Hi Alex,

your approach is completely valid. What you want to achieve is that you
have a chain between your state managing operator and the consuming async
operations. In that way, you have no serialization overhead.

To achieve that you want to
- use Flink 1.11+ [1]
- make sure that if you have a legacy source, you disableChaining before
your state managing operator as asyncIO cannot be (transitively) chained to
legacy sources. So it should be source -> ... -> (forward channel) ->
(state managing operator -> async1 -> async2 -> ... ) ... -> sink
- enableObjectReuse [2] to avoid copying of objects

[1] https://issues.apache.org/jira/browse/FLINK-16219
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html

On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <a...@cluonflux.com> wrote:

> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>
> -0xe1a
>
> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Alex,
>>
>> I'm not sure if there is a best practice here, but what I can tell you is
>> that I worked on a job that did exactly what you're suggesting with a
>> non-async operator to create a [record, config] tuple, which was then
>> passed to the async stage. Our config objects were also not tiny (~500kb)
>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>> setup worked quite well. I'd say if latency isn't your most important
>> metric, or if your pipeline is a similar size, the ease of async IO is
>> worth it.
>>
>> One thing you'll have to look out for (if you haven't already) is
>> bootstrapping the config objects when the job starts, since the broadcast
>> from the polling source can happen later than recieving the first record –
>> we solved this by calling the polling source's service in the `open()`
>> method of the non-async operator and storing the initial configs in memory.
>>
>> Hope that helps a bit,
>> Austin
>>
>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <a...@cluonflux.com> wrote:
>>
>>> Hi folks,
>>>
>>> I have a somewhat complex Flink job that has a few async stages, and a
>>> few stateful stages. It currently loads its configuration on startup, and
>>> doesn't attempt to refresh it.
>>>
>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>> source which sends a configuration snapshot whenever anything has changed,
>>> I've set up a broadcast of that source, and I'm updating the operators in
>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>> I've reached the first async operator, and I recall that async functions
>>> aren't allowed to be stateful.
>>>
>>> I've tried to find a best practice for this situation, without much
>>> luck. My best idea so far is to insert a new stage before the async one,
>>> which would tuple up each record with its corresponding config snapshot
>>> from the most recent broadcast state. This would increase the amount of
>>> data that needs to be serialized, and some of the configs are quite large,
>>> but would allow me to continue using async IO.
>>>
>>> Any suggestions?
>>>
>>> Thanks!
>>>
>>> -0xe1a
>>>
>>

Reply via email to