Hi Fabian ,

thank you for your answer it is indeed the solution that I am currently
testing
i use TypeInformation<Row> convert =
JsonRowSchemaConverter.convert(JSON_SCHEMA);     provided by the
flink-json  and provide the TypeFormation to the operatorStream
its look like to work :) with this solution my schema can be outside my
package

one additional question about .  GenericMemoryCatalog
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html
 .
catalog can be use accross multiple job running on the same cluster ? or
the catalog are  scoped on the job session only ?

DataStream<JsonNode> dataStreamJson = dataStream.map(new
MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String s) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode node = objectMapper.readTree(s);
        return node;
    }
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new
MapFunction<JsonNode, Row>() {
    @Override
    public Row map(JsonNode jsonNode) throws Exception {
        int pos = 0;
        Row row = new Row(jsonNode.size());
        Iterator<String> iterator = jsonNode.fieldNames();
        while (iterator.hasNext()) {
            String key = iterator.next();
            row.setField(pos, jsonNode.get(key).asText());
            pos++;
        }
        return row;
    }
}).returns(convert);

Table tableA = tEnv.fromDataStream(dataStreamRow);


Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <fhue...@gmail.com> a écrit :

> Hi Steve,
>
> Maybe you could implement a custom TableSource that queries the data from
> the rest API and converts the JSON directly into a Row data type.
> This would also avoid going through the DataStream API just for ingesting
> the data.
>
> Best, Fabian
>
> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
> contact.steverob...@gmail.com>:
>
>> Hi guys ,
>>
>> It's been a while since I'm studying TABLE APIs for integration into my
>> system.
>> when i take a look on this documentation
>> :
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>
>>
>> I understand that it is possible to apply a JSON FORMAT on the connector
>> and apply a JSON-SCHEMA without any hardcoded  java pojo
>> .jsonSchema(
>>       "{" +
>>       "  type: 'object'," +
>>       "  properties: {" +
>>       "    lon: {" +
>>       "      type: 'number'" +
>>       "    }," +
>>       "    rideTime: {" +
>>       "      type: 'string'," +
>>       "      format: 'date-time'" +
>>       "    }" +
>>       "  }" +
>>       "}"
>>     )
>>
>>
>>     but my problematic is the following my data comes from REST-API , so
>> I have to process the data and transmit it via a DataStream
>>     the problem is that between the conversation of a dataStream and a
>> table must pass through a Java Pojo. Datastream<YourPojo>  input....
>>  Table table=tEnv.fromDataStream(input);
>>     I tried a trick while making a conversation from my JSON to AVRO
>> using a GenericRecord but it does not seem possible .
>>
>>     my user case and being able to add REST-API processing  in runtime
>> and be able to outsource and dynamically load my Pojo / Schema without
>> harcode an Java-Pojo object
>>
>>
>> Do you have an approach to suggest me ?
>>
>>
>> Thank a lot
>>
>

Reply via email to