Hi Frank , I think we have a similar case in my case I should be able to set the pojo from the outside to analyze API-REST. my strategy in my case and define my schema by using a JSON to convert JSON data formats to Flink Row
so for example imagine your date is something like this : public static String json = "{\"firstname\":\"steve\",\"name\":\"flink\"}"; i will define a schema pojo like this : public static String JSON_SCHEMA = "{\n" + " \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n" + " \"type\": \"object\",\n" + " \"properties\": {\n" + " \"firstname\": {\n" + " \"type\": \"string\"\n" + " },\n" + " \"name\": {\n" + " \"type\": \"string\"\n" + " }\n" + " },\n" + " \"required\": [\n" + " \"firstname\",\n" + " \"name\"\n" + " ]\n" + "}"; and create the convertion for your data like this to Row (if you want use TABLE API) TypeInformation<Row> convert = JsonRowSchemaConverter.convert(JSON_SCHEMA); DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String s) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JsonNode node = objectMapper.readTree(s); return node; } }); DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() { @Override public Row map(JsonNode jsonNode) throws Exception { int pos = 0; Row row = new Row(jsonNode.size()); Iterator<String> iterator = jsonNode.fieldNames(); while (iterator.hasNext()) { String key = iterator.next(); row.setField(pos, jsonNode.get(key).asText()); pos++; } return row; } }).returns(convert); // convert DataStream to Table Table tableA = tEnv.fromDataStream(dataStreamRow); maybe this workflow can give you some idee ^^ Le jeu. 5 sept. 2019 à 11:14, Frank Wilson <fajwil...@gmail.com> a écrit : > Hi, > > So far I’ve been developing my flink pipelines using the datastream API. I > have a pipeline that calculates windowed statistics on a given pojo field. > Ideally I would like this field to be user configurable via a config file. > To do this I would need to extract pojo fields by name. The Table API seems > to have the facilities to do this. I gather datastreams can be converted to > Tables. Is it sensible to convert to tables in the middle of my datastream > pipeline to do this or is there a better way? > > Thanks, > > Frank >