Hello, For change capture streams from JDBC that may include deletes, a common suggestion is to represent them as KTables (i.e. a changelog stream) instead of KStreams, since the former stream will override the values with the same key with the newer record in the stream. Then when you are doing the joins, deletion records (usually represented as a {key: null}) will also be propagated to the result KTable as a deletion record as well.
Guozhang On Thu, Apr 20, 2017 at 8:51 AM, Olivia Batistuta < olivia.batist...@gmail.com> wrote: > Hi Guys, > > I just wanted to run this by the group to see if this strategy makes sense. > > I have a Kafka Streams App that is using the JDBC connector to stream in > updates and inserts happening on multiple tables at the RDBMS layer. > > Inside the Streams app, we are performing joins between two or more > KStreams and KTables to get new KTable and KStream pipelines. > > The final messages are pushed to output topics that are eventually pushed > into ElasticSearch permanently. > > I need to add support so that when records are physically deleted from the > RDBMS tables, we can pick up those changes at the Kafka Streams layer and > get rid of the documents linked to this deleted objects from ElasticSearch. > > I also need to make sure that the streams and stores do not have any > records associated with these deleted objects. > > The deletes for each RDBMS table are being captured and INSERTed into RDBMS > audit tables that is used to create another set of streams via the JDBC > Source connector > > The delete streams are being pushed into Kafka Stream and picked up as > KStreams inside the app. > > http://docs.confluent.io/3.2.0/connect/connect-jdbc/docs/index.html > > When the delete event comes in, I have logic that picks up each event via > the Kafka Streams Processor API to remove the documents from ElasticSearch > that are tied to this deleted object. > > http://docs.confluent.io/3.2.0/streams/developer-guide. > html?highlight=processor%20api#applying-processors-and- > transformers-processor-api-integration > > Since we also need to remove all dependent records that were associated > with this deleted parent records, I also have logic that performs a LEFT > JOIN first between the delete stream and the individuals streams and then > setting a delete flag on messages in the results of the LEFT join that have > to be removed. > > Records whose keys are in the DELETED object streams are passed as non-null > values and those are flagged to be deleted in the result from the Value > joiner. > > After the LEFT join result is obtained, I am filtering to exclude records > that have the DELETED flag set on those messages. > > I am basically attempting to perform an ANTI JOIN that can be described > like this using SQL > > SELECT * FROM each stream WHERE id NOT IN (SELECT id FROM > stream_of_deleted_objects) > > Is there a simpler, better way of implementing this? > > Thanks in advance for any feedback. > -- -- Guozhang