Thanks Arvid! I'm not completely clear on where to apply your suggestions. I've included a sketch of my job below, and I have a couple questions:
1. It looks like enableObjectReuse() is a global setting, should I worry about whether I'm using any mutable data between stages? 2. Should I disableChaining() on BOTH broadcast-dependent stages, or just the one immediately preceding the async? Thanks! -0xe1a *Types:* /** all the configs for a given tenant, as of the time when a change was observed */ data class ConfigSnapshot( tenantId: Long, timestamp: Instant, configs: Map<UUID, Config> ) /** parse raw strings from input, rejecting those for unconfigured tenants */ class Parse( initialConfigs: Map<Long, ConfigSnapshot> ) : BroadcastProcessFunction<String, ConfigSnapshot, Record> { override fun processBroadcastElement( value: ConfigSnapshot, ctx: Context, out: Collector<Record> ) { val snapshots = ctx.getBroadcastState(configSnapshotDescriptor) snapshots.put(value.tenantId, value) } override fun processElement(value: String, ctx: ReadOnlyContext, out: Collector<Record>) { val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor) .toMap() .keys .ifEmpty { initialConfigs.keys } val parsed = Record(value) if (!validTenantIds.contains(parsed.tenantId)) { return } else { out.collect(parsed) } } } /** given a parsed record, identity which config(s) are interested in it, and send an output value of the record tupled with the interested config */ class ValidateAndDistribute( initialConfigs: Map<Long, ConfigSnapshot> ) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>> { override fun processBroadcastElement( value: ConfigSnapshot, ctx: Context, out: Collector<Pair<Record, Config>> ) { val snapshots = ctx.getBroadcastState(configSnapshotDescriptor) snapshots.put(value.tenantId, value) } override fun processElement( value: Record, ctx: ReadOnlyContext, out: Collector<Pair<Record, Config>> ) { val configsForThisTenant = ctx.getBroadcastState(configSnapshotDescriptor) .toMap() .ifEmpty { initialConfigs } .get(value.tenantId) .configs .orEmpty() val configsInterestedInThisRecord = configsForThisTenant.values.filter { it.interestedIn(value) } for ((configId, config) in configsInterestedInThisRecord) { out.collect(value to config) } } } /** given a pair of Record and Config, run the async operation and send an enriched record including the result */ class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord> *Job Pseudocode:* val initialConfigs: Map<Long, ConfigSnapshot> = ??? val dataSource: DataStream<String> = ??? val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while (true) { poll; sleep }" source // the config-subscribing operators keep the broadcast state in a Map<tenantId: Long, ConfigSnapshot> val configSnapshotDescriptor = MapStateDescriptor( "currentConfigSnapshots", Long::class.java, ConfigSnapshot::class.java ) // Broadcast the snapshots val configBroadcast: BroadcastStream<ConfigSnapshot> = configSource.broadcast(configSnapshotDescriptor) val parsed: DataStream<Record> = dataSource .connect(configBroadcast) .process(Parse(initialConfigs)) // input records can be duplicated now, as there may be multiple Configs that are interested in a record val validated: DataStream<Pair<Record, Config>> = parsed .connect(configBroadcast) .process(ValidateAndDistribute(initialConfigs)) val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait( validated, Enrich(), 5L, TimeUnit.SECONDS ) On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <ar...@apache.org> wrote: > Hi Alex, > > your approach is completely valid. What you want to achieve is that you > have a chain between your state managing operator and the consuming async > operations. In that way, you have no serialization overhead. > > To achieve that you want to > - use Flink 1.11+ [1] > - make sure that if you have a legacy source, you disableChaining before > your state managing operator as asyncIO cannot be (transitively) chained to > legacy sources. So it should be source -> ... -> (forward channel) -> > (state managing operator -> async1 -> async2 -> ... ) ... -> sink > - enableObjectReuse [2] to avoid copying of objects > > [1] https://issues.apache.org/jira/browse/FLINK-16219 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html > > On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <a...@cluonflux.com> wrote: > >> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :) >> >> -0xe1a >> >> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> Hey Alex, >>> >>> I'm not sure if there is a best practice here, but what I can tell you >>> is that I worked on a job that did exactly what you're suggesting with a >>> non-async operator to create a [record, config] tuple, which was then >>> passed to the async stage. Our config objects were also not tiny (~500kb) >>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this >>> setup worked quite well. I'd say if latency isn't your most important >>> metric, or if your pipeline is a similar size, the ease of async IO is >>> worth it. >>> >>> One thing you'll have to look out for (if you haven't already) is >>> bootstrapping the config objects when the job starts, since the broadcast >>> from the polling source can happen later than recieving the first record – >>> we solved this by calling the polling source's service in the `open()` >>> method of the non-async operator and storing the initial configs in memory. >>> >>> Hope that helps a bit, >>> Austin >>> >>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <a...@cluonflux.com> wrote: >>> >>>> Hi folks, >>>> >>>> I have a somewhat complex Flink job that has a few async stages, and a >>>> few stateful stages. It currently loads its configuration on startup, and >>>> doesn't attempt to refresh it. >>>> >>>> Now I'm working on dynamic reconfiguration. I've written a polling >>>> source which sends a configuration snapshot whenever anything has changed, >>>> I've set up a broadcast of that source, and I'm updating the operators in >>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now >>>> I've reached the first async operator, and I recall that async functions >>>> aren't allowed to be stateful. >>>> >>>> I've tried to find a best practice for this situation, without much >>>> luck. My best idea so far is to insert a new stage before the async one, >>>> which would tuple up each record with its corresponding config snapshot >>>> from the most recent broadcast state. This would increase the amount of >>>> data that needs to be serialized, and some of the configs are quite large, >>>> but would allow me to continue using async IO. >>>> >>>> Any suggestions? >>>> >>>> Thanks! >>>> >>>> -0xe1a >>>> >>>