Is there a way to make sure the offsets got committed? Perhaps, after the last msg has been consumed, I can setup a task to run after a safe time (say 5 mins? ) in another thread which would delete the topic? What would be a safe time to use?
On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax <matth...@confluent.io> wrote: > I guess yes. You might only want to make sure the topic offsets got > committed -- not sure if committing offsets of a deleted topic could > cause issue (ie, crashing you Streams app) > > -Matthias > > On 12/2/16 11:04 PM, Ali Akhtar wrote: > > Thank you very much. Last q - Is it safe to do this from within a call > back > > processing that topic , once it reaches the last message? (It keeps a > > count of how many messages processed vs how many remaining) > > > > On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io> > wrote: > > > >> You can use TopicCommand to delete a topic within Java: > >> > >>> final TopicCommand.TopicCommandOptions commandOptions = new > >> TopicCommand.TopicCommandOptions(new String[]{ > >>> "--zookeeper", "zookeperHost:2181", > >>> "--delete", > >>> "--topic", "TOPIC-TO-BE-DELETED"}); > >>> TopicCommand.deleteTopic(zkUtils, commandOptions); > >> > >> So you can delete a topic within your Streams app. > >> > >> -Matthias > >> > >> > >> > >> On 12/2/16 9:25 PM, Ali Akhtar wrote: > >>> Is there a way to delete the processed topics via streams or the java > >>> driver? Or only thru the bash script? > >>> > >>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io> > >> wrote: > >>> > >>>> If you keep old topics that are completely processed, there would be > >>>> increasing overhead, because Streams would try to read from those > topics > >>>> as long as they exist. Thus, more fetch request will be sent to those > >>>> more topics over time, while most fetch request will return without > any > >>>> new data (as those old topic do not have new data) > >>>> > >>>> If you delete completely processed topics, there will be no overhead. > >>>> > >>>> -Matthias > >>>> > >>>> On 12/2/16 3:58 PM, Ali Akhtar wrote: > >>>>> Hey Matthias, > >>>>> > >>>>> So I have a scenario where I need to batch a group of messages > >> together. > >>>>> > >>>>> I'm considering creating a new topic for each batch that arrives, i.e > >>>>> batch_<some_id>. > >>>>> > >>>>> Each batch_<id> topic will have a finite number of messages, and then > >> it > >>>>> will remain empty. Essentially these will be throwaway topics. > >>>>> > >>>>> Is there any overhead to there being a lot of these topics, and > having > >> a > >>>>> listener for batch_.* , or is this effectively like having one > listener > >>>> for > >>>>> one topic? > >>>>> > >>>>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> 1) There will be once consumer per thread. The number of thread is > >>>>>> defined by the number of instances you start and how many threads > you > >>>>>> configure for each instance via StreamConfig parameter > >>>>>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by > >>>> yourself. > >>>>>> > >>>>>> Depending on the number to partitions in your topics, each thread > will > >>>>>> process one or multiple partitions. As a partition will be processed > >> by > >>>>>> exactly one thread, the overall number of partitions over all you > >> input > >>>>>> topics limits your max number of thread (if you have more threads, > >> those > >>>>>> will just be idle) > >>>>>> > >>>>>> 2) Thus, there should be no performance issues. Furthermore, if you > >>>>>> create new topic while you application is running -- and if this > might > >>>>>> overload you current application -- you can always start new > instances > >>>>>> an scale-out you application dynamically -- Kafka Streams is fully > >>>> elastic. > >>>>>> > >>>>>> Have a look here for more details: > >>>>>> http://docs.confluent.io/current/streams/architecture.html > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 12/2/16 6:23 AM, Ali Akhtar wrote: > >>>>>>> That's pretty useful to know - thanks. > >>>>>>> > >>>>>>> 1) If I listened too foo-.*, and there were 5 foo topics created > >> after > >>>>>>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will > this > >>>>>> create > >>>>>>> 5 consumers / threads / instances, or will it be just 1 instance > that > >>>>>>> receives the messages for all of those topics? > >>>>>>> > >>>>>>> 2) Will this cause issues performance issues if i had a lot of > >>>> throwaway > >>>>>>> foo topics being created, or will this scale? > >>>>>>> > >>>>>>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian....@gmail.com> > >>>> wrote: > >>>>>>> > >>>>>>>> Hi Ali, > >>>>>>>> > >>>>>>>> The only way KafkaStreams will process new topics after start is > if > >>>> the > >>>>>>>> original stream was defined with a regular expression, i.e, > >>>>>>>> kafka.stream(Pattern.compile("foo-.*"); > >>>>>>>> > >>>>>>>> If any new topics are added after start that match the pattern, > then > >>>>>> they > >>>>>>>> will also be consumed. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Damian > >>>>>>>> > >>>>>>>> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com> > >> wrote: > >>>>>>>> > >>>>>>>>> Heya, > >>>>>>>>> > >>>>>>>>> Normally, you add your topics and their callbacks to a > >> StreamBuilder, > >>>>>> and > >>>>>>>>> then call KafkaStreams.start() to start ingesting those topics. > >>>>>>>>> > >>>>>>>>> Is it possible to add a new topic to the StreamBuilder, and start > >>>>>>>> ingesting > >>>>>>>>> that as well, after KafkaStreams.start() has been called? > >>>>>>>>> > >>>>>>>>> Thanks. > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >