Hi Rob,

your approach looks good, but I haven't used iterations in streams yet. If
it works for you, it can't be bad anyways.

If it is indeed working as expected, I'd recommend checking out broadcasts
to distribute the rules [1]. This pattern will allow you to dynamically add
new rules via a special input source (like Kafka topic).
Also on datastream API level, you have the option to emit side-outputs [2].
That would be a natural choice for error handling.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

On Wed, Jul 8, 2020 at 2:28 AM Rob Shepherd <rgsheph...@gmail.com> wrote:

> Thank you for the excellent clarifications.
> I couldn't quite figure out how to map the above to my domain.
>
> Nevertheless i have a working demo that performs the following pseudo code:
>
> Let's say that each "channel" has slightly different stream requirements
> and we can look up the list of operations needed to be performed using a
> channel key.
> (an operation is our term for some element of processing, a FaaS call or
> local routine maybe)
>
> 1. extract channel key from incoming message
> 2. lookup channel info and enrich the stream object with channel info and
> a list of operations
> 3. i...n using the iterative stream API, loop around each operation in the
> list from (2).
> 4. sink
>
>
> https://gist.github.com/robshep/bf38b7753062e9d49d365e505e86385e#file-dynamicstramjob-java-L52
>
> I've some work to do to understand storing and retrieving state, as my
> demo just stores the loop-state in my main stream object - I don't know
> whether this is bad or bad-practice.
>
> I'd be really grateful if anyone can cast their eye on this little demo
> and see if there are any gotchas or pitfalls I'm likely to succumb to with
> this approach.
>
> With thanks for your help getting started
>
>
>
> Rob Shepherd BEng PhD
>
>
>
> On Tue, 7 Jul 2020 at 19:26, Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Rob,
>>
>> 1. When you start a flink application, you actually just execute a Java
>> main called the driver. This driver submits a job graph to the job manager,
>> which executes the job. Since the driver is an ordinary Java program that
>> uses the Flink API, you can compose the job graph in any way you want. Have
>> a look at one example to see what I mean [1]. It's not hard to imagine that
>> you can compose a query such as
>>
>> List<String> extractorQueries = new ArrayList<>();
>> Table table = tableEnvironment.from("testCatalog.`default`.testTable");
>> Table errors = tableEnvironment.fromValues();
>> for (int index = 0; index < extractorQueries.size(); index++) {
>>    String extractorQuery = extractorQueries.get(index);
>>    table = table.addColumns($(extractorQuery).as("extractedValue" + index, 
>> "error"));
>>    errors = errors.unionAll(table.filter($("error").isNotNull()));
>>    table = table.filter($("error").isNull()).dropColumns($("error"));
>> }
>> // write table and errors
>>
>> This query first loads the data from a testTable and then successively
>> applies sql expressions that calculate one value + one error column each.
>> The value is stored in extractedValue0...99 (assuming 100 extractor
>> queries). All values that cause errors, will have a value in the error
>> column set. These are collected in the table "errors" for side output (very
>> useful for debugging and improving the extractor queries). All good records
>> (error IS NULL) are retained for further processing and the error column
>> gets dropped.
>>
>> Btw there is also a Python entry point available, which offers you more
>> or less the same. I just haven't tried it yet. [2]
>>
>> Lastly, currently all extractors are executed in succession. Of course,
>> it is also possible to run them independently if you have different source
>> streams. You can then later join / union them.
>>
>> 2. The downside of this driver approach is that changes in the
>> configuration are not directly reflected. However, upon restart Flink will
>> adapt the changes and recover from the last checkpoint [3] (= snapshot of
>> the current processing state, which can be done every second in your case
>> as the state is rather small). So now you just need to find a way to force
>> a restart.
>>
>> One approach is to kill it manually and start again, but that's not
>> scaling well. However, Flink's fault tolerance feature can be somewhat
>> exploited: You can have one part of your program fail on config change,
>> which will restart the whole application automatically if configured
>> correctly and thus using the latest configuration.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java#L77-L100
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/python_udfs.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#checkpointing
>>
>> On Tue, Jul 7, 2020 at 6:12 PM Rob Shepherd <rgsheph...@gmail.com> wrote:
>>
>>> Very helpful thank you Arvid.
>>>
>>> I've been reading up but I'm not sure I grasp all of that just yet.
>>> Please may I ask for clarification?
>>>
>>> 1. Could I summarise correctly that I may build a list of functions from
>>> an SQL call which can then be looped over?
>>> This looping sounds appealing and you are right that "1 or 100" is a big
>>> bonus.
>>>
>>> 2. "during the start of the application and restart to reflect changes"
>>> "during the start" do you mean when the job first boots, or immediately
>>> upon ingress of the data event from the queue?
>>> "restart" is this an API call to maybe abort an execution of a piece of
>>> data but with more up-to-date context.
>>>
>>>
>>> Trying to be a fast learner, and very grateful for the pointers.
>>>
>>> With thanks and best regards
>>>
>>> Rob
>>>
>>>
>>>
>>>
>>> Rob Shepherd BEng PhD
>>>
>>>
>>>
>>> On Tue, 7 Jul 2020 at 15:33, Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Rob,
>>>>
>>>> In the past I used a mixture of configuration and template queries to
>>>> achieve a similar goal (I had only up to 150 of these jobs per
>>>> application). My approach was not completely dynamic as you have described
>>>> but rather to compose a big query from a configuration during the start of
>>>> the application and restart to reflect changes.
>>>>
>>>> For the simple extractor/mapper, I'd use Table API and plug in SQL
>>>> statements [1] that could be easily given by experienced
>>>> end-users/analysts. Abort logic should be added programmatically to each of
>>>> the extractor/mapper through Table API (for example, extractor can output
>>>> an error column that also gives an explanation and this column is then
>>>> checked for non-null). The big advantage of using Table API over a simple
>>>> SQL query is that you can add structural variance: your application may use
>>>> 1 extractor or 100; it's just a matter of a loop.
>>>>
>>>> Note that async IO is currently not available in Table API, but you can
>>>> easily switch back and forth between Table API and Datastream. I'd
>>>> definitely suggest to use async IO for your described use cases.
>>>>
>>>> So please consider to also use that less dynamic approach; you'd get
>>>> much for free: SQL support with proper query validation and meaningful
>>>> error messages. And it's also much easier to test/debug.
>>>>
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#sql
>>>>
>>>> On Tue, Jul 7, 2020 at 4:01 PM Rob Shepherd <rgsheph...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> It'd be great to consider stream processing as a platform for our
>>>>> upcoming projects. Flink seems to be the closeted match.
>>>>>
>>>>> However we have numerous stream processing workloads and would want to
>>>>> be able to scale up to 1000's different streams;  each quite similar in
>>>>> structure/sequence but with the functional logic being very different in
>>>>> each.
>>>>>
>>>>> For example, there is always a "validate" stage - but what that means
>>>>> is dependant on the client/data/context etc and would typically map to a
>>>>> few line of script to perform.
>>>>>
>>>>> In essence, our sequences can often be deconstructed down to 8-12
>>>>> python snippets and the serverless/functional paradigm seems to fit well.
>>>>>
>>>>> Whilst we can deploy our functions readily to a faas/k8s or something
>>>>> (which seems to fit the bill with remote functions) I don't yet see how to
>>>>> quickly draw these together in a dynamic stream.
>>>>>
>>>>> My initial thoughts would be to create a very general purpose stream
>>>>> job that then works through the context of mapping functions to flink 
>>>>> tasks
>>>>> based on the client dataset.
>>>>>
>>>>> E.g. some pseudo code:
>>>>>
>>>>> ingress()
>>>>> extract()
>>>>> getDynamicStreamFunctionDefs()
>>>>> getFunction1()
>>>>> runFunction1()
>>>>> abortOnError()
>>>>> getFunction2()
>>>>> runFunction2()
>>>>> abortOnError()
>>>>> ...
>>>>> getFunction10()
>>>>> runFunction10()
>>>>> sinkData()
>>>>>
>>>>> Most functions are not however simple lexical operations, or
>>>>> extractors/mappers - but on the whole require a few database/API calls to
>>>>> retrieve things like previous data, configurations etc.
>>>>>
>>>>> They are not necessarily long running but certainly Async is a
>>>>> consideration.
>>>>>
>>>>> I think every stage will be UDFs (and then Meta-UDFs at that)
>>>>>
>>>>> As a result I'm not sure if we can get this to fit without a brittle
>>>>> set of workarounds, and ruin any benefit of running through flink etc...
>>>>> but it would great to hear opinions of others who might have tackled
>>>>> this kind of dynamic tasking.
>>>>>
>>>>>
>>>>> I'm happy to explain this better if it isn't clear.
>>>>>
>>>>> With best regards
>>>>>
>>>>> Rob
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Rob Shepherd BEng PhD
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to