Hi Flink Team, *FLINK Streaming:* I have DataStream[String] from kafkaconsumer
DataStream<String> stream = env .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties)); https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html I have to sink this string stream using StreamingFileSink, which needs DataStream[GenericRecord] val schema: Schema = ... val input: DataStream[GenericRecord] = ... val sink: StreamingFileSink[GenericRecord] = StreamingFileSink .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema)) .build() input.addSink(sink) https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html *Question:* How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ? https://stackoverflow.com/questions/63065945/flink-datastreamstring-kafkacosumer-convert-to-avro-for-sink Regards, Vijay