Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry
about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
the one immediately preceding the async?




/** all the configs for a given tenant, as of the time when a change was
observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map<UUID, Config>

/** parse raw strings from input, rejecting those for unconfigured tenants
class Parse(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<String, ConfigSnapshot, Record> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Record>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)

  override fun processElement(value: String, ctx: ReadOnlyContext, out:
Collector<Record>) {
    val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
      .ifEmpty { initialConfigs.keys }

    val parsed = Record(value)
    if (!validTenantIds.contains(parsed.tenantId)) {
    } else {

/** given a parsed record, identity which config(s) are interested in it,
and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Pair<Record, Config>>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)

  override fun processElement(
    value: Record,
    ctx: ReadOnlyContext,
    out: Collector<Pair<Record, Config>>
  ) {
    val configsForThisTenant =
      .ifEmpty { initialConfigs }

    val configsInterestedInThisRecord = configsForThisTenant.values.filter

    for ((configId, config) in configsInterestedInThisRecord) {
      out.collect(value to config)

/** given a pair of Record and Config, run the async operation and send an
enriched record including the result */
class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord>

*Job Pseudocode:*

val initialConfigs: Map<Long, ConfigSnapshot> = ???
val dataSource: DataStream<String> = ???
val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while
(true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a
Map<tenantId: Long, ConfigSnapshot>
val configSnapshotDescriptor = MapStateDescriptor(

// Broadcast the snapshots
val configBroadcast: BroadcastStream<ConfigSnapshot> =

val parsed: DataStream<Record> = dataSource

// input records can be duplicated now, as there may be multiple Configs
that are interested in a record
val validated: DataStream<Pair<Record, Config>> = parsed

val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait(

On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <> wrote:

> Hi Alex,
> your approach is completely valid. What you want to achieve is that you
> have a chain between your state managing operator and the consuming async
> operations. In that way, you have no serialization overhead.
> To achieve that you want to
> - use Flink 1.11+ [1]
> - make sure that if you have a legacy source, you disableChaining before
> your state managing operator as asyncIO cannot be (transitively) chained to
> legacy sources. So it should be source -> ... -> (forward channel) ->
> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
> - enableObjectReuse [2] to avoid copying of objects
> [1]
> [2]
> On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <> wrote:
>> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>> -0xe1a
>> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
>>> wrote:
>>> Hey Alex,
>>> I'm not sure if there is a best practice here, but what I can tell you
>>> is that I worked on a job that did exactly what you're suggesting with a
>>> non-async operator to create a [record, config] tuple, which was then
>>> passed to the async stage. Our config objects were also not tiny (~500kb)
>>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>>> setup worked quite well. I'd say if latency isn't your most important
>>> metric, or if your pipeline is a similar size, the ease of async IO is
>>> worth it.
>>> One thing you'll have to look out for (if you haven't already) is
>>> bootstrapping the config objects when the job starts, since the broadcast
>>> from the polling source can happen later than recieving the first record –
>>> we solved this by calling the polling source's service in the `open()`
>>> method of the non-async operator and storing the initial configs in memory.
>>> Hope that helps a bit,
>>> Austin
>>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <> wrote:
>>>> Hi folks,
>>>> I have a somewhat complex Flink job that has a few async stages, and a
>>>> few stateful stages. It currently loads its configuration on startup, and
>>>> doesn't attempt to refresh it.
>>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>>> source which sends a configuration snapshot whenever anything has changed,
>>>> I've set up a broadcast of that source, and I'm updating the operators in
>>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>>> I've reached the first async operator, and I recall that async functions
>>>> aren't allowed to be stateful.
>>>> I've tried to find a best practice for this situation, without much
>>>> luck. My best idea so far is to insert a new stage before the async one,
>>>> which would tuple up each record with its corresponding config snapshot
>>>> from the most recent broadcast state. This would increase the amount of
>>>> data that needs to be serialized, and some of the configs are quite large,
>>>> but would allow me to continue using async IO.
>>>> Any suggestions?
>>>> Thanks!
>>>> -0xe1a

Reply via email to