Hello, everybody,

I found a solution myself, here's it in case somebody is interested:
Idea

The trick is to combine the two streams using the beam.Flatten operation
and to use a *Stateful DoFn* to compute the number of pageviews before one
request. Each stream contains json dictionaries. I embedded them by
using {'request'
: request} and {'pageview' : pageview} as a surrounding block, so that I
can keep the different events apart in the *Stateful DoFn*. I also computed
things like first pageview timestamp and seconds since first pageview
along. The streams have to use the session_id as a key, such that the *Stateful
DoFn* is receiving all the events of one session only.
Code

First of all, this is the pipeline code:

# Beam pipeline, that extends requests by number of pageviews before
request in that sessionwith beam.Pipeline(options=options) as p:
    # The stream of requests
    requests = (
          'Read from PubSub subscription'   >>
beam.io.ReadFromPubSub(subscription=request_sub)
        | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
        | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
        | 'Use Session ID as stream key'    >> beam.Map(lambda
request: (request['session_id'], request))
        | 'Add type of event'               >> beam.Map(lambda r:
(r[0], ('request', r[1])))
    )

    # The stream of pageviews
    pageviews = (
          'Read from PubSub subscription'   >>
beam.io.ReadFromPubSub(subscription=pageview_sub)
        | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
        | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
        | 'Use Session ID as stream key'    >> beam.Map(lambda
pageview: (pageview['session_id'], pageview))
        | 'Add type of event'               >> beam.Map(lambda p:
(p[0], ('pageview', p[1])))
    )

    # Combine the streams and apply Stateful DoFn
    combined = (
        (
            p | ('Prepare requests stream' >> requests),
            p | ('Prepare pageviews stream' >> pageviews)
        )
        | 'Combine event streams'       >> beam.Flatten()
        | 'Global Window'               >>
beam.WindowInto(windowfn=window.GlobalWindows(),

trigger=trigger.AfterCount(1),

accumulation_mode=trigger.AccumulationMode.DISCARDING)
        | 'Stateful DoFn'               >> beam.ParDo(CountPageviews())
        | 'Compute processing delay'    >> beam.ParDo(LogTimeDelay())
        | 'Format for BigQuery output'  >> beam.ParDo(FormatForOutputDoFn())
    )

    # Write to BigQuery.
    combined | 'Write' >> beam.io.WriteToBigQuery(
        requests_extended_table,
        schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

The interesting part is the combination of the two streams using
beam.Flatten and applying the stateful DoFn CountPageviews()

Here's the code of the used custom DoFns:

# This DoFn just loads a json messageclass ExtractJSON(beam.DoFn):
  def process(self, element):
    import json

    yield json.loads(element)
# This DoFn adds the event timestamp of messages into it json elements
for further processingclass AssignTimestampFn(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    import datetime

    timestamped_element = element
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
    timestamped_element['timestamp_utc'] = timestamp_utc
    timestamped_element['timestamp'] = timestamp
    yield timestamped_element
# This class is a stateful dofn# Input elements should be of form
(session_id, {'event_type' : event}# Where events can be requests or
pageviews# It computes on a per session basis the number of pageviews
and the first pageview timestamp# in its internal state# Whenever a
request comes in, it appends the internal state to the request and
emits# a extended request# Whenever a pageview comes in, the internal
state is updated but nothing is emittedclass
CountPageviewsStateful(beam.DoFn):
  # The internal states
  NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews',
combine_fn=sum)
  FIRST_PAGEVIEW =
userstate.ReadModifyWriteStateSpec('first_pageview',
coder=beam.coders.VarIntCoder())

  def process(self,
              element,
              num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
              first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
              ):
    import datetime

    # Extract element
    session_id = element[0]
    event_type, event = element[1]

    # Process different event types
    # Depending on event type, different actions are done
    if event_type == 'request':
        # This is a request
        request = event

        # First, the first pageview timestamp is extracted and the
seconds since first timestamp are calculated
        first_pageview = first_pageview_state.read()
        if first_pageview is not None:
            seconds_since_first_pageview =
(int(request['timestamp_utc'].timestamp()) - first_pageview)

            first_pageview_timestamp_utc =
datetime.datetime.utcfromtimestamp(float(first_pageview))
            first_pageview_timestamp =
first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
        else:
            seconds_since_first_pageview = -1
            first_pageview_timestamp = None

        # The calculated data is appended to the request
        request['num_pageviews'] = num_pageviews_state.read()
        request['first_pageview_timestamp'] = first_pageview_timestamp
        request['seconds_since_first_pageview'] = seconds_since_first_pageview

        # The pageview counter is reset
        num_pageviews_state.clear()

        # The request is returned
        yield (session_id, request)
    elif event_type == 'pageview':
        # This is a pageview
        pageview = event

        # Update first pageview state
        first_pageview = first_pageview_state.read()
        if first_pageview is None:
            
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
        elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
            
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))

        # Increase number of pageviews
        num_pageviews_state.add(1)

        # Do not return anything, pageviews are not further processed
# This DoFn logs the delay between the event time and the processing
timeclass LogTimeDelay(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    import datetime
    import logging

    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()

    logging.warning('Delayed by %s seconds', seconds_delay)

    yield element

This seems to work and gives me an average delay of about 1-2 seconds on
the direct runner. On Cloud Dataflow the average delay is about 0.5-1
seconds. So all in all, this seems to solve the problem definition.
Further considerations

There are some open questions, though:

   - I am using global windows, which means internal state will be kept
   forever as far as i am concerned. Maybe session windows are the correct way
   to go: When there are no pageviews/requests for x seconds, the window is
   closed and internal state is given free.
   - Processing delay is a little bit high, but maybe I need to tweak the
   pubsub part a little bit.
   - I do not know how much overhead or memory consumption this solution
   adds over standard beam methods. I also didn't test high workload and
   parallelisation.


Any input is welcome!

Kind regards,
Hendrik Gruß

Am Fr., 8. Jan. 2021 um 14:56 Uhr schrieb Gruß, Hendrik <h...@pixum.com>:

> Hi everybody,
>
> I would like to hear your thoughts on which technique would be used in
> Apache Beam for the following problem:
>
> *Problem definition*:
>
> I have two streams of data, one with pageviews of users, and another with
> requests of the users. They share the key session_id which describes the
> users session, but each have other additional data.
>
> The goal is to append the number of pageviews in a session to the requests
> of that session. That means, I want to have a stream of data that has every
> request together with the number of pageviews before the request. It
> suffices to have the pageviews of lets say the last 5 minutes, and it is
> not important to have all the pageviews, if there is late data. There
> should only be low latency on receiving the requests.
>
> What would be the appropriate technique? Side inputs? CoGroupByKey? Here
> are my first attempts:
> https://stackoverflow.com/questions/65625961/windowed-joins-in-apache-beam
> Kind regards,
> Hendrik Gruß
>
> ----
> Hendrik Gruß
> Data Engineer
> Diginet Gmbh & Co. KG
>


-- 
Hendrik Gruß
Data Engineer

-- 


Pixum und artboxONE sind geschützte Marken der Diginet GmbH & Co. KG - 
Industriestr.161 - 50999 Köln

Fon: +49 (2236) 886-0 - Fax: +49 (2236) 88 
66 99 Sitz Köln, HRA 25531, Umsatzsteuer-ID: DE-209867661,

Komplementärin: 
Diginet Management GmbH, Sitz Köln, HRB 69766, Geschäftsführer: Daniel 
Attallah, Oliver Thomsen

----------------------------------------------------

Pixum hat die beste 
Bildqualität. Ausgezeichnet von der Stiftung Warentest -  Jetzt mehr 
erfahren: www.pixum.de/testsiege <http://www.pixum.de/testsiege>

----------------------------------------------------

Außergewöhnliche 
Kunstwerke - modern und bezahlbar: Jetzt artboxONE entdecken:  
<http://www.artboxone.de/>www.artboxone.de <http://www.artboxone.de>

 

Reply via email to