[ 
https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YUJIANBO updated FLINK-24224:
-----------------------------
    Summary: Table change to stream for CEP , only event  is ROW  datatype can 
works on CEP,  but  other POJOs、maps、 JsonObject datatype event  do not work.   
  Any datatype for event to CEP  is OK by  only stream api   (was: Table change 
to stream  for  CEP , only the row datatype Stream works on CEP, and other 
POJOs、maps、 JsonObject  datatype streams do not work, but any datatype stream 
for CEP can work by only stream api .)

> Table change to stream for CEP , only event  is ROW  datatype can works on 
> CEP,  but  other POJOs、maps、 JsonObject datatype event  do not work.     Any 
> datatype for event to CEP  is OK by  only stream api 
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24224
>                 URL: https://issues.apache.org/jira/browse/FLINK-24224
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.11.2, 1.12.0, 1.13.2
>            Reporter: YUJIANBO
>            Priority: Major
>
>  
> 1、problem:*Table change to stream for CEP* , only event  is *ROW*  datatype 
> can works on CEP,  but  other POJOs、maps、 JsonObject datatype event  do not 
> work.
>                        Any datatype for event to CEP  is OK by  only stream 
> api 
> 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
> 3、code:
> (1)table to Stream  to  CEP   (only row datatype is ok,  other datatype 
> Stream to CEP has no data print and it has no error message)
> {code:java}
> tableEnv.executeSql(creat_kafka_source);
> tableEnv.executeSql(calculateSql);
> Table tb = tableEnv.from("calculateSql");
> String[] fieldNames = tb.getSchema().getFieldNames();
> DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
> KeyedStream<JSONObject, String> ds = tableEnv
>         .toAppendStream(tb, Row.class)
>         .map(new RichMapFunction<Row, JSONObject>() {
>             Map<String, Object> map = new HashMap<>();
>             @Override
>             public void open(Configuration parameters) throws Exception {
>                 super.open(parameters);
>                 if (null == map) {
>                     map = new HashMap<>();
>                 }
>             }
>             @Override
>             public JSONObject map(Row value) throws Exception {
>                 //将数据key和value添加到map中
>                 RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
> fieldNames, value);
>                 JSONObject jsonObject = 
> JSONObject.parseObject(JSON.toJSONString(map));
>                 map.clear();
>                 return jsonObject;
>             }
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
>             @Override
>             public long extractTimestamp(JSONObject element) {
>                 return element.getLongValue("wStart") * 1000;
>             }
>         }).keyBy(x -> x.getString("x_forwarded_for"));
> //it has data to print
> ds.print();
> Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
>         .where(new SimpleCondition<JSONObject>() {
>             @Override
>             public boolean filter(JSONObject value) throws Exception {
>                 log.info("===================>" + value);
>                 return true;
>             }
>         }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
> //it has no data to print
> patternStream.process(new PatternProcessFunction<JSONObject, String>() {
>     @Override
>     public void processMatch(Map<String, List<JSONObject>> match, Context 
> ctx, Collector<String> out) throws Exception {
>         out.collect("==========>>>>>>>" + match.toString());
>     }
> }).print();
> {code}
> (2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
> {code:java}
> Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
> FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
>         new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), 
> proPs);
> consumer.setStartFromEarliest();
> SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
>         .map(x -> {
>             return JSON.parseObject(x.value());
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
>             @Override
>             public long extractTimestamp(JSONObject element) {
>                 return element.getLongValue("ts");
>             }
>         })
>         .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
>         .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, 
> JSONObject, String, TimeWindow>() {
>             @Override
>             public void apply(String s, TimeWindow window, 
> Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception {
>                 Iterator<JSONObject> iterator = input.iterator();
>                 ArrayList<JSONObject> list = new ArrayList<>();
>                 int n = 0;
>                 while (iterator.hasNext()) {
>                     n++;
>                     JSONObject next = iterator.next();
>                     list.add(next);
>                 }
>                 JSONObject jsonObject = list.get(0);
>                 jsonObject.put("ct",n);
>                 jsonObject.remove("ts");
>                 out.collect(jsonObject);
>             }
>         });
> input.print();
> //it is ok
> Pattern<JSONObject, JSONObject> minInterval = Pattern
>         .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
>             @Override
>             public boolean filter(JSONObject jsonObject) throws Exception {
>                 return true;
>             }
>         }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
> pattern.process(new PatternProcessFunction<JSONObject, String>() {
>     @Override
>     public void processMatch(Map<String, List<JSONObject>> map, Context 
> context, Collector<String> out) throws Exception {
>         out.collect("这个用户有嫌疑:====================>" + map.toString());
>     }
> }).print();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to