Hi.

> *1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

Could you explain what you expected? Do you mean you want to just register
a DataType that is able to bridge the received bytes to the POJO objects. I
am not sure wether the current RAW type[1] in Flink Table is enough for
you.

*> 2. How can I use a DeserializationSchema<Row> to get a DataStream<Row>
or even DataStreamSource<Row> in a unit test from either a file or
String[]/byte[] of serialized JSON?*

For DeserializationSchema<RowData>, you can refer to the Kafka
connector[2]. I think it should be similar to the
DeserializationSchema<Row>.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234



Andrew Otto <o...@wikimedia.org> 于2022年6月17日周五 02:26写道:

> At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
> Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
> to provide library level integration between our 'Event Platform' JSON data
> and Flink.  My main goal:
>
> *No case classes or POJOs.  *The JSONSchemas should be enough.
>
> I can actually do this pretty easily with the Table API. I can
> convert from JSONSchema to a DataType, and then create a table with that
> DataType and format('json').
>
> I'd like to be able to do the same for the DataStream API.  From what I
> can tell, to do this I should be using a Row
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
> as the record type.  I can also convert from JSONSchema to
> TypeInformation<Row> pretty easily, using the Types factory
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
> .
>
> While I can convert to and from
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
> the Table API to DataStream<Row>, it seems directly using DataStream<Row>
> of our JSON could be pretty useful, and would make it possible to use Flink
> without instantiating a StreamTableEnvironment or requiring a 'table
> planner'.  Also, to convert back up to the Table API from a
> DataStream<Row>, I need the explicit TypeInformation<Row>, which I need to
> manually construct.
>
> Ah but, JsonRowDeserializationSchema
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.html>
>  is
> deprecated. Okay, fine I can copy it into my code and modify it for my
> purposes.  But even if I do, I'm confused about something else:
>
> DeserializationSchema is not Table API specific (e.g. it can be used as
> the value deserializer in KafkaSource).  Row is also not Table API specific
> (although I know the main purpose is to bridge Table to DataStream API).
> However, it seems that constructing a Source using DeserializationSchema is
> not really that common?  KafkaSource uses it, but FileSource and
> env.fromElements don't?  I'm trying to write integration tests for this
> that use the DataStream API.
>
> *tl;dr questions:*
>
> *1. Is there some easy way to use deserialized JSON in DataStream without
> case classes or POJOs?*
>
> *2. How can I use a DeserializationSchema<Row> to get a DataStream<Row> or
> even DataStreamSource<Row> in a unit test from either a file or
> String[]/byte[] of serialized JSON?*
>
> Thank you!
>
>
>
>
>

Reply via email to