Hi,

I was assigned to migrate out Flink 1.7 to 1.10 so far it's going good, however 
I've encountered problem with Avro writing to hdfs. Currently we're using 
Bucketing sink - which is deprecated. I've managed to replace few Bucketing 
sinks with StreamingFileSink with row format. However I don't have any idea how 
to tackle Avro and Writer<> implementation.

@Override
protected void applySink(DataStream<Feature> outputStream) {
    outputStream
            .keyBy(Feature::getSessionId)
            .addSink(createSink())
            .uid(UID_PART.concat("sink-v1"))
            .name(UID_PART.concat("hdfs_bucketing_sink"));
}

private SinkFunction<GenericRecord> createSFSink() {
    return StreamingFileSink
            .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
                    ParquetAvroWriters.forGenericRecord(new 
ComboFeatureAvroWriter().createSchema()))
            .build();
}

private BucketingSink<Feature> createSink() {
    return new BucketingSink<Feature>(hdfsPath)
            .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", 
ZoneOffset.UTC))
            .setBatchSize(batchSize)
            .setBatchRolloverInterval(batchRollingInterval)
            .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
            .setInactiveBucketThreshold(inactiveBucketThreshold)
            .setUseTruncate(useTruncate)
            .setWriter(new ComboFeatureAvroWriter());
}
Above function createSFSink() I took from 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 I've tried changing GenericRecord to Feature class - fail, I've tried to write 
empty GenericRecord map just to get rid of compilation error - failed (still 
giving improper type error). I've also tried to use 
ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed

Reply via email to