Awesome Arvid, thanks a lot! :) And I thought when doing this that I was simplifying the test ...
On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise <ar...@ververica.com> wrote: > Hi David, > > bounded sources do not work well with checkpointing. As soon as the source > is drained, no checkpoints are performed anymore. It's an unfortunate > limitation that we want to get rid of, but haven't found the time (because > it requires larger changes). > > So for your test to work, you need to add a source that is continuously > open, but does not output more than one element. Fortunately, there is > already a working implementation in our test bed. > > https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java > > On Thu, Mar 5, 2020 at 7:54 PM David Magalhães <speeddra...@gmail.com> > wrote: > >> I've implemented a CustomSink with TwoPhaseCommit. To test this I've >> create a test using the baselines of this [1] one, and it works fine. >> >> To test the integration with S3 (and with an exponential back off), I've >> tried to implement a new test, using the following code: >> >> ... >> val invalidWriter = writer >> .asInstanceOf[WindowParquetGenericRecordListFileSink] >> .copy(filePath = s"s3a://bucket_that_doesnt_exists/") >> >> val records: Iterable[GenericRecord] = Iterable apply { >> new GenericData.Record(GenericRecordSchema.schema) { >> put(KEY.name, "x") >> put(EVENT.name, "record.value()") >> put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z") >> put(EVENT_TYPE.name, "xpto") >> } >> } >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> env >> >> .enableCheckpointing(1000) >> .fromElements(records) >> .addSink(invalidWriter) >> >> val task = executor.submit(() => env.execute("s3exponential")) >> ... >> >> This will setup a small environment with one record and enable checkpoint >> (in order for the TPC works), and then execute in another thread so the >> test check check if the error count is increasing. >> >> So, the test have the following behaviour: >> >> If I use enableCheckpointing(10), the test passes 9 of 10 times. >> If I use other values, like 1000, the test fails if not all the times, >> most of the times. >> >> Here is a small example of the log when the test is successful. >> >> 2020-03-05 16:04:40,342 DEBUG >> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink >> - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] >> 2020-03-05 16:04:40,342 DEBUG >> org.apache.flink.streaming.runtime.tasks.StreamTask - Starting >> checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2) >> 2020-03-05 16:04:40,342 DEBUG >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - >> WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered, >> flushing transaction >> 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271, >> transactionStartTime=1583424280304}' >> ### 2020-03-05 16:04:40,342 DEBUG >> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink >> - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] - >> openedTransactions [1] >> ### 2020-03-05 16:04:40,342 DEBUG >> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink >> - operation='preCommit', message='Start writing #1 records' >> >> When the test fails, here is some part of the log: >> >> 2020-03-05 16:38:44,386 DEBUG >> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink >> - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1] >> 2020-03-05 16:38:44,386 DEBUG >> org.apache.flink.streaming.runtime.tasks.StreamTask - Aborting >> checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2) >> 2020-03-05 16:38:44,387 DEBUG >> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Sink: >> Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment, >> feeding buffered data back. >> 2020-03-05 16:38:44,390 DEBUG >> org.apache.flink.runtime.io.network.partition.ResultPartition - >> ReleaseOnConsumptionResultPartition >> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 >> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received >> consumed notification for subpartition 0. >> 2020-03-05 16:38:44,390 DEBUG >> org.apache.flink.runtime.io.network.partition.ResultPartitionManager - >> Received consume notification from ReleaseOnConsumptionResultPartition >> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 >> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]. >> 2020-03-05 16:38:44,390 DEBUG >> org.apache.flink.runtime.io.network.partition.ResultPartition - Source: >> Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing >> ReleaseOnConsumptionResultPartition >> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 >> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]. >> >> Not sure why this behaviour changes with the time for each checkpoint, >> but until now I didn't find the reason why "pre commit" isn't execute >> >> Does anyone have any thought, something that I'm missing ? >> >> [1] >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java >> >