Hi Luke, Thanks for the detailed response, it sounds like an avenue that I'd like to explore a bit further, although forgive me as I'm still quite new to Beam in general. I haven't written any stateful DoFns previously but I'd imagine it'd look something like this (what you are proposing that is):
``` val pipeline = Pipeline.create(options) // Users val users = pipeline.apply("Read Users from Kafka", KafkaIO.read<User>(options.usersTopic, options)) // Additional entities omitted for brevity here pipeline .apply("Read Events from Kafka", KafkaIO.read<Event>(options.identifiedEventsTopic, options)) .apply(Window.into(/* Unsure what to put here? */ )) .apply("Enrich Event for Users", ParDo.of( object: DoFn<KV<String, Event>, KV<String, Event>>(){ @StateId("user") private val state: SomeObjectToStoreStateAboutUsersAndOrEvents() @ProcessElement() fun processElement(context: ProcessContext, @StateId("user") userStateObject: SomeObjectToStoreStateAboutUsersAndOrEvents) { // Evaluate the incoming event // Enrich if we have that user // If enriched then output // Otherwise, store in state } } )) .apply("Eventually Write Enriched Events to Kafka", KafkaIO.write<Event>(options.enrichedEventsTopic, options)) ``` I'm not totally sure on the windowing usage yet or how/when the other streams come into play, so any advice there would be useful. Additionally - I have a few other questions if you have the time: - In this particular pipeline, users is a single entity, however I may potentially have multiple others. I'm assuming this would just require an additional stateful function per entity? - Some events may contain multiple user instances and thus require to be enriched multiple times from a single source. Is this a problem using this approach? - The current plan for this in a production environment would be to rely on Flink. I noticed that Flink has "partial" support for handling state, would this fall under that supported umbrella? Thanks so much for the advice Luke, I greatly appreciate it. Sorry for such a long winded question, I'm really excited about working with Beam, but the learning curve has been pretty steep (coming from an all-Kafka Kafka Streams world) thus far. Rion On 2020/05/04 16:11:59, Luke Cwik <lc...@google.com> wrote: > You can shard the side input based upon some prefix of the key, (e.g first > byte of the key) into X shards allowing each side input to be smaller for > runners that don't work well with map/multimap side inputs. You should also > take a look at the side input patterns[1] since they cover slowly changing > side inputs and I believe your use case has come up in the past so going > through these mailing threads[2] might help as well. > > Finally, you could also use a stateful DoFn instead of the side input. > > The stateful DoFn graph would create a single "object" type that contained > either an "update" message or a "needsEnrichment" message. Something like: > MainKafkaStream -------> Flatten -> Window.into(???) -> > StatefulDoFn(EnrichFn) -> ... > AdditionalInfoStream -/ > > You need to ensure that the key you use for the stateful DoFn is the same > key for both the "update" and "needsEnrichment" message that you would join > on. This might require multiple enrichment fns if there isn't a single key > you can join on. > > The Window.into that is before the stateful DoFn would control when > "events" are allowed to progress. If you want to have them "synchronized" > on event time then you could use the default event time trigger which would > mean that update messages wouldn't be allowed to fall behind in processing > when compared to the needsEnrichment messages (you might want to look into > @RequiresTimeSortedInput here as well). You could also use an after count 1 > trigger that would allow both streams to go through the EnrichFn without > one blocking on the other meaning that the needsEnrichment messages might > get "stale" data. > > Using the stateful DoFn would mean that you would only be > retrieving/writing as much data that is ever associated with the key that > you use and would have good parallelism if you have a lot of keys. > > 1: https://beam.apache.org/documentation/patterns/side-inputs/ > 2: > https://lists.apache.org/list.html?user@beam.apache.org:lte=99M:slowly+changing+side+inputs > 3: > https://lists.apache.org/list.html?d...@beam.apache.org:lte=99M:slowly+changing+side+inputs > > > On Sun, May 3, 2020 at 1:00 PM Rion Williams <rionmons...@gmail.com> wrote: > > > Hi all, > > > > I'm in the process of migrating an existing pipeline over from the Kafka > > Streams framework to Apache Beam and I'm reaching out in hopes of finding a > > pattern that could address the use-case I'm currently dealing with. > > > > In the most trivial sense, the scenario just involves an incoming message > > (an event) within the pipeline that may contain multiple different entities > > (users, endpoints, ip addresses, etc.) and I'm trying to take the > > identifiers from each of those entities and enrich the event from a source > > (Kafka) that has more information on them. > > > > The workflow might go something like this: > > - Read in incoming event containing a single user entity from a Kafka > > topic > > - Perform the equivalent of a LEFT JOIN on the user identifier against > > another Kafka topic that contains additional user information > > - If the join succeeds, mutate the existing user instance on the event > > with any additional information from the source topic > > - Write the "enriched" event to a destination (e.g. Kafka, Elastic, etc.) > > > > What would be the best pattern to try and tackle this problem? I know that > > side inputs are an option, however the source user topic could potentially > > contain millions of distinct users (with any "new" users being added to > > that source topic in near-real time, albeit upstream from this process). > > > > Any information or recommendations would be greatly appreciated! > > > > Thanks, > > > > Rion > > > > >