Hi Steve, The memory catalog does not persist metadata and needs to be repopulated everytime. However, you can implement a catalog that persists the metadata to a file or a database.
There is an effort to implement a Catalog interface of Hive's metastore. A preview is available in the latest release (1.9.0) Best, Fabian Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert < contact.steverob...@gmail.com>: > Hi Fabian , > > thank you for your answer it is indeed the solution that I am currently > testing > i use TypeInformation<Row> convert = > JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the > flink-json and provide the TypeFormation to the operatorStream > its look like to work :) with this solution my schema can be outside my > package > > one additional question about . GenericMemoryCatalog > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html > . > catalog can be use accross multiple job running on the same cluster ? or > the catalog are scoped on the job session only ? > > DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, > JsonNode>() { > @Override > public JsonNode map(String s) throws Exception { > ObjectMapper objectMapper = new ObjectMapper(); > JsonNode node = objectMapper.readTree(s); > return node; > } > }); > DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, > Row>() { > @Override > public Row map(JsonNode jsonNode) throws Exception { > int pos = 0; > Row row = new Row(jsonNode.size()); > Iterator<String> iterator = jsonNode.fieldNames(); > while (iterator.hasNext()) { > String key = iterator.next(); > row.setField(pos, jsonNode.get(key).asText()); > pos++; > } > return row; > } > }).returns(convert); > > Table tableA = tEnv.fromDataStream(dataStreamRow); > > > Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <fhue...@gmail.com> a écrit : > >> Hi Steve, >> >> Maybe you could implement a custom TableSource that queries the data from >> the rest API and converts the JSON directly into a Row data type. >> This would also avoid going through the DataStream API just for ingesting >> the data. >> >> Best, Fabian >> >> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert < >> contact.steverob...@gmail.com>: >> >>> Hi guys , >>> >>> It's been a while since I'm studying TABLE APIs for integration into my >>> system. >>> when i take a look on this documentation >>> : >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors >>> >>> >>> I understand that it is possible to apply a JSON FORMAT on the connector >>> and apply a JSON-SCHEMA without any hardcoded java pojo >>> .jsonSchema( >>> "{" + >>> " type: 'object'," + >>> " properties: {" + >>> " lon: {" + >>> " type: 'number'" + >>> " }," + >>> " rideTime: {" + >>> " type: 'string'," + >>> " format: 'date-time'" + >>> " }" + >>> " }" + >>> "}" >>> ) >>> >>> >>> but my problematic is the following my data comes from REST-API , so >>> I have to process the data and transmit it via a DataStream >>> the problem is that between the conversation of a dataStream and a >>> table must pass through a Java Pojo. Datastream<YourPojo> input.... >>> Table table=tEnv.fromDataStream(input); >>> I tried a trick while making a conversation from my JSON to AVRO >>> using a GenericRecord but it does not seem possible . >>> >>> my user case and being able to add REST-API processing in runtime >>> and be able to outsource and dynamically load my Pojo / Schema without >>> harcode an Java-Pojo object >>> >>> >>> Do you have an approach to suggest me ? >>> >>> >>> Thank a lot >>> >>