Got it , my bad. I should have used backeteer. this seems to be working fine
StreamingFileSink.forBulkFormat[Request](
    new Path(outputPath),
    ParquetAvroWriters.forReflectRecord(classOf[Request]))
    .withBucketAssigner(DateTimeBucketAssigner[Request])
    .withBucketCheckInterval(5000L)
    .build()

On Sun, Dec 9, 2018 at 2:13 PM Avi Levi <avi.l...@bluevoyant.com> wrote:

> Hi,
> I am trying to read from kafka and write to parquet. But I am getting
> thousands of ".part-0-0in progress..." files (and counting ...)
> is that a bug or am I doing something wrong?
>
> object StreamParquet extends App {
>   implicit val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.enableCheckpointing(100)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>   env.getCheckpointConfig.setCheckpointTimeout(600)
>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   env.setParallelism(1)
> val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new
> AddressSchema(), consumerProperties)
>   val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
>   val outputPath = "streaming_files"
>   val sink = StreamingFileSink.forBulkFormat(
>     new Path(outputPath),
>     ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
>   stream.addSink(sink)
>   env.execute("Write to file")
> }
>
>

Reply via email to