Yes on Local mode both from intelli and using spark-submit on my machine and on a windows machine work.
I have noticed the following error when adding this in the above spark-submit for k8 --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \ :: resolving dependencies :: org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0 confs: [default] Exception in thread "main" java.io.FileNotFoundException: /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml (No such file or directory) is there some way to verify that the k8 installation is correct ? Other spark processes that do not have streaming involved do work correctly. On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Hi, > > > Have you tried this on local mode as opposed to Kubernetes to see if it > works? > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <stevo...@gmail.com> > wrote: > >> Hello Jacek, >> >> Yes this is a spark-streaming. >> I have removed all code and created a new project with just the base >> code that is enough to open a stream and loop over it to see what i am >> doing wrong. >> >> Not adding the packages would result me in the following error >> >> 21/09/06 08:10:41 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1): >> java.lang.ClassNotFoundException: >> org.apache.spark.streaming.kafka010.KafkaRDDPartition >> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >> >> at java.lang.Class.forName0(Native Method) >> >> at java.lang.Class.forName(Class.java:348) >> >> at >> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) >> >> >> Which should not really be the case cause this should be included in the >> kubernetes pod. Anyway I can confirm this ? >> >> >> So my simple class is as follow : >> >> >> streamingContext = new JavaStreamingContext(javaSparkContext, >> Durations.seconds(5)); >> >> stream = KafkaUtils.createDirectStream(streamingContext, >> LocationStrategies.PreferConsistent(), >> ConsumerStrategies.Subscribe(topics, kafkaConfiguration)); >> >> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) >> rdd -> { >> try { >> rdd.foreachPartition(partition -> { >> while (partition.hasNext()) { >> ConsumerRecord<String, byte[]> consumerRecord = partition.next(); >> LOGGER.info("WORKING " + consumerRecord.topic() >> +consumerRecord.partition() + ": "+consumerRecord.offset()); >> } >> }); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> }); >> >> streamingContext.start(); >> try { >> streamingContext.awaitTermination(); >> } catch (InterruptedException e) { >> e.printStackTrace(); >> } finally { >> streamingContext.stop(); >> javaSparkContext.stop(); >> } >> >> >> This is all there is too the class which is a java boot @Component. >> >> Now in order my pom is as such >> >> <?xml version="1.0" encoding="UTF-8"?> >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>com.kafka</groupId> >> <artifactId>SimpleKafkaStream</artifactId> >> <version>1.0</version> >> >> <packaging>jar</packaging> >> >> <properties> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> >> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> >> <maven.compiler.source>8</maven.compiler.source> >> <maven.compiler.target>8</maven.compiler.target> >> <start-class>com.kafka.Main</start-class> >> </properties> >> >> <parent> >> <groupId>org.springframework.boot</groupId> >> <artifactId>spring-boot-starter-parent</artifactId> >> <version>2.4.2</version> >> <relativePath/> >> </parent> >> >> <dependencies> >> <dependency> >> <groupId>org.springframework.boot</groupId> >> <artifactId>spring-boot-starter</artifactId> >> <exclusions> >> <exclusion> >> <groupId>org.springframework.boot</groupId> >> <artifactId>spring-boot-starter-logging</artifactId> >> </exclusion> >> </exclusions> >> </dependency> >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-core_2.12</artifactId> >> <version>3.1.2</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> >> <version>3.1.2</version> >> <scope>provided</scope> >> </dependency> >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-streaming_2.12</artifactId> >> <version>3.1.2</version> >> </dependency> >> >> </dependencies> >> >> <build> >> <plugins> >> <plugin> >> <groupId>org.springframework.boot</groupId> >> <artifactId>spring-boot-maven-plugin</artifactId> >> </plugin> >> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-compiler-plugin</artifactId> >> <version>3.8.1</version> >> <configuration> >> <source>1.8</source> >> <target>1.8</target> >> </configuration> >> </plugin> >> >> </plugins> >> </build> >> >> </project> >> >> a simple pom that even the spark-streaming-kafka-0-10_2.12 scope is >> provided or not it would stilly give the same error. >> >> I have tried to build an uber jar in order to test with that but i was >> still unable to make it work as such : >> >> <build> >> <plugins> >> <plugin> >> <groupId>org.springframework.boot</groupId> >> <artifactId>spring-boot-maven-plugin</artifactId> >> <configuration> >> <fork>true</fork> >> <mainClass>com.kafka.Main</mainClass> >> </configuration> >> <executions> >> <execution> >> <goals> >> <goal>repackage</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> <plugin> >> <artifactId>maven-assembly-plugin</artifactId> >> <version>3.2.0</version> >> <configuration> >> <descriptorRefs> >> <descriptorRef>dependencies</descriptorRef> >> </descriptorRefs> >> <archive> >> <manifest> >> <addClasspath>true</addClasspath> >> <mainClass>com.kafka.Main</mainClass> >> </manifest> >> </archive> >> </configuration> >> <executions> >> <execution> >> <id>make-assembly</id> >> <phase>package</phase> >> <goals> >> <goal>single</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-compiler-plugin</artifactId> >> <version>3.8.1</version> >> <configuration> >> <source>1.8</source> >> <target>1.8</target> >> </configuration> >> </plugin> >> >> </plugins> >> >> </build> >> >> I am open to any suggestions and implementations in why this is not >> working and what needs to be done. >> >> >> Thank you for your time, >> >> Stelios >> >> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote: >> >>> Hi, >>> >>> No idea still, but noticed >>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars >>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" >>> \" that bothers me quite a lot. >>> >>> First of all, it's a Spark Streaming (not Structured Streaming) app. >>> Correct? Please upgrade at your earliest convenience since it's no longer >>> in active development (if supported at all). >>> >>> Secondly, why are these jars listed explicitly since they're part of >>> Spark? You should not really be doing such risky config changes (unless >>> you've got no other choice and you know what you're doing). >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://about.me/JacekLaskowski >>> "The Internals Of" Online Books <https://books.japila.pl/> >>> Follow me on https://twitter.com/jaceklaskowski >>> >>> <https://twitter.com/jaceklaskowski> >>> >>> >>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <stevo...@gmail.com> >>> wrote: >>> >>>> Yes you are right. >>>> I am using Spring Boot for this. >>>> >>>> The same does work for the event that does not involve any kafka >>>> events. But again i am not sending out extra jars there so nothing is >>>> replaced and we are using the default ones. >>>> >>>> If i do not use the userClassPathFirst which will force the service to >>>> use the newer version i will end up with the same problem >>>> >>>> We are using protobuf v3+ and as such we need to push that version >>>> since apache core uses an older version. >>>> >>>> So all we should really need is the following : --jars >>>> "protobuf-java-3.17.3.jar" \ >>>> and here we need the userClassPathFirst=true in order to use the latest >>>> version. >>>> >>>> >>>> Using only this jar as it works on local or no jars defined we ended up >>>> with the following error. >>>> >>>> 21/08/31 10:53:40 WARN org.apache.spark.scheduler.TaskSetManager: >>>> Lost task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1): >>>> java.lang.ClassNotFoundException: >>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition >>>> >>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>> >>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>> >>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>> >>>> at java.base/java.lang.Class.forName0(Native Method) >>>> >>>> at java.base/java.lang.Class.forName(Unknown Source) >>>> >>>> at >>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) >>>> >>>> >>>> >>>> >>>> Which can be resolved with passing more jars. >>>> >>>> >>>> Any idea about this error ? >>>> >>>> K8 does not seem to like this, but Java Spring should be the one that >>>> is responsible for the version but it seems K8 does not like this versions. >>>> >>>> Perhaps miss configuration on K8 ? >>>> >>>> I haven't set that up so i am not aware of what was done there. >>>> >>>> >>>> >>>> For downgrading to java 8 on my K8 might not be so easy. I want to >>>> explore if there is something else before doing that as we will need to >>>> spin off new instances of K8 to check that. >>>> >>>> >>>> >>>> Thank you for the time taken >>>> >>>> >>>> >>>> >>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote: >>>> >>>>> Hi Stelios, >>>>> >>>>> I've never seen this error before, but a couple of things caught >>>>> my attention that I would look at closer to chase the root cause of the >>>>> issue. >>>>> >>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:" >>>>> and "21/08/31 07:28:42 ERROR org.springframework.boot.SpringApplication: >>>>> Application run failed" seem to indicate that you're using Spring Boot >>>>> (that I know almost nothing about so take the following with a pinch of >>>>> salt :)) >>>>> >>>>> Spring Boot manages the classpath by itself and together with another >>>>> interesting option in your >>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how >>>>> much this exception: >>>>> >>>>> > org.apache.spark.scheduler.ExternalClusterManager: >>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>>> subtype >>>>> >>>>> could be due to casting compatible types from two different >>>>> classloaders? >>>>> >>>>> Just a thought but wanted to share as I think it's worth investigating. >>>>> >>>>> Pozdrawiam, >>>>> Jacek Laskowski >>>>> ---- >>>>> https://about.me/JacekLaskowski >>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>> Follow me on https://twitter.com/jaceklaskowski >>>>> >>>>> <https://twitter.com/jaceklaskowski> >>>>> >>>>> >>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <stevo...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have been facing the current issue for some time now and I was >>>>>> wondering if someone might have some inside on how I can resolve the >>>>>> following. >>>>>> >>>>>> The code (java 11) is working correctly on my local machine but >>>>>> whenever I try to launch the following on K8 I am getting the following >>>>>> error. >>>>>> >>>>>> 21/08/31 07:28:42 ERROR org.apache.spark.SparkContext: Error >>>>>> initializing SparkContext. >>>>>> >>>>>> java.util.ServiceConfigurationError: >>>>>> org.apache.spark.scheduler.ExternalClusterManager: >>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>>>> subtype >>>>>> >>>>>> >>>>>> >>>>>> I have a spark that will monitor some directories and handle the data >>>>>> accordingly. >>>>>> >>>>>> That part is working correctly on K8 and the SparkContext has no >>>>>> issue being initialized there. >>>>>> >>>>>> >>>>>> This is the spark-submit for that >>>>>> >>>>>> >>>>>> spark-submit \ >>>>>> --master=k8s://https://url:port \ >>>>>> --deploy-mode cluster \ >>>>>> --name a-name\ >>>>>> --conf spark.driver.userClassPathFirst=true \ >>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \ >>>>>> --files "application-dev.properties,keystore.jks,truststore.jks" \ >>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \ >>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ >>>>>> --conf spark.kubernetes.namespace=spark \ >>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \ >>>>>> --conf spark.dynamicAllocation.enabled=false \ >>>>>> --driver-memory 525m --executor-memory 525m \ >>>>>> --num-executors 1 --executor-cores 1 \ >>>>>> target/SparkStream.jar continuous-merge >>>>>> >>>>>> >>>>>> My issue comes when I try to launch the service in order to listen to >>>>>> kafka events and store them in HDFS. >>>>>> >>>>>> >>>>>> spark-submit \ >>>>>> --master=k8s://https://url:port \ >>>>>> --deploy-mode cluster \ >>>>>> --name consume-data \ >>>>>> --conf spark.driver.userClassPathFirst=true \ >>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\ >>>>>> --files "application-dev.properties,keystore.jks,truststore.jks" \ >>>>>> --jars >>>>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" >>>>>> \ >>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \ >>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ >>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \ >>>>>> --conf spark.kubernetes.namespace=spark \ >>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \ >>>>>> --conf spark.dynamicAllocation.enabled=false \ >>>>>> --driver-memory 1g --executor-memory 1g \ >>>>>> --num-executors 1 --executor-cores 1 \ >>>>>> target/SparkStream.jar consume >>>>>> >>>>>> >>>>>> It could be that I am launching the application wrongly or perhaps >>>>>> that my K8 is not configured correctly ? >>>>>> >>>>>> >>>>>> >>>>>> I have stripped down my code and left it barebone and will end up >>>>>> with the following issue : >>>>>> >>>>>> >>>>>> 21/08/31 07:28:42 ERROR org.apache.spark.SparkContext: Error >>>>>> initializing SparkContext. >>>>>> >>>>>> java.util.ServiceConfigurationError: >>>>>> org.apache.spark.scheduler.ExternalClusterManager: >>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>>>> subtype >>>>>> >>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source) >>>>>> >>>>>> at >>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown >>>>>> Source) >>>>>> >>>>>> at >>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown >>>>>> Source) >>>>>> >>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source) >>>>>> >>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source) >>>>>> >>>>>> at >>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala: >>>>>> >>>>>> >>>>>> 21/08/31 07:28:42 WARN >>>>>> org.springframework.context.annotation.AnnotationConfigApplicationContext: >>>>>> Exception encountered during context initialization - cancelling refresh >>>>>> attempt: >>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: >>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency >>>>>> expressed through field 'streamAllKafkaData'; nested exception is >>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error >>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency >>>>>> expressed through field 'javaSparkContext'; nested exception is >>>>>> org.springframework.beans.factory.BeanCreationException: Error creating >>>>>> bean with name 'javaSparkContext' defined in class path resource >>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via >>>>>> factory method failed; nested exception is >>>>>> org.springframework.beans.BeanInstantiationException: Failed to >>>>>> instantiate >>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method >>>>>> 'javaSparkContext' threw exception; nested exception is >>>>>> java.util.ServiceConfigurationError: >>>>>> org.apache.spark.scheduler.ExternalClusterManager: >>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>>>> subtype >>>>>> >>>>>> 21/08/31 07:28:42 ERROR org.springframework.boot.SpringApplication: >>>>>> Application run failed >>>>>> >>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: >>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency >>>>>> expressed through field 'streamAllKafkaData'; nested exception is >>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error >>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency >>>>>> expressed through field 'javaSparkContext'; nested exception is >>>>>> org.springframework.beans.factory.BeanCreationException: Error creating >>>>>> bean with name 'javaSparkContext' defined in class path resource >>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via >>>>>> factory method failed; nested exception is >>>>>> org.springframework.beans.BeanInstantiationException: Failed to >>>>>> instantiate >>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method >>>>>> 'javaSparkContext' threw exception; nested exception is >>>>>> java.util.ServiceConfigurationError: >>>>>> org.apache.spark.scheduler.ExternalClusterManager: >>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >>>>>> subtype >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> It could be that i am launching the application for Kafka wrongly >>>>>> with all the extra jars added ? >>>>>> >>>>>> Just that those seem to be needed or i am getting other errors when >>>>>> not including those. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Any help will be greatly appreciated. >>>>>> >>>>>> >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Stelios >>>>>> >>>>>> >>>>>> >>>>>>