Hi,

I am trying to find the best pattern to solve a specific problem using
Kafka streaming.  All of our current processing uses the Kafka streaming
API (using multiple joins, windows, repartitions etc) so I already think I
have a decent grasp of the fundamentals.

We have 2 streams of events:
- primary events (P), which indicate some key event in the system and carry
a large amount of data
- secondary events (S), which should *always* occur as a follow-on to the
primary event and only contain a reference to the single associated primary
event.

I want to join secondary events to primary events (the simple part) BUT I
also want to find out when secondary events have been *unable* to be joined.
A secondary is unable to be joined:
- when primary event delivery has been delayed (so that secondary events
are received before the associated primary event)
- when primary events go missing (the event collection system is noisy, so
we do lose a small bu significant number of primary events)
- due to coding errors in the collectors, where an incorrect reference has
been inserted into the secondary event

Currently this functionality is implemented using a database:
- primary events are inserted into the database and then secondary events
lookup the primary by-reference.  If the primary is found the secondary is
sent to a "JOINED" topic.
- if the primary is not found, the secondary event is buffered in the
database until the primary is received and then joined+emitted (and the
secondary event is removed from the DB)
- after some arbitrary time period, the database is queried for outstanding
not-joined secondary events and they are emitted to an "UNJOINED" topic.
This allows alerting on unmatched secondary events to drive quality
measures, and allows offline analysis (to understand why)

Some options:
1. Implement the same strategy as existing except using Kafka state stores
instead of the DB.  With this approach I am concerned about atomic
correctness - i.e. that state in the Kafka store can be managed so that the
event is never sent to both JOINED and UNJOINED.


2. Continually emit key-values to a "PENDING" topic for the secondary join.
An example sequence could be something like ...(where primary events = P,
secondary events = S) :
    a) receive S with no matching P => emit {S, false}
    b) receive matching P for S => emit {S, null} (to effectively delete it
from the topic)
    c) receive S with matching P => do not emit anything

Now the problem becomes more like building a time-window of events from
PENDING, to eventually emit the events to UNJOINED.  I am also uncertain as
how to ensure events can never end up in both JOINED and UNJOINED.

My apologies for the wall of text .. I find it a difficult problem to
explain. 😏


Is there some pattern I am missing that will help solve this problem?
Any help / other suggestions would be appreciated.

Thanks,
Ross

Reply via email to