+ Xinyu. I think he's still working on merging the remaining 1.0 changes to the Beam runner.
On Tue, Mar 26, 2019 at 3:35 PM LeVeck, Matt <matt_lev...@intuit.com> wrote: > Thanks Prateek. I think I’ve switched all of the relevant libraries to > Beam Runner 2.12-SNAPSHOTl, and Samza 1.1.0. But for some reason it’s > still looking for a class, ContextManager that existed in Samza API 0.14.1, > and not in Samza 1.1.0. Any idea off the top of your head what I need to > change for it to stop looking for that? > > Error: > 2019-03-26 22:13:13 DEBUG PipelineOptionsFactory:296 - Provided Arguments: > {runner=[SamzaRunner], jobName=[spp-kabini-transformer], > maxSourceParallelism=[2], configFilePath=[config.properties]} > > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/samza/operators/ContextManager > > at java.lang.Class.getDeclaredMethods0(Native Method) > > at > java.lang.Class.privateGetDeclaredMethods(Class.java:2701) > > at java.lang.Class.getDeclaredMethod(Class.java:2128) > > at > org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:191) > > at > org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155) > > at > org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55) > > at org.apache.beam.sdk.Pipeline.create(Pipeline.java:145) > > at > com.intuit.idp.kabini.transformer.SampleJoiner.main(SampleJoiner.java:257) > > Caused by: java.lang.ClassNotFoundException: > org.apache.samza.operators.ContextManager > > at > java.net.URLClassLoader.findClass(URLClassLoader.java:381) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > ... 8 more > > Relevant snippets of pom: > <properties> > <powermock.version>1.5.1</powermock.version> > <beam.version>2.12.0-SNAPSHOT</beam.version> > <samza.version>1.1.0</samza.version> > > > > … > > And > <dependency> > <groupId>org.apache.samza</groupId> > <artifactId>samza-api</artifactId> > <version>${samza.version}</version> > <type>jar</type> > </dependency> > > <dependency> > <groupId>org.apache.samza</groupId> > <artifactId>samza-core_2.12</artifactId> > <version>${samza.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.samza</groupId> > <artifactId>samza-kafka_2.12</artifactId> > <version>${samza.version}</version> > <scope>runtime</scope> > </dependency> > > <dependency> > <groupId>org.apache.samza</groupId> > <artifactId>samza-kv_2.12</artifactId> > <version>${samza.version}</version> > <scope>runtime</scope> > </dependency> > > <dependency> > <groupId>org.apache.samza</groupId> > <artifactId>samza-kv-rocksdb_2.12</artifactId> > <version>${samza.version}</version> > <scope>runtime</scope> > </dependency> > > > > > > *From: *Prateek Maheshwari <prateek...@gmail.com> > *Date: *Tuesday, March 26, 2019 at 11:31 AM > *To: *"LeVeck, Matt" <matt_lev...@intuit.com> > *Cc: *"dev@samza.apache.org" <dev@samza.apache.org>, "Deshpande, Omkar" < > omkar_deshpa...@intuit.com>, "Audo, Nicholas" <nicholas_a...@intuit.com> > *Subject: *Re: SSL with Samza 0.14.1? > > > > This email is from an external sender. > > > > Hi Matt, > > > > You're right, the KafkaSystemConsumer in 0.14.1 does not support SSL since > it uses SimpleConsumer in the BrokerProxy. The new KafkaSystemConsumer in > Samza 1.0 does. > > > > Backporting this to 14.1 will be non-trivial. Can you upgrade to Samza 1.0 > to pick up the new consumer? I think Xinyu already shared a beam runner > snapshot build with 1.0 that you can try. > > > > - Prateek > > > > On Mon, Mar 25, 2019 at 1:58 PM LeVeck, Matt <matt_lev...@intuit.com> > wrote: > > So, this is far from definitive for SSL. However it is consistent with > what we would expect from an SSL error. We don’t get the error if I > instantiate a full fledged consumer, or use kafka-console-consumer with SSL > configs. We do see this error if we try a console consumer with the > deprecated interface of providing the zookeeper addresses instead of the > broker addresses. That, combined with the code I linked to in my previous > message (where samza builds a consumer that doesn’t take, and is not passed > any SSL configs) is why we think SSL is the issue. > > Thanks, > > Matt > 2019-03-25 20:53:41 DEBUG KafkaSystemAdmin:57 - Exception detail: > kafka.common.KafkaException: fetching topic metadata for topics > [Set(__samza_checkpoint_ver_1_for_spp-kabini-transformer_42)] from broker > [ArrayBuffer(BrokerEndPoint(0,sppkafka.data-lake-dev.a.intuit.com,19701), > BrokerEndPoint(2,sppkafka.data-lake-dev.a.intuit.com,19901), > BrokerEndPoint(1,sppkafka.data-lake-dev.a.intuit.com,19801))] failed at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:75) at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96) at > org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37) > at > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2$$anonfun$19.apply(KafkaSystemAdmin.scala:516) > at > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2$$anonfun$19.apply(KafkaSystemAdmin.scala:516) > at > org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52) > at > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2.apply(KafkaSystemAdmin.scala:516) > at > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$validateStream$2.apply(KafkaSystemAdmin.scala:514) > at > org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90) > at > org.apache.samza.system.kafka.KafkaSystemAdmin.validateStream(KafkaSystemAdmin.scala:513) > at > org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.createResources(KafkaCheckpointManager.scala:88) > at > org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:245) > at > org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerImpl.lambda$onBecomingLeader$0(ZkJobCoordinator.java:366) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:163) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103) > at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61) ... 20 > more > > > > *From: *"LeVeck, Matt" <matt_lev...@intuit.com> > *Date: *Monday, March 25, 2019 at 11:35 AM > *To: *Prateek Maheshwari <prateek...@gmail.com>, "dev@samza.apache.org" < > dev@samza.apache.org> > *Cc: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "Audo, Nicholas" < > nicholas_a...@intuit.com> > *Subject: *Re: SSL with Samza 0.14.1? > > > > Thanks Prateek > > I’ll grab the logs here shortly. But having looked at the code a bit > since my original email, I don’t see any possibility for getting that > config into the consumer based on this: > > https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L77 > and > > https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala > > -Matt > > > > *From: *Prateek Maheshwari <prateek...@gmail.com> > *Date: *Monday, March 25, 2019 at 10:02 AM > *To: *"dev@samza.apache.org" <dev@samza.apache.org> > *Cc: *"Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "LeVeck, Matt" < > matt_lev...@intuit.com>, "Audo, Nicholas" <nicholas_a...@intuit.com> > *Subject: *Re: SSL with Samza 0.14.1? > > > > This email is from an external sender. > > > > Hi Matt, > > > > It's possible that the old Kafka AdminClient does not support SSL for ZK > out of the box. I'll check if this is the case, and if this is something > that can be configured. > > > > In the mean time, can you tell us the following: > > 1. Kafka broker version you're running. > > 2. Kafka client version for the job. > > 3. Stacktrace where you see the SSL connect errors. > > > > Thanks, > > Prateek > > > > > > > > On Mon, Mar 25, 2019 at 9:47 AM Prateek Maheshwari <prateek...@gmail.com> > wrote: > > Forwarding again. Original email did not show up on the OSS mailing list. > > ---------- Forwarded message --------- > From: *Deshpande, Omkar* <omkar_deshpa...@intuit.com> > Date: Fri, Mar 22, 2019 at 5:08 PM > Subject: Fwd: SSL with Samza 0.14.1? > To: prateek...@gmail.com <prateek...@gmail.com> > > > > ++Prateek gmail > ------------------------------ > > *From:* LeVeck, Matt > *Sent:* Thursday, March 21, 2019 10:33:11 PM > *To:* dev@samza.apache.org; pmaheshw...@linkedin.com; Deshpande, Omkar; > Audo, Nicholas > *Subject:* SSL with Samza 0.14.1? > > > > Prateek, Samza dev team, > > This is Matt from Intuit. We met briefly at the beginning of this > week’s meetup. I’m wondering if you could help give us some guidance on > Kafka SSL with Samza. Here, I’m talking about the Kafka cluster that Samza > uses to store checkpoints, etc. We’re trying to connect to a cluster that > has SSL enabled, and we’re getting some errors that are indicative of SSL > connectivity failing. It might just be that our properties file isn’t > correct. But we’re a wondering if there is another possibility. This > indicates that Samza 0.14.1 uses Kafka 0.11 which should have SSL support. > But Samza 0.14.1 also requires access to zookeeper for its consumer client, > which is indicative of older clients (see > https://samza.apache.org/learn/documentation/0.14/jobs/configuration-table.html#kafka). > Is it possible that Samza 0.14.1 doesn’t support SSL for Kafka when > creating its checkpoint topics? > > Anyways, I’m hoping that’s not the case, and either our config is wrong or > we’re doing something else wrong. Here is our properties snippet in case > we’ve messed up the config key names. Any guidance is appreciated. > > > # Kafka System > > systems.kafka.zookeeper.connect= > sppzookeeper.data-lake-dev.a.intuit.com:2181, > sppzookeeper.data-lake-dev.a.intuit.com:2182, > sppzookeeper.data-lake-dev.a.intuit.com:2183 > > systems.kafka.security.protocol=SSL > > systems.kafka.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 > > systems.kafka.ssl.truststore.type=JKS > > systems.kafka.ssl.truststore.location=/home/appuser/spp/kabini.jks > > systems.kafka.ssl.truststore.password=Intuit01 > > systems.kafka.bootstrap.servers=sppkafka.data-lake-dev.a.intuit.com:19701, > sppkafka.data-lake-dev.a.intuit.com:19801, > sppkafka.data-lake-dev.a.intuit.com:19901 > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > > > We’ve also tried adding producer and consumer specific entries: > > > > systems.kafka.producer.security.protocol=SSL > > systems.kafka.producer.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 > > systems.kafka.producer.ssl.truststore.type=JKS > > systems.kafka.producer.ssl.truststore.location=/home/appuser/spp/kabini.jks > > systems.kafka.producer.ssl.truststore.password=Intuit01 > > systems.kafka.producer.bootstrap.servers= > sppkafka.data-lake-dev.a.intuit.com:19701, > sppkafka.data-lake-dev.a.intuit.com:19801, > sppkafka.data-lake-dev.a.intuit.com:19901 > > systems.kafka.consumer.zookeeper.connect= > sppzookeeper.data-lake-dev.a.intuit.com:2181, > sppzookeeper.data-lake-dev.a.intuit.com:2182, > sppzookeeper.data-lake-dev.a.intuit.com:2183 > > systems.kafka.consumer.security.protocol=SSL > > systems.kafka.consumer.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 > > systems.kafka.consumer.ssl.truststore.type=JKS > > systems.kafka.consumer.ssl.truststore.location=/home/appuser/spp/kabini.jks > > systems.kafka.consumer.ssl.truststore.password=Intuit01 > > systems.kafka.consumer.bootstrap.servers= > sppkafka.data-lake-dev.a.intuit.com:19701, > sppkafka.data-lake-dev.a.intuit.com:19801, > sppkafka.data-lake-dev.a.intuit.com:19901 > > systems.kafka.zookeeper.connect= > sppzookeeper.data-lake-dev.a.intuit.com:2181, > sppzookeeper.data-lake-dev.a.intuit.com:2182, > sppzookeeper.data-lake-dev.a.intuit.com:2183 > > systems.kafka.security.protocol=SSL > > systems.kafka.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 > > systems.kafka.ssl.truststore.type=JKS > > systems.kafka.ssl.truststore.location=/home/appuser/spp/kabini.jks > > systems.kafka.ssl.truststore.password=Intuit01 > > systems.kafka.bootstrap.servers=sppkafka.data-lake-dev.a.intuit.com:19701, > sppkafka.data-lake-dev.a.intuit.com:19801, > sppkafka.data-lake-dev.a.intuit.com:19901 > > Thanks, > > Matt > >