Thanks Stephan On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote:
> You are right, the checkpoints should contain all offsets. > > I created a Ticket for this: > https://issues.apache.org/jira/browse/FLINK-3440 > > > > > On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <erdem.agao...@gmail.com> wrote: > >> Hi, >> >> On a related and a more exaggerated setup, our kafka-producer (flume) >> seems >> to send data to a single partition at a time and switches it every few >> minutes. So when i run my flink datastream program for the first time, it >> starts on the *largest* offsets and shows something like this: >> >> . Fetched the following start offsets [FetchPartition {partition=7, >> offset=15118832832}] >> . Fetched the following start offsets [FetchPartition {partition=1, >> offset=15203613236}] >> . Fetched the following start offsets [FetchPartition {partition=2, >> offset=15366811664}] >> . Fetched the following start offsets [FetchPartition {partition=0, >> offset=15393999709}] >> . Fetched the following start offsets [FetchPartition {partition=8, >> offset=15319475583}] >> . Fetched the following start offsets [FetchPartition {partition=5, >> offset=15482889767}] >> . Fetched the following start offsets [FetchPartition {partition=6, >> offset=15113885928}] >> . Fetched the following start offsets [FetchPartition {partition=3, >> offset=15182701991}] >> . Fetched the following start offsets [FetchPartition {partition=4, >> offset=15186569356}] >> >> For that instance flume happens to be sending data to partition-6 only, so >> other consumers sit idly. Working with default paralellism 4, only one of >> the 4 threads is able to source data and checkpointing logs reflect that: >> >> Committing offsets [-915623761776, -915623761776, -915623761776, >> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, >> -915623761776] to offset store: FLINK_ZOOKEEPER >> Committing offsets [-915623761776, -915623761776, -915623761776, >> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776, >> -915623761776] to offset store: FLINK_ZOOKEEPER >> Committing offsets [-915623761776, -915623761776, -915623761776, >> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, >> -915623761776] to offset store: FLINK_ZOOKEEPER >> Committing offsets [-915623761776, -915623761776, -915623761776, >> -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, >> -915623761776] to offset store: FLINK_ZOOKEEPER >> >> This also means checkpoint will only contain the offset for partition-6. >> So >> if program is stopped and restarted at a later time, it restores the >> offset >> for partition-6 only and other partitions are started at the largest >> offset. >> So it's able to process unseen data in partition-6 but not others. Say if >> flume produces data to partition-3 when flink program is stopped, they're >> lost, while the data in partition-6 is not. This generally causes multiple >> (late-)windows to be fired after restart, because we now generate >> watermarks >> off partition-3 which says the windows of the unseen data in partition-6 >> are >> already complete. >> >> This also has a side effect of windows not triggering unless some >> rebalancing is done beforehand. Since only 1 of the 4 threads will source >> data and generate watermarks, window triggers won't get watermarks from >> other 3 sources and wait long past the watermarks generated from the >> single >> source. >> >> I know producers shouldn't work like that, but consumers shouldn't care. I >> think it may also create some edge cases even if things were not as >> extreme >> as ours. If checkpoints could contain offsets of all of the partitions >> regardless of their contents, probably storing start offsets in first >> run, i >> guess that would solve the problems around restarting. >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > > -- erdem agaoglu