Hi Rob,

Maybe I am quite late to the party, but I think it might be worth having
a look at the Stateful functions API[1] as well. Especially your latest
approach reminds me a bit about the architecture of the Stateful
functions. There you can have arbitrary routing to functions. You can
also delegate some functions execution to external services.

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/concepts/distributed_architecture.html

Best,

Dawid

On 09/07/2020 10:17, Arvid Heise wrote:
> Hi Rob,
>
> your approach looks good, but I haven't used iterations in streams
> yet. If it works for you, it can't be bad anyways.
>
> If it is indeed working as expected, I'd recommend checking out
> broadcasts to distribute the rules [1]. This pattern will allow you to
> dynamically add new rules via a special input source (like Kafka topic).
> Also on datastream API level, you have the option to emit side-outputs
> [2]. That would be a natural choice for error handling.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html
>
> On Wed, Jul 8, 2020 at 2:28 AM Rob Shepherd <rgsheph...@gmail.com
> <mailto:rgsheph...@gmail.com>> wrote:
>
>     Thank you for the excellent clarifications.
>     I couldn't quite figure out how to map the above to my domain.
>
>     Nevertheless i have a working demo that performs the following
>     pseudo code:
>
>     Let's say that each "channel" has slightly different stream
>     requirements
>     and we can look up the list of operations needed to be performed
>     using a channel key.
>     (an operation is our term for some element of processing, a FaaS
>     call or local routine maybe)
>
>     1. extract channel key from incoming message
>     2. lookup channel info and enrich the stream object with channel
>     info and a list of operations
>     3. i...n using the iterative stream API, loop around each
>     operation in the list from (2).
>     4. sink
>
>     
> https://gist.github.com/robshep/bf38b7753062e9d49d365e505e86385e#file-dynamicstramjob-java-L52
>
>     I've some work to do to understand storing and retrieving state,
>     as my demo just stores the loop-state in my main stream object - I
>     don't know whether this is bad or bad-practice.
>
>     I'd be really grateful if anyone can cast their eye on this little
>     demo and see if there are any gotchas or pitfalls I'm likely to
>     succumb to with this approach.
>
>     With thanks for your help getting started
>
>
>
>     Rob Shepherd BEng PhD
>
>
>
>     On Tue, 7 Jul 2020 at 19:26, Arvid Heise <ar...@ververica.com
>     <mailto:ar...@ververica.com>> wrote:
>
>         Hi Rob,
>
>         1. When you start a flink application, you actually just
>         execute a Java main called the driver. This driver submits a
>         job graph to the job manager, which executes the job. Since
>         the driver is an ordinary Java program that uses the Flink
>         API, you can compose the job graph in any way you want. Have a
>         look at one example to see what I mean [1]. It's not hard to
>         imagine that you can compose a query such as
>
>         List<String> extractorQueries = new ArrayList<>(); Table table = 
> tableEnvironment.from("testCatalog.`default`.testTable"); Table errors = 
> tableEnvironment.fromValues(); for (int index = 0; index < 
> extractorQueries.size(); index++) {
>            String extractorQuery = extractorQueries.get(index); table = 
> table.addColumns($(extractorQuery).as("extractedValue" + index, "error")); 
> errors = errors.unionAll(table.filter($("error").isNotNull())); table = 
> table.filter($("error").isNull()).dropColumns($("error")); }
>         // write table and errors
>
>         This query first loads the data from a testTable and then
>         successively applies sql expressions that calculate one value
>         + one error column each. The value is stored in
>         extractedValue0...99 (assuming 100 extractor queries). All
>         values that cause errors, will have a value in the error
>         column set. These are collected in the table "errors" for side
>         output (very useful for debugging and improving the extractor
>         queries). All good records (error IS NULL) are retained for
>         further processing and the error column gets dropped.
>
>         Btw there is also a Python entry point available, which offers
>         you more or less the same. I just haven't tried it yet. [2]
>
>         Lastly, currently all extractors are executed in succession.
>         Of course, it is also possible to run them independently if
>         you have different source streams. You can then later join /
>         union them.
>
>         2. The downside of this driver approach is that changes in the
>         configuration are not directly reflected. However, upon
>         restart Flink will adapt the changes and recover from the last
>         checkpoint [3] (= snapshot of the current processing state,
>         which can be done every second in your case as the state is
>         rather small). So now you just need to find a way to force a
>         restart.
>
>         One approach is to kill it manually and start again, but
>         that's not scaling well. However, Flink's fault tolerance
>         feature can be somewhat exploited: You can have one part of
>         your program fail on config change, which will restart the
>         whole application automatically if configured correctly and
>         thus using the latest configuration.
>
>         [1]
>         
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java#L77-L100
>         [2]
>         
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/python_udfs.html
>         [3]
>         
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#checkpointing
>
>         On Tue, Jul 7, 2020 at 6:12 PM Rob Shepherd
>         <rgsheph...@gmail.com <mailto:rgsheph...@gmail.com>> wrote:
>
>             Very helpful thank you Arvid.
>
>             I've been reading up but I'm not sure I grasp all of that
>             just yet.  Please may I ask for clarification?
>
>             1. Could I summarise correctly that I may build a list of
>             functions from an SQL call which can then be looped over?
>             This looping sounds appealing and you are right that "1 or
>             100" is a big bonus.
>
>             2. "during the start of the application and restart to
>             reflect changes"
>             "during the start" do you mean when the job first boots,
>             or immediately upon ingress of the data event from the queue?
>             "restart" is this an API call to maybe abort an execution
>             of a piece of data but with more up-to-date context.
>
>
>             Trying to be a fast learner, and very grateful for the
>             pointers.
>
>             With thanks and best regards
>
>             Rob
>
>
>
>
>             Rob Shepherd BEng PhD
>
>
>
>             On Tue, 7 Jul 2020 at 15:33, Arvid Heise
>             <ar...@ververica.com <mailto:ar...@ververica.com>> wrote:
>
>                 Hi Rob,
>
>                 In the past I used a mixture of configuration and
>                 template queries to achieve a similar goal (I had only
>                 up to 150 of these jobs per application). My approach
>                 was not completely dynamic as you have described but
>                 rather to compose a big query from a configuration
>                 during the start of the application and restart to
>                 reflect changes.
>
>                 For the simple extractor/mapper, I'd use Table API and
>                 plug in SQL statements [1] that could be easily given
>                 by experienced end-users/analysts. Abort logic should
>                 be added programmatically to each of the
>                 extractor/mapper through Table API (for example,
>                 extractor can output an error column that also gives
>                 an explanation and this column is then checked for
>                 non-null). The big advantage of using Table API over a
>                 simple SQL query is that you can add structural
>                 variance: your application may use 1 extractor or 100;
>                 it's just a matter of a loop.
>
>                 Note that async IO is currently not available in Table
>                 API, but you can easily switch back and forth between
>                 Table API and Datastream. I'd definitely suggest to
>                 use async IO for your described use cases.
>
>                 So please consider to also use that less dynamic
>                 approach; you'd get much for free: SQL support with
>                 proper query validation and meaningful error messages.
>                 And it's also much easier to test/debug.
>
>
>                 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#sql
>
>                 On Tue, Jul 7, 2020 at 4:01 PM Rob Shepherd
>                 <rgsheph...@gmail.com <mailto:rgsheph...@gmail.com>>
>                 wrote:
>
>                     Hi All,
>
>                     It'd be great to consider stream processing as a
>                     platform for our upcoming projects. Flink seems to
>                     be the closeted match.
>
>                     However we have numerous stream processing
>                     workloads and would want to be able to scale up to
>                     1000's different streams;  each quite similar in
>                     structure/sequence but with the functional logic
>                     being very different in each.
>
>                     For example, there is always a "validate" stage -
>                     but what that means is dependant on the
>                     client/data/context etc and would typically map to
>                     a few line of script to perform.
>
>                     In essence, our sequences can often be
>                     deconstructed down to 8-12 python snippets and the
>                     serverless/functional paradigm seems to fit well.
>
>                     Whilst we can deploy our functions readily to a
>                     faas/k8s or something (which seems to fit the bill
>                     with remote functions) I don't yet see how to
>                     quickly draw these together in a dynamic stream.
>
>                     My initial thoughts would be to create a very
>                     general purpose stream job that then works through
>                     the context of mapping functions to flink tasks
>                     based on the client dataset.
>
>                     E.g. some pseudo code:
>
>                     ingress()
>                     extract()
>                     getDynamicStreamFunctionDefs()
>                     getFunction1()
>                     runFunction1()
>                     abortOnError()
>                     getFunction2()
>                     runFunction2()
>                     abortOnError()
>                     ...
>                     getFunction10()
>                     runFunction10()
>                     sinkData()
>
>                     Most functions are not however simple lexical
>                     operations, or extractors/mappers - but on the
>                     whole require a few database/API calls to retrieve
>                     things like previous data, configurations etc.
>
>                     They are not necessarily long running but
>                     certainly Async is a consideration.
>
>                     I think every stage will be UDFs (and then
>                     Meta-UDFs at that)
>
>                     As a result I'm not sure if we can get this to fit
>                     without a brittle set of workarounds, and ruin any
>                     benefit of running through flink etc...
>                     but it would great to hear opinions of others who
>                     might have tackled this kind of dynamic tasking.
>
>
>                     I'm happy to explain this better if it isn't clear.
>
>                     With best regards
>
>                     Rob
>
>
>
>
>                     Rob Shepherd BEng PhD
>
>
>
>                 -- 
>
>                 Arvid Heise| Senior Java Developer
>
>                 <https://www.ververica.com/>
>
>
>                 Follow us @VervericaData
>
>                 --
>
>                 Join Flink Forward <https://flink-forward.org/>- The
>                 Apache FlinkConference
>
>                 Stream Processing | Event Driven | Real Time
>
>                 --
>
>                 Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
>                 Germany
>
>                 --
>
>                 Ververica GmbHRegistered at Amtsgericht
>                 Charlottenburg: HRB 158244 BManaging Directors:
>                 Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>                 (Toni) Cheng   
>
>
>
>         -- 
>
>         Arvid Heise| Senior Java Developer
>
>         <https://www.ververica.com/>
>
>
>         Follow us @VervericaData
>
>         --
>
>         Join Flink Forward <https://flink-forward.org/>- The Apache
>         FlinkConference
>
>         Stream Processing | Event Driven | Real Time
>
>         --
>
>         Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
>         --
>
>         Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB
>         158244 BManaging Directors: Timothy Alexander Steinert, Yip
>         Park Tung Jason, Ji (Toni) Cheng   
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/>- The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji (Toni) Cheng   

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to