Hi. > *1. Is there some easy way to use deserialized JSON in DataStream without case classes or POJOs?*
Could you explain what you expected? Do you mean you want to just register a DataType that is able to bridge the received bytes to the POJO objects. I am not sure wether the current RAW type[1] in Flink Table is enough for you. *> 2. How can I use a DeserializationSchema<Row> to get a DataStream<Row> or even DataStreamSource<Row> in a unit test from either a file or String[]/byte[] of serialized JSON?* For DeserializationSchema<RowData>, you can refer to the Kafka connector[2]. I think it should be similar to the DeserializationSchema<Row>. Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/ [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234 Andrew Otto <o...@wikimedia.org> 于2022年6月17日周五 02:26写道: > At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in > Kafka. There are hundreds of these schemas and topics in Kafka. I'd like > to provide library level integration between our 'Event Platform' JSON data > and Flink. My main goal: > > *No case classes or POJOs. *The JSONSchemas should be enough. > > I can actually do this pretty easily with the Table API. I can > convert from JSONSchema to a DataType, and then create a table with that > DataType and format('json'). > > I'd like to be able to do the same for the DataStream API. From what I > can tell, to do this I should be using a Row > <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html> > as the record type. I can also convert from JSONSchema to > TypeInformation<Row> pretty easily, using the Types factory > <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html> > . > > While I can convert to and from > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/> > the Table API to DataStream<Row>, it seems directly using DataStream<Row> > of our JSON could be pretty useful, and would make it possible to use Flink > without instantiating a StreamTableEnvironment or requiring a 'table > planner'. Also, to convert back up to the Table API from a > DataStream<Row>, I need the explicit TypeInformation<Row>, which I need to > manually construct. > > Ah but, JsonRowDeserializationSchema > <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.html> > is > deprecated. Okay, fine I can copy it into my code and modify it for my > purposes. But even if I do, I'm confused about something else: > > DeserializationSchema is not Table API specific (e.g. it can be used as > the value deserializer in KafkaSource). Row is also not Table API specific > (although I know the main purpose is to bridge Table to DataStream API). > However, it seems that constructing a Source using DeserializationSchema is > not really that common? KafkaSource uses it, but FileSource and > env.fromElements don't? I'm trying to write integration tests for this > that use the DataStream API. > > *tl;dr questions:* > > *1. Is there some easy way to use deserialized JSON in DataStream without > case classes or POJOs?* > > *2. How can I use a DeserializationSchema<Row> to get a DataStream<Row> or > even DataStreamSource<Row> in a unit test from either a file or > String[]/byte[] of serialized JSON?* > > Thank you! > > > > >