For reference: self answered on [1].

Turns out that Flink 1.12 defaults the TimeCharacteristic to EventTime and
> deprecates the whole TimeCharacteristic flow. So to downgrade to Flink
> 1.11, you must add the following statement to configure the
> StreamExecutionEnvironment.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
[1] https://stackoverflow.com/a/67111541/10299342

On Thu, Apr 15, 2021 at 12:08 AM Edward Bingham <edward.bing...@siden.io>
wrote:

> Hi everyone,
>
> I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
> Flink processors using Flink 1.12, and tried to get them working on Amazon
> EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
> went to downgrade, I found, inexplicably, that watermarks were no longer
> propagating.
>
> There is only one partition on the topic, and parallelism is set to 1. Is
> there something I'm missing here? I feel like I'm going a bit crazy.
>
> I've cross-posted this on stackoverflow, but I figure the mailing list is
> probably a better avenue for this question.
>
> Thanks,
> Ned
>
>
> Here's the output for Flink 1.12 (correctly propagating the watermark):
>
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: Custom Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Custom Source",
>     "parallelism" : 1
>   }, {
>     "id" : 2,
>     "type" : "Process",
>     "pact" : "Operator",
>     "contents" : "Process",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> Assigning timestamp 86400000
> Source [timestamp=86400000 watermark=-9223372036854775808] "test message"
> Emitting watermark 0
> Assigning timestamp 864000000
> Source [timestamp=864000000 watermark=0] "test message"
> Emitting watermark 777600000
> Assigning timestamp 8640000000
> Source [timestamp=8640000000 watermark=777600000] "test message"
> Emitting watermark 8553600000
> Assigning timestamp 86400000000
> Source [timestamp=86400000000 watermark=8553600000] "test message"
> Emitting watermark 86313600000
> Assigning timestamp 9223372036854775807
> Source [timestamp=9223372036854775807 watermark=86313600000] "test message"
> Emitting watermark 9223372036768375807
>
> And here is the output for Flink 1.11 (not propagating the watermark):
>
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: Custom Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Custom Source",
>     "parallelism" : 1
>   }, {
>     "id" : 2,
>     "type" : "Process",
>     "pact" : "Operator",
>     "contents" : "Process",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> Assigning timestamp 86400000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 0
> Assigning timestamp 864000000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 777600000
> Assigning timestamp 8640000000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 8553600000
> Assigning timestamp 86400000000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 86313600000
> Assigning timestamp 9223372036854775807
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 9223372036768375807
>
> Here's the integration test that exposes it:
>
> package mytest;
> import com.fasterxml.jackson.core.JsonProcessingException;import 
> com.fasterxml.jackson.databind.ObjectMapper;
> import java.io.FileInputStream;import java.io.InputStream;import 
> java.io.IOException;
> import java.nio.file.Files;import java.nio.file.Paths;
> import java.text.SimpleDateFormat;
> import java.util.Arrays;import java.util.concurrent.CompletableFuture;import 
> java.util.concurrent.TimeUnit;import java.util.Date;import 
> java.util.HashMap;import java.util.Map;import java.util.Properties;
> import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
> import kafka.utils.MockTime;import kafka.utils.TestUtils;
> import kafka.zk.EmbeddedZookeeper;
> import org.apache.flink.api.common.eventtime.TimestampAssigner;import 
> org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import 
> org.apache.flink.api.common.eventtime.Watermark;import 
> org.apache.flink.api.common.eventtime.WatermarkGenerator;import 
> org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import 
> org.apache.flink.api.common.eventtime.WatermarkOutput;import 
> org.apache.flink.api.common.eventtime.WatermarkStrategy;import 
> org.apache.flink.api.common.JobExecutionResult;import 
> org.apache.flink.api.common.serialization.SimpleStringSchema;import 
> org.apache.flink.core.execution.JobClient;import 
> org.apache.flink.runtime.client.JobCancellationException;import 
> org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
> org.apache.flink.streaming.api.functions.ProcessFunction;import 
> org.apache.flink.streaming.api.functions.ProcessFunction.Context;import 
> org.apache.flink.streaming.api.TimerService;import 
> org.apache.flink.streaming.api.windowing.time.Time;import 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import 
> org.apache.flink.test.util.MiniClusterWithClientResource;import 
> org.apache.flink.util.Collector;import 
> org.apache.kafka.clients.admin.AdminClient;import 
> org.apache.kafka.clients.admin.CreateTopicsResult;import 
> org.apache.kafka.clients.admin.DescribeTopicsResult;import 
> org.apache.kafka.clients.admin.NewTopic;import 
> org.apache.kafka.clients.admin.TopicDescription;import 
> org.apache.kafka.clients.producer.KafkaProducer;import 
> org.apache.kafka.clients.producer.ProducerRecord;import 
> org.apache.kafka.common.serialization.Serializer;import 
> org.apache.kafka.common.serialization.StringDeserializer;import 
> org.apache.kafka.common.serialization.StringSerializer;import 
> org.apache.kafka.streams.StreamsConfig;
> import org.junit.*;
> public class FailTest {
>     private static EmbeddedZookeeper zooKeeper = null;
>     private static KafkaServer server = null;
>     public static AdminClient admin = null;
>     private static int connected = 0;
>
>     private static StringSerializer stringSerializer = new StringSerializer();
>     private static StringDeserializer stringDeserializer = new 
> StringDeserializer();
>
>     private static final Properties ZooKeeperProperties = 
> getZooKeeperProperties();
>     private static final Properties ServerProperties        = 
> getServerProperties();
>     private static final Properties ProducerProperties  = 
> getProducerProperties();
>
>     public static StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>     public static Properties getProducerProperties() {
>         // Use Kafka provided properties
>         Properties result = new Properties();
>         result.put("bootstrap.servers", "localhost:9092");
>         result.put("compression.type", "none");
>         return result;
>     }
>
>     public static Properties getServerProperties() {
>         // Use Kafka provided properties
>         Properties result = new Properties();
>         result.put("broker.id", "0");
>         result.put("num.network.threads", "3");
>         result.put("num.io.threads", "8");
>         result.put("socket.send.buffer.bytes", "102400");
>         result.put("socket.recv.buffer.bytes", "102400");
>         result.put("log.dirs", "target/kafka-logs");
>         result.put("num.partitions", "1");
>         result.put("offset.topic.replication.factor", "1");
>         result.put("transaction.state.log.replication.factor", "1");
>         result.put("transaction.state.log.min.isr", "1");
>         result.put("auto.create.topics.enable", "true");
>         result.put("log.retention.hours", "168");
>         result.put("log.segment.bytes", "1073741824");
>         result.put("log.retention.check.interval.ms", "300000");
>         result.put("zookeeper.connect", "localhost:2181");
>         result.put("zookeeper.connection.timeout.ms", "18000");
>         result.put("group.initial.rebalance.delay.ms", "0");
>         return result;
>     }
>
>     public static Properties getZooKeeperProperties() {
>         // Use Kafka provided properties
>         Properties result = new Properties();
>         result.put("dataDir", "/tmp/zookeeper");
>         result.put("clientPort", "2181");
>         result.put("maxClientCnxns", "0");
>         result.put("admin.enableServer", "false");
>         return result;
>     }
>
>     private static Properties getNewLogDir(Properties props) {
>         String path = props.getProperty("log.dirs");
>         path = path + "/run.";
>         int index = 0;
>         boolean done = false;
>         while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
>             index += 1;
>         }
>         props.setProperty("log.dirs", path + String.valueOf(index));
>         return props;
>     }
>
>     public static class Print<V> extends ProcessFunction<V, V> {
>         private static final ObjectMapper mapper = new ObjectMapper();
>         public String prefix;
>
>         public Print() {
>             this.prefix = "";
>         }
>
>         public Print(String prefix) {
>             this.prefix = prefix;
>         }
>
>         @Override
>         public void processElement(V value, Context ctx, Collector<V> out) {
>             System.out.printf("%s ", prefix);
>             if (ctx != null) {
>                 TimerService srv = ctx.timerService();
>                 Long timestampLong = ctx.timestamp();
>                 long timestamp = 0;
>                 if (timestampLong != null) {
>                     timestamp = timestampLong;
>                 }
>                 long watermark = 0;
>                 if (srv != null) {
>                     watermark = srv.currentWatermark();
>                 }
>                 System.out.printf("[timestamp=%d watermark=%d] ", timestamp, 
> watermark);
>             }
>
>             if (value == null) {
>                 System.out.println("null");
>             } else {
>                 try {
>                     System.out.println(new 
> String(mapper.writeValueAsBytes(value)));
>                 } catch (Exception e) {
>                     System.out.println("exception");
>                     e.printStackTrace();
>                 }
>             }
>             out.collect(value);
>         }
>     }
>
>     @ClassRule
>     public static MiniClusterWithClientResource flinkCluster =
>         new MiniClusterWithClientResource(
>             new MiniClusterResourceConfiguration.Builder()
>                 .setNumberSlotsPerTaskManager(2)
>                 .setNumberTaskManagers(1)
>                 .build());
>
>     @BeforeClass
>     public static void setup() {
>         env.setParallelism(1);
>         if (connected == 0) {
>             zooKeeper = new EmbeddedZookeeper();
>             ServerProperties.setProperty("zookeeper.connect", "localhost:" + 
> zooKeeper.port());
>
>             server = TestUtils.createServer(new 
> KafkaConfig(getNewLogDir(ServerProperties)), new MockTime());
>             admin = AdminClient.create(ProducerProperties);
>         }
>         connected += 1;
>     }
>
>     @AfterClass
>     public static void tearDown() {
>         if (connected == 1) {
>             try {
>                 server.shutdown();
>                 zooKeeper.shutdown();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>
>             zooKeeper = null;
>             server = null;
>             admin = null;
>         }
>         connected -= 1;
>     }
>
>     @Test
>     public void testFail() throws Exception {
>         String inputTopic = "input";
>
>         Map<String, String> configs = new HashMap<>();
>         int partitions = 1;
>         short replication = 1;
>
>         CreateTopicsResult result = admin.createTopics(Arrays.asList(
>             new NewTopic(inputTopic, partitions, replication).configs(configs)
>         ));
>         result.all().get();
>
>         KafkaProducer<String, String> producer = new KafkaProducer<String, 
> String>(ProducerProperties, stringSerializer, stringSerializer);
> ;
>
>         DescribeTopicsResult topics = 
> admin.describeTopics(Arrays.asList(inputTopic));
>         for (Map.Entry<String, TopicDescription> topic : 
> topics.all().get().entrySet()) {
>             System.out.printf("%s %d\n", topic.getValue().name(), 
> topic.getValue().partitions().size());
>             System.out.println(topic.getValue().toString());
>         }
>
>         // Some subscription events
>         producer.send(new ProducerRecord<String, String>(inputTopic, 0, 
> Time.days(1).toMilliseconds(), "0", "test message"));
>         producer.send(new ProducerRecord<String, String>(inputTopic, 0, 
> Time.days(10).toMilliseconds(), "0", "test message"));
>         producer.send(new ProducerRecord<String, String>(inputTopic, 0, 
> Time.days(100).toMilliseconds(), "0", "test message"));
>         producer.send(new ProducerRecord<String, String>(inputTopic, 0, 
> Time.days(1000).toMilliseconds(), "0", "test message"));
>         producer.send(new ProducerRecord<String, String>(inputTopic, 0, 
> Long.MAX_VALUE, "0", "test message"));
>         producer.flush();
>         producer.close();
>
>         Properties prop = new Properties();
>         prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
>         prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>         prop.put("group.id", "0");
>         prop.put("enable.auto.commit", "true");
>         prop.put("auto.commit.interval.ms", "1000");
>         prop.put("session.timeout.ms", "30000");
>         FlinkKafkaConsumer<String> source = new 
> FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), prop);
>         source.assignTimestampsAndWatermarks(
>             new WatermarkStrategy<String>() {
>                 @Override
>                 public TimestampAssigner<String> 
> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>                     return new TimestampAssigner<String>() {
>                         @Override
>                         public long extractTimestamp(String event, long 
> recordTimestamp) {
>                             System.out.printf("Assigning timestamp %d\n", 
> recordTimestamp);
>                             return recordTimestamp;
>                         }
>                     };
>                 }
>
>                 @Override
>                 public WatermarkGenerator<String> 
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>                     return new WatermarkGenerator<String>() {
>                         public long latestWatermark = Long.MIN_VALUE;
>
>                         @Override
>                         public void onEvent(String event, long timestamp, 
> WatermarkOutput output) {
>                             long eventWatermark = timestamp - 
> Time.days(1).toMilliseconds();
>                             if (eventWatermark > latestWatermark) {
>                                 System.out.printf("Emitting watermark %d\n", 
> eventWatermark);
>                                 output.emitWatermark(new 
> Watermark(eventWatermark));
>                                 latestWatermark = eventWatermark;
>                             }
>                         }
>
>                         @Override
>                         public void onPeriodicEmit(WatermarkOutput output) {
>                         }
>                     };
>                 }
>             });
>         source.setStartFromEarliest();
>
>         env.addSource(source)
>             .process(new Print<String>("Source"));
>
>         System.out.println(env.getExecutionPlan());
>         JobClient client = null;
>         try {
>             client = env.executeAsync("Fail Test");
>         } catch (Exception e) {
>             e.printStackTrace();
>             throw e;
>         }
>
>         topics = admin.describeTopics(Arrays.asList(inputTopic));
>         for (Map.Entry<String, TopicDescription> topic : 
> topics.all().get().entrySet()) {
>             System.out.printf("%s %d\n", topic.getValue().name(), 
> topic.getValue().partitions().size());
>             System.out.println(topic.getValue().toString());
>         }
>
>         TimeUnit.SECONDS.sleep(5);
>         client.cancel().get(5, TimeUnit.SECONDS);
>     }
> }
>
>

Reply via email to