Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry
about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just
the one immediately preceding the async?

Thanks!

-0xe1a

*Types:*

/** all the configs for a given tenant, as of the time when a change was
observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map<UUID, Config>
)

/** parse raw strings from input, rejecting those for unconfigured tenants
*/
class Parse(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<String, ConfigSnapshot, Record> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Record>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(value: String, ctx: ReadOnlyContext, out:
Collector<Record>) {
    val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .keys
      .ifEmpty { initialConfigs.keys }

    val parsed = Record(value)
    if (!validTenantIds.contains(parsed.tenantId)) {
      return
    } else {
      out.collect(parsed)
    }
  }
}

/** given a parsed record, identity which config(s) are interested in it,
and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Pair<Record, Config>>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(
    value: Record,
    ctx: ReadOnlyContext,
    out: Collector<Pair<Record, Config>>
  ) {
    val configsForThisTenant =
ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .ifEmpty { initialConfigs }
      .get(value.tenantId)
      .configs
      .orEmpty()

    val configsInterestedInThisRecord = configsForThisTenant.values.filter
{
      it.interestedIn(value)
    }

    for ((configId, config) in configsInterestedInThisRecord) {
      out.collect(value to config)
    }
  }
}

/** given a pair of Record and Config, run the async operation and send an
enriched record including the result */
class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord>

*Job Pseudocode:*

val initialConfigs: Map<Long, ConfigSnapshot> = ???
val dataSource: DataStream<String> = ???
val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while
(true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a
Map<tenantId: Long, ConfigSnapshot>
val configSnapshotDescriptor = MapStateDescriptor(
  "currentConfigSnapshots",
  Long::class.java,
  ConfigSnapshot::class.java
)

// Broadcast the snapshots
val configBroadcast: BroadcastStream<ConfigSnapshot> =
configSource.broadcast(configSnapshotDescriptor)

val parsed: DataStream<Record> = dataSource
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

// input records can be duplicated now, as there may be multiple Configs
that are interested in a record
val validated: DataStream<Pair<Record, Config>> = parsed
  .connect(configBroadcast)
  .process(ValidateAndDistribute(initialConfigs))

val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait(
  validated,
  Enrich(),
  5L,
  TimeUnit.SECONDS
)





On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Alex,
>
> your approach is completely valid. What you want to achieve is that you
> have a chain between your state managing operator and the consuming async
> operations. In that way, you have no serialization overhead.
>
> To achieve that you want to
> - use Flink 1.11+ [1]
> - make sure that if you have a legacy source, you disableChaining before
> your state managing operator as asyncIO cannot be (transitively) chained to
> legacy sources. So it should be source -> ... -> (forward channel) ->
> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
> - enableObjectReuse [2] to avoid copying of objects
>
> [1] https://issues.apache.org/jira/browse/FLINK-16219
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html
>
> On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <a...@cluonflux.com> wrote:
>
>> Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)
>>
>> -0xe1a
>>
>> On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey Alex,
>>>
>>> I'm not sure if there is a best practice here, but what I can tell you
>>> is that I worked on a job that did exactly what you're suggesting with a
>>> non-async operator to create a [record, config] tuple, which was then
>>> passed to the async stage. Our config objects were also not tiny (~500kb)
>>> and our pipeline not huge (~1M records/day and 1GB data/ day), but this
>>> setup worked quite well. I'd say if latency isn't your most important
>>> metric, or if your pipeline is a similar size, the ease of async IO is
>>> worth it.
>>>
>>> One thing you'll have to look out for (if you haven't already) is
>>> bootstrapping the config objects when the job starts, since the broadcast
>>> from the polling source can happen later than recieving the first record –
>>> we solved this by calling the polling source's service in the `open()`
>>> method of the non-async operator and storing the initial configs in memory.
>>>
>>> Hope that helps a bit,
>>> Austin
>>>
>>> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <a...@cluonflux.com> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I have a somewhat complex Flink job that has a few async stages, and a
>>>> few stateful stages. It currently loads its configuration on startup, and
>>>> doesn't attempt to refresh it.
>>>>
>>>> Now I'm working on dynamic reconfiguration. I've written a polling
>>>> source which sends a configuration snapshot whenever anything has changed,
>>>> I've set up a broadcast of that source, and I'm updating the operators in
>>>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now
>>>> I've reached the first async operator, and I recall that async functions
>>>> aren't allowed to be stateful.
>>>>
>>>> I've tried to find a best practice for this situation, without much
>>>> luck. My best idea so far is to insert a new stage before the async one,
>>>> which would tuple up each record with its corresponding config snapshot
>>>> from the most recent broadcast state. This would increase the amount of
>>>> data that needs to be serialized, and some of the configs are quite large,
>>>> but would allow me to continue using async IO.
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks!
>>>>
>>>> -0xe1a
>>>>
>>>

Reply via email to