For reference: self answered on [1]. Turns out that Flink 1.12 defaults the TimeCharacteristic to EventTime and > deprecates the whole TimeCharacteristic flow. So to downgrade to Flink > 1.11, you must add the following statement to configure the > StreamExecutionEnvironment. > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > [1] https://stackoverflow.com/a/67111541/10299342
On Thu, Apr 15, 2021 at 12:08 AM Edward Bingham <edward.bing...@siden.io> wrote: > Hi everyone, > > I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some > Flink processors using Flink 1.12, and tried to get them working on Amazon > EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I > went to downgrade, I found, inexplicably, that watermarks were no longer > propagating. > > There is only one partition on the topic, and parallelism is set to 1. Is > there something I'm missing here? I feel like I'm going a bit crazy. > > I've cross-posted this on stackoverflow, but I figure the mailing list is > probably a better avenue for this question. > > Thanks, > Ned > > > Here's the output for Flink 1.12 (correctly propagating the watermark): > > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 1 > }, { > "id" : 2, > "type" : "Process", > "pact" : "Operator", > "contents" : "Process", > "parallelism" : 1, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > Assigning timestamp 86400000 > Source [timestamp=86400000 watermark=-9223372036854775808] "test message" > Emitting watermark 0 > Assigning timestamp 864000000 > Source [timestamp=864000000 watermark=0] "test message" > Emitting watermark 777600000 > Assigning timestamp 8640000000 > Source [timestamp=8640000000 watermark=777600000] "test message" > Emitting watermark 8553600000 > Assigning timestamp 86400000000 > Source [timestamp=86400000000 watermark=8553600000] "test message" > Emitting watermark 86313600000 > Assigning timestamp 9223372036854775807 > Source [timestamp=9223372036854775807 watermark=86313600000] "test message" > Emitting watermark 9223372036768375807 > > And here is the output for Flink 1.11 (not propagating the watermark): > > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 1 > }, { > "id" : 2, > "type" : "Process", > "pact" : "Operator", > "contents" : "Process", > "parallelism" : 1, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > Assigning timestamp 86400000 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 0 > Assigning timestamp 864000000 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 777600000 > Assigning timestamp 8640000000 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 8553600000 > Assigning timestamp 86400000000 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 86313600000 > Assigning timestamp 9223372036854775807 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 9223372036768375807 > > Here's the integration test that exposes it: > > package mytest; > import com.fasterxml.jackson.core.JsonProcessingException;import > com.fasterxml.jackson.databind.ObjectMapper; > import java.io.FileInputStream;import java.io.InputStream;import > java.io.IOException; > import java.nio.file.Files;import java.nio.file.Paths; > import java.text.SimpleDateFormat; > import java.util.Arrays;import java.util.concurrent.CompletableFuture;import > java.util.concurrent.TimeUnit;import java.util.Date;import > java.util.HashMap;import java.util.Map;import java.util.Properties; > import kafka.server.KafkaConfig;import kafka.server.KafkaServer; > import kafka.utils.MockTime;import kafka.utils.TestUtils; > import kafka.zk.EmbeddedZookeeper; > import org.apache.flink.api.common.eventtime.TimestampAssigner;import > org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import > org.apache.flink.api.common.eventtime.Watermark;import > org.apache.flink.api.common.eventtime.WatermarkGenerator;import > org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import > org.apache.flink.api.common.eventtime.WatermarkOutput;import > org.apache.flink.api.common.eventtime.WatermarkStrategy;import > org.apache.flink.api.common.JobExecutionResult;import > org.apache.flink.api.common.serialization.SimpleStringSchema;import > org.apache.flink.core.execution.JobClient;import > org.apache.flink.runtime.client.JobCancellationException;import > org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > org.apache.flink.streaming.api.functions.ProcessFunction;import > org.apache.flink.streaming.api.functions.ProcessFunction.Context;import > org.apache.flink.streaming.api.TimerService;import > org.apache.flink.streaming.api.windowing.time.Time;import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import > org.apache.flink.test.util.MiniClusterWithClientResource;import > org.apache.flink.util.Collector;import > org.apache.kafka.clients.admin.AdminClient;import > org.apache.kafka.clients.admin.CreateTopicsResult;import > org.apache.kafka.clients.admin.DescribeTopicsResult;import > org.apache.kafka.clients.admin.NewTopic;import > org.apache.kafka.clients.admin.TopicDescription;import > org.apache.kafka.clients.producer.KafkaProducer;import > org.apache.kafka.clients.producer.ProducerRecord;import > org.apache.kafka.common.serialization.Serializer;import > org.apache.kafka.common.serialization.StringDeserializer;import > org.apache.kafka.common.serialization.StringSerializer;import > org.apache.kafka.streams.StreamsConfig; > import org.junit.*; > public class FailTest { > private static EmbeddedZookeeper zooKeeper = null; > private static KafkaServer server = null; > public static AdminClient admin = null; > private static int connected = 0; > > private static StringSerializer stringSerializer = new StringSerializer(); > private static StringDeserializer stringDeserializer = new > StringDeserializer(); > > private static final Properties ZooKeeperProperties = > getZooKeeperProperties(); > private static final Properties ServerProperties = > getServerProperties(); > private static final Properties ProducerProperties = > getProducerProperties(); > > public static StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > public static Properties getProducerProperties() { > // Use Kafka provided properties > Properties result = new Properties(); > result.put("bootstrap.servers", "localhost:9092"); > result.put("compression.type", "none"); > return result; > } > > public static Properties getServerProperties() { > // Use Kafka provided properties > Properties result = new Properties(); > result.put("broker.id", "0"); > result.put("num.network.threads", "3"); > result.put("num.io.threads", "8"); > result.put("socket.send.buffer.bytes", "102400"); > result.put("socket.recv.buffer.bytes", "102400"); > result.put("log.dirs", "target/kafka-logs"); > result.put("num.partitions", "1"); > result.put("offset.topic.replication.factor", "1"); > result.put("transaction.state.log.replication.factor", "1"); > result.put("transaction.state.log.min.isr", "1"); > result.put("auto.create.topics.enable", "true"); > result.put("log.retention.hours", "168"); > result.put("log.segment.bytes", "1073741824"); > result.put("log.retention.check.interval.ms", "300000"); > result.put("zookeeper.connect", "localhost:2181"); > result.put("zookeeper.connection.timeout.ms", "18000"); > result.put("group.initial.rebalance.delay.ms", "0"); > return result; > } > > public static Properties getZooKeeperProperties() { > // Use Kafka provided properties > Properties result = new Properties(); > result.put("dataDir", "/tmp/zookeeper"); > result.put("clientPort", "2181"); > result.put("maxClientCnxns", "0"); > result.put("admin.enableServer", "false"); > return result; > } > > private static Properties getNewLogDir(Properties props) { > String path = props.getProperty("log.dirs"); > path = path + "/run."; > int index = 0; > boolean done = false; > while (!Files.notExists(Paths.get(path + String.valueOf(index)))) { > index += 1; > } > props.setProperty("log.dirs", path + String.valueOf(index)); > return props; > } > > public static class Print<V> extends ProcessFunction<V, V> { > private static final ObjectMapper mapper = new ObjectMapper(); > public String prefix; > > public Print() { > this.prefix = ""; > } > > public Print(String prefix) { > this.prefix = prefix; > } > > @Override > public void processElement(V value, Context ctx, Collector<V> out) { > System.out.printf("%s ", prefix); > if (ctx != null) { > TimerService srv = ctx.timerService(); > Long timestampLong = ctx.timestamp(); > long timestamp = 0; > if (timestampLong != null) { > timestamp = timestampLong; > } > long watermark = 0; > if (srv != null) { > watermark = srv.currentWatermark(); > } > System.out.printf("[timestamp=%d watermark=%d] ", timestamp, > watermark); > } > > if (value == null) { > System.out.println("null"); > } else { > try { > System.out.println(new > String(mapper.writeValueAsBytes(value))); > } catch (Exception e) { > System.out.println("exception"); > e.printStackTrace(); > } > } > out.collect(value); > } > } > > @ClassRule > public static MiniClusterWithClientResource flinkCluster = > new MiniClusterWithClientResource( > new MiniClusterResourceConfiguration.Builder() > .setNumberSlotsPerTaskManager(2) > .setNumberTaskManagers(1) > .build()); > > @BeforeClass > public static void setup() { > env.setParallelism(1); > if (connected == 0) { > zooKeeper = new EmbeddedZookeeper(); > ServerProperties.setProperty("zookeeper.connect", "localhost:" + > zooKeeper.port()); > > server = TestUtils.createServer(new > KafkaConfig(getNewLogDir(ServerProperties)), new MockTime()); > admin = AdminClient.create(ProducerProperties); > } > connected += 1; > } > > @AfterClass > public static void tearDown() { > if (connected == 1) { > try { > server.shutdown(); > zooKeeper.shutdown(); > } catch (Exception e) { > e.printStackTrace(); > } > > zooKeeper = null; > server = null; > admin = null; > } > connected -= 1; > } > > @Test > public void testFail() throws Exception { > String inputTopic = "input"; > > Map<String, String> configs = new HashMap<>(); > int partitions = 1; > short replication = 1; > > CreateTopicsResult result = admin.createTopics(Arrays.asList( > new NewTopic(inputTopic, partitions, replication).configs(configs) > )); > result.all().get(); > > KafkaProducer<String, String> producer = new KafkaProducer<String, > String>(ProducerProperties, stringSerializer, stringSerializer); > ; > > DescribeTopicsResult topics = > admin.describeTopics(Arrays.asList(inputTopic)); > for (Map.Entry<String, TopicDescription> topic : > topics.all().get().entrySet()) { > System.out.printf("%s %d\n", topic.getValue().name(), > topic.getValue().partitions().size()); > System.out.println(topic.getValue().toString()); > } > > // Some subscription events > producer.send(new ProducerRecord<String, String>(inputTopic, 0, > Time.days(1).toMilliseconds(), "0", "test message")); > producer.send(new ProducerRecord<String, String>(inputTopic, 0, > Time.days(10).toMilliseconds(), "0", "test message")); > producer.send(new ProducerRecord<String, String>(inputTopic, 0, > Time.days(100).toMilliseconds(), "0", "test message")); > producer.send(new ProducerRecord<String, String>(inputTopic, 0, > Time.days(1000).toMilliseconds(), "0", "test message")); > producer.send(new ProducerRecord<String, String>(inputTopic, 0, > Long.MAX_VALUE, "0", "test message")); > producer.flush(); > producer.close(); > > Properties prop = new Properties(); > prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); > prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > prop.put("group.id", "0"); > prop.put("enable.auto.commit", "true"); > prop.put("auto.commit.interval.ms", "1000"); > prop.put("session.timeout.ms", "30000"); > FlinkKafkaConsumer<String> source = new > FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), prop); > source.assignTimestampsAndWatermarks( > new WatermarkStrategy<String>() { > @Override > public TimestampAssigner<String> > createTimestampAssigner(TimestampAssignerSupplier.Context context) { > return new TimestampAssigner<String>() { > @Override > public long extractTimestamp(String event, long > recordTimestamp) { > System.out.printf("Assigning timestamp %d\n", > recordTimestamp); > return recordTimestamp; > } > }; > } > > @Override > public WatermarkGenerator<String> > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { > return new WatermarkGenerator<String>() { > public long latestWatermark = Long.MIN_VALUE; > > @Override > public void onEvent(String event, long timestamp, > WatermarkOutput output) { > long eventWatermark = timestamp - > Time.days(1).toMilliseconds(); > if (eventWatermark > latestWatermark) { > System.out.printf("Emitting watermark %d\n", > eventWatermark); > output.emitWatermark(new > Watermark(eventWatermark)); > latestWatermark = eventWatermark; > } > } > > @Override > public void onPeriodicEmit(WatermarkOutput output) { > } > }; > } > }); > source.setStartFromEarliest(); > > env.addSource(source) > .process(new Print<String>("Source")); > > System.out.println(env.getExecutionPlan()); > JobClient client = null; > try { > client = env.executeAsync("Fail Test"); > } catch (Exception e) { > e.printStackTrace(); > throw e; > } > > topics = admin.describeTopics(Arrays.asList(inputTopic)); > for (Map.Entry<String, TopicDescription> topic : > topics.all().get().entrySet()) { > System.out.printf("%s %d\n", topic.getValue().name(), > topic.getValue().partitions().size()); > System.out.println(topic.getValue().toString()); > } > > TimeUnit.SECONDS.sleep(5); > client.cancel().get(5, TimeUnit.SECONDS); > } > } > >