[ 
https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YUJIANBO updated FLINK-24224:
-----------------------------
    Description: 
 

1、problem:*Table change to stream for CEP* , only event  is *ROW*  datatype 
*can works* on CEP,  but  *othe*r POJOs、maps、 JsonObject datatype event  *do 
not work*.

                       *Any datatype for event to CEP  is OK by  only stream 
api* 

2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2

3、code:

(1)table to Stream  to  CEP   (only row datatype is ok,  other datatype Stream 
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);

Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();

KeyedStream<JSONObject, String> ds = tableEnv
        .toAppendStream(tb, Row.class)
        .map(new RichMapFunction<Row, JSONObject>() {
            Map<String, Object> map = new HashMap<>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                if (null == map) {
                    map = new HashMap<>();
                }
            }

            @Override
            public JSONObject map(Row value) throws Exception {
                //将数据key和value添加到map中
                RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
fieldNames, value);
                JSONObject jsonObject = 
JSONObject.parseObject(JSON.toJSONString(map));
                map.clear();
                return jsonObject;
            }
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(JSONObject element) {
                return element.getLongValue("wStart") * 1000;
            }
        }).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
        .where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                log.info("===================>" + value);
                return true;
            }
        }).timesOrMore(1).within(Time.seconds(10));

PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction<JSONObject, String>() {
    @Override
    public void processMatch(Map<String, List<JSONObject>> match, Context ctx, 
Collector<String> out) throws Exception {
        out.collect("==========>>>>>>>" + match.toString());
    }
}).print();


{code}
(2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
        new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();

SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
        .map(x -> {
            return JSON.parseObject(x.value());
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
            @Override
            public long extractTimestamp(JSONObject element) {
                return element.getLongValue("ts");
            }
        })
        .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
        .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, 
JSONObject, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<JSONObject> 
input, Collector<JSONObject> out) throws Exception {
                Iterator<JSONObject> iterator = input.iterator();
                ArrayList<JSONObject> list = new ArrayList<>();
                int n = 0;
                while (iterator.hasNext()) {
                    n++;
                    JSONObject next = iterator.next();
                    list.add(next);
                }
                JSONObject jsonObject = list.get(0);
                jsonObject.put("ct",n);
                jsonObject.remove("ts");
                out.collect(jsonObject);
            }
        });

input.print();

//it is ok
Pattern<JSONObject, JSONObject> minInterval = Pattern
        .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                return true;
            }
        }).timesOrMore(1).within(Time.seconds(10));

PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction<JSONObject, String>() {
    @Override
    public void processMatch(Map<String, List<JSONObject>> map, Context 
context, Collector<String> out) throws Exception {
        out.collect("这个用户有嫌疑:====================>" + map.toString());
    }
}).print();
{code}
 

  was:
 

1、problem:*Table change to stream for CEP* , only event  is *ROW*  datatype can 
works on CEP,  but  other POJOs、maps、 JsonObject datatype event  do not work.

                       Any datatype for event to CEP  is OK by  only stream api 

2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2

3、code:

(1)table to Stream  to  CEP   (only row datatype is ok,  other datatype Stream 
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);

Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();

KeyedStream<JSONObject, String> ds = tableEnv
        .toAppendStream(tb, Row.class)
        .map(new RichMapFunction<Row, JSONObject>() {
            Map<String, Object> map = new HashMap<>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                if (null == map) {
                    map = new HashMap<>();
                }
            }

            @Override
            public JSONObject map(Row value) throws Exception {
                //将数据key和value添加到map中
                RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
fieldNames, value);
                JSONObject jsonObject = 
JSONObject.parseObject(JSON.toJSONString(map));
                map.clear();
                return jsonObject;
            }
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(JSONObject element) {
                return element.getLongValue("wStart") * 1000;
            }
        }).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
        .where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                log.info("===================>" + value);
                return true;
            }
        }).timesOrMore(1).within(Time.seconds(10));

PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction<JSONObject, String>() {
    @Override
    public void processMatch(Map<String, List<JSONObject>> match, Context ctx, 
Collector<String> out) throws Exception {
        out.collect("==========>>>>>>>" + match.toString());
    }
}).print();


{code}
(2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
        new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();

SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
        .map(x -> {
            return JSON.parseObject(x.value());
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
            @Override
            public long extractTimestamp(JSONObject element) {
                return element.getLongValue("ts");
            }
        })
        .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
        .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, 
JSONObject, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<JSONObject> 
input, Collector<JSONObject> out) throws Exception {
                Iterator<JSONObject> iterator = input.iterator();
                ArrayList<JSONObject> list = new ArrayList<>();
                int n = 0;
                while (iterator.hasNext()) {
                    n++;
                    JSONObject next = iterator.next();
                    list.add(next);
                }
                JSONObject jsonObject = list.get(0);
                jsonObject.put("ct",n);
                jsonObject.remove("ts");
                out.collect(jsonObject);
            }
        });

input.print();

//it is ok
Pattern<JSONObject, JSONObject> minInterval = Pattern
        .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                return true;
            }
        }).timesOrMore(1).within(Time.seconds(10));

PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction<JSONObject, String>() {
    @Override
    public void processMatch(Map<String, List<JSONObject>> map, Context 
context, Collector<String> out) throws Exception {
        out.collect("这个用户有嫌疑:====================>" + map.toString());
    }
}).print();
{code}
 


> Table change to stream for CEP , only event  is ROW  datatype can works on 
> CEP,  but  other POJOs、maps、 JsonObject datatype event  do not work.     Any 
> datatype for event to CEP  is OK by  only stream api 
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24224
>                 URL: https://issues.apache.org/jira/browse/FLINK-24224
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.11.2, 1.12.0, 1.13.2
>            Reporter: YUJIANBO
>            Priority: Major
>
>  
> 1、problem:*Table change to stream for CEP* , only event  is *ROW*  datatype 
> *can works* on CEP,  but  *othe*r POJOs、maps、 JsonObject datatype event  *do 
> not work*.
>                        *Any datatype for event to CEP  is OK by  only stream 
> api* 
> 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
> 3、code:
> (1)table to Stream  to  CEP   (only row datatype is ok,  other datatype 
> Stream to CEP has no data print and it has no error message)
> {code:java}
> tableEnv.executeSql(creat_kafka_source);
> tableEnv.executeSql(calculateSql);
> Table tb = tableEnv.from("calculateSql");
> String[] fieldNames = tb.getSchema().getFieldNames();
> DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
> KeyedStream<JSONObject, String> ds = tableEnv
>         .toAppendStream(tb, Row.class)
>         .map(new RichMapFunction<Row, JSONObject>() {
>             Map<String, Object> map = new HashMap<>();
>             @Override
>             public void open(Configuration parameters) throws Exception {
>                 super.open(parameters);
>                 if (null == map) {
>                     map = new HashMap<>();
>                 }
>             }
>             @Override
>             public JSONObject map(Row value) throws Exception {
>                 //将数据key和value添加到map中
>                 RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
> fieldNames, value);
>                 JSONObject jsonObject = 
> JSONObject.parseObject(JSON.toJSONString(map));
>                 map.clear();
>                 return jsonObject;
>             }
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
>             @Override
>             public long extractTimestamp(JSONObject element) {
>                 return element.getLongValue("wStart") * 1000;
>             }
>         }).keyBy(x -> x.getString("x_forwarded_for"));
> //it has data to print
> ds.print();
> Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
>         .where(new SimpleCondition<JSONObject>() {
>             @Override
>             public boolean filter(JSONObject value) throws Exception {
>                 log.info("===================>" + value);
>                 return true;
>             }
>         }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
> //it has no data to print
> patternStream.process(new PatternProcessFunction<JSONObject, String>() {
>     @Override
>     public void processMatch(Map<String, List<JSONObject>> match, Context 
> ctx, Collector<String> out) throws Exception {
>         out.collect("==========>>>>>>>" + match.toString());
>     }
> }).print();
> {code}
> (2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
> {code:java}
> Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
> FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
>         new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), 
> proPs);
> consumer.setStartFromEarliest();
> SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
>         .map(x -> {
>             return JSON.parseObject(x.value());
>         })
>         .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
>             @Override
>             public long extractTimestamp(JSONObject element) {
>                 return element.getLongValue("ts");
>             }
>         })
>         .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
>         .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, 
> JSONObject, String, TimeWindow>() {
>             @Override
>             public void apply(String s, TimeWindow window, 
> Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception {
>                 Iterator<JSONObject> iterator = input.iterator();
>                 ArrayList<JSONObject> list = new ArrayList<>();
>                 int n = 0;
>                 while (iterator.hasNext()) {
>                     n++;
>                     JSONObject next = iterator.next();
>                     list.add(next);
>                 }
>                 JSONObject jsonObject = list.get(0);
>                 jsonObject.put("ct",n);
>                 jsonObject.remove("ts");
>                 out.collect(jsonObject);
>             }
>         });
> input.print();
> //it is ok
> Pattern<JSONObject, JSONObject> minInterval = Pattern
>         .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
>             @Override
>             public boolean filter(JSONObject jsonObject) throws Exception {
>                 return true;
>             }
>         }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
> pattern.process(new PatternProcessFunction<JSONObject, String>() {
>     @Override
>     public void processMatch(Map<String, List<JSONObject>> map, Context 
> context, Collector<String> out) throws Exception {
>         out.collect("这个用户有嫌疑:====================>" + map.toString());
>     }
> }).print();
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to