Hi Robert, I switched to SNAPSHOT and confirm that it works. Thanks!
On Thu, Feb 25, 2016 at 10:50 AM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Erdem, > > FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT > should already contain the fix and it'll be in 1.0.0 (for which I'll post a > release candidate today) as well. > > On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu <erdem.agao...@gmail.com> > wrote: > >> Thanks Stephan >> >> On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> You are right, the checkpoints should contain all offsets. >>> >>> I created a Ticket for this: >>> https://issues.apache.org/jira/browse/FLINK-3440 >>> >>> >>> >>> >>> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <erdem.agao...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> On a related and a more exaggerated setup, our kafka-producer (flume) >>>> seems >>>> to send data to a single partition at a time and switches it every few >>>> minutes. So when i run my flink datastream program for the first time, >>>> it >>>> starts on the *largest* offsets and shows something like this: >>>> >>>> . Fetched the following start offsets [FetchPartition {partition=7, >>>> offset=15118832832}] >>>> . Fetched the following start offsets [FetchPartition {partition=1, >>>> offset=15203613236}] >>>> . Fetched the following start offsets [FetchPartition {partition=2, >>>> offset=15366811664}] >>>> . Fetched the following start offsets [FetchPartition {partition=0, >>>> offset=15393999709}] >>>> . Fetched the following start offsets [FetchPartition {partition=8, >>>> offset=15319475583}] >>>> . Fetched the following start offsets [FetchPartition {partition=5, >>>> offset=15482889767}] >>>> . Fetched the following start offsets [FetchPartition {partition=6, >>>> offset=15113885928}] >>>> . Fetched the following start offsets [FetchPartition {partition=3, >>>> offset=15182701991}] >>>> . Fetched the following start offsets [FetchPartition {partition=4, >>>> offset=15186569356}] >>>> >>>> For that instance flume happens to be sending data to partition-6 only, >>>> so >>>> other consumers sit idly. Working with default paralellism 4, only one >>>> of >>>> the 4 threads is able to source data and checkpointing logs reflect >>>> that: >>>> >>>> Committing offsets [-915623761776, -915623761776, -915623761776, >>>> -915623761776, -915623761776, -915623761776, -915623761776, >>>> -915623761776, >>>> -915623761776] to offset store: FLINK_ZOOKEEPER >>>> Committing offsets [-915623761776, -915623761776, -915623761776, >>>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776, >>>> -915623761776] to offset store: FLINK_ZOOKEEPER >>>> Committing offsets [-915623761776, -915623761776, -915623761776, >>>> -915623761776, -915623761776, -915623761776, -915623761776, >>>> -915623761776, >>>> -915623761776] to offset store: FLINK_ZOOKEEPER >>>> Committing offsets [-915623761776, -915623761776, -915623761776, >>>> -915623761776, -915623761776, -915623761776, -915623761776, >>>> -915623761776, >>>> -915623761776] to offset store: FLINK_ZOOKEEPER >>>> >>>> This also means checkpoint will only contain the offset for >>>> partition-6. So >>>> if program is stopped and restarted at a later time, it restores the >>>> offset >>>> for partition-6 only and other partitions are started at the largest >>>> offset. >>>> So it's able to process unseen data in partition-6 but not others. Say >>>> if >>>> flume produces data to partition-3 when flink program is stopped, >>>> they're >>>> lost, while the data in partition-6 is not. This generally causes >>>> multiple >>>> (late-)windows to be fired after restart, because we now generate >>>> watermarks >>>> off partition-3 which says the windows of the unseen data in >>>> partition-6 are >>>> already complete. >>>> >>>> This also has a side effect of windows not triggering unless some >>>> rebalancing is done beforehand. Since only 1 of the 4 threads will >>>> source >>>> data and generate watermarks, window triggers won't get watermarks from >>>> other 3 sources and wait long past the watermarks generated from the >>>> single >>>> source. >>>> >>>> I know producers shouldn't work like that, but consumers shouldn't >>>> care. I >>>> think it may also create some edge cases even if things were not as >>>> extreme >>>> as ours. If checkpoints could contain offsets of all of the partitions >>>> regardless of their contents, probably storing start offsets in first >>>> run, i >>>> guess that would solve the problems around restarting. >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive at Nabble.com. >>>> >>> >>> >> >> >> -- >> erdem agaoglu >> > > -- erdem agaoglu