Hi,

I am trying to upload avro records to AWS S3 using StreamingFileSink.
Avro file is getting created and uploaded with valid data but I want to add a 
Rolling policy which will roll the file after specific time or total part file 
size.

With forBulkFormat() I am able to use only CheckpointRollingPolicy which is 
automatically rolling all part files on every check point.
Though CheckpointRollingPolicy allows us to override 'shouldRollOnEvent' & 
'shouldRollOnProcessingTime'  it is not behaving as per overrided logic and 
just rolls out on each check point.
-'shouldRollOnProcessingTime' method is not getting invoked for each streaming 
message, its
- 'shouldRollOnEvent' part file size here always shows fixed single message 
size only not the clubbed part file size


Below is the code snippet -

    val avroOcfFilesink : StreamingFileSink[GenericRecord] =  
StreamingFileSink.forBulkFormat(new Path(avroOutputPath),
      new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {
        override def createWriter(out: OutputStream): 
DataFileWriter[GenericRecord] = {
          val schema: Schema = new 
Schema.Parser().parse(faultCodeOCFRecordSchema)
          val datumWriter = new ReflectDatumWriter[GenericRecord](schema)
          val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
          //dataFileWriter.setCodec(CodecFactory.snappyCodec)
          dataFileWriter.create(schema, out)
          dataFileWriter
        }
      }))
      .withBucketAssigner(new BucketAssigner[GenericRecord, String] {
        override def getBucketId(in: GenericRecord, context: Context): String = 
{
          val bucketIdPrefix = 
configurationParameters.getRequired("s3.bucket.id.prefix")
          val currentProcessingTimeUTC = System.currentTimeMillis()
            bucketIdPrefix + 
TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)
        }
        override def getSerializer: SimpleVersionedSerializer[String] = { 
SimpleVersionedStringSerializer.INSTANCE }
      }).
      withRollingPolicy(
      new CheckpointRollingPolicy[GenericRecord, String] {
        override def shouldRollOnEvent(partFileState: PartFileInfo[String], 
element: GenericRecord): Boolean = {
          println("partFileState.getSize:"+partFileState.getSize)
          (partFileState.getSize >= 1024*8)
        }
        override def shouldRollOnProcessingTime(partFileState: 
PartFileInfo[String], currentTime: Long): Boolean = {
          println("currentTime:"+currentTime+" , 
partFileState.getCreationTime"+partFileState.getCreationTime+", 
Diff:"+(currentTime - partFileState.getCreationTime))
          (currentTime - partFileState.getCreationTime >= 600000)
        }
      }
    ).build()

    
avroRecordStream.addSink(avroOcfFilesink).setParallelism(1).name("AvroToS3Bucket")

Reply via email to