Hi Kostas, Thanks for confirming that. I started thinking it might be useful or more user friendly to use unique counter across buckets for the same operator subtask? The way I could imagine this working is to pass max counter to the https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204 write method? or bucket holding instance of Buckets class and accessing global counter from there? As far as I know the write method invocation is guaranteed to be thread safe for the same sub operator instance.
Thanks, Pawel On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Pawel, > > You are correct that counters are unique within the same bucket but > NOT across buckets. Across buckets, you may see the same counter being > used. > The max counter is used only upon restoring from a failure, resuming > from a savepoint or rescaling and this is done to guarantee that n > valid data are overwritten while limiting the state that Flink has to > keep internally. For a more detailed discussion about the why, you can > have a look here: https://issues.apache.org/jira/browse/FLINK-13609 > > Cheers, > Kostas > > On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek > <pawelbartosze...@gmail.com> wrote: > > > > I have looked into the source code and it looks likes that the same > counter counter value being used in two buckets is correct. > > Each Bucket class > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java > is passed partCounter in the constructor. Whenever part file is rolled over > then counter is incremented within the scope of this bucket. It can happen > that there are two or more active buckets and counter is increased > independently inside them so that they are become equal. However, global > max counter maintained by Buckets class always keeps the max part counter > so that when new bucket is created is passed the correct part counter. > > > > I have done my analysis based on the logs from my job. I highlighted the > same counter value used for part-0-8. > > > > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z. > > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 received completion notification for checkpoint with id=7. > > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing for checkpoint with id=8 (max part counter=7). > > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on > checkpoint. > > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and > bucketPath=s3://xxx > > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to > element > > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z. > > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 received completion notification for checkpoint with id=8. > > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to > element > > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z. > > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing for checkpoint with id=9 (max part counter=9). > > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on > checkpoint. > > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and > bucketPath=s3://xxx > > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on > checkpoint. > > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and > bucketPath=s3://xxx > > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to > element > > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z. > > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 received completion notification for checkpoint with id=9. > > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to > element > > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z. > > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing for checkpoint with id=10 (max part counter=10). > > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on > checkpoint. > > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and > bucketPath=s3://xxx > > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on > checkpoint. > > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask > 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and > bucketPath=s3://xxx > > > > > > Thanks, > > Pawel > > > > > > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek < > pawelbartosze...@gmail.com> wrote: > >> > >> Hi, > >> > >> > >> Flink Streaming Sink is designed to use global counter when creating > files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics > (managed flink provided by AWS) with bulk writes (rolling policy is > hardcoded to roll over on checkpoint). > >> My job is configured to checkpoint every minute. Job is running with > parallelism 1. > >> > >> The problem is that the same counter 616 is used for both files > invalid-records/2020-01-22T15_06_00Z/part-0-616 and > invalid-records/2020-01-22T15_05_00Z/part-0-616. > >> > >> 15:06:37 > >> { "locationInformation": > "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", > "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": > "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID > f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", > "threadName": "Async calls on Source: Custom Source -> Extract Td-agent > message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", > "applicationARN": > "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", > "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": > "INFO"} > >> } > >> 15:07:37 > >> { "locationInformation": > "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", > "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": > "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID > XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", > "threadName": "Async calls on Source: Custom Source -> Extract Td-agent > message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", > "applicationARN": > "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", > "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": > "INFO" } > >> > >> Thanks, > >> Pawel >