Hi David, bounded sources do not work well with checkpointing. As soon as the source is drained, no checkpoints are performed anymore. It's an unfortunate limitation that we want to get rid of, but haven't found the time (because it requires larger changes).
So for your test to work, you need to add a source that is continuously open, but does not output more than one element. Fortunately, there is already a working implementation in our test bed. https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java On Thu, Mar 5, 2020 at 7:54 PM David Magalhães <[email protected]> wrote: > I've implemented a CustomSink with TwoPhaseCommit. To test this I've > create a test using the baselines of this [1] one, and it works fine. > > To test the integration with S3 (and with an exponential back off), I've > tried to implement a new test, using the following code: > > ... > val invalidWriter = writer > .asInstanceOf[WindowParquetGenericRecordListFileSink] > .copy(filePath = s"s3a://bucket_that_doesnt_exists/") > > val records: Iterable[GenericRecord] = Iterable apply { > new GenericData.Record(GenericRecordSchema.schema) { > put(KEY.name, "x") > put(EVENT.name, "record.value()") > put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z") > put(EVENT_TYPE.name, "xpto") > } > } > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env > > .enableCheckpointing(1000) > .fromElements(records) > .addSink(invalidWriter) > > val task = executor.submit(() => env.execute("s3exponential")) > ... > > This will setup a small environment with one record and enable checkpoint > (in order for the TPC works), and then execute in another thread so the > test check check if the error count is increasing. > > So, the test have the following behaviour: > > If I use enableCheckpointing(10), the test passes 9 of 10 times. > If I use other values, like 1000, the test fails if not all the times, > most of the times. > > Here is a small example of the log when the test is successful. > > 2020-03-05 16:04:40,342 DEBUG > com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink > - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] > 2020-03-05 16:04:40,342 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Starting > checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2) > 2020-03-05 16:04:40,342 DEBUG > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - > WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered, > flushing transaction > 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271, > transactionStartTime=1583424280304}' > ### 2020-03-05 16:04:40,342 DEBUG > com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink > - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] - > openedTransactions [1] > ### 2020-03-05 16:04:40,342 DEBUG > com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink > - operation='preCommit', message='Start writing #1 records' > > When the test fails, here is some part of the log: > > 2020-03-05 16:38:44,386 DEBUG > com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink > - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1] > 2020-03-05 16:38:44,386 DEBUG > org.apache.flink.streaming.runtime.tasks.StreamTask - Aborting > checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2) > 2020-03-05 16:38:44,387 DEBUG > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Sink: > Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment, > feeding buffered data back. > 2020-03-05 16:38:44,390 DEBUG > org.apache.flink.runtime.io.network.partition.ResultPartition - > ReleaseOnConsumptionResultPartition > 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 > [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received > consumed notification for subpartition 0. > 2020-03-05 16:38:44,390 DEBUG > org.apache.flink.runtime.io.network.partition.ResultPartitionManager - > Received consume notification from ReleaseOnConsumptionResultPartition > 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 > [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]. > 2020-03-05 16:38:44,390 DEBUG > org.apache.flink.runtime.io.network.partition.ResultPartition - Source: > Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing > ReleaseOnConsumptionResultPartition > 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 > [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]. > > Not sure why this behaviour changes with the time for each checkpoint, but > until now I didn't find the reason why "pre commit" isn't execute > > Does anyone have any thought, something that I'm missing ? > > [1] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java >
