Hi Kostas,

Thanks for confirming that. I started thinking it might be useful or more
user friendly to use unique counter across buckets for the same operator
subtask?
The way I could imagine this working is to pass max counter to the
https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204
write method? or bucket holding instance of Buckets class and accessing
global counter from there? As far as I know the write method invocation is
guaranteed to be thread safe for the same sub operator instance.

Thanks,
Pawel


On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Pawel,
>
> You are correct that counters are unique within the same bucket but
> NOT across buckets. Across buckets, you may see the same counter being
> used.
> The max counter is used only upon restoring from a failure, resuming
> from a savepoint or rescaling and this is done to guarantee that n
> valid data are overwritten while limiting the state that Flink has to
> keep internally. For a more detailed discussion about the why, you can
> have a look here: https://issues.apache.org/jira/browse/FLINK-13609
>
> Cheers,
> Kostas
>
> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
> <pawelbartosze...@gmail.com> wrote:
> >
> > I have looked into the source code and it looks likes that the same
> counter counter value being used in two buckets is correct.
> > Each Bucket class
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
> is passed partCounter in the constructor. Whenever part file is rolled over
> then counter is incremented within the scope of this bucket. It can happen
> that there are two or more active buckets and counter is increased
> independently inside them so that they are become equal. However, global
> max counter maintained by Buckets class always keeps the max part counter
> so that when new bucket is created is passed the correct part counter.
> >
> > I have done my analysis based on the logs from my job. I highlighted the
> same counter value used for part-0-8.
> >
> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 received completion notification for checkpoint with id=7.
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing for checkpoint with id=8 (max part counter=7).
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on
> checkpoint.
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to
> element
> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 received completion notification for checkpoint with id=8.
> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to
> element
> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing for checkpoint with id=9 (max part counter=9).
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on
> checkpoint.
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on
> checkpoint.
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to
> element
> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z.
> > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 received completion notification for checkpoint with id=9.
> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to
> element
> > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z.
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing for checkpoint with id=10 (max part counter=10).
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on
> checkpoint.
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on
> checkpoint.
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and
> bucketPath=s3://xxx
> >
> >
> > Thanks,
> > Pawel
> >
> >
> > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <
> pawelbartosze...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >>
> >> Flink Streaming Sink is designed to use global counter when creating
> files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
> (managed flink provided by AWS) with bulk writes (rolling policy is
> hardcoded to roll over on checkpoint).
> >> My job is configured to checkpoint every minute. Job is running with
> parallelism 1.
> >>
> >> The problem is that the same counter 616 is used for both files
> invalid-records/2020-01-22T15_06_00Z/part-0-616 and
> invalid-records/2020-01-22T15_05_00Z/part-0-616.
> >>
> >> 15:06:37
> >> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message":
> "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO"}
> >> }
> >> 15:07:37
> >> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message":
> "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO" }
> >>
> >> Thanks,
> >> Pawel
>

Reply via email to