[ https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621432#comment-17621432 ]
hongcha edited comment on FLINK-29674 at 10/21/22 1:15 AM: ----------------------------------------------------------- [~coderap] this is my test source code: {code:java} package test; import java.util.HashMap; import java.util.Map; import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition;import com.alibaba.fastjson.JSONObject;/** * * User: jiangwei * Date: Oct 12, 2022 * Time: 10:22:20 AM */ public class KafkaWindowTest { @SuppressWarnings("serial") public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, "xxx", "xxx"); properties.put("sasl.jaas.config", jaasCfg); properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); String topic = "jw-test-kafka-w-offset-002"; offsets.put(new TopicPartition(topic,0), 6L); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("xxx:9092") .setProperties(properties) // .setProperty("commit.offsets.on.checkpoint", "false") .setTopics(topic) // .setTopicPattern(java.util.regex.Pattern.compile(topic+".*")) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.offsets(offsets)) .build(); try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(1); env.enableCheckpointing(5000); env.getCheckpointConfig() .setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SingleOutputStreamOperator<String> fromSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); fromSource.print("first"); fromSource.keyBy(data -> JSONObject.parseObject(data).getString("id")) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .process(new ProcessWindowFunction<String, Iterable<String>, String, TimeWindow>() { @Override public void process(String arg0, ProcessWindowFunction<String, Iterable<String>, String, TimeWindow>.Context arg1, Iterable<String> datas, Collector<Iterable<String>> out) throws Exception { out.collect(datas); } }).print("window-data"); env.execute(); } catch (Exception e) { e.printStackTrace(); } } } {code} this is my test data: {code:java} "{'id':'1','kpi':11,'kpi1':90,'time':1}", "{'id':'1','kpi':11,'kpi1':90,'time':2}", "{'id':'1','kpi':11,'kpi1':90,'time':3}", "{'id':'1','kpi':11,'kpi1':90,'time':4}", "{'id':'1','kpi':11,'kpi1':90,'time':5}", "{'id':'1','kpi':11,'kpi1':90,'time':6}", "end"{code} this is success result {code:java} first> {"kpi":11,"id":"1","time":1,"kpi1":90} first> {"kpi":11,"id":"1","time":2,"kpi1":90} first> {"kpi":11,"id":"1","time":3,"kpi1":90} first> {"kpi":11,"id":"1","time":4,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, {"kpi":11,"id":"1","time":2,"kpi1":90}] first> {"kpi":11,"id":"1","time":5,"kpi1":90} first> {"kpi":11,"id":"1","time":6,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, {"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}] window-data> [{"kpi":11,"id":"1","time":6,"kpi1":90}] {code} this is use checkpoint result,the last "window-data" not print ,and the job always running: {code:java} first> {"kpi":11,"id":"1","time":1,"kpi1":90} first> {"kpi":11,"id":"1","time":2,"kpi1":90} first> {"kpi":11,"id":"1","time":3,"kpi1":90} first> {"kpi":11,"id":"1","time":4,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, {"kpi":11,"id":"1","time":2,"kpi1":90}] first> {"kpi":11,"id":"1","time":5,"kpi1":90} first> {"kpi":11,"id":"1","time":6,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, {"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}]{code} was (Author: JIRAUSER297050): [~coderap] this is my test source code: {code:java} package test; import java.util.HashMap; import java.util.Map; import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition;import com.alibaba.fastjson.JSONObject;/** * * User: jiangwei * Date: Oct 12, 2022 * Time: 10:22:20 AM */ public class KafkaWindowTest { @SuppressWarnings("serial") public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, "xxx", "xxx"); properties.put("sasl.jaas.config", jaasCfg); properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); String topic = "jw-test-kafka-w-offset-002"; offsets.put(new TopicPartition(topic,0), 6L); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("192.168.8.79:9092") .setProperties(properties) // .setProperty("commit.offsets.on.checkpoint", "false") .setTopics(topic) // .setTopicPattern(java.util.regex.Pattern.compile(topic+".*")) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.offsets(offsets)) .build(); try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setParallelism(1); env.enableCheckpointing(5000); env.getCheckpointConfig() .setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SingleOutputStreamOperator<String> fromSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); fromSource.print("first"); fromSource.keyBy(data -> JSONObject.parseObject(data).getString("id")) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .process(new ProcessWindowFunction<String, Iterable<String>, String, TimeWindow>() { @Override public void process(String arg0, ProcessWindowFunction<String, Iterable<String>, String, TimeWindow>.Context arg1, Iterable<String> datas, Collector<Iterable<String>> out) throws Exception { out.collect(datas); } }).print("window-data"); env.execute(); } catch (Exception e) { e.printStackTrace(); } } } {code} this is my test data: {code:java} "{'id':'1','kpi':11,'kpi1':90,'time':1}", "{'id':'1','kpi':11,'kpi1':90,'time':2}", "{'id':'1','kpi':11,'kpi1':90,'time':3}", "{'id':'1','kpi':11,'kpi1':90,'time':4}", "{'id':'1','kpi':11,'kpi1':90,'time':5}", "{'id':'1','kpi':11,'kpi1':90,'time':6}", "end"{code} this is success result {code:java} first> {"kpi":11,"id":"1","time":1,"kpi1":90} first> {"kpi":11,"id":"1","time":2,"kpi1":90} first> {"kpi":11,"id":"1","time":3,"kpi1":90} first> {"kpi":11,"id":"1","time":4,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, {"kpi":11,"id":"1","time":2,"kpi1":90}] first> {"kpi":11,"id":"1","time":5,"kpi1":90} first> {"kpi":11,"id":"1","time":6,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, {"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}] window-data> [{"kpi":11,"id":"1","time":6,"kpi1":90}] {code} this is use checkpoint result,the last "window-data" not print ,and the job always running: {code:java} first> {"kpi":11,"id":"1","time":1,"kpi1":90} first> {"kpi":11,"id":"1","time":2,"kpi1":90} first> {"kpi":11,"id":"1","time":3,"kpi1":90} first> {"kpi":11,"id":"1","time":4,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, {"kpi":11,"id":"1","time":2,"kpi1":90}] first> {"kpi":11,"id":"1","time":5,"kpi1":90} first> {"kpi":11,"id":"1","time":6,"kpi1":90} window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, {"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}]{code} > Apache Kafka Connector‘s “ setBounded” not valid > ------------------------------------------------ > > Key: FLINK-29674 > URL: https://issues.apache.org/jira/browse/FLINK-29674 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.15.2 > Reporter: hongcha > Priority: Major > Attachments: image-2022-10-18-20-38-34-515.png > > > When I'm using the Kafka connector, and to set kafka's consumption boundary > (" setBounded ") 。when my job runs normally (with no fail), the bounds are > valid, and my job will finish. However, when my job fails and I restore it to > the checkpoint used during the failure, I find that my job cannot be > completed normally and is always running. However, I can see in the log that > data has been consumed to the boundary set by me. I don't know if there is a > problem with my usage, here is part of my code: > > {code:java} > //代码占位符 > String topicName = "jw-test-kafka-w-offset-002"; > Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); > offsets.put(new TopicPartition(topicName,0), 6L); > KafkaSource<String> source = KafkaSource.<String>builder() > .setBootstrapServers("xxx:9092") > .setProperties(properties) > .setTopics(topicName) > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setBounded(OffsetsInitializer.offsets(offsets)) > .build(); {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)