[ 
https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412366#comment-17412366
 ] 

YUJIANBO commented on FLINK-24224:
----------------------------------

[~dwysakowicz]:  

           hello, I am very eager to know if this is a bug. Do you have any 
ideas to help me solve it.
           Thank you very much~

> Table to stream, only the row datatype Stream works on CEP, and other POJOs, 
> maps and jsonobjects datatype streams do not work, but any datatype stream to 
> CEP can work by only stream api .
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24224
>                 URL: https://issues.apache.org/jira/browse/FLINK-24224
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.11.2, 1.12.0, 1.13.2
>            Reporter: YUJIANBO
>            Priority: Major
>
>  
> 1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*, 
> and other POJOs, maps and jsonobjects datatype streams do not work, but any 
> datatype stream to CEP can work by only stream api .
> 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
> 3、code:
> (1)table to Stream  to  CEP   (only row datatype is ok,  other datatype 
> Stream to CEP has no data print and it has no error message)
> {code:java}
> tableEnv.executeSql(creat_kafka_source);
> tableEnv.executeSql(calculateSql);
> Table tb = tableEnv.from("calculateSql");
> String[] fieldNames = tb.getSchema().getFieldNames();
> DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
> KeyedStream<JSONObject, String> ds = tableEnv
>         .toAppendStream(tb, Row.class)
>         .map(new RichMapFunction<Row, JSONObject>() {
>             Map<String, Object> map = new HashMap<>();
>             @Override
>             public void open(Configuration parameters) throws Exception {
>                 super.open(parameters);
>                 if (null == map) {
>                     map = new HashMap<>();
>                 }
>             }
>             @Override
>             public JSONObject map(Row value) throws Exception {
>                 //将数据key和value添加到map中
>                 RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
> fieldNames, value);
>                 JSONObject jsonObject = 
> JSONObject.parseObject(JSON.toJSONString(map));
>                 map.clear();
>                 return jsonObject;
>             }
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
>             @Override
>             public long extractTimestamp(JSONObject element) {
>                 return element.getLongValue("wStart") * 1000;
>             }
>         }).keyBy(x -> x.getString("x_forwarded_for"));
> //it has data to print
> ds.print();
> Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
>         .where(new SimpleCondition<JSONObject>() {
>             @Override
>             public boolean filter(JSONObject value) throws Exception {
>                 log.info("===================>" + value);
>                 return true;
>             }
>         }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
> //it has no data to print
> patternStream.process(new PatternProcessFunction<JSONObject, String>() {
>     @Override
>     public void processMatch(Map<String, List<JSONObject>> match, Context 
> ctx, Collector<String> out) throws Exception {
>         out.collect("==========>>>>>>>" + match.toString());
>     }
> }).print();
> {code}
> (2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
> {code:java}
> Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
> FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
>         new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), 
> proPs);
> consumer.setStartFromEarliest();
> SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
>         .map(x -> {
>             return JSON.parseObject(x.value());
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
>             @Override
>             public long extractTimestamp(JSONObject element) {
>                 return element.getLongValue("ts");
>             }
>         })
>         .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
>         .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, 
> JSONObject, String, TimeWindow>() {
>             @Override
>             public void apply(String s, TimeWindow window, 
> Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception {
>                 Iterator<JSONObject> iterator = input.iterator();
>                 ArrayList<JSONObject> list = new ArrayList<>();
>                 int n = 0;
>                 while (iterator.hasNext()) {
>                     n++;
>                     JSONObject next = iterator.next();
>                     list.add(next);
>                 }
>                 JSONObject jsonObject = list.get(0);
>                 jsonObject.put("ct",n);
>                 jsonObject.remove("ts");
>                 out.collect(jsonObject);
>             }
>         });
> input.print();
> //it is ok
> Pattern<JSONObject, JSONObject> minInterval = Pattern
>         .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
>             @Override
>             public boolean filter(JSONObject jsonObject) throws Exception {
>                 return true;
>             }
>         }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
> pattern.process(new PatternProcessFunction<JSONObject, String>() {
>     @Override
>     public void processMatch(Map<String, List<JSONObject>> map, Context 
> context, Collector<String> out) throws Exception {
>         out.collect("这个用户有嫌疑:====================>" + map.toString());
>     }
> }).print();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to