Lol, we just came to the same conclusion. Can you open up a JIRA, so we can discuss there? I have some concerns about the new Kafka producer that we just upgraded to (see above). We can discuss more there.
On Thu, Jan 29, 2015 at 10:13 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > What about this idea? It might sound ugly and unnecessarily complicated, > but message sending chain should have coordinator. > > MessageCollector.send(envelop, coordinator) > SystemProducers.send(source, envelop, coordinator) > SystemProducer.send(source, envelop, coordinator) > > If it sounds acceptable, I will file a jira and submit PR. > > Let me know. If this is not acceptable, I should switch my SystemProducer > implementation to StreamTask again. > > > On Thu, Jan 29, 2015 at 9:29 AM, Bae, Jae Hyeon <metac...@gmail.com> > wrote: > > > Hi Chris > > > > > > > > On Thu, Jan 29, 2015 at 9:10 AM, Chris Riccomini <criccom...@apache.org> > > wrote: > > > >> Hey Jae, > >> > >> If I understand you correctly, your concern is that there could be > flushes > >> in-between commits. For example: > >> > >> T=30s; flush > >> T=45s; flush > >> T=60s; flush && commit > >> T=65s; flush > >> > >> Your concern here is that if there's a failure before 60s, the messages > >> that were flushed at 30s and 45s will be duplicated when the container > >> reprocesses, right? > > > > > > Correct > > > >> > > > > > >> > Never mind. I found a solution. Flush should be synced with commit. > >> > > > > Last night, I was sleepy and struggling with finding a solution, so this > > morning, it turned out to be wrong :( > > My idea was, send() function does not call flush() even though the buffer > > is full. But this is risky. > > > > Actually, I was writing our internal data pipeline component as > StreamTask > > but I switched it to SystemProducer as Metamx Druid Tranquility did. But > I > > overlook that duplicate data which can be caused by flush & commit > mismatch. > > > > Do you have any idea? > > > >> > >> Could you elaborate on this? > >> > >> Cheers, > >> Chris > >> > >> On Thu, Jan 29, 2015 at 12:27 AM, Bae, Jae Hyeon <metac...@gmail.com> > >> wrote: > >> > >> > Never mind. I found a solution. Flush should be synced with commit. > >> > > >> > On Thu, Jan 29, 2015 at 12:15 AM, Bae, Jae Hyeon <metac...@gmail.com> > >> > wrote: > >> > > >> > > Hi Samza Devs > >> > > > >> > > StreamTask can control SamzaContainer.commit() through task > >> coordinator. > >> > > Can we make SystemProducer control commit after flush? With this > >> feature, > >> > > we can prevent any duplicate data on SamzaContainer failure. > >> > > > >> > > For example, if we set commit interval as 2 minutes, before commit > >> time > >> > > interval expires, when its buffer size is greater than batch size, > >> > > SystemProducer will flush data in the buffer. Right after flush, > when > >> the > >> > > container dies, another container will start from the previous > commit. > >> > > Then, we will have duplicate data. > >> > > > >> > > If we have longer commit interval, we will have more duplicate > data. I > >> > > know this is not a big deal because container failure will be rare > >> case > >> > and > >> > > just a few minutes data will be duplicated. But I will be happy if > we > >> can > >> > > clear this little concern. > >> > > > >> > > Any idea? > >> > > > >> > > Thank you > >> > > Best, Jae > >> > > > >> > > >> > > > > >