I'm wondering why it has to be so complex... Kafka can be configured to delete items older than 24h in a topic. So if you want to get rid of records that did not arrive in the last 24h, just configure the topic accordingly?
On Wed, Feb 1, 2017 at 2:37 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Understood now. > > It's a tricky problem you have, and the only solution I can come up with > is quite complex -- maybe anybody else has a better idea? > Honestly, I am not sure if this will work: > > For my proposal, the source ID must be part of the key of your records > to distinguish records from different sources. > > You can read all sources as a single KStream. Second, you will need two > KTables, one additional "side topic", and use IQ in your main() method > in combination with an additional producer that does write to both input > topic. > > 1. You put all new records into the first (auxiliary) KTable > 2. You apply all new record to update you second (result) KTable > 3. You query for result KTable to get all keys store in it > 4. You query your auxiliary KTable to get all keys stored in it > 5. You compute the key set of non-updated keys > 6. You write tombstone messages for those keys into the input topic, > using the additional producer (this will delete all non-updated keys > from you result KTable) > 7. you write tombstones for all updated keys into your "side" topic, > using the additional producer (this will clear you auxiliary KTable > in order to have it empty for the next batch of record to arrive) > > The "side topic" must be a second input topic for your auxiliary KTable. > > The dataflow would be like this: > > input topic ---+----------> result KTable > | > +---+ > +---> auxiliary KTable > side topic --------+ > > > You need to read both topics as KStream, duplicate the first KStream and > merge one duplicate with the side topic. > > Something like this: > > KStream input = builder.stream("source"); > KStream side = builder.stream("side"); > KStream merged = builder.merge(input, side); > > KTable result = input.groupByKey().aggregate(...); > KTable auxiliary = merged.groupedByKey().aggregate(...); > > > For doing the querying step, you need to monitor the apps progress when > in processes CSV input. If it is done processing, you start your > querying to compute the key sets for updated and non-updated keys, write > all tombstones, and than wait for the next batch of regular input to > arrive, and start over the whole process. > > Step 1 to 3 will be covered by you Streams app and this will run > continuously. Step 4 to 7 is additional user code that you need to put > outside the streaming part of you app, and trigger after processing a > batch of CSV inputs finished. > > > Hope this does make sense... > > > -Matthias > > > On 1/31/17 4:53 PM, Eric Dain wrote: >> Sorry for the confusion, I stopped the example before processing the file >> from S2. >> >> So in day 2, if we get >> S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z] >> >> To elaborate more, A, B and C belong to S1 ( items have field to state >> their source). Processing files from S1 should never delete or modify items >> belong to S2. >> >> Thanks for the feedback that I should not use Interactive Queries in >> SourceTask. >> >> Currently, I'm representing all CSVs records in one KStream (adding source >> to each record). But I can represent them as separate KStreams if needed. >> Are you suggesting windowing these KStreams with 24 hours window and then >> merging them? >> >> >> >> >> On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Thanks for the update. >>> >>> What is not clear to me: why do you only need to remove C, but not >>> D,E,F, too, as source2 does not deliver any data on day 2? >>> >>> Furhtermore, IQ is designed to be use outside of you Streams code, and >>> thus, you should no use it in SourceTask (not sure if this would even be >>> possible in the first place). >>> >>> However, you might be able to exploit joins... Your CSV input is >>> KStream, right? >>> >>> >>> -Matthias >>> >>> On 1/31/17 3:10 PM, Eric Dain wrote: >>>> Sorry for not being clear. Let me explain by example. Let's say I have >>> two >>>> sources S1 and S2. The application that I need to write will load the >>> files >>>> from these sources every 24 hours. The results will be KTable K. >>>> >>>> For day 1: >>>> S1=[A, B, C] => the result K = [A,B,C] >>>> >>>> S2=[D,E,F] => K will be [A,B,C,D,E,F] >>>> >>>> For day 2: >>>> >>>> S1=[A,B] because C is missing I have to remove it from K; K= [A,B,D,E,F] >>>> On the other hand, I will process A and B again in case of updates. >>>> >>>> In other words, I know how to process existent and new items, I'm not >>> sure >>>> how to remove items missing from the latest CSV file. >>>> >>>> If I can use Interactive Queries from inside the SourceTask to get a >>>> snapshot of what currently in K for a specific source S, then I can send >>>> delete message for the missing items by subtracting latest items in the >>> CSV >>>> from the items of that source in K. >>>> >>>> Thanks, >>>> >>>> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> I am not sure if I understand the complete scenario yet. >>>>> >>>>>> I need to delete all items from that source that >>>>>> doesn't exist in the latest CSV file. >>>>> >>>>> Cannot follow here. I thought your CSV files provide the data you want >>>>> to process. But it seems you also have a second source? >>>>> >>>>> How does your Streams app compute the items you want to delete? If you >>>>> have this items in a KTable, you can access them from outside your >>>>> application using Interactive Queries. >>>>> >>>>> Thus, you can monitor the app progress by observing committed offsets, >>>>> and if finished, you query your KTable to extract the items you want to >>>>> delete and do the cleanup. >>>>> >>>>> Does this make sense? >>>>> >>>>> For Interactive Queries see the docs and blog post: >>>>> >>>>> http://docs.confluent.io/current/streams/developer- >>>>> guide.html#interactive-queries >>>>> >>>>> https://www.confluent.io/blog/unifying-stream-processing- >>>>> and-interactive-queries-in-apache-kafka/ >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 1/30/17 9:10 PM, Eric Dain wrote: >>>>>> Thanks Matthias for your reply. >>>>>> >>>>>> I'm not trying to stop the application. I'm importing inventory from >>> CSV >>>>>> files coming from 3rd party sources. The CSVs are snapshots for each >>>>>> source's inventory. I need to delete all items from that source that >>>>>> doesn't exist in the latest CSV file. >>>>>> >>>>>> I was thinking of using "End of Batch" message to initiate that >>> process. >>>>> I >>>>>> might need to do the clean-up as part of the Connect code instead, or >>>>> there >>>>>> is a better way of doing that? >>>>>> >>>>>> Thanks, >>>>>> Eric >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax < >>> matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> currently, a Kafka Streams application is designed to "run forever" >>> and >>>>>>> there is no notion of "End of Batch" -- we have plans to add this >>>>>>> though... (cf. >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams) >>>>>>> >>>>>>> Thus, right now you need to stop your application manually. You would >>>>>>> need to observe the application's committed offsets (and lag) using >>>>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID) >>> to >>>>>>> monitor the app's progress to see when it is done. >>>>>>> >>>>>>> Cf. >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ >>>>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 1/28/17 1:07 PM, Eric Dain wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest >>>>> large >>>>>>>> csv file. I need to run some clean-up code after all records in the >>>>> file >>>>>>>> are processed. Is there a way to send "End of Batch" event that is >>>>>>>> guaranteed to be processed after all records? If not is there >>>>> alternative >>>>>>>> solution? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Eric >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> > -- Gwen Shapira Product Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog