What about this idea? It might sound ugly and unnecessarily complicated, but message sending chain should have coordinator.
MessageCollector.send(envelop, coordinator) SystemProducers.send(source, envelop, coordinator) SystemProducer.send(source, envelop, coordinator) If it sounds acceptable, I will file a jira and submit PR. Let me know. If this is not acceptable, I should switch my SystemProducer implementation to StreamTask again. On Thu, Jan 29, 2015 at 9:29 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > Hi Chris > > > > On Thu, Jan 29, 2015 at 9:10 AM, Chris Riccomini <criccom...@apache.org> > wrote: > >> Hey Jae, >> >> If I understand you correctly, your concern is that there could be flushes >> in-between commits. For example: >> >> T=30s; flush >> T=45s; flush >> T=60s; flush && commit >> T=65s; flush >> >> Your concern here is that if there's a failure before 60s, the messages >> that were flushed at 30s and 45s will be duplicated when the container >> reprocesses, right? > > > Correct > >> > > >> > Never mind. I found a solution. Flush should be synced with commit. >> > > Last night, I was sleepy and struggling with finding a solution, so this > morning, it turned out to be wrong :( > My idea was, send() function does not call flush() even though the buffer > is full. But this is risky. > > Actually, I was writing our internal data pipeline component as StreamTask > but I switched it to SystemProducer as Metamx Druid Tranquility did. But I > overlook that duplicate data which can be caused by flush & commit mismatch. > > Do you have any idea? > >> >> Could you elaborate on this? >> >> Cheers, >> Chris >> >> On Thu, Jan 29, 2015 at 12:27 AM, Bae, Jae Hyeon <metac...@gmail.com> >> wrote: >> >> > Never mind. I found a solution. Flush should be synced with commit. >> > >> > On Thu, Jan 29, 2015 at 12:15 AM, Bae, Jae Hyeon <metac...@gmail.com> >> > wrote: >> > >> > > Hi Samza Devs >> > > >> > > StreamTask can control SamzaContainer.commit() through task >> coordinator. >> > > Can we make SystemProducer control commit after flush? With this >> feature, >> > > we can prevent any duplicate data on SamzaContainer failure. >> > > >> > > For example, if we set commit interval as 2 minutes, before commit >> time >> > > interval expires, when its buffer size is greater than batch size, >> > > SystemProducer will flush data in the buffer. Right after flush, when >> the >> > > container dies, another container will start from the previous commit. >> > > Then, we will have duplicate data. >> > > >> > > If we have longer commit interval, we will have more duplicate data. I >> > > know this is not a big deal because container failure will be rare >> case >> > and >> > > just a few minutes data will be duplicated. But I will be happy if we >> can >> > > clear this little concern. >> > > >> > > Any idea? >> > > >> > > Thank you >> > > Best, Jae >> > > >> > >> > >