Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153141352 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -64,6 +71,20 @@ public ZookeeperOffsetHandler(Properties props) { int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); + // set consumerId to register ownership in zookeeper, just like kafka high level API + UUID uuid = UUID.randomUUID(); + String hostName = "Unkonw"; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Can not get host name!"); --- End diff -- I would add the exception to the log also.
---