Thank you for your question. I responded on StackOverflow. Let's finish the discussion there.
On Fri, Jul 24, 2020 at 5:07 AM Vijayendra Yadav <contact....@gmail.com> wrote: > Hi Flink Team, > > *FLINK Streaming:* I have DataStream[String] from kafkaconsumer > > DataStream<String> stream = env > .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), > properties)); > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html > > I have to sink this string stream using StreamingFileSink, which needs > DataStream[GenericRecord] > > val schema: Schema = ... > val input: DataStream[GenericRecord] = ... > val sink: StreamingFileSink[GenericRecord] = StreamingFileSink > .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema)) > .build() > input.addSink(sink) > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html > > *Question:* How to convert DataStream[String] to > DataStream[GenericRecord] before Sinking so that I can write AVRO files ? > > > https://stackoverflow.com/questions/63065945/flink-datastreamstring-kafkacosumer-convert-to-avro-for-sink > > Regards, > Vijay >