Lol, we just came to the same conclusion. Can you open up a JIRA, so we can
discuss there? I have some concerns about the new Kafka producer that we
just upgraded to (see above). We can discuss more there.

On Thu, Jan 29, 2015 at 10:13 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote:

> What about this idea? It might sound ugly and unnecessarily complicated,
> but message sending chain should have coordinator.
>
> MessageCollector.send(envelop, coordinator)
> SystemProducers.send(source, envelop, coordinator)
> SystemProducer.send(source, envelop, coordinator)
>
> If it sounds acceptable, I will file a jira and submit PR.
>
> Let me know. If this is not acceptable, I should switch my SystemProducer
> implementation to StreamTask again.
>
>
> On Thu, Jan 29, 2015 at 9:29 AM, Bae, Jae Hyeon <metac...@gmail.com>
> wrote:
>
> > Hi Chris
> >
> >
> >
> > On Thu, Jan 29, 2015 at 9:10 AM, Chris Riccomini <criccom...@apache.org>
> > wrote:
> >
> >> Hey Jae,
> >>
> >> If I understand you correctly, your concern is that there could be
> flushes
> >> in-between commits. For example:
> >>
> >>   T=30s; flush
> >>   T=45s; flush
> >>   T=60s; flush && commit
> >>   T=65s; flush
> >>
> >> Your concern here is that if there's a failure before 60s, the messages
> >> that were flushed at 30s and 45s will be duplicated when the container
> >> reprocesses, right?
> >
> >
> > Correct
> >
> >>
> >
> >
> >> > Never mind. I found a solution. Flush should be synced with commit.
> >>
> >
> > Last night, I was sleepy and struggling with finding a solution, so this
> > morning, it turned out to be wrong :(
> > My idea was, send() function does not call flush() even though the buffer
> > is full. But this is risky.
> >
> > Actually, I was writing our internal data pipeline component as
> StreamTask
> > but I switched it to SystemProducer as Metamx Druid Tranquility did. But
> I
> > overlook that duplicate data which can be caused by flush & commit
> mismatch.
> >
> > Do you have any idea?
> >
> >>
> >> Could you elaborate on this?
> >>
> >> Cheers,
> >> Chris
> >>
> >> On Thu, Jan 29, 2015 at 12:27 AM, Bae, Jae Hyeon <metac...@gmail.com>
> >> wrote:
> >>
> >> > Never mind. I found a solution. Flush should be synced with commit.
> >> >
> >> > On Thu, Jan 29, 2015 at 12:15 AM, Bae, Jae Hyeon <metac...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Samza Devs
> >> > >
> >> > > StreamTask can control SamzaContainer.commit() through task
> >> coordinator.
> >> > > Can we make SystemProducer control commit after flush? With this
> >> feature,
> >> > > we can prevent any duplicate data on SamzaContainer failure.
> >> > >
> >> > > For example, if we set commit interval as 2 minutes, before commit
> >> time
> >> > > interval expires, when its buffer size is greater than batch size,
> >> > > SystemProducer will flush data in the buffer. Right after flush,
> when
> >> the
> >> > > container dies, another container will start from the previous
> commit.
> >> > > Then, we will have duplicate data.
> >> > >
> >> > > If we have longer commit interval, we will have more duplicate
> data. I
> >> > > know this is not a big deal because container failure will be rare
> >> case
> >> > and
> >> > > just a few minutes data will be duplicated. But I will be happy if
> we
> >> can
> >> > > clear this little concern.
> >> > >
> >> > > Any idea?
> >> > >
> >> > > Thank you
> >> > > Best, Jae
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to