Hi, Xinyu In the logs I see:
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f spp-driver-0 -c spp-driver | grep 'obtained consensus successfully' INFO : JobModel version 53 obtained consensus successfully! INFO : JobModel version 54 obtained consensus successfully! MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f spp-driver-1 -c spp-driver | grep 'obtained consensus successfully' INFO : JobModel version 54 obtained consensus successfully! MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs spp-driver-2 -c spp-driver | grep 'obtained consensus successfully' INFO : JobModel version 53 obtained consensus successfully! INFO : JobModel version 54 obtained consensus successfully! And yes, I can connect to ZK: [zk: localhost:2181(CONNECTED) 1] ls /app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/processors [0000000041, 0000000043, 0000000042] [zk: localhost:2181(CONNECTED) 5] ls /app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/jobModelGeneration/jobModels [45, 46, 47, 48, 49, 50, 51, 52, 53, 54] [zk: localhost:2181(CONNECTED) 11] get /app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/jobModelGeneration/jobModels/54 ??t?{ "config" : { }, "containers" : { "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" : { "tasks" : { "Partition 3" : { "task-name" : "Partition 3", "system-stream-partitions" : [ { "system" : "kafka", "partition" : 3, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" }, { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 3, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" } ], "changelog-partition" : 5, "task-mode" : "Active" }, "Partition 6" : { "task-name" : "Partition 6", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 6, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 6, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 8, "task-mode" : "Active" }, "Partition 9" : { "task-name" : "Partition 9", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 9, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 9, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 11, "task-mode" : "Active" }, "Partition 10" : { "task-name" : "Partition 10", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 10, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 10, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 2, "task-mode" : "Active" } }, "processor-id" : "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" }, "49939588-6d2f-4b32-b462-b60cbbf193a2" : { "tasks" : { "Partition 0" : { "task-name" : "Partition 0", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 0, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 0, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 0, "task-mode" : "Active" }, "Partition 4" : { "task-name" : "Partition 4", "system-stream-partitions" : [ { "system" : "kafka", "partition" : 4, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" }, { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 4, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" } ], "changelog-partition" : 6, "task-mode" : "Active" }, "Partition 7" : { "task-name" : "Partition 7", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 7, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 7, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 9, "task-mode" : "Active" }, "Partition 11" : { "task-name" : "Partition 11", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 11, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 11, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 3, "task-mode" : "Active" } }, "processor-id" : "49939588-6d2f-4b32-b462-b60cbbf193a2" }, "99ff20c4-fb5f-4ada-b3f8-446054036ce7" : { "tasks" : { "Partition 1" : { "task-name" : "Partition 1", "system-stream-partitions" : [ { "system" : "kafka", "partition" : 1, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" }, { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 1, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" } ], "changelog-partition" : 1, "task-mode" : "Active" }, "Partition 2" : { "task-name" : "Partition 2", "system-stream-partitions" : [ { "system" : "kafka", "partition" : 2, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" }, { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 2, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" } ], "changelog-partition" : 4, "task-mode" : "Active" }, "Partition 5" : { "task-name" : "Partition 5", "system-stream-partitions" : [ { "system" : "kafka", "partition" : 5, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" }, { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 5, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" } ], "changelog-partition" : 7, "task-mode" : "Active" }, "Partition 8" : { "task-name" : "Partition 8", "system-stream-partitions" : [ { "system" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_", "partition" : 8, "stream" : "0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_" }, { "system" : "kafka", "partition" : 8, "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7" } ], "changelog-partition" : 10, "task-mode" : "Active" } }, "processor-id" : "99ff20c4-fb5f-4ada-b3f8-446054036ce7" } }, "max-change-log-stream-partitions" : 12, "all-container-locality" : { "49939588-6d2f-4b32-b462-b60cbbf193a2" : null, "99ff20c4-fb5f-4ada-b3f8-446054036ce7" : null, "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" : null } } cZxid = 0xe000003be ctime = Thu Aug 15 19:25:42 UTC 2019 mZxid = 0xe000003be mtime = Thu Aug 15 19:25:42 UTC 2019 pZxid = 0xe000003be cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 7677 numChildren = 0 [zk: localhost:2181(CONNECTED) 12] Please, let me know, if you need any other info ------------------ Michael Benenson From: Xinyu Liu <xi...@linkedin.com> Date: Thursday, August 15, 2019 at 2:38 PM To: "Benenson, Mikhail" <mikhail_benen...@intuit.com>, Prateek Maheshwari <pmaheshw...@linkedin.com>, Hai Lu <h...@linkedin.com> Cc: "Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Ho, Tom" <tom...@intuit.com>, "LeVeck, Matt" <matt_lev...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, Samarth Shetty <sshe...@linkedin.com>, "Audo, Nicholas" <nicholas_a...@intuit.com>, "Cesar, Scott" <scott_ce...@intuit.com>, Miguel Sanchez <misanc...@linkedin.com>, "Bansal, Ritesh" <ritesh_ban...@intuit.com>, Jagadish Venkatraman <jvenkatra...@linkedin.com>, "dev@samza.apache.org" <dev@samza.apache.org>, Shanthoosh Venkataraman <svenkatara...@linkedin.com>, Bharath Kumarasubramanian <bkumarasubraman...@linkedin.com> Subject: Re: Beam w Samza Runner : problem with partition assignment. This email is from an external sender. +Shanthoosh and Bharath for more investigation. @Michael: do you happen to have some tool that can connect to your zk? I am wondering how many samza job instances you see there. And if will be helpful if you can look into the JobModels node and see what's the last one looks like (maybe compared with the previous ones too). If the leader was disconnected from the followers, we should see the job model changing from assigning the partitions across all 3 hosts to only one host (leader) has all the partitions. Thanks, Xinyu ________________________________ From: Benenson, Mikhail <mikhail_benen...@intuit.com> Sent: Thursday, August 15, 2019 1:27 PM To: Xinyu Liu <xi...@linkedin.com>; Prateek Maheshwari <pmaheshw...@linkedin.com>; Hai Lu <h...@linkedin.com> Cc: Deshpande, Omkar <omkar_deshpa...@intuit.com>; Ho, Tom <tom...@intuit.com>; LeVeck, Matt <matt_lev...@intuit.com>; Xinyu Liu <xinyuliu...@gmail.com>; Samarth Shetty <sshe...@linkedin.com>; Audo, Nicholas <nicholas_a...@intuit.com>; Cesar, Scott <scott_ce...@intuit.com>; Miguel Sanchez <misanc...@linkedin.com>; Bansal, Ritesh <ritesh_ban...@intuit.com>; Jagadish Venkatraman <jvenkatra...@linkedin.com>; dev@samza.apache.org <dev@samza.apache.org> Subject: Beam w Samza Runner : problem with partition assignment. Hi, folks We are running 3 instances of the Beam 2.13.0 with Samza Runner 1.1.0 application, that read from kafka topic with 12 partitions. Two of these instances understand that there are 3 instances, and picks 4 partitions for processing: Instance 0: (3,6,9, 10) and instance 1: (1,2,5,8) But the third instance (ZK leader) somehow think that it is the only running instance, so it reads from all 12 partitions. This cause the situation, that some records are processed twice, on different instances, and this seems as incorrect behavior. This problem is very hard to reproduce, but I have it running now in my environment. After one instance has been restarted, here is new partition assignment: MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f spp-driver-1 -c spp-driver | grep 'Starting table manager in task instance Partition' INFO : Starting table manager in task instance Partition 3 INFO : Starting table manager in task instance Partition 6 INFO : Starting table manager in task instance Partition 9 INFO : Starting table manager in task instance Partition 10 MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f spp-driver-2 -c spp-driver | grep 'Starting table manager in task instance Partition' INFO : Starting table manager in task instance Partition 3 INFO : Starting table manager in task instance Partition 9 INFO : Starting table manager in task instance Partition 1 INFO : Starting table manager in task instance Partition 7 INFO : Starting table manager in task instance Partition 5 INFO : Starting table manager in task instance Partition 11 MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f spp-driver-0 -c spp-driver | grep 'Starting table manager in task instance Partition' INFO : Starting table manager in task instance Partition 6 INFO : Starting table manager in task instance Partition 0 INFO : Starting table manager in task instance Partition 4 INFO : Starting table manager in task instance Partition 8 INFO : Starting table manager in task instance Partition 10 INFO : Starting table manager in task instance Partition 2 Looks like two instances think that there are only two instances, so each process 6 partitions, but the third instance think there are three instances, so it process 4 partitions, so partition 3 is read by two instances, so data from partition 3 are processed twice. Any idea, what could cause this issue? Please, find the log from misbehaved node attached. Extract from Instance 0 log: logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:44.316; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 3 logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 6 logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 9 logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 10 Extract from Instance 1 log: logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:27.957; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 1 logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 2 logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 5 logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 8 Extract from Instance 2 log: INFO ZkClient-EventThread-66-zk-cs.data-strmprocess-spp-api-usw2-ppd-e2e.svc.cluster.local:2181 - 2019-08-15 16:28:15.092; - org.apache.samza.zk.ZkJobCoordinator - ZkJobCoordinator::onBecomeLeader - I became the leader … INFO ZkClient-EventThread-66-zk-cs.data-strmprocess-spp-api-usw2-ppd-e2e.svc.cluster.local:2181 - 2019-08-15 16:29:41.401; - org.apache.samza.zk.ZkJobCoordinator - ProcessorChangeHandler::handleChildChange - Path: /app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/processors Current Children: [0000000036, 0000000038, 0000000037] … logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.206; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 6 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 3 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 9 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 0 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 4 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 8 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 1 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 10 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 7 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 5 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 11 logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting table manager in task instance Partition 2 ------------------ Michael Benenson