Thank you for your question. I responded on StackOverflow.
Let's finish the discussion there.

On Fri, Jul 24, 2020 at 5:07 AM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Hi Flink Team,
>
> *FLINK Streaming:* I have DataStream[String] from kafkaconsumer
>
> DataStream<String> stream = env
>     .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), 
> properties));
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>
> I have to sink this string stream using StreamingFileSink, which needs
> DataStream[GenericRecord]
>
> val schema: Schema = ...
> val input: DataStream[GenericRecord] = ...
> val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
>     .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
>     .build()
> input.addSink(sink)
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
>
> *Question:* How to convert DataStream[String] to
> DataStream[GenericRecord] before Sinking so that I can write AVRO files ?
>
>
> https://stackoverflow.com/questions/63065945/flink-datastreamstring-kafkacosumer-convert-to-avro-for-sink
>
> Regards,
> Vijay
>

Reply via email to