Hi Erdem, FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT should already contain the fix and it'll be in 1.0.0 (for which I'll post a release candidate today) as well.
On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu <erdem.agao...@gmail.com> wrote: > Thanks Stephan > > On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote: > >> You are right, the checkpoints should contain all offsets. >> >> I created a Ticket for this: >> https://issues.apache.org/jira/browse/FLINK-3440 >> >> >> >> >> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <erdem.agao...@gmail.com> >> wrote: >> >>> Hi, >>> >>> On a related and a more exaggerated setup, our kafka-producer (flume) >>> seems >>> to send data to a single partition at a time and switches it every few >>> minutes. So when i run my flink datastream program for the first time, it >>> starts on the *largest* offsets and shows something like this: >>> >>> . Fetched the following start offsets [FetchPartition {partition=7, >>> offset=15118832832}] >>> . Fetched the following start offsets [FetchPartition {partition=1, >>> offset=15203613236}] >>> . Fetched the following start offsets [FetchPartition {partition=2, >>> offset=15366811664}] >>> . Fetched the following start offsets [FetchPartition {partition=0, >>> offset=15393999709}] >>> . Fetched the following start offsets [FetchPartition {partition=8, >>> offset=15319475583}] >>> . Fetched the following start offsets [FetchPartition {partition=5, >>> offset=15482889767}] >>> . Fetched the following start offsets [FetchPartition {partition=6, >>> offset=15113885928}] >>> . Fetched the following start offsets [FetchPartition {partition=3, >>> offset=15182701991}] >>> . Fetched the following start offsets [FetchPartition {partition=4, >>> offset=15186569356}] >>> >>> For that instance flume happens to be sending data to partition-6 only, >>> so >>> other consumers sit idly. Working with default paralellism 4, only one of >>> the 4 threads is able to source data and checkpointing logs reflect that: >>> >>> Committing offsets [-915623761776, -915623761776, -915623761776, >>> -915623761776, -915623761776, -915623761776, -915623761776, >>> -915623761776, >>> -915623761776] to offset store: FLINK_ZOOKEEPER >>> Committing offsets [-915623761776, -915623761776, -915623761776, >>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776, >>> -915623761776] to offset store: FLINK_ZOOKEEPER >>> Committing offsets [-915623761776, -915623761776, -915623761776, >>> -915623761776, -915623761776, -915623761776, -915623761776, >>> -915623761776, >>> -915623761776] to offset store: FLINK_ZOOKEEPER >>> Committing offsets [-915623761776, -915623761776, -915623761776, >>> -915623761776, -915623761776, -915623761776, -915623761776, >>> -915623761776, >>> -915623761776] to offset store: FLINK_ZOOKEEPER >>> >>> This also means checkpoint will only contain the offset for partition-6. >>> So >>> if program is stopped and restarted at a later time, it restores the >>> offset >>> for partition-6 only and other partitions are started at the largest >>> offset. >>> So it's able to process unseen data in partition-6 but not others. Say if >>> flume produces data to partition-3 when flink program is stopped, they're >>> lost, while the data in partition-6 is not. This generally causes >>> multiple >>> (late-)windows to be fired after restart, because we now generate >>> watermarks >>> off partition-3 which says the windows of the unseen data in partition-6 >>> are >>> already complete. >>> >>> This also has a side effect of windows not triggering unless some >>> rebalancing is done beforehand. Since only 1 of the 4 threads will source >>> data and generate watermarks, window triggers won't get watermarks from >>> other 3 sources and wait long past the watermarks generated from the >>> single >>> source. >>> >>> I know producers shouldn't work like that, but consumers shouldn't care. >>> I >>> think it may also create some edge cases even if things were not as >>> extreme >>> as ours. If checkpoints could contain offsets of all of the partitions >>> regardless of their contents, probably storing start offsets in first >>> run, i >>> guess that would solve the problems around restarting. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com. >>> >> >> > > > -- > erdem agaoglu >