hi,
 I was using a user case of cep on flink 1.3.2, as follow:
1 source from kafka configured with 128 partitions
2 data schema: logTime long, impressionId string, orderPlanId long, type
int.  If two type(click and impression) with the same impressionId and
orderPlanId were matched in 30 seconds, output the result to kafka.
3 input datas were out of order
4 using event time
5 using BoundedOutOfOrdernessTimestampExtractor to generate watermark.

When i deploy the app using with -ys 2 -yn 4, it worked well. But when -yn
larger than 4, watermark was not generated, and eventually the buffer was
destroyed.

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        logger.error("default watermark interval {}",
env.getConfig().getAutoWatermarkInterval());

        final Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", srcBroker);
        consumerProperties.setProperty("zookeeper.connect", srcZk);
        consumerProperties.setProperty("group.id", srcGroup);
        FlinkKafkaConsumer08<String> kafkaSource = new
FlinkKafkaConsumer08<String>(srcTopic, new SimpleStringSchema(),
consumerProperties);
        kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(60)) {
            @Override
            public long extractTimestamp(String s) {
                JSONObject json = JSONObject.parseObject(s);
                long logTime = json.getLongValue("logTime") * 1000;
                return logTime;
            }
        });
        DataStreamSource<String> inputStream = env.addSource(kafkaSource);

        DataStream<String> originStream = inputStream.keyBy(new
KeySelector<String, Tuple2&lt;String, Long>>() {
            @Override
            public Tuple2<String, Long> getKey(String s) throws Exception {
                JSONObject json = JSONObject.parseObject(s);
                String impressionId = json.getString("impressionId");
                Long orderPlanId = json.getLongValue("orderPlanId");
                return new Tuple2<String, Long>(impressionId, orderPlanId);
            }
        });

        Pattern<String, ?> pattern =
Pattern.<String>begin("start").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                JSONObject json = JSONObject.parseObject(s);
                int type = json.getInteger("type");
                return type == 0;
            }
        }).followedByAny("end").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                JSONObject json = JSONObject.parseObject(s);
                int type = json.getInteger("type");
                return type == 1;
            }
        }).within(Time.seconds(30));

        PatternStream<String> cepStream = CEP.pattern(originStream,
pattern);
        DataStream<String> outputStream = cepStream.flatSelect(new
PatternFlatSelectFunction<String, String>() {
            @Override
            public void flatSelect(Map<String, List&lt;String>> map,
Collector<String> collector) throws Exception {
                List<String> impInfos = map.get("start");
                List<String> clkInfos = map.get("end");
                logger.error("flatSelect start size {}, end size {}",
map.size(), clkInfos.size());

                for (String clkInfo : clkInfos) {
                    for (String impInfo : impInfos) {
                        JSONObject clkJson =
JSONObject.parseObject(clkInfo);
                        JSONObject impJson =
JSONObject.parseObject(impInfo);
                        String clkImpressionId =
clkJson.getString("impressionId");
                        Long clkOrderPlanId =
clkJson.getLong("orderPlanId");
                        String impImpressionId =
impJson.getString("impressionId");
                        Long impOrderPlanId =
impJson.getLong("orderPlanId");
                        logger.error("start size {}, end size {}, impression
{}, orderPlan {}, impTime {}, clkTime {}",
                                impInfos.size(), clkInfos.size(),
clkImpressionId, clkOrderPlanId,
                                impJson.getLong("logTime"),
clkJson.getLong("logTime"));
                        if (StringUtils.equals(clkImpressionId,
impImpressionId) && clkOrderPlanId.equals(impOrderPlanId)) {
                            StringBuilder builder = new StringBuilder();
                           
builder.append("impressionId:").append(clkImpressionId).append(",")
                                   
.append("orderPlanId:").append(clkOrderPlanId).append(",")
                                   
.append("clkTime:").append(clkJson.getLong("logTime")).append(",")
                                   
.append("impTime:").append(impJson.getLong("logTime"));
                            collector.collect(builder.toString());
                        }
                    }
                }
            }
        });

        outputStream.addSink(new FlinkKafkaProducer08<String>(dstBrokers,
dstTopic, new SimpleStringSchema()));

        env.execute("CEPWaterMarkSampling");

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1345/flink_cep.jpg>
 


Is there some relation between parallelism and watermark generation ?
Any sugestion is well apreciated, thanks in advance.

Best regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to