On Mon, May 4, 2020 at 3:15 PM Rion Williams <rionmons...@gmail.com> wrote:
> No problem, I totally understand and can relate. As mentioned I think you > pointed me in the right direction and it makes sense on paper to me, I’m > just not sure how to connect some of the dots syntactically. > > Greatly appreciate it! > > On May 4, 2020, at 5:11 PM, Luke Cwik <lc...@google.com> wrote: > > > I have a lot of work stuff going on so I'll try to provide a response but > it might take days. Also, if you find an answer to one of your questions or > have additional questions while you investigate, feel free to update this > thread. > > On Mon, May 4, 2020 at 2:58 PM Rion Williams <rionmons...@gmail.com> > wrote: > >> Hi Luke, >> >> Thanks for the detailed response, it sounds like an avenue that I'd like >> to explore a bit further, although forgive me as I'm still quite new to >> Beam in general. I haven't written any stateful DoFns previously but I'd >> imagine it'd look something like this (what you are proposing that is): >> >> ``` >> val pipeline = Pipeline.create(options) >> >> // Users >> val users = pipeline.apply("Read Users from Kafka", >> KafkaIO.read<User>(options.usersTopic, options)) >> >> // Additional entities omitted for brevity here >> >> pipeline >> .apply("Read Events from Kafka", >> KafkaIO.read<Event>(options.identifiedEventsTopic, options)) >> .apply(Window.into(/* Unsure what to put here? */ )) >> .apply("Enrich Event for Users", ParDo.of( >> object: DoFn<KV<String, Event>, KV<String, Event>>(){ >> @StateId("user") >> private val state: >> SomeObjectToStoreStateAboutUsersAndOrEvents() >> >> @ProcessElement() >> fun processElement(context: ProcessContext, >> @StateId("user") userStateObject: >> SomeObjectToStoreStateAboutUsersAndOrEvents) { >> // Evaluate the incoming event >> >> // Enrich if we have that user >> >> // If enriched then output >> >> // Otherwise, store in state >> } >> } >> )) >> .apply("Eventually Write Enriched Events to Kafka", >> KafkaIO.write<Event>(options.enrichedEventsTopic, options)) >> ``` >> >> I'm not totally sure on the windowing usage yet or how/when the other >> streams come into play, so any advice there would be useful. Additionally - >> I have a few other questions if you have the time: >> >> - In this particular pipeline, users is a single entity, however I may >> potentially have multiple others. I'm assuming this would just require an >> additional stateful function per entity? >> > Yes. > - Some events may contain multiple user instances and thus require to be >> enriched multiple times from a single source. Is this a problem using this >> approach? >> > No. You would need to split your event into multiple copies of it having each be enriched using the userId as the primary key and then you would rekey to something that is unique for the event such as an eventId (or a UUID that you decide upfront before splitting into multiple events) and go through another stateful DoFn that will buffer the enriched events per user till you have them all (e.g. based on number of users referenced in the event) and then perform a join of all the enriched data. You could do this style of join across multiple types of enrichment as long as you know exactly how many you need to wait for before producing your final enriched event. > - The current plan for this in a production environment would be to rely >> on Flink. I noticed that Flink has "partial" support for handling state, >> would this fall under that supported umbrella? >> > To my knowledge, the partial support is related to a limitation with timers and not related to user state which is all your pipeline seems to use. > >> Thanks so much for the advice Luke, I greatly appreciate it. Sorry for >> such a long winded question, I'm really excited about working with Beam, >> but the learning curve has been pretty steep (coming from an all-Kafka >> Kafka Streams world) thus far. >> >> Rion >> >> On 2020/05/04 16:11:59, Luke Cwik <lc...@google.com> wrote: >> > You can shard the side input based upon some prefix of the key, (e.g >> first >> > byte of the key) into X shards allowing each side input to be smaller >> for >> > runners that don't work well with map/multimap side inputs. You should >> also >> > take a look at the side input patterns[1] since they cover slowly >> changing >> > side inputs and I believe your use case has come up in the past so going >> > through these mailing threads[2] might help as well. >> > >> > Finally, you could also use a stateful DoFn instead of the side input. >> > >> > The stateful DoFn graph would create a single "object" type that >> contained >> > either an "update" message or a "needsEnrichment" message. Something >> like: >> > MainKafkaStream -------> Flatten -> Window.into(???) -> >> > StatefulDoFn(EnrichFn) -> ... >> > AdditionalInfoStream -/ >> > >> > You need to ensure that the key you use for the stateful DoFn is the >> same >> > key for both the "update" and "needsEnrichment" message that you would >> join >> > on. This might require multiple enrichment fns if there isn't a single >> key >> > you can join on. >> > >> > The Window.into that is before the stateful DoFn would control when >> > "events" are allowed to progress. If you want to have them >> "synchronized" >> > on event time then you could use the default event time trigger which >> would >> > mean that update messages wouldn't be allowed to fall behind in >> processing >> > when compared to the needsEnrichment messages (you might want to look >> into >> > @RequiresTimeSortedInput here as well). You could also use an after >> count 1 >> > trigger that would allow both streams to go through the EnrichFn without >> > one blocking on the other meaning that the needsEnrichment messages >> might >> > get "stale" data. >> > >> > Using the stateful DoFn would mean that you would only be >> > retrieving/writing as much data that is ever associated with the key >> that >> > you use and would have good parallelism if you have a lot of keys. >> > >> > 1: https://beam.apache.org/documentation/patterns/side-inputs/ >> > 2: >> > >> https://lists.apache.org/list.html?user@beam.apache.org:lte=99M:slowly+changing+side+inputs >> > 3: >> > >> https://lists.apache.org/list.html?d...@beam.apache.org:lte=99M:slowly+changing+side+inputs >> > >> > >> > On Sun, May 3, 2020 at 1:00 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> > >> > > Hi all, >> > > >> > > I'm in the process of migrating an existing pipeline over from the >> Kafka >> > > Streams framework to Apache Beam and I'm reaching out in hopes of >> finding a >> > > pattern that could address the use-case I'm currently dealing with. >> > > >> > > In the most trivial sense, the scenario just involves an incoming >> message >> > > (an event) within the pipeline that may contain multiple different >> entities >> > > (users, endpoints, ip addresses, etc.) and I'm trying to take the >> > > identifiers from each of those entities and enrich the event from a >> source >> > > (Kafka) that has more information on them. >> > > >> > > The workflow might go something like this: >> > > - Read in incoming event containing a single user entity from a Kafka >> > > topic >> > > - Perform the equivalent of a LEFT JOIN on the user identifier >> against >> > > another Kafka topic that contains additional user information >> > > - If the join succeeds, mutate the existing user instance on the >> event >> > > with any additional information from the source topic >> > > - Write the "enriched" event to a destination (e.g. Kafka, Elastic, >> etc.) >> > > >> > > What would be the best pattern to try and tackle this problem? I know >> that >> > > side inputs are an option, however the source user topic could >> potentially >> > > contain millions of distinct users (with any "new" users being added >> to >> > > that source topic in near-real time, albeit upstream from this >> process). >> > > >> > > Any information or recommendations would be greatly appreciated! >> > > >> > > Thanks, >> > > >> > > Rion >> > > >> > > >> > >> >