Hi,
I was assigned to migrate out Flink 1.7 to 1.10 so far it's going good, however I've encountered problem with Avro writing to hdfs. Currently we're using Bucketing sink - which is deprecated. I've managed to replace few Bucketing sinks with StreamingFileSink with row format. However I don't have any idea how to tackle Avro and Writer<> implementation. @Override protected void applySink(DataStream<Feature> outputStream) { outputStream .keyBy(Feature::getSessionId) .addSink(createSink()) .uid(UID_PART.concat("sink-v1")) .name(UID_PART.concat("hdfs_bucketing_sink")); } private SinkFunction<GenericRecord> createSFSink() { return StreamingFileSink .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)), ParquetAvroWriters.forGenericRecord(new ComboFeatureAvroWriter().createSchema())) .build(); } private BucketingSink<Feature> createSink() { return new BucketingSink<Feature>(hdfsPath) .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", ZoneOffset.UTC)) .setBatchSize(batchSize) .setBatchRolloverInterval(batchRollingInterval) .setInactiveBucketCheckInterval(checkInactiveBucketInterval) .setInactiveBucketThreshold(inactiveBucketThreshold) .setUseTruncate(useTruncate) .setWriter(new ComboFeatureAvroWriter()); } Above function createSFSink() I took from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html I've tried changing GenericRecord to Feature class - fail, I've tried to write empty GenericRecord map just to get rid of compilation error - failed (still giving improper type error). I've also tried to use ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed