I'm wondering why it has to be so complex... Kafka can be configured
to delete items older than 24h in a topic. So if you want to get rid
of records that did not arrive in the last 24h, just configure the
topic accordingly?

On Wed, Feb 1, 2017 at 2:37 PM, Matthias J. Sax <matth...@confluent.io> wrote:
> Understood now.
>
> It's a tricky problem you have, and the only solution I can come up with
> is quite complex -- maybe anybody else has a better idea?
> Honestly, I am not sure if this will work:
>
> For my proposal, the source ID must be part of the key of your records
> to distinguish records from different sources.
>
> You can read all sources as a single KStream. Second, you will need two
> KTables, one additional "side topic", and use IQ in your main() method
> in combination with an additional producer that does write to both input
> topic.
>
> 1. You put all new records into the first (auxiliary) KTable
> 2. You apply all new record to update you second (result) KTable
> 3. You query for result KTable to get all keys store in it
> 4. You query your auxiliary KTable to get all keys stored in it
> 5. You compute the key set of non-updated keys
> 6. You write tombstone messages for those keys into the input topic,
>    using the additional producer (this will delete all non-updated keys
> from you result KTable)
> 7. you write tombstones for all updated keys into your "side" topic,
>    using the additional producer (this will clear you auxiliary KTable
> in order to have it empty for the next batch of record to arrive)
>
> The "side topic" must be a second input topic for your auxiliary KTable.
>
> The dataflow would be like this:
>
> input topic ---+----------> result KTable
>                |
>                +---+
>                    +---> auxiliary KTable
> side topic --------+
>
>
> You need to read both topics as KStream, duplicate the first KStream and
> merge one duplicate with the side topic.
>
> Something like this:
>
> KStream input = builder.stream("source");
> KStream side = builder.stream("side");
> KStream merged = builder.merge(input, side);
>
> KTable result = input.groupByKey().aggregate(...);
> KTable auxiliary = merged.groupedByKey().aggregate(...);
>
>
> For doing the querying step, you need to monitor the apps progress when
> in processes CSV input. If it is done processing, you start your
> querying to compute the key sets for updated and non-updated keys, write
> all tombstones, and than wait for the next batch of regular input to
> arrive, and start over the whole process.
>
> Step 1 to 3 will be covered by you Streams app and this will run
> continuously. Step 4 to 7 is additional user code that you need to put
> outside the streaming part of you app, and trigger after processing a
> batch of CSV inputs finished.
>
>
> Hope this does make sense...
>
>
> -Matthias
>
>
> On 1/31/17 4:53 PM, Eric Dain wrote:
>> Sorry for the confusion, I stopped the example before processing the file
>> from S2.
>>
>> So in day 2, if we get
>> S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z]
>>
>> To elaborate more, A, B and C belong to S1 ( items have field to state
>> their source). Processing files from S1 should never delete or modify items
>> belong to S2.
>>
>> Thanks for the feedback that I should not use Interactive Queries in
>> SourceTask.
>>
>> Currently, I'm representing all CSVs records in one KStream (adding source
>> to each record). But I can represent them as separate KStreams if needed.
>> Are you suggesting windowing these KStreams with 24 hours window and then
>> merging them?
>>
>>
>>
>>
>> On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> What is not clear to me: why do you only need to remove C, but not
>>> D,E,F, too, as source2 does not deliver any data on day 2?
>>>
>>> Furhtermore, IQ is designed to be use outside of you Streams code, and
>>> thus, you should no use it in SourceTask (not sure if this would even be
>>> possible in the first place).
>>>
>>> However, you might be able to exploit joins... Your CSV input is
>>> KStream, right?
>>>
>>>
>>> -Matthias
>>>
>>> On 1/31/17 3:10 PM, Eric Dain wrote:
>>>> Sorry for not being clear. Let me explain by example. Let's say I have
>>> two
>>>> sources S1 and S2. The application that I need to write will load the
>>> files
>>>> from these sources every 24 hours. The results will be KTable K.
>>>>
>>>> For day 1:
>>>> S1=[A, B, C]   =>  the result K = [A,B,C]
>>>>
>>>> S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
>>>>
>>>> For day 2:
>>>>
>>>> S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
>>>> On the other hand, I will process A and B again in case of updates.
>>>>
>>>> In other words, I know how to process existent and new items, I'm not
>>> sure
>>>> how to remove items missing from the latest CSV file.
>>>>
>>>> If I can use Interactive Queries from inside the SourceTask to get a
>>>> snapshot of what currently in K for a specific source S, then I can send
>>>> delete message for the missing items by subtracting latest items in the
>>> CSV
>>>> from the items of that source in K.
>>>>
>>>> Thanks,
>>>>
>>>> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> I am not sure if I understand the complete scenario yet.
>>>>>
>>>>>> I need to delete all items from that source that
>>>>>> doesn't exist in the latest CSV file.
>>>>>
>>>>> Cannot follow here. I thought your CSV files provide the data you want
>>>>> to process. But it seems you also have a second source?
>>>>>
>>>>> How does your Streams app compute the items you want to delete? If you
>>>>> have this items in a KTable, you can access them from outside your
>>>>> application using Interactive Queries.
>>>>>
>>>>> Thus, you can monitor the app progress by observing committed offsets,
>>>>> and if finished, you query your KTable to extract the items you want to
>>>>> delete and do the cleanup.
>>>>>
>>>>> Does this make sense?
>>>>>
>>>>> For Interactive Queries see the docs and blog post:
>>>>>
>>>>> http://docs.confluent.io/current/streams/developer-
>>>>> guide.html#interactive-queries
>>>>>
>>>>> https://www.confluent.io/blog/unifying-stream-processing-
>>>>> and-interactive-queries-in-apache-kafka/
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 1/30/17 9:10 PM, Eric Dain wrote:
>>>>>> Thanks Matthias for your reply.
>>>>>>
>>>>>> I'm not trying to stop the application. I'm importing inventory from
>>> CSV
>>>>>> files coming from 3rd party sources. The CSVs are snapshots for each
>>>>>> source's inventory. I need to delete all items from that source that
>>>>>> doesn't exist in the latest CSV file.
>>>>>>
>>>>>> I was thinking of using "End of Batch" message to initiate that
>>> process.
>>>>> I
>>>>>> might need to do the clean-up as part of the Connect code instead, or
>>>>> there
>>>>>> is a better way of doing that?
>>>>>>
>>>>>> Thanks,
>>>>>> Eric
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <
>>> matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> currently, a Kafka Streams application is designed to "run forever"
>>> and
>>>>>>> there is no notion of "End of Batch" -- we have plans to add this
>>>>>>> though... (cf.
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>>>>>>
>>>>>>> Thus, right now you need to stop your application manually. You would
>>>>>>> need to observe the application's committed offsets (and lag) using
>>>>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID)
>>> to
>>>>>>> monitor the app's progress to see when it is done.
>>>>>>>
>>>>>>> Cf.
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
>>>>> large
>>>>>>>> csv file. I need to run some clean-up code after all records in the
>>>>> file
>>>>>>>> are processed. Is there a way to send "End of Batch" event that is
>>>>>>>> guaranteed to be processed after all records? If not is there
>>>>> alternative
>>>>>>>> solution?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Eric
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Reply via email to