I've implemented a CustomSink with TwoPhaseCommit. To test this I've create
a test using the baselines of this [1] one, and it works fine.

To test the integration with S3 (and with an exponential back off), I've
tried to implement a new test, using the following code:

...
val invalidWriter = writer
      .asInstanceOf[WindowParquetGenericRecordListFileSink]
      .copy(filePath = s"s3a://bucket_that_doesnt_exists/")

val records: Iterable[GenericRecord] = Iterable apply {
    new GenericData.Record(GenericRecordSchema.schema) {
    put(KEY.name, "x")
    put(EVENT.name, "record.value()")
    put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
    put(EVENT_TYPE.name, "xpto")
    }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env

    .enableCheckpointing(1000)
    .fromElements(records)
    .addSink(invalidWriter)

val task = executor.submit(() => env.execute("s3exponential"))
...

This will setup a small environment with one record and enable checkpoint
(in order for the TPC works), and then execute in another thread so the
test check check if the error count is increasing.

So, the test have the following behaviour:

If I use enableCheckpointing(10), the test passes 9 of 10 times.
If I use other values, like 1000, the test fails if not all the times, most
of the times.

Here is a small example of the log when the test is successful.

2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
2020-03-05 16:04:40,342 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
2020-03-05 16:04:40,342 DEBUG
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
flushing transaction
'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
transactionStartTime=1583424280304}'
### 2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
openedTransactions [1]
### 2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - operation='preCommit', message='Start writing #1 records'

When the test fails, here is some part of the log:

2020-03-05 16:38:44,386 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
2020-03-05 16:38:44,386 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Aborting
checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
2020-03-05 16:38:44,387 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
feeding buffered data back.
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  -
ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
consumed notification for subpartition 0.
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
Received consume notification from ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].

Not sure why this behaviour changes with the time for each checkpoint, but
until now I didn't find the reason why "pre commit" isn't execute

Does anyone have any thought, something that I'm missing ?

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java

Reply via email to