Hi Flink Team,

*FLINK Streaming:* I have DataStream[String] from kafkaconsumer

DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new
SimpleStringSchema(), properties));

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

I have to sink this string stream using StreamingFileSink, which needs
DataStream[GenericRecord]

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build()
input.addSink(sink)

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html

*Question:* How to convert DataStream[String] to DataStream[GenericRecord]
before Sinking so that I can write AVRO files ?

https://stackoverflow.com/questions/63065945/flink-datastreamstring-kafkacosumer-convert-to-avro-for-sink

Regards,
Vijay

Reply via email to