Awesome Arvid, thanks a lot! :)

And I thought when doing this that I was simplifying the test ...

On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> bounded sources do not work well with checkpointing. As soon as the source
> is drained, no checkpoints are performed anymore. It's an unfortunate
> limitation that we want to get rid of, but haven't found the time (because
> it requires larger changes).
>
> So for your test to work, you need to add a source that is continuously
> open, but does not output more than one element. Fortunately, there is
> already a working implementation in our test bed.
>
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
>
> On Thu, Mar 5, 2020 at 7:54 PM David Magalhães <speeddra...@gmail.com>
> wrote:
>
>> I've implemented a CustomSink with TwoPhaseCommit. To test this I've
>> create a test using the baselines of this [1] one, and it works fine.
>>
>> To test the integration with S3 (and with an exponential back off), I've
>> tried to implement a new test, using the following code:
>>
>> ...
>> val invalidWriter = writer
>>       .asInstanceOf[WindowParquetGenericRecordListFileSink]
>>       .copy(filePath = s"s3a://bucket_that_doesnt_exists/")
>>
>> val records: Iterable[GenericRecord] = Iterable apply {
>>     new GenericData.Record(GenericRecordSchema.schema) {
>>     put(KEY.name, "x")
>>     put(EVENT.name, "record.value()")
>>     put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
>>     put(EVENT_TYPE.name, "xpto")
>>     }
>> }
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env
>>
>>     .enableCheckpointing(1000)
>>     .fromElements(records)
>>     .addSink(invalidWriter)
>>
>> val task = executor.submit(() => env.execute("s3exponential"))
>> ...
>>
>> This will setup a small environment with one record and enable checkpoint
>> (in order for the TPC works), and then execute in another thread so the
>> test check check if the error count is increasing.
>>
>> So, the test have the following behaviour:
>>
>> If I use enableCheckpointing(10), the test passes 9 of 10 times.
>> If I use other values, like 1000, the test fails if not all the times,
>> most of the times.
>>
>> Here is a small example of the log when the test is successful.
>>
>> 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
>> 2020-03-05 16:04:40,342 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
>> checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
>> 2020-03-05 16:04:40,342 DEBUG
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
>> WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
>> flushing transaction
>> 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
>> transactionStartTime=1583424280304}'
>> ### 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
>> openedTransactions [1]
>> ### 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - operation='preCommit', message='Start writing #1 records'
>>
>> When the test fails, here is some part of the log:
>>
>> 2020-03-05 16:38:44,386 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
>> 2020-03-05 16:38:44,386 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Aborting
>> checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
>> 2020-03-05 16:38:44,387 DEBUG
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
>> Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
>> feeding buffered data back.
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartition  -
>> ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
>> consumed notification for subpartition 0.
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
>> Received consume notification from ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
>> Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
>> ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>>
>> Not sure why this behaviour changes with the time for each checkpoint,
>> but until now I didn't find the reason why "pre commit" isn't execute
>>
>> Does anyone have any thought, something that I'm missing ?
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
>>
>

Reply via email to