Hi Mahendra,  

    Sorry for the late reply. I noticed that in your code you implement a 
bucket assigner that reads to switch to a new bucket every minute, does it 
related to the current problems met ? Since different buckets would use 
different directories and files, when switching buckets new files would be 
created and used.


Best,
 Yun


 ------------------Original Mail ------------------
Sender:Mahendra Hegde <mahendra.he...@omnitracs.com>
Send Date:Tue Dec 29 20:23:33 2020
Recipients:user@flink.apache.org <user@flink.apache.org>
Subject:StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

Hello,

I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3.
I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with 
bulk formats.

But when I use this I am facing 2 issues :
‘shouldRollOnEvent’ method is getting called on each record addition but 
.getsize() always gives one message size instead of current partFile size.
Files are getting rolled out at every 1 minute even though my checkpoint is 
bigger (3 mins), I don’t find any way to override this 1 min default rolling.

Any suggestion would be appreciated.


Code:

val avroOcfFilesink : StreamingFileSink[GenericRecord] =  
StreamingFileSink.forBulkFormat(new Path(avroOutputPath),
      new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {
        override def createWriter(out: OutputStream): 
DataFileWriter[GenericRecord] = {
          val schema: Schema = new 
Schema.Parser().parse(faultCodeOCFRecordSchema)
          val datumWriter = new ReflectDatumWriter[GenericRecord](schema)
          val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
          dataFileWriter.setCodec(CodecFactory.snappyCodec)
          dataFileWriter.create(schema, out)
          dataFileWriter
        }
      }))
      .withBucketAssigner(new BucketAssigner[GenericRecord, String] {
        override def getBucketId(in: GenericRecord, context: Context): String = 
{
          val bucketIdPrefix = 
configurationParameters.getRequired("s3.bucket.id.prefix")
          val currentProcessingTimeUTC = System.currentTimeMillis()
            bucketIdPrefix + 
TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)

        }
        override def getSerializer: SimpleVersionedSerializer[String] = { 
SimpleVersionedStringSerializer.INSTANCE }
      }).withBucketCheckInterval(120000)
     .withRollingPolicy(
       new CheckpointRollingPolicy[GenericRecord, String] {
        override def shouldRollOnEvent(partFileState: PartFileInfo[String], 
element: GenericRecord): Boolean = {
          log.info("###### PartFileState.getSize:"+partFileState.getSize+", 
Creation"+partFileState.getCreationTime+",  
Lastupdate:"+partFileState.getLastUpdateTime)
          false
        }
        override def shouldRollOnProcessingTime(partFileState: 
PartFileInfo[String], currentTime: Long): Boolean = {
          val result : Boolean =  (currentTime - partFileState.getCreationTime) 
>= 10000
          log.info(" currentTime:"+currentTime+" , 
partFileState.getCreationTime"+partFileState.getCreationTime+", 
Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result)
          false
        }
      }
    ).build()


Thanks
MH

Reply via email to