Hi everyone, I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some Flink processors using Flink 1.12, and tried to get them working on Amazon EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I went to downgrade, I found, inexplicably, that watermarks were no longer propagating.
There is only one partition on the topic, and parallelism is set to 1. Is there something I'm missing here? I feel like I'm going a bit crazy. I've cross-posted this on stackoverflow, but I figure the mailing list is probably a better avenue for this question. Thanks, Ned Here's the output for Flink 1.12 (correctly propagating the watermark): input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "Process", "pact" : "Operator", "contents" : "Process", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) Assigning timestamp 86400000 Source [timestamp=86400000 watermark=-9223372036854775808] "test message" Emitting watermark 0 Assigning timestamp 864000000 Source [timestamp=864000000 watermark=0] "test message" Emitting watermark 777600000 Assigning timestamp 8640000000 Source [timestamp=8640000000 watermark=777600000] "test message" Emitting watermark 8553600000 Assigning timestamp 86400000000 Source [timestamp=86400000000 watermark=8553600000] "test message" Emitting watermark 86313600000 Assigning timestamp 9223372036854775807 Source [timestamp=9223372036854775807 watermark=86313600000] "test message" Emitting watermark 9223372036768375807 And here is the output for Flink 1.11 (not propagating the watermark): input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "Process", "pact" : "Operator", "contents" : "Process", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) Assigning timestamp 86400000 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 0 Assigning timestamp 864000000 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 777600000 Assigning timestamp 8640000000 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 8553600000 Assigning timestamp 86400000000 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 86313600000 Assigning timestamp 9223372036854775807 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 9223372036768375807 Here's the integration test that exposes it: package mytest; import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper; import java.io.FileInputStream;import java.io.InputStream;import java.io.IOException; import java.nio.file.Files;import java.nio.file.Paths; import java.text.SimpleDateFormat; import java.util.Arrays;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Properties; import kafka.server.KafkaConfig;import kafka.server.KafkaServer; import kafka.utils.MockTime;import kafka.utils.TestUtils; import kafka.zk.EmbeddedZookeeper; import org.apache.flink.api.common.eventtime.TimestampAssigner;import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import org.apache.flink.api.common.eventtime.Watermark;import org.apache.flink.api.common.eventtime.WatermarkGenerator;import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import org.apache.flink.api.common.eventtime.WatermarkOutput;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.JobExecutionResult;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.core.execution.JobClient;import org.apache.flink.runtime.client.JobCancellationException;import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.streaming.api.functions.ProcessFunction.Context;import org.apache.flink.streaming.api.TimerService;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.test.util.MiniClusterWithClientResource;import org.apache.flink.util.Collector;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.CreateTopicsResult;import org.apache.kafka.clients.admin.DescribeTopicsResult;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.admin.TopicDescription;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.Serializer;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.streams.StreamsConfig; import org.junit.*; public class FailTest { private static EmbeddedZookeeper zooKeeper = null; private static KafkaServer server = null; public static AdminClient admin = null; private static int connected = 0; private static StringSerializer stringSerializer = new StringSerializer(); private static StringDeserializer stringDeserializer = new StringDeserializer(); private static final Properties ZooKeeperProperties = getZooKeeperProperties(); private static final Properties ServerProperties = getServerProperties(); private static final Properties ProducerProperties = getProducerProperties(); public static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); public static Properties getProducerProperties() { // Use Kafka provided properties Properties result = new Properties(); result.put("bootstrap.servers", "localhost:9092"); result.put("compression.type", "none"); return result; } public static Properties getServerProperties() { // Use Kafka provided properties Properties result = new Properties(); result.put("broker.id", "0"); result.put("num.network.threads", "3"); result.put("num.io.threads", "8"); result.put("socket.send.buffer.bytes", "102400"); result.put("socket.recv.buffer.bytes", "102400"); result.put("log.dirs", "target/kafka-logs"); result.put("num.partitions", "1"); result.put("offset.topic.replication.factor", "1"); result.put("transaction.state.log.replication.factor", "1"); result.put("transaction.state.log.min.isr", "1"); result.put("auto.create.topics.enable", "true"); result.put("log.retention.hours", "168"); result.put("log.segment.bytes", "1073741824"); result.put("log.retention.check.interval.ms", "300000"); result.put("zookeeper.connect", "localhost:2181"); result.put("zookeeper.connection.timeout.ms", "18000"); result.put("group.initial.rebalance.delay.ms", "0"); return result; } public static Properties getZooKeeperProperties() { // Use Kafka provided properties Properties result = new Properties(); result.put("dataDir", "/tmp/zookeeper"); result.put("clientPort", "2181"); result.put("maxClientCnxns", "0"); result.put("admin.enableServer", "false"); return result; } private static Properties getNewLogDir(Properties props) { String path = props.getProperty("log.dirs"); path = path + "/run."; int index = 0; boolean done = false; while (!Files.notExists(Paths.get(path + String.valueOf(index)))) { index += 1; } props.setProperty("log.dirs", path + String.valueOf(index)); return props; } public static class Print<V> extends ProcessFunction<V, V> { private static final ObjectMapper mapper = new ObjectMapper(); public String prefix; public Print() { this.prefix = ""; } public Print(String prefix) { this.prefix = prefix; } @Override public void processElement(V value, Context ctx, Collector<V> out) { System.out.printf("%s ", prefix); if (ctx != null) { TimerService srv = ctx.timerService(); Long timestampLong = ctx.timestamp(); long timestamp = 0; if (timestampLong != null) { timestamp = timestampLong; } long watermark = 0; if (srv != null) { watermark = srv.currentWatermark(); } System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark); } if (value == null) { System.out.println("null"); } else { try { System.out.println(new String(mapper.writeValueAsBytes(value))); } catch (Exception e) { System.out.println("exception"); e.printStackTrace(); } } out.collect(value); } } @ClassRule public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build()); @BeforeClass public static void setup() { env.setParallelism(1); if (connected == 0) { zooKeeper = new EmbeddedZookeeper(); ServerProperties.setProperty("zookeeper.connect", "localhost:" + zooKeeper.port()); server = TestUtils.createServer(new KafkaConfig(getNewLogDir(ServerProperties)), new MockTime()); admin = AdminClient.create(ProducerProperties); } connected += 1; } @AfterClass public static void tearDown() { if (connected == 1) { try { server.shutdown(); zooKeeper.shutdown(); } catch (Exception e) { e.printStackTrace(); } zooKeeper = null; server = null; admin = null; } connected -= 1; } @Test public void testFail() throws Exception { String inputTopic = "input"; Map<String, String> configs = new HashMap<>(); int partitions = 1; short replication = 1; CreateTopicsResult result = admin.createTopics(Arrays.asList( new NewTopic(inputTopic, partitions, replication).configs(configs) )); result.all().get(); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(ProducerProperties, stringSerializer, stringSerializer); ; DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(inputTopic)); for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) { System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size()); System.out.println(topic.getValue().toString()); } // Some subscription events producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message")); producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message")); producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message")); producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message")); producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message")); producer.flush(); producer.close(); Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); prop.put("group.id", "0"); prop.put("enable.auto.commit", "true"); prop.put("auto.commit.interval.ms", "1000"); prop.put("session.timeout.ms", "30000"); FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), prop); source.assignTimestampsAndWatermarks( new WatermarkStrategy<String>() { @Override public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new TimestampAssigner<String>() { @Override public long extractTimestamp(String event, long recordTimestamp) { System.out.printf("Assigning timestamp %d\n", recordTimestamp); return recordTimestamp; } }; } @Override public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<String>() { public long latestWatermark = Long.MIN_VALUE; @Override public void onEvent(String event, long timestamp, WatermarkOutput output) { long eventWatermark = timestamp - Time.days(1).toMilliseconds(); if (eventWatermark > latestWatermark) { System.out.printf("Emitting watermark %d\n", eventWatermark); output.emitWatermark(new Watermark(eventWatermark)); latestWatermark = eventWatermark; } } @Override public void onPeriodicEmit(WatermarkOutput output) { } }; } }); source.setStartFromEarliest(); env.addSource(source) .process(new Print<String>("Source")); System.out.println(env.getExecutionPlan()); JobClient client = null; try { client = env.executeAsync("Fail Test"); } catch (Exception e) { e.printStackTrace(); throw e; } topics = admin.describeTopics(Arrays.asList(inputTopic)); for (Map.Entry<String, TopicDescription> topic : topics.all().get().entrySet()) { System.out.printf("%s %d\n", topic.getValue().name(), topic.getValue().partitions().size()); System.out.println(topic.getValue().toString()); } TimeUnit.SECONDS.sleep(5); client.cancel().get(5, TimeUnit.SECONDS); } }