Got it , my bad. I should have used backeteer. this seems to be working fine StreamingFileSink.forBulkFormat[Request]( new Path(outputPath), ParquetAvroWriters.forReflectRecord(classOf[Request])) .withBucketAssigner(DateTimeBucketAssigner[Request]) .withBucketCheckInterval(5000L) .build()
On Sun, Dec 9, 2018 at 2:13 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Hi, > I am trying to read from kafka and write to parquet. But I am getting > thousands of ".part-0-0in progress..." files (and counting ...) > is that a bug or am I doing something wrong? > > object StreamParquet extends App { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(100) > env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) > env.getCheckpointConfig.setCheckpointTimeout(600) > env.getCheckpointConfig.setFailOnCheckpointingErrors(false) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > env.setParallelism(1) > val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new > AddressSchema(), consumerProperties) > val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer) > val outputPath = "streaming_files" > val sink = StreamingFileSink.forBulkFormat( > new Path(outputPath), > ParquetAvroWriters.forReflectRecord(classOf[Address])).build() > stream.addSink(sink) > env.execute("Write to file") > } > >