What about this idea? It might sound ugly and unnecessarily complicated,
but message sending chain should have coordinator.

MessageCollector.send(envelop, coordinator)
SystemProducers.send(source, envelop, coordinator)
SystemProducer.send(source, envelop, coordinator)

If it sounds acceptable, I will file a jira and submit PR.

Let me know. If this is not acceptable, I should switch my SystemProducer
implementation to StreamTask again.


On Thu, Jan 29, 2015 at 9:29 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote:

> Hi Chris
>
>
>
> On Thu, Jan 29, 2015 at 9:10 AM, Chris Riccomini <criccom...@apache.org>
> wrote:
>
>> Hey Jae,
>>
>> If I understand you correctly, your concern is that there could be flushes
>> in-between commits. For example:
>>
>>   T=30s; flush
>>   T=45s; flush
>>   T=60s; flush && commit
>>   T=65s; flush
>>
>> Your concern here is that if there's a failure before 60s, the messages
>> that were flushed at 30s and 45s will be duplicated when the container
>> reprocesses, right?
>
>
> Correct
>
>>
>
>
>> > Never mind. I found a solution. Flush should be synced with commit.
>>
>
> Last night, I was sleepy and struggling with finding a solution, so this
> morning, it turned out to be wrong :(
> My idea was, send() function does not call flush() even though the buffer
> is full. But this is risky.
>
> Actually, I was writing our internal data pipeline component as StreamTask
> but I switched it to SystemProducer as Metamx Druid Tranquility did. But I
> overlook that duplicate data which can be caused by flush & commit mismatch.
>
> Do you have any idea?
>
>>
>> Could you elaborate on this?
>>
>> Cheers,
>> Chris
>>
>> On Thu, Jan 29, 2015 at 12:27 AM, Bae, Jae Hyeon <metac...@gmail.com>
>> wrote:
>>
>> > Never mind. I found a solution. Flush should be synced with commit.
>> >
>> > On Thu, Jan 29, 2015 at 12:15 AM, Bae, Jae Hyeon <metac...@gmail.com>
>> > wrote:
>> >
>> > > Hi Samza Devs
>> > >
>> > > StreamTask can control SamzaContainer.commit() through task
>> coordinator.
>> > > Can we make SystemProducer control commit after flush? With this
>> feature,
>> > > we can prevent any duplicate data on SamzaContainer failure.
>> > >
>> > > For example, if we set commit interval as 2 minutes, before commit
>> time
>> > > interval expires, when its buffer size is greater than batch size,
>> > > SystemProducer will flush data in the buffer. Right after flush, when
>> the
>> > > container dies, another container will start from the previous commit.
>> > > Then, we will have duplicate data.
>> > >
>> > > If we have longer commit interval, we will have more duplicate data. I
>> > > know this is not a big deal because container failure will be rare
>> case
>> > and
>> > > just a few minutes data will be duplicated. But I will be happy if we
>> can
>> > > clear this little concern.
>> > >
>> > > Any idea?
>> > >
>> > > Thank you
>> > > Best, Jae
>> > >
>> >
>>
>
>

Reply via email to