Hi Rob, Maybe I am quite late to the party, but I think it might be worth having a look at the Stateful functions API[1] as well. Especially your latest approach reminds me a bit about the architecture of the Stateful functions. There you can have arbitrary routing to functions. You can also delegate some functions execution to external services.
[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/concepts/distributed_architecture.html Best, Dawid On 09/07/2020 10:17, Arvid Heise wrote: > Hi Rob, > > your approach looks good, but I haven't used iterations in streams > yet. If it works for you, it can't be bad anyways. > > If it is indeed working as expected, I'd recommend checking out > broadcasts to distribute the rules [1]. This pattern will allow you to > dynamically add new rules via a special input source (like Kafka topic). > Also on datastream API level, you have the option to emit side-outputs > [2]. That would be a natural choice for error handling. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html > > On Wed, Jul 8, 2020 at 2:28 AM Rob Shepherd <rgsheph...@gmail.com > <mailto:rgsheph...@gmail.com>> wrote: > > Thank you for the excellent clarifications. > I couldn't quite figure out how to map the above to my domain. > > Nevertheless i have a working demo that performs the following > pseudo code: > > Let's say that each "channel" has slightly different stream > requirements > and we can look up the list of operations needed to be performed > using a channel key. > (an operation is our term for some element of processing, a FaaS > call or local routine maybe) > > 1. extract channel key from incoming message > 2. lookup channel info and enrich the stream object with channel > info and a list of operations > 3. i...n using the iterative stream API, loop around each > operation in the list from (2). > 4. sink > > > https://gist.github.com/robshep/bf38b7753062e9d49d365e505e86385e#file-dynamicstramjob-java-L52 > > I've some work to do to understand storing and retrieving state, > as my demo just stores the loop-state in my main stream object - I > don't know whether this is bad or bad-practice. > > I'd be really grateful if anyone can cast their eye on this little > demo and see if there are any gotchas or pitfalls I'm likely to > succumb to with this approach. > > With thanks for your help getting started > > > > Rob Shepherd BEng PhD > > > > On Tue, 7 Jul 2020 at 19:26, Arvid Heise <ar...@ververica.com > <mailto:ar...@ververica.com>> wrote: > > Hi Rob, > > 1. When you start a flink application, you actually just > execute a Java main called the driver. This driver submits a > job graph to the job manager, which executes the job. Since > the driver is an ordinary Java program that uses the Flink > API, you can compose the job graph in any way you want. Have a > look at one example to see what I mean [1]. It's not hard to > imagine that you can compose a query such as > > List<String> extractorQueries = new ArrayList<>(); Table table = > tableEnvironment.from("testCatalog.`default`.testTable"); Table errors = > tableEnvironment.fromValues(); for (int index = 0; index < > extractorQueries.size(); index++) { > String extractorQuery = extractorQueries.get(index); table = > table.addColumns($(extractorQuery).as("extractedValue" + index, "error")); > errors = errors.unionAll(table.filter($("error").isNotNull())); table = > table.filter($("error").isNull()).dropColumns($("error")); } > // write table and errors > > This query first loads the data from a testTable and then > successively applies sql expressions that calculate one value > + one error column each. The value is stored in > extractedValue0...99 (assuming 100 extractor queries). All > values that cause errors, will have a value in the error > column set. These are collected in the table "errors" for side > output (very useful for debugging and improving the extractor > queries). All good records (error IS NULL) are retained for > further processing and the error column gets dropped. > > Btw there is also a Python entry point available, which offers > you more or less the same. I just haven't tried it yet. [2] > > Lastly, currently all extractors are executed in succession. > Of course, it is also possible to run them independently if > you have different source streams. You can then later join / > union them. > > 2. The downside of this driver approach is that changes in the > configuration are not directly reflected. However, upon > restart Flink will adapt the changes and recover from the last > checkpoint [3] (= snapshot of the current processing state, > which can be done every second in your case as the state is > rather small). So now you just need to find a way to force a > restart. > > One approach is to kill it manually and start again, but > that's not scaling well. However, Flink's fault tolerance > feature can be somewhat exploited: You can have one part of > your program fail on config change, which will restart the > whole application automatically if configured correctly and > thus using the latest configuration. > > [1] > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java#L77-L100 > [2] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/python_udfs.html > [3] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#checkpointing > > On Tue, Jul 7, 2020 at 6:12 PM Rob Shepherd > <rgsheph...@gmail.com <mailto:rgsheph...@gmail.com>> wrote: > > Very helpful thank you Arvid. > > I've been reading up but I'm not sure I grasp all of that > just yet. Please may I ask for clarification? > > 1. Could I summarise correctly that I may build a list of > functions from an SQL call which can then be looped over? > This looping sounds appealing and you are right that "1 or > 100" is a big bonus. > > 2. "during the start of the application and restart to > reflect changes" > "during the start" do you mean when the job first boots, > or immediately upon ingress of the data event from the queue? > "restart" is this an API call to maybe abort an execution > of a piece of data but with more up-to-date context. > > > Trying to be a fast learner, and very grateful for the > pointers. > > With thanks and best regards > > Rob > > > > > Rob Shepherd BEng PhD > > > > On Tue, 7 Jul 2020 at 15:33, Arvid Heise > <ar...@ververica.com <mailto:ar...@ververica.com>> wrote: > > Hi Rob, > > In the past I used a mixture of configuration and > template queries to achieve a similar goal (I had only > up to 150 of these jobs per application). My approach > was not completely dynamic as you have described but > rather to compose a big query from a configuration > during the start of the application and restart to > reflect changes. > > For the simple extractor/mapper, I'd use Table API and > plug in SQL statements [1] that could be easily given > by experienced end-users/analysts. Abort logic should > be added programmatically to each of the > extractor/mapper through Table API (for example, > extractor can output an error column that also gives > an explanation and this column is then checked for > non-null). The big advantage of using Table API over a > simple SQL query is that you can add structural > variance: your application may use 1 extractor or 100; > it's just a matter of a loop. > > Note that async IO is currently not available in Table > API, but you can easily switch back and forth between > Table API and Datastream. I'd definitely suggest to > use async IO for your described use cases. > > So please consider to also use that less dynamic > approach; you'd get much for free: SQL support with > proper query validation and meaningful error messages. > And it's also much easier to test/debug. > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#sql > > On Tue, Jul 7, 2020 at 4:01 PM Rob Shepherd > <rgsheph...@gmail.com <mailto:rgsheph...@gmail.com>> > wrote: > > Hi All, > > It'd be great to consider stream processing as a > platform for our upcoming projects. Flink seems to > be the closeted match. > > However we have numerous stream processing > workloads and would want to be able to scale up to > 1000's different streams; each quite similar in > structure/sequence but with the functional logic > being very different in each. > > For example, there is always a "validate" stage - > but what that means is dependant on the > client/data/context etc and would typically map to > a few line of script to perform. > > In essence, our sequences can often be > deconstructed down to 8-12 python snippets and the > serverless/functional paradigm seems to fit well. > > Whilst we can deploy our functions readily to a > faas/k8s or something (which seems to fit the bill > with remote functions) I don't yet see how to > quickly draw these together in a dynamic stream. > > My initial thoughts would be to create a very > general purpose stream job that then works through > the context of mapping functions to flink tasks > based on the client dataset. > > E.g. some pseudo code: > > ingress() > extract() > getDynamicStreamFunctionDefs() > getFunction1() > runFunction1() > abortOnError() > getFunction2() > runFunction2() > abortOnError() > ... > getFunction10() > runFunction10() > sinkData() > > Most functions are not however simple lexical > operations, or extractors/mappers - but on the > whole require a few database/API calls to retrieve > things like previous data, configurations etc. > > They are not necessarily long running but > certainly Async is a consideration. > > I think every stage will be UDFs (and then > Meta-UDFs at that) > > As a result I'm not sure if we can get this to fit > without a brittle set of workarounds, and ruin any > benefit of running through flink etc... > but it would great to hear opinions of others who > might have tackled this kind of dynamic tasking. > > > I'm happy to explain this better if it isn't clear. > > With best regards > > Rob > > > > > Rob Shepherd BEng PhD > > > > -- > > Arvid Heise| Senior Java Developer > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/>- The > Apache FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > Germany > > -- > > Ververica GmbHRegistered at Amtsgericht > Charlottenburg: HRB 158244 BManaging Directors: > Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > > > > -- > > Arvid Heise| Senior Java Developer > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/>- The Apache > FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB > 158244 BManaging Directors: Timothy Alexander Steinert, Yip > Park Tung Jason, Ji (Toni) Cheng > > > > -- > > Arvid Heise| Senior Java Developer > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/>- The Apache > FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 > BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason, > Ji (Toni) Cheng
signature.asc
Description: OpenPGP digital signature