Hi, Xinyu

In the logs I see:


MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f  
spp-driver-0 -c spp-driver | grep 'obtained consensus successfully'

 INFO : JobModel version 53 obtained consensus successfully!

 INFO : JobModel version 54 obtained consensus successfully!





MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f  
spp-driver-1 -c spp-driver | grep 'obtained consensus successfully'

 INFO : JobModel version 54 obtained consensus successfully!





MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs  
spp-driver-2 -c spp-driver | grep 'obtained consensus successfully'

 INFO : JobModel version 53 obtained consensus successfully!

 INFO : JobModel version 54 obtained consensus successfully!


And yes, I can connect to ZK:


[zk: localhost:2181(CONNECTED) 1] ls 
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/processors

[0000000041, 0000000043, 0000000042]



[zk: localhost:2181(CONNECTED) 5] ls 
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/jobModelGeneration/jobModels

[45, 46, 47, 48, 49, 50, 51, 52, 53, 54]


[zk: localhost:2181(CONNECTED) 11] get 
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/jobModelGeneration/jobModels/54

??t?{

  "config" : { },

  "containers" : {

    "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" : {

      "tasks" : {

        "Partition 3" : {

          "task-name" : "Partition 3",

          "system-stream-partitions" : [ {

            "system" : "kafka",

            "partition" : 3,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          }, {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 3,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          } ],

          "changelog-partition" : 5,

          "task-mode" : "Active"

        },

        "Partition 6" : {

          "task-name" : "Partition 6",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 6,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 6,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 8,

          "task-mode" : "Active"

        },

        "Partition 9" : {

          "task-name" : "Partition 9",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 9,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 9,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 11,

          "task-mode" : "Active"

        },

        "Partition 10" : {

          "task-name" : "Partition 10",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 10,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 10,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 2,

          "task-mode" : "Active"

        }

      },

      "processor-id" : "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a"

    },

    "49939588-6d2f-4b32-b462-b60cbbf193a2" : {

      "tasks" : {

        "Partition 0" : {

          "task-name" : "Partition 0",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 0,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 0,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 0,

          "task-mode" : "Active"

        },

        "Partition 4" : {

          "task-name" : "Partition 4",

          "system-stream-partitions" : [ {

            "system" : "kafka",

            "partition" : 4,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          }, {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 4,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          } ],

          "changelog-partition" : 6,

          "task-mode" : "Active"

        },

        "Partition 7" : {

          "task-name" : "Partition 7",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 7,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 7,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 9,

          "task-mode" : "Active"

        },

        "Partition 11" : {

          "task-name" : "Partition 11",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 11,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 11,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 3,

          "task-mode" : "Active"

        }

      },

      "processor-id" : "49939588-6d2f-4b32-b462-b60cbbf193a2"

    },

    "99ff20c4-fb5f-4ada-b3f8-446054036ce7" : {

      "tasks" : {

        "Partition 1" : {

          "task-name" : "Partition 1",

          "system-stream-partitions" : [ {

            "system" : "kafka",

            "partition" : 1,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          }, {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 1,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          } ],

          "changelog-partition" : 1,

          "task-mode" : "Active"

        },

        "Partition 2" : {

          "task-name" : "Partition 2",

          "system-stream-partitions" : [ {

            "system" : "kafka",

            "partition" : 2,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          }, {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 2,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          } ],

          "changelog-partition" : 4,

          "task-mode" : "Active"

        },

        "Partition 5" : {

          "task-name" : "Partition 5",

          "system-stream-partitions" : [ {

            "system" : "kafka",

            "partition" : 5,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          }, {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 5,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          } ],

          "changelog-partition" : 7,

          "task-mode" : "Active"

        },

        "Partition 8" : {

          "task-name" : "Partition 8",

          "system-stream-partitions" : [ {

            "system" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",

            "partition" : 8,

            "stream" : 
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"

          }, {

            "system" : "kafka",

            "partition" : 8,

            "stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"

          } ],

          "changelog-partition" : 10,

          "task-mode" : "Active"

        }

      },

      "processor-id" : "99ff20c4-fb5f-4ada-b3f8-446054036ce7"

    }

  },

  "max-change-log-stream-partitions" : 12,

  "all-container-locality" : {

    "49939588-6d2f-4b32-b462-b60cbbf193a2" : null,

    "99ff20c4-fb5f-4ada-b3f8-446054036ce7" : null,

    "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" : null

  }

}

cZxid = 0xe000003be

ctime = Thu Aug 15 19:25:42 UTC 2019

mZxid = 0xe000003be

mtime = Thu Aug 15 19:25:42 UTC 2019

pZxid = 0xe000003be

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 7677

numChildren = 0

[zk: localhost:2181(CONNECTED) 12]

Please, let me know, if you need any other info

------------------
Michael Benenson


From: Xinyu Liu <xi...@linkedin.com>
Date: Thursday, August 15, 2019 at 2:38 PM
To: "Benenson, Mikhail" <mikhail_benen...@intuit.com>, Prateek Maheshwari 
<pmaheshw...@linkedin.com>, Hai Lu <h...@linkedin.com>
Cc: "Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Ho, Tom" 
<tom...@intuit.com>, "LeVeck, Matt" <matt_lev...@intuit.com>, Xinyu Liu 
<xinyuliu...@gmail.com>, Samarth Shetty <sshe...@linkedin.com>, "Audo, 
Nicholas" <nicholas_a...@intuit.com>, "Cesar, Scott" <scott_ce...@intuit.com>, 
Miguel Sanchez <misanc...@linkedin.com>, "Bansal, Ritesh" 
<ritesh_ban...@intuit.com>, Jagadish Venkatraman <jvenkatra...@linkedin.com>, 
"dev@samza.apache.org" <dev@samza.apache.org>, Shanthoosh Venkataraman 
<svenkatara...@linkedin.com>, Bharath Kumarasubramanian 
<bkumarasubraman...@linkedin.com>
Subject: Re: Beam w Samza Runner : problem with partition assignment.

This email is from an external sender.

+Shanthoosh and Bharath for more investigation.

@Michael: do you happen to have some tool that can connect to your zk? I am 
wondering how many samza job instances you see there. And if will be helpful if 
you can look into the JobModels node and see what's the last one looks like 
(maybe compared with the previous ones too). If the leader was disconnected 
from the followers, we should see the job model changing from assigning the 
partitions across all 3 hosts to only one host (leader) has all the partitions.

Thanks,
Xinyu
________________________________
From: Benenson, Mikhail <mikhail_benen...@intuit.com>
Sent: Thursday, August 15, 2019 1:27 PM
To: Xinyu Liu <xi...@linkedin.com>; Prateek Maheshwari 
<pmaheshw...@linkedin.com>; Hai Lu <h...@linkedin.com>
Cc: Deshpande, Omkar <omkar_deshpa...@intuit.com>; Ho, Tom <tom...@intuit.com>; 
LeVeck, Matt <matt_lev...@intuit.com>; Xinyu Liu <xinyuliu...@gmail.com>; 
Samarth Shetty <sshe...@linkedin.com>; Audo, Nicholas 
<nicholas_a...@intuit.com>; Cesar, Scott <scott_ce...@intuit.com>; Miguel 
Sanchez <misanc...@linkedin.com>; Bansal, Ritesh <ritesh_ban...@intuit.com>; 
Jagadish Venkatraman <jvenkatra...@linkedin.com>; dev@samza.apache.org 
<dev@samza.apache.org>
Subject: Beam w Samza Runner : problem with partition assignment.


Hi, folks

We are running 3 instances of the Beam 2.13.0 with Samza Runner 1.1.0 
application, that read from kafka topic with 12 partitions.



Two of these instances understand that there are 3 instances, and picks 4 
partitions for processing: Instance 0: (3,6,9, 10) and instance 1: (1,2,5,8)

But the third instance (ZK leader) somehow think that it is the only running 
instance, so it reads from all 12 partitions.



This cause the situation, that some records are processed twice, on different 
instances, and this seems as incorrect behavior.



This problem is very hard to reproduce, but I have it running now in my 
environment.



After one instance has been restarted, here is new partition assignment:



MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f  
spp-driver-1 -c spp-driver | grep 'Starting table manager in task instance 
Partition'

 INFO : Starting table manager in task instance Partition 3

 INFO : Starting table manager in task instance Partition 6

 INFO : Starting table manager in task instance Partition 9

 INFO : Starting table manager in task instance Partition 10



MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f  
spp-driver-2 -c spp-driver | grep 'Starting table manager in task instance 
Partition'

 INFO : Starting table manager in task instance Partition 3

 INFO : Starting table manager in task instance Partition 9

 INFO : Starting table manager in task instance Partition 1

 INFO : Starting table manager in task instance Partition 7

 INFO : Starting table manager in task instance Partition 5

 INFO : Starting table manager in task instance Partition 11



MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f  
spp-driver-0 -c spp-driver | grep 'Starting table manager in task instance 
Partition'

 INFO : Starting table manager in task instance Partition 6

 INFO : Starting table manager in task instance Partition 0

 INFO : Starting table manager in task instance Partition 4

 INFO : Starting table manager in task instance Partition 8

 INFO : Starting table manager in task instance Partition 10

 INFO : Starting table manager in task instance Partition 2



Looks like two instances think that there are only  two instances, so each 
process 6 partitions, but the third instance think there are three instances, 
so it process 4 partitions, so partition 3 is read by two instances, so data 
from partition 3 are processed twice.



Any idea, what could cause this issue?



Please, find the log from misbehaved node attached.



Extract from Instance 0 log:

logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:44.316; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 3

logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 6

logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 9

logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 10



Extract from Instance 1 log:

logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:27.957; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 1

logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 2

logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 5

logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 8

Extract from Instance 2 log:

INFO 
ZkClient-EventThread-66-zk-cs.data-strmprocess-spp-api-usw2-ppd-e2e.svc.cluster.local:2181
 - 2019-08-15 16:28:15.092; - org.apache.samza.zk.ZkJobCoordinator - 
ZkJobCoordinator::onBecomeLeader - I became the leader

…

INFO 
ZkClient-EventThread-66-zk-cs.data-strmprocess-spp-api-usw2-ppd-e2e.svc.cluster.local:2181
 - 2019-08-15 16:29:41.401; - org.apache.samza.zk.ZkJobCoordinator - 
ProcessorChangeHandler::handleChildChange - Path: 
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/processors
 Current Children: [0000000036, 0000000038, 0000000037]

…

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.206; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 6

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 3

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 9

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 0

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 4

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 8

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 1

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 10

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 7

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 5

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 11

logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 - 
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting 
table manager in task instance Partition 2







------------------

Michael Benenson


Reply via email to