Hi Fabian , thank you for your answer it is indeed the solution that I am currently testing i use TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the flink-json and provide the TypeFormation to the operatorStream its look like to work :) with this solution my schema can be outside my package
one additional question about . GenericMemoryCatalog https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html . catalog can be use accross multiple job running on the same cluster ? or the catalog are scoped on the job session only ? DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String s) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JsonNode node = objectMapper.readTree(s); return node; } }); DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() { @Override public Row map(JsonNode jsonNode) throws Exception { int pos = 0; Row row = new Row(jsonNode.size()); Iterator<String> iterator = jsonNode.fieldNames(); while (iterator.hasNext()) { String key = iterator.next(); row.setField(pos, jsonNode.get(key).asText()); pos++; } return row; } }).returns(convert); Table tableA = tEnv.fromDataStream(dataStreamRow); Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <fhue...@gmail.com> a écrit : > Hi Steve, > > Maybe you could implement a custom TableSource that queries the data from > the rest API and converts the JSON directly into a Row data type. > This would also avoid going through the DataStream API just for ingesting > the data. > > Best, Fabian > > Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert < > contact.steverob...@gmail.com>: > >> Hi guys , >> >> It's been a while since I'm studying TABLE APIs for integration into my >> system. >> when i take a look on this documentation >> : >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors >> >> >> I understand that it is possible to apply a JSON FORMAT on the connector >> and apply a JSON-SCHEMA without any hardcoded java pojo >> .jsonSchema( >> "{" + >> " type: 'object'," + >> " properties: {" + >> " lon: {" + >> " type: 'number'" + >> " }," + >> " rideTime: {" + >> " type: 'string'," + >> " format: 'date-time'" + >> " }" + >> " }" + >> "}" >> ) >> >> >> but my problematic is the following my data comes from REST-API , so >> I have to process the data and transmit it via a DataStream >> the problem is that between the conversation of a dataStream and a >> table must pass through a Java Pojo. Datastream<YourPojo> input.... >> Table table=tEnv.fromDataStream(input); >> I tried a trick while making a conversation from my JSON to AVRO >> using a GenericRecord but it does not seem possible . >> >> my user case and being able to add REST-API processing in runtime >> and be able to outsource and dynamically load my Pojo / Schema without >> harcode an Java-Pojo object >> >> >> Do you have an approach to suggest me ? >> >> >> Thank a lot >> >