[ https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412366#comment-17412366 ]
YUJIANBO commented on FLINK-24224: ---------------------------------- [~dwysakowicz]: hello, I am very eager to know if this is a bug. Do you have any ideas to help me solve it. Thank you very much~ > Table to stream, only the row datatype Stream works on CEP, and other POJOs, > maps and jsonobjects datatype streams do not work, but any datatype stream to > CEP can work by only stream api . > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-24224 > URL: https://issues.apache.org/jira/browse/FLINK-24224 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.11.2, 1.12.0, 1.13.2 > Reporter: YUJIANBO > Priority: Major > > > 1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*, > and other POJOs, maps and jsonobjects datatype streams do not work, but any > datatype stream to CEP can work by only stream api . > 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2 > 3、code: > (1)table to Stream to CEP (only row datatype is ok, other datatype > Stream to CEP has no data print and it has no error message) > {code:java} > tableEnv.executeSql(creat_kafka_source); > tableEnv.executeSql(calculateSql); > Table tb = tableEnv.from("calculateSql"); > String[] fieldNames = tb.getSchema().getFieldNames(); > DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes(); > KeyedStream<JSONObject, String> ds = tableEnv > .toAppendStream(tb, Row.class) > .map(new RichMapFunction<Row, JSONObject>() { > Map<String, Object> map = new HashMap<>(); > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > if (null == map) { > map = new HashMap<>(); > } > } > @Override > public JSONObject map(Row value) throws Exception { > //将数据key和value添加到map中 > RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, > fieldNames, value); > JSONObject jsonObject = > JSONObject.parseObject(JSON.toJSONString(map)); > map.clear(); > return jsonObject; > } > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) { > @Override > public long extractTimestamp(JSONObject element) { > return element.getLongValue("wStart") * 1000; > } > }).keyBy(x -> x.getString("x_forwarded_for")); > //it has data to print > ds.print(); > Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin") > .where(new SimpleCondition<JSONObject>() { > @Override > public boolean filter(JSONObject value) throws Exception { > log.info("===================>" + value); > return true; > } > }).timesOrMore(1).within(Time.seconds(10)); > PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern); > //it has no data to print > patternStream.process(new PatternProcessFunction<JSONObject, String>() { > @Override > public void processMatch(Map<String, List<JSONObject>> match, Context > ctx, Collector<String> out) throws Exception { > out.collect("==========>>>>>>>" + match.toString()); > } > }).print(); > {code} > (2) *Olny Stream API to CEP* ( Any datatype , it is OK) > {code:java} > Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid); > FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = > new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), > proPs); > consumer.setStartFromEarliest(); > SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer) > .map(x -> { > return JSON.parseObject(x.value()); > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) { > @Override > public long extractTimestamp(JSONObject element) { > return element.getLongValue("ts"); > } > }) > .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri")) > .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, > JSONObject, String, TimeWindow>() { > @Override > public void apply(String s, TimeWindow window, > Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception { > Iterator<JSONObject> iterator = input.iterator(); > ArrayList<JSONObject> list = new ArrayList<>(); > int n = 0; > while (iterator.hasNext()) { > n++; > JSONObject next = iterator.next(); > list.add(next); > } > JSONObject jsonObject = list.get(0); > jsonObject.put("ct",n); > jsonObject.remove("ts"); > out.collect(jsonObject); > } > }); > input.print(); > //it is ok > Pattern<JSONObject, JSONObject> minInterval = Pattern > .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() { > @Override > public boolean filter(JSONObject jsonObject) throws Exception { > return true; > } > }).timesOrMore(1).within(Time.seconds(10)); > PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval); > pattern.process(new PatternProcessFunction<JSONObject, String>() { > @Override > public void processMatch(Map<String, List<JSONObject>> map, Context > context, Collector<String> out) throws Exception { > out.collect("这个用户有嫌疑:====================>" + map.toString()); > } > }).print(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)