Thanks Stephan

On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:

> You are right, the checkpoints should contain all offsets.
>
> I created a Ticket for this:
> https://issues.apache.org/jira/browse/FLINK-3440
>
>
>
>
> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <erdem.agao...@gmail.com> wrote:
>
>> Hi,
>>
>> On a related and a more exaggerated setup, our kafka-producer (flume)
>> seems
>> to send data to a single partition at a time and switches it every few
>> minutes. So when i run my flink datastream program for the first time, it
>> starts on the *largest* offsets and shows something like this:
>>
>> . Fetched the following start offsets [FetchPartition {partition=7,
>> offset=15118832832}]
>> . Fetched the following start offsets [FetchPartition {partition=1,
>> offset=15203613236}]
>> . Fetched the following start offsets [FetchPartition {partition=2,
>> offset=15366811664}]
>> . Fetched the following start offsets [FetchPartition {partition=0,
>> offset=15393999709}]
>> . Fetched the following start offsets [FetchPartition {partition=8,
>> offset=15319475583}]
>> . Fetched the following start offsets [FetchPartition {partition=5,
>> offset=15482889767}]
>> . Fetched the following start offsets [FetchPartition {partition=6,
>> offset=15113885928}]
>> . Fetched the following start offsets [FetchPartition {partition=3,
>> offset=15182701991}]
>> . Fetched the following start offsets [FetchPartition {partition=4,
>> offset=15186569356}]
>>
>> For that instance flume happens to be sending data to partition-6 only, so
>> other consumers sit idly. Working with default paralellism 4, only one of
>> the 4 threads is able to source data and checkpointing logs reflect that:
>>
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>> Committing offsets [-915623761776, -915623761776, -915623761776,
>> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>
>> This also means checkpoint will only contain the offset for partition-6.
>> So
>> if program is stopped and restarted at a later time, it restores the
>> offset
>> for partition-6 only and other partitions are started at the largest
>> offset.
>> So it's able to process unseen data in partition-6 but not others. Say if
>> flume produces data to partition-3 when flink program is stopped, they're
>> lost, while the data in partition-6 is not. This generally causes multiple
>> (late-)windows to be fired after restart, because we now generate
>> watermarks
>> off partition-3 which says the windows of the unseen data in partition-6
>> are
>> already complete.
>>
>> This also has a side effect of windows not triggering unless some
>> rebalancing is done beforehand. Since only 1 of the 4 threads will source
>> data and generate watermarks, window triggers won't get watermarks from
>> other 3 sources and wait long past the watermarks generated from the
>> single
>> source.
>>
>> I know producers shouldn't work like that, but consumers shouldn't care. I
>> think it may also create some edge cases even if things were not as
>> extreme
>> as ours. If checkpoints could contain offsets of all of the partitions
>> regardless of their contents, probably storing start offsets in first
>> run, i
>> guess that would solve the problems around restarting.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


-- 
erdem agaoglu

Reply via email to