I suppose the topic won't be deleted, but this would be a rare enough occurrence that there won't be too many dormant topics hanging around.
Alternatively perhaps I can store the undeleted topics somewhere, and whenever a new node starts, it could check this list and delete them. On Sat, Dec 3, 2016 at 3:23 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Not sure. Would need to think about it more. However, default commit > interval in streams is 30 sec. You can configure is via StreamConfig > COMMIT_INTERVAL_MS. So using the additional thread and waiting for 5 > minutes sounds ok. Question is, what would happen if the JVM goes down > before you delete the topic. > > > -Matthias > > On 12/3/16 2:07 AM, Ali Akhtar wrote: > > Is there a way to make sure the offsets got committed? Perhaps, after the > > last msg has been consumed, I can setup a task to run after a safe time > > (say 5 mins? ) in another thread which would delete the topic? What would > > be a safe time to use? > > > > On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> I guess yes. You might only want to make sure the topic offsets got > >> committed -- not sure if committing offsets of a deleted topic could > >> cause issue (ie, crashing you Streams app) > >> > >> -Matthias > >> > >> On 12/2/16 11:04 PM, Ali Akhtar wrote: > >>> Thank you very much. Last q - Is it safe to do this from within a call > >> back > >>> processing that topic , once it reaches the last message? (It keeps a > >>> count of how many messages processed vs how many remaining) > >>> > >>> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io> > >> wrote: > >>> > >>>> You can use TopicCommand to delete a topic within Java: > >>>> > >>>>> final TopicCommand.TopicCommandOptions commandOptions = new > >>>> TopicCommand.TopicCommandOptions(new String[]{ > >>>>> "--zookeeper", "zookeperHost:2181", > >>>>> "--delete", > >>>>> "--topic", "TOPIC-TO-BE-DELETED"}); > >>>>> TopicCommand.deleteTopic(zkUtils, commandOptions); > >>>> > >>>> So you can delete a topic within your Streams app. > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> On 12/2/16 9:25 PM, Ali Akhtar wrote: > >>>>> Is there a way to delete the processed topics via streams or the java > >>>>> driver? Or only thru the bash script? > >>>>> > >>>>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io> > >>>> wrote: > >>>>> > >>>>>> If you keep old topics that are completely processed, there would be > >>>>>> increasing overhead, because Streams would try to read from those > >> topics > >>>>>> as long as they exist. Thus, more fetch request will be sent to > those > >>>>>> more topics over time, while most fetch request will return without > >> any > >>>>>> new data (as those old topic do not have new data) > >>>>>> > >>>>>> If you delete completely processed topics, there will be no > overhead. > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 12/2/16 3:58 PM, Ali Akhtar wrote: > >>>>>>> Hey Matthias, > >>>>>>> > >>>>>>> So I have a scenario where I need to batch a group of messages > >>>> together. > >>>>>>> > >>>>>>> I'm considering creating a new topic for each batch that arrives, > i.e > >>>>>>> batch_<some_id>. > >>>>>>> > >>>>>>> Each batch_<id> topic will have a finite number of messages, and > then > >>>> it > >>>>>>> will remain empty. Essentially these will be throwaway topics. > >>>>>>> > >>>>>>> Is there any overhead to there being a lot of these topics, and > >> having > >>>> a > >>>>>>> listener for batch_.* , or is this effectively like having one > >> listener > >>>>>> for > >>>>>>> one topic? > >>>>>>> > >>>>>>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax < > >>>> matth...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> 1) There will be once consumer per thread. The number of thread is > >>>>>>>> defined by the number of instances you start and how many threads > >> you > >>>>>>>> configure for each instance via StreamConfig parameter > >>>>>>>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by > >>>>>> yourself. > >>>>>>>> > >>>>>>>> Depending on the number to partitions in your topics, each thread > >> will > >>>>>>>> process one or multiple partitions. As a partition will be > processed > >>>> by > >>>>>>>> exactly one thread, the overall number of partitions over all you > >>>> input > >>>>>>>> topics limits your max number of thread (if you have more threads, > >>>> those > >>>>>>>> will just be idle) > >>>>>>>> > >>>>>>>> 2) Thus, there should be no performance issues. Furthermore, if > you > >>>>>>>> create new topic while you application is running -- and if this > >> might > >>>>>>>> overload you current application -- you can always start new > >> instances > >>>>>>>> an scale-out you application dynamically -- Kafka Streams is fully > >>>>>> elastic. > >>>>>>>> > >>>>>>>> Have a look here for more details: > >>>>>>>> http://docs.confluent.io/current/streams/architecture.html > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> On 12/2/16 6:23 AM, Ali Akhtar wrote: > >>>>>>>>> That's pretty useful to know - thanks. > >>>>>>>>> > >>>>>>>>> 1) If I listened too foo-.*, and there were 5 foo topics created > >>>> after > >>>>>>>>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will > >> this > >>>>>>>> create > >>>>>>>>> 5 consumers / threads / instances, or will it be just 1 instance > >> that > >>>>>>>>> receives the messages for all of those topics? > >>>>>>>>> > >>>>>>>>> 2) Will this cause issues performance issues if i had a lot of > >>>>>> throwaway > >>>>>>>>> foo topics being created, or will this scale? > >>>>>>>>> > >>>>>>>>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian....@gmail.com > > > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Ali, > >>>>>>>>>> > >>>>>>>>>> The only way KafkaStreams will process new topics after start is > >> if > >>>>>> the > >>>>>>>>>> original stream was defined with a regular expression, i.e, > >>>>>>>>>> kafka.stream(Pattern.compile("foo-.*"); > >>>>>>>>>> > >>>>>>>>>> If any new topics are added after start that match the pattern, > >> then > >>>>>>>> they > >>>>>>>>>> will also be consumed. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Damian > >>>>>>>>>> > >>>>>>>>>> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com> > >>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Heya, > >>>>>>>>>>> > >>>>>>>>>>> Normally, you add your topics and their callbacks to a > >>>> StreamBuilder, > >>>>>>>> and > >>>>>>>>>>> then call KafkaStreams.start() to start ingesting those topics. > >>>>>>>>>>> > >>>>>>>>>>> Is it possible to add a new topic to the StreamBuilder, and > start > >>>>>>>>>> ingesting > >>>>>>>>>>> that as well, after KafkaStreams.start() has been called? > >>>>>>>>>>> > >>>>>>>>>>> Thanks. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >