[ https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
YUJIANBO updated FLINK-24224: ----------------------------- Description: 1、problem:*Table change to stream for CEP* , only event is *ROW* datatype *can works* on CEP, but *othe*r POJOs、maps、 JsonObject datatype event *do not work*. *Any datatype for event to CEP is OK by only stream api* 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2 3、code: (1)table to Stream to CEP (only row datatype is ok, other datatype Stream to CEP has no data print and it has no error message) {code:java} tableEnv.executeSql(creat_kafka_source); tableEnv.executeSql(calculateSql); Table tb = tableEnv.from("calculateSql"); String[] fieldNames = tb.getSchema().getFieldNames(); DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes(); KeyedStream<JSONObject, String> ds = tableEnv .toAppendStream(tb, Row.class) .map(new RichMapFunction<Row, JSONObject>() { Map<String, Object> map = new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); if (null == map) { map = new HashMap<>(); } } @Override public JSONObject map(Row value) throws Exception { //将数据key和value添加到map中 RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, fieldNames, value); JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(map)); map.clear(); return jsonObject; } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("wStart") * 1000; } }).keyBy(x -> x.getString("x_forwarded_for")); //it has data to print ds.print(); Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin") .where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { log.info("===================>" + value); return true; } }).timesOrMore(1).within(Time.seconds(10)); PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern); //it has no data to print patternStream.process(new PatternProcessFunction<JSONObject, String>() { @Override public void processMatch(Map<String, List<JSONObject>> match, Context ctx, Collector<String> out) throws Exception { out.collect("==========>>>>>>>" + match.toString()); } }).print(); {code} (2) *Olny Stream API to CEP* ( Any datatype , it is OK) {code:java} Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid); FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs); consumer.setStartFromEarliest(); SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer) .map(x -> { return JSON.parseObject(x.value()); }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("ts"); } }) .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri")) .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, JSONObject, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception { Iterator<JSONObject> iterator = input.iterator(); ArrayList<JSONObject> list = new ArrayList<>(); int n = 0; while (iterator.hasNext()) { n++; JSONObject next = iterator.next(); list.add(next); } JSONObject jsonObject = list.get(0); jsonObject.put("ct",n); jsonObject.remove("ts"); out.collect(jsonObject); } }); input.print(); //it is ok Pattern<JSONObject, JSONObject> minInterval = Pattern .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject jsonObject) throws Exception { return true; } }).timesOrMore(1).within(Time.seconds(10)); PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval); pattern.process(new PatternProcessFunction<JSONObject, String>() { @Override public void processMatch(Map<String, List<JSONObject>> map, Context context, Collector<String> out) throws Exception { out.collect("这个用户有嫌疑:====================>" + map.toString()); } }).print(); {code} was: 1、problem:*Table change to stream for CEP* , only event is *ROW* datatype can works on CEP, but other POJOs、maps、 JsonObject datatype event do not work. Any datatype for event to CEP is OK by only stream api 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2 3、code: (1)table to Stream to CEP (only row datatype is ok, other datatype Stream to CEP has no data print and it has no error message) {code:java} tableEnv.executeSql(creat_kafka_source); tableEnv.executeSql(calculateSql); Table tb = tableEnv.from("calculateSql"); String[] fieldNames = tb.getSchema().getFieldNames(); DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes(); KeyedStream<JSONObject, String> ds = tableEnv .toAppendStream(tb, Row.class) .map(new RichMapFunction<Row, JSONObject>() { Map<String, Object> map = new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); if (null == map) { map = new HashMap<>(); } } @Override public JSONObject map(Row value) throws Exception { //将数据key和value添加到map中 RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, fieldNames, value); JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(map)); map.clear(); return jsonObject; } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("wStart") * 1000; } }).keyBy(x -> x.getString("x_forwarded_for")); //it has data to print ds.print(); Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin") .where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { log.info("===================>" + value); return true; } }).timesOrMore(1).within(Time.seconds(10)); PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern); //it has no data to print patternStream.process(new PatternProcessFunction<JSONObject, String>() { @Override public void processMatch(Map<String, List<JSONObject>> match, Context ctx, Collector<String> out) throws Exception { out.collect("==========>>>>>>>" + match.toString()); } }).print(); {code} (2) *Olny Stream API to CEP* ( Any datatype , it is OK) {code:java} Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid); FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs); consumer.setStartFromEarliest(); SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer) .map(x -> { return JSON.parseObject(x.value()); }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) { @Override public long extractTimestamp(JSONObject element) { return element.getLongValue("ts"); } }) .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri")) .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, JSONObject, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception { Iterator<JSONObject> iterator = input.iterator(); ArrayList<JSONObject> list = new ArrayList<>(); int n = 0; while (iterator.hasNext()) { n++; JSONObject next = iterator.next(); list.add(next); } JSONObject jsonObject = list.get(0); jsonObject.put("ct",n); jsonObject.remove("ts"); out.collect(jsonObject); } }); input.print(); //it is ok Pattern<JSONObject, JSONObject> minInterval = Pattern .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject jsonObject) throws Exception { return true; } }).timesOrMore(1).within(Time.seconds(10)); PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval); pattern.process(new PatternProcessFunction<JSONObject, String>() { @Override public void processMatch(Map<String, List<JSONObject>> map, Context context, Collector<String> out) throws Exception { out.collect("这个用户有嫌疑:====================>" + map.toString()); } }).print(); {code} > Table change to stream for CEP , only event is ROW datatype can works on > CEP, but other POJOs、maps、 JsonObject datatype event do not work. Any > datatype for event to CEP is OK by only stream api > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-24224 > URL: https://issues.apache.org/jira/browse/FLINK-24224 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.11.2, 1.12.0, 1.13.2 > Reporter: YUJIANBO > Priority: Major > > > 1、problem:*Table change to stream for CEP* , only event is *ROW* datatype > *can works* on CEP, but *othe*r POJOs、maps、 JsonObject datatype event *do > not work*. > *Any datatype for event to CEP is OK by only stream > api* > 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2 > 3、code: > (1)table to Stream to CEP (only row datatype is ok, other datatype > Stream to CEP has no data print and it has no error message) > {code:java} > tableEnv.executeSql(creat_kafka_source); > tableEnv.executeSql(calculateSql); > Table tb = tableEnv.from("calculateSql"); > String[] fieldNames = tb.getSchema().getFieldNames(); > DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes(); > KeyedStream<JSONObject, String> ds = tableEnv > .toAppendStream(tb, Row.class) > .map(new RichMapFunction<Row, JSONObject>() { > Map<String, Object> map = new HashMap<>(); > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > if (null == map) { > map = new HashMap<>(); > } > } > @Override > public JSONObject map(Row value) throws Exception { > //将数据key和value添加到map中 > RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, > fieldNames, value); > JSONObject jsonObject = > JSONObject.parseObject(JSON.toJSONString(map)); > map.clear(); > return jsonObject; > } > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) { > @Override > public long extractTimestamp(JSONObject element) { > return element.getLongValue("wStart") * 1000; > } > }).keyBy(x -> x.getString("x_forwarded_for")); > //it has data to print > ds.print(); > Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin") > .where(new SimpleCondition<JSONObject>() { > @Override > public boolean filter(JSONObject value) throws Exception { > log.info("===================>" + value); > return true; > } > }).timesOrMore(1).within(Time.seconds(10)); > PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern); > //it has no data to print > patternStream.process(new PatternProcessFunction<JSONObject, String>() { > @Override > public void processMatch(Map<String, List<JSONObject>> match, Context > ctx, Collector<String> out) throws Exception { > out.collect("==========>>>>>>>" + match.toString()); > } > }).print(); > {code} > (2) *Olny Stream API to CEP* ( Any datatype , it is OK) > {code:java} > Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid); > FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = > new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), > proPs); > consumer.setStartFromEarliest(); > SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer) > .map(x -> { > return JSON.parseObject(x.value()); > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) { > @Override > public long extractTimestamp(JSONObject element) { > return element.getLongValue("ts"); > } > }) > .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri")) > .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, > JSONObject, String, TimeWindow>() { > @Override > public void apply(String s, TimeWindow window, > Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception { > Iterator<JSONObject> iterator = input.iterator(); > ArrayList<JSONObject> list = new ArrayList<>(); > int n = 0; > while (iterator.hasNext()) { > n++; > JSONObject next = iterator.next(); > list.add(next); > } > JSONObject jsonObject = list.get(0); > jsonObject.put("ct",n); > jsonObject.remove("ts"); > out.collect(jsonObject); > } > }); > input.print(); > //it is ok > Pattern<JSONObject, JSONObject> minInterval = Pattern > .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() { > @Override > public boolean filter(JSONObject jsonObject) throws Exception { > return true; > } > }).timesOrMore(1).within(Time.seconds(10)); > PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval); > pattern.process(new PatternProcessFunction<JSONObject, String>() { > @Override > public void processMatch(Map<String, List<JSONObject>> map, Context > context, Collector<String> out) throws Exception { > out.collect("这个用户有嫌疑:====================>" + map.toString()); > } > }).print(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)