Github user taizilongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/5050#discussion_r153426741 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java --- @@ -148,4 +171,17 @@ public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String } } } + + public static void registerPartitionOwnership(CuratorFramework curatorClient, String groupId, String consumerId, String topic, int partition, String taskId) throws Exception { + String path = "/consumers/" + groupId + "/owners/" + topic + "/" + Integer.toString(partition); + // register with task info that we can read from zookeeper which taskmanager consume the right topic + String info = consumerId + "_" + taskId; + try { + if (curatorClient.checkExists().forPath(path) == null) { + curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, info.getBytes()); + } + } catch (KeeperException.NodeExistsException e) { + LOG.warn("NodeExists for {}", e); --- End diff -- Thanks to review, I will edit them
---