Hello, everybody, I found a solution myself, here's it in case somebody is interested: Idea
The trick is to combine the two streams using the beam.Flatten operation and to use a *Stateful DoFn* to compute the number of pageviews before one request. Each stream contains json dictionaries. I embedded them by using {'request' : request} and {'pageview' : pageview} as a surrounding block, so that I can keep the different events apart in the *Stateful DoFn*. I also computed things like first pageview timestamp and seconds since first pageview along. The streams have to use the session_id as a key, such that the *Stateful DoFn* is receiving all the events of one session only. Code First of all, this is the pipeline code: # Beam pipeline, that extends requests by number of pageviews before request in that sessionwith beam.Pipeline(options=options) as p: # The stream of requests requests = ( 'Read from PubSub subscription' >> beam.io.ReadFromPubSub(subscription=request_sub) | 'Extract JSON' >> beam.ParDo(ExtractJSON()) | 'Add Timestamp' >> beam.ParDo(AssignTimestampFn()) | 'Use Session ID as stream key' >> beam.Map(lambda request: (request['session_id'], request)) | 'Add type of event' >> beam.Map(lambda r: (r[0], ('request', r[1]))) ) # The stream of pageviews pageviews = ( 'Read from PubSub subscription' >> beam.io.ReadFromPubSub(subscription=pageview_sub) | 'Extract JSON' >> beam.ParDo(ExtractJSON()) | 'Add Timestamp' >> beam.ParDo(AssignTimestampFn()) | 'Use Session ID as stream key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview)) | 'Add type of event' >> beam.Map(lambda p: (p[0], ('pageview', p[1]))) ) # Combine the streams and apply Stateful DoFn combined = ( ( p | ('Prepare requests stream' >> requests), p | ('Prepare pageviews stream' >> pageviews) ) | 'Combine event streams' >> beam.Flatten() | 'Global Window' >> beam.WindowInto(windowfn=window.GlobalWindows(), trigger=trigger.AfterCount(1), accumulation_mode=trigger.AccumulationMode.DISCARDING) | 'Stateful DoFn' >> beam.ParDo(CountPageviews()) | 'Compute processing delay' >> beam.ParDo(LogTimeDelay()) | 'Format for BigQuery output' >> beam.ParDo(FormatForOutputDoFn()) ) # Write to BigQuery. combined | 'Write' >> beam.io.WriteToBigQuery( requests_extended_table, schema=REQUESTS_EXTENDED_TABLE_SCHEMA, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) The interesting part is the combination of the two streams using beam.Flatten and applying the stateful DoFn CountPageviews() Here's the code of the used custom DoFns: # This DoFn just loads a json messageclass ExtractJSON(beam.DoFn): def process(self, element): import json yield json.loads(element) # This DoFn adds the event timestamp of messages into it json elements for further processingclass AssignTimestampFn(beam.DoFn): def process(self, element, timestamp=beam.DoFn.TimestampParam): import datetime timestamped_element = element timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp)) timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S") timestamped_element['timestamp_utc'] = timestamp_utc timestamped_element['timestamp'] = timestamp yield timestamped_element # This class is a stateful dofn# Input elements should be of form (session_id, {'event_type' : event}# Where events can be requests or pageviews# It computes on a per session basis the number of pageviews and the first pageview timestamp# in its internal state# Whenever a request comes in, it appends the internal state to the request and emits# a extended request# Whenever a pageview comes in, the internal state is updated but nothing is emittedclass CountPageviewsStateful(beam.DoFn): # The internal states NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews', combine_fn=sum) FIRST_PAGEVIEW = userstate.ReadModifyWriteStateSpec('first_pageview', coder=beam.coders.VarIntCoder()) def process(self, element, num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS), first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW) ): import datetime # Extract element session_id = element[0] event_type, event = element[1] # Process different event types # Depending on event type, different actions are done if event_type == 'request': # This is a request request = event # First, the first pageview timestamp is extracted and the seconds since first timestamp are calculated first_pageview = first_pageview_state.read() if first_pageview is not None: seconds_since_first_pageview = (int(request['timestamp_utc'].timestamp()) - first_pageview) first_pageview_timestamp_utc = datetime.datetime.utcfromtimestamp(float(first_pageview)) first_pageview_timestamp = first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S") else: seconds_since_first_pageview = -1 first_pageview_timestamp = None # The calculated data is appended to the request request['num_pageviews'] = num_pageviews_state.read() request['first_pageview_timestamp'] = first_pageview_timestamp request['seconds_since_first_pageview'] = seconds_since_first_pageview # The pageview counter is reset num_pageviews_state.clear() # The request is returned yield (session_id, request) elif event_type == 'pageview': # This is a pageview pageview = event # Update first pageview state first_pageview = first_pageview_state.read() if first_pageview is None: first_pageview_state.write(int(pageview['timestamp_utc'].timestamp())) elif first_pageview > int(pageview['timestamp_utc'].timestamp()): first_pageview_state.write(int(pageview['timestamp_utc'].timestamp())) # Increase number of pageviews num_pageviews_state.add(1) # Do not return anything, pageviews are not further processed # This DoFn logs the delay between the event time and the processing timeclass LogTimeDelay(beam.DoFn): def process(self, element, timestamp=beam.DoFn.TimestampParam): import datetime import logging timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp)) seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds() logging.warning('Delayed by %s seconds', seconds_delay) yield element This seems to work and gives me an average delay of about 1-2 seconds on the direct runner. On Cloud Dataflow the average delay is about 0.5-1 seconds. So all in all, this seems to solve the problem definition. Further considerations There are some open questions, though: - I am using global windows, which means internal state will be kept forever as far as i am concerned. Maybe session windows are the correct way to go: When there are no pageviews/requests for x seconds, the window is closed and internal state is given free. - Processing delay is a little bit high, but maybe I need to tweak the pubsub part a little bit. - I do not know how much overhead or memory consumption this solution adds over standard beam methods. I also didn't test high workload and parallelisation. Any input is welcome! Kind regards, Hendrik Gruß Am Fr., 8. Jan. 2021 um 14:56 Uhr schrieb Gruß, Hendrik <h...@pixum.com>: > Hi everybody, > > I would like to hear your thoughts on which technique would be used in > Apache Beam for the following problem: > > *Problem definition*: > > I have two streams of data, one with pageviews of users, and another with > requests of the users. They share the key session_id which describes the > users session, but each have other additional data. > > The goal is to append the number of pageviews in a session to the requests > of that session. That means, I want to have a stream of data that has every > request together with the number of pageviews before the request. It > suffices to have the pageviews of lets say the last 5 minutes, and it is > not important to have all the pageviews, if there is late data. There > should only be low latency on receiving the requests. > > What would be the appropriate technique? Side inputs? CoGroupByKey? Here > are my first attempts: > https://stackoverflow.com/questions/65625961/windowed-joins-in-apache-beam > Kind regards, > Hendrik Gruß > > ---- > Hendrik Gruß > Data Engineer > Diginet Gmbh & Co. KG > -- Hendrik Gruß Data Engineer -- Pixum und artboxONE sind geschützte Marken der Diginet GmbH & Co. KG - Industriestr.161 - 50999 Köln Fon: +49 (2236) 886-0 - Fax: +49 (2236) 88 66 99 Sitz Köln, HRA 25531, Umsatzsteuer-ID: DE-209867661, Komplementärin: Diginet Management GmbH, Sitz Köln, HRB 69766, Geschäftsführer: Daniel Attallah, Oliver Thomsen ---------------------------------------------------- Pixum hat die beste Bildqualität. Ausgezeichnet von der Stiftung Warentest - Jetzt mehr erfahren: www.pixum.de/testsiege <http://www.pixum.de/testsiege> ---------------------------------------------------- Außergewöhnliche Kunstwerke - modern und bezahlbar: Jetzt artboxONE entdecken: <http://www.artboxone.de/>www.artboxone.de <http://www.artboxone.de>