Hello, I was wondering if I could get some pointers on what I'm doing wrong
here. I posted this on stack overflow
<https://stackoverflow.com/questions/49655460/flink-does-not-checkpoint-and-bucketingsink-leaves-files-in-pending-state-when>,
but I thought I'd also ask here.
I'm trying to generate some test data using a collection, and write that
data to s3, Flink doesn't seem to do any checkpointing at all when I do
this, but it does do checkpointing when the source comes from s3.
For example, this DOES checkpoint and leaves output files in a completed
state:
```scala
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = {
val path = "s3a://my_bucket/simple_job/in"
env
.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = 5000L
)
}
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
```
Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state
even after the job has finished:
```scala
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = env.fromCollection((1 to
100).map(_.toString))
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
```
Is this a bug in flink or something I'm doing wrong? Thank you!