hi, I was using a user case of cep on flink 1.3.2, as follow: 1 source from kafka configured with 128 partitions 2 data schema: logTime long, impressionId string, orderPlanId long, type int. If two type(click and impression) with the same impressionId and orderPlanId were matched in 30 seconds, output the result to kafka. 3 input datas were out of order 4 using event time 5 using BoundedOutOfOrdernessTimestampExtractor to generate watermark.
When i deploy the app using with -ys 2 -yn 4, it worked well. But when -yn larger than 4, watermark was not generated, and eventually the buffer was destroyed. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); logger.error("default watermark interval {}", env.getConfig().getAutoWatermarkInterval()); final Properties consumerProperties = new Properties(); consumerProperties.setProperty("bootstrap.servers", srcBroker); consumerProperties.setProperty("zookeeper.connect", srcZk); consumerProperties.setProperty("group.id", srcGroup); FlinkKafkaConsumer08<String> kafkaSource = new FlinkKafkaConsumer08<String>(srcTopic, new SimpleStringSchema(), consumerProperties); kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(60)) { @Override public long extractTimestamp(String s) { JSONObject json = JSONObject.parseObject(s); long logTime = json.getLongValue("logTime") * 1000; return logTime; } }); DataStreamSource<String> inputStream = env.addSource(kafkaSource); DataStream<String> originStream = inputStream.keyBy(new KeySelector<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> getKey(String s) throws Exception { JSONObject json = JSONObject.parseObject(s); String impressionId = json.getString("impressionId"); Long orderPlanId = json.getLongValue("orderPlanId"); return new Tuple2<String, Long>(impressionId, orderPlanId); } }); Pattern<String, ?> pattern = Pattern.<String>begin("start").where(new SimpleCondition<String>() { @Override public boolean filter(String s) throws Exception { JSONObject json = JSONObject.parseObject(s); int type = json.getInteger("type"); return type == 0; } }).followedByAny("end").where(new SimpleCondition<String>() { @Override public boolean filter(String s) throws Exception { JSONObject json = JSONObject.parseObject(s); int type = json.getInteger("type"); return type == 1; } }).within(Time.seconds(30)); PatternStream<String> cepStream = CEP.pattern(originStream, pattern); DataStream<String> outputStream = cepStream.flatSelect(new PatternFlatSelectFunction<String, String>() { @Override public void flatSelect(Map<String, List<String>> map, Collector<String> collector) throws Exception { List<String> impInfos = map.get("start"); List<String> clkInfos = map.get("end"); logger.error("flatSelect start size {}, end size {}", map.size(), clkInfos.size()); for (String clkInfo : clkInfos) { for (String impInfo : impInfos) { JSONObject clkJson = JSONObject.parseObject(clkInfo); JSONObject impJson = JSONObject.parseObject(impInfo); String clkImpressionId = clkJson.getString("impressionId"); Long clkOrderPlanId = clkJson.getLong("orderPlanId"); String impImpressionId = impJson.getString("impressionId"); Long impOrderPlanId = impJson.getLong("orderPlanId"); logger.error("start size {}, end size {}, impression {}, orderPlan {}, impTime {}, clkTime {}", impInfos.size(), clkInfos.size(), clkImpressionId, clkOrderPlanId, impJson.getLong("logTime"), clkJson.getLong("logTime")); if (StringUtils.equals(clkImpressionId, impImpressionId) && clkOrderPlanId.equals(impOrderPlanId)) { StringBuilder builder = new StringBuilder(); builder.append("impressionId:").append(clkImpressionId).append(",") .append("orderPlanId:").append(clkOrderPlanId).append(",") .append("clkTime:").append(clkJson.getLong("logTime")).append(",") .append("impTime:").append(impJson.getLong("logTime")); collector.collect(builder.toString()); } } } } }); outputStream.addSink(new FlinkKafkaProducer08<String>(dstBrokers, dstTopic, new SimpleStringSchema())); env.execute("CEPWaterMarkSampling"); <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1345/flink_cep.jpg> Is there some relation between parallelism and watermark generation ? Any sugestion is well apreciated, thanks in advance. Best regards. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/