Hi Luke,

Thanks for the detailed response, it sounds like an avenue that I'd like to 
explore a bit further, although forgive me as I'm still quite new to Beam in 
general. I haven't written any stateful DoFns previously but I'd imagine it'd 
look something like this (what you are proposing that is):

```
val pipeline = Pipeline.create(options)

// Users
val users = pipeline.apply("Read Users from Kafka", 
KafkaIO.read<User>(options.usersTopic, options))

// Additional entities omitted for brevity here

pipeline
    .apply("Read Events from Kafka", 
KafkaIO.read<Event>(options.identifiedEventsTopic, options))
    .apply(Window.into(/* Unsure what to put here? */ ))
    .apply("Enrich Event for Users", ParDo.of(
            object: DoFn<KV<String, Event>, KV<String, Event>>(){
                @StateId("user")
                private val state: SomeObjectToStoreStateAboutUsersAndOrEvents()

                @ProcessElement()
                fun processElement(context: ProcessContext, @StateId("user") 
userStateObject: SomeObjectToStoreStateAboutUsersAndOrEvents) {
                    // Evaluate the incoming event
                    
                    // Enrich if we have that user
                    
                    // If enriched then output
                    
                    // Otherwise, store in state
                }
            }
    ))
    .apply("Eventually Write Enriched Events to Kafka", 
KafkaIO.write<Event>(options.enrichedEventsTopic, options))
```

I'm not totally sure on the windowing usage yet or how/when the other streams 
come into play, so any advice there would be useful. Additionally - I have a 
few other questions if you have the time:

- In this particular pipeline, users is a single entity, however I may 
potentially have multiple others. I'm assuming this would just require an 
additional stateful function per entity?
- Some events may contain multiple user instances and thus require to be 
enriched multiple times from a single source. Is this a problem using this 
approach?
- The current plan for this in a production environment would be to rely on 
Flink. I noticed that Flink has "partial" support for handling state, would 
this fall under that supported umbrella?

Thanks so much for the advice Luke, I greatly appreciate it. Sorry for such a 
long winded question, I'm really excited about working with Beam, but the 
learning curve has been pretty steep (coming from an all-Kafka Kafka Streams 
world) thus far.

Rion

On 2020/05/04 16:11:59, Luke Cwik <lc...@google.com> wrote: 
> You can shard the side input based upon some prefix of the key, (e.g first
> byte of the key) into X shards allowing each side input to be smaller for
> runners that don't work well with map/multimap side inputs. You should also
> take a look at the side input patterns[1] since they cover slowly changing
> side inputs and I believe your use case has come up in the past so going
> through these mailing threads[2] might help as well.
> 
> Finally, you could also use a stateful DoFn instead of the side input.
> 
> The stateful DoFn graph would create a single "object" type that contained
> either an "update" message or a "needsEnrichment" message. Something like:
> MainKafkaStream -------> Flatten -> Window.into(???) ->
> StatefulDoFn(EnrichFn) -> ...
> AdditionalInfoStream -/
> 
> You need to ensure that the key you use for the stateful DoFn is the same
> key for both the "update" and "needsEnrichment" message that you would join
> on. This might require multiple enrichment fns if there isn't a single key
> you can join on.
> 
> The Window.into that is before the stateful DoFn would control when
> "events" are allowed to progress. If you want to have them "synchronized"
> on event time then you could use the default event time trigger which would
> mean that update messages wouldn't be allowed to fall behind in processing
> when compared to the needsEnrichment messages (you might want to look into
> @RequiresTimeSortedInput here as well). You could also use an after count 1
> trigger that would allow both streams to go through the EnrichFn without
> one blocking on the other meaning that the needsEnrichment messages might
> get "stale" data.
> 
> Using the stateful DoFn would mean that you would only be
> retrieving/writing as much data that is ever associated with the key that
> you use and would have good parallelism if you have a lot of keys.
> 
> 1: https://beam.apache.org/documentation/patterns/side-inputs/
> 2:
> https://lists.apache.org/list.html?user@beam.apache.org:lte=99M:slowly+changing+side+inputs
> 3:
> https://lists.apache.org/list.html?d...@beam.apache.org:lte=99M:slowly+changing+side+inputs
> 
> 
> On Sun, May 3, 2020 at 1:00 PM Rion Williams <rionmons...@gmail.com> wrote:
> 
> > Hi all,
> >
> > I'm in the process of migrating an existing pipeline over from the Kafka
> > Streams framework to Apache Beam and I'm reaching out in hopes of finding a
> > pattern that could address the use-case I'm currently dealing with.
> >
> > In the most trivial sense, the scenario just involves an incoming message
> > (an event) within the pipeline that may contain multiple different entities
> > (users, endpoints, ip addresses, etc.) and I'm trying to take the
> > identifiers from each of those entities and enrich the event from a source
> > (Kafka) that has more information on them.
> >
> > The workflow might go something like this:
> >  - Read in incoming event containing a single user entity from a Kafka
> > topic
> >  - Perform the equivalent of a LEFT JOIN on the user identifier against
> > another Kafka topic that contains additional user information
> >  - If the join succeeds, mutate the existing user instance on the event
> > with any additional information from the source topic
> >  - Write the "enriched" event to a destination (e.g. Kafka, Elastic, etc.)
> >
> > What would be the best pattern to try and tackle this problem? I know that
> > side inputs are an option, however the source user topic could potentially
> > contain millions of distinct users (with any "new" users being added to
> > that source topic in near-real time, albeit upstream from this process).
> >
> > Any information or recommendations would be greatly appreciated!
> >
> > Thanks,
> >
> > Rion
> >
> >
> 

Reply via email to