Hi Mahendra, Sorry for the late reply. I noticed that in your code you implement a bucket assigner that reads to switch to a new bucket every minute, does it related to the current problems met ? Since different buckets would use different directories and files, when switching buckets new files would be created and used.
Best, Yun ------------------Original Mail ------------------ Sender:Mahendra Hegde <mahendra.he...@omnitracs.com> Send Date:Tue Dec 29 20:23:33 2020 Recipients:user@flink.apache.org <user@flink.apache.org> Subject:StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues Hello, I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3. I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with bulk formats. But when I use this I am facing 2 issues : ‘shouldRollOnEvent’ method is getting called on each record addition but .getsize() always gives one message size instead of current partFile size. Files are getting rolled out at every 1 minute even though my checkpoint is bigger (3 mins), I don’t find any way to override this 1 min default rolling. Any suggestion would be appreciated. Code: val avroOcfFilesink : StreamingFileSink[GenericRecord] = StreamingFileSink.forBulkFormat(new Path(avroOutputPath), new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() { override def createWriter(out: OutputStream): DataFileWriter[GenericRecord] = { val schema: Schema = new Schema.Parser().parse(faultCodeOCFRecordSchema) val datumWriter = new ReflectDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) dataFileWriter.setCodec(CodecFactory.snappyCodec) dataFileWriter.create(schema, out) dataFileWriter } })) .withBucketAssigner(new BucketAssigner[GenericRecord, String] { override def getBucketId(in: GenericRecord, context: Context): String = { val bucketIdPrefix = configurationParameters.getRequired("s3.bucket.id.prefix") val currentProcessingTimeUTC = System.currentTimeMillis() bucketIdPrefix + TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC) } override def getSerializer: SimpleVersionedSerializer[String] = { SimpleVersionedStringSerializer.INSTANCE } }).withBucketCheckInterval(120000) .withRollingPolicy( new CheckpointRollingPolicy[GenericRecord, String] { override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = { log.info("###### PartFileState.getSize:"+partFileState.getSize+", Creation"+partFileState.getCreationTime+", Lastupdate:"+partFileState.getLastUpdateTime) false } override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = { val result : Boolean = (currentTime - partFileState.getCreationTime) >= 10000 log.info(" currentTime:"+currentTime+" , partFileState.getCreationTime"+partFileState.getCreationTime+", Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result) false } } ).build() Thanks MH