Hi Frank ,

I think we have a similar case in my case I should be able to set the pojo
from the outside to analyze API-REST.
my strategy in my case and define my schema by using a JSON to convert JSON
data formats to Flink Row

so for example imagine your date is something like this :  public static
String json = "{\"firstname\":\"steve\",\"name\":\"flink\"}";
i will define a schema pojo like this :

public static String JSON_SCHEMA = "{\n" +
        "  \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n"; +
        "  \"type\": \"object\",\n" +
        "  \"properties\": {\n" +
        "    \"firstname\": {\n" +
        "      \"type\": \"string\"\n" +
        "    },\n" +
        "    \"name\": {\n" +
        "      \"type\": \"string\"\n" +
        "    }\n" +
        "  },\n" +
        "  \"required\": [\n" +
        "    \"firstname\",\n" +
        "    \"name\"\n" +
        "  ]\n" +
        "}";


and create the convertion for your data   like this to Row (if you want use
TABLE API)

TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA);

DataStream<JsonNode> dataStreamJson = dataStream.map(new
MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String s) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode node = objectMapper.readTree(s);
        return node;
    }
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new
MapFunction<JsonNode, Row>() {
    @Override
    public Row map(JsonNode jsonNode) throws Exception {
        int pos = 0;
        Row row = new Row(jsonNode.size());
        Iterator<String> iterator = jsonNode.fieldNames();
        while (iterator.hasNext()) {
            String key = iterator.next();
            row.setField(pos, jsonNode.get(key).asText());
            pos++;
        }
        return row;
    }
}).returns(convert);

// convert DataStream to Table

Table tableA = tEnv.fromDataStream(dataStreamRow);


maybe this workflow can give you some idee  ^^

Le jeu. 5 sept. 2019 à 11:14, Frank Wilson <fajwil...@gmail.com> a écrit :

> Hi,
>
> So far I’ve been developing my flink pipelines using the datastream API. I
> have a pipeline that calculates windowed statistics on a given pojo field.
> Ideally I would like this field to be user configurable via a config file.
> To do this I would need to extract pojo fields by name. The Table API seems
> to have the facilities to do this. I gather datastreams can be converted to
> Tables. Is it sensible to convert to tables in the middle of my datastream
> pipeline to do this or is there a better way?
>
> Thanks,
>
> Frank
>

Reply via email to