Great to hear that it is now working. Cheers, Till
On Sat, Jun 13, 2020, 12:58 Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > yes. again it trapped me. It was the /etc/hosts that I change when I > am using VMs. Now, even with the INFO "INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.joda.time.DateTime does not contain a getter for field Millis" my > program is running. Thanks! > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann <trohrm...@apache.org> > wrote: > > > > Hi Felipe, > > > > the problem why you cannot submit a job to the Flink cluster is that the > client cannot reach the blob server: > > > > Caused by: java.io.IOException: Could not connect to BlobServer at > address localhost/192.168.56.1:35193 > > > > Could you check whether the cluster has been properly started and is > reachable under 192.168.56.1:35193? You could also share the cluster logs > with us to further debug the problem. > > > > Cheers, > > Till > > > > On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> > >> Hi, I tried to change the joda.time maven version to be the same of > >> the flink-training example and I am getting this error on IntelliJ. > >> Maybe it is more precislyL > >> > >> 2020-06-13 12:04:27,333 INFO > >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No > >> state backend has been configured, using default (Memory / JobManager) > >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) > >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, > >> maxStateSize: 5242880) > >> 2020-06-13 12:04:27,333 INFO > >> org.apache.flink.runtime.taskmanager.Task [] - > >> reducer -> flat-output -> Sink: sink (4/4) > >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,337 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> reducer -> flat-output -> Sink: sink (4/4) > >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,360 INFO > >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No > >> state backend has been configured, using default (Memory / JobManager) > >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) > >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, > >> maxStateSize: 5242880) > >> 2020-06-13 12:04:27,360 INFO > >> org.apache.flink.runtime.taskmanager.Task [] - > >> reducer -> flat-output -> Sink: sink (3/4) > >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,362 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> reducer -> flat-output -> Sink: sink (3/4) > >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,376 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,381 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,381 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,389 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,511 WARN > >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - > >> Falling back to default Kryo serializer because Chill serializer > >> couldn't be found. > >> java.lang.reflect.InvocationTargetException: null > >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:?] > >> at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> ~[?:?] > >> at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> ~[?:?] > >> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > >> at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) > >> [classes/:?] > >> at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) > >> [classes/:?] > >> at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) > >> [classes/:?] > >> at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > >> [classes/:?] > >> at org.apache.flink.runtime.io > .network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) > >> [classes/:?] > >> at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.emit(RecordWriter.java:117) > >> [classes/:?] > >> at org.apache.flink.runtime.io > .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > >> [classes/:?] > >> at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > >> [classes/:?] > >> at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > >> [classes/:?] > >> at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > >> [classes/:?] > >> at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > >> [classes/:?] > >> at > org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114) > >> [classes/:?] > >> at > org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96) > >> [classes/:?] > >> at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > >> [classes/:?] > >> at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208) > >> [classes/:?] > >> Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve > >> type variable: A > >> at > com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68) > >> ~[kryo-5.0.0-RC1.jar:?] > >> at > org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93) > >> ~[classes/:?] > >> at > org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98) > >> ~[classes/:?] > >> at > org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172) > >> ~[classes/:?] > >> at > org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84) > >> ~[classes/:?] > >> ... 25 more > >> -- > >> -- Felipe Gutierrez > >> -- skype: felipe.o.gutierrez > >> -- https://felipeogutierrez.blogspot.com > >> > >> On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez > >> <felipe.o.gutier...@gmail.com> wrote: > >> > > >> > Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used > >> > as a POJO type because not all fields are valid POJO fields, and must > >> > be processed as GenericType. Please read the Flink documentation on > >> > "Data Types & Serialization" for details of the effect on > >> > performance", however I cannot submit my job. It is strange because I > >> > can start and run it on Intellij, but not on the standalone cluster in > >> > my machine. > >> > > >> > 2020-06-13 10:50:56,051 INFO org.apache.flink.core.fs.FileSystem > >> > - Hadoop is not in the classpath/dependencies. > >> > The extended set of supported File Systems via Hadoop is not > >> > available. > >> > 2020-06-13 10:50:56,143 INFO > >> > org.apache.flink.runtime.security.modules.HadoopModuleFactory - > >> > Cannot create Hadoop Security Module because Hadoop cannot be found in > >> > the Classpath. > >> > 2020-06-13 10:50:56,164 INFO > >> > org.apache.flink.runtime.security.modules.JaasModule - Jaas > >> > file will be created as /tmp/jaas-837993701496785981.conf. > >> > 2020-06-13 10:50:56,169 INFO > >> > > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory > >> > - Cannot install HadoopSecurityContext because Hadoop cannot be found > >> > in the Classpath. > >> > 2020-06-13 10:50:56,169 WARN > >> > org.apache.flink.runtime.security.SecurityUtils - Unable > >> > to install incompatible security context factory > >> > > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory > >> > 2020-06-13 10:50:56,171 INFO org.apache.flink.client.cli.CliFrontend > >> > - Running 'run' command. > >> > 2020-06-13 10:50:56,242 INFO org.apache.flink.client.cli.CliFrontend > >> > - Building program from JAR file > >> > 2020-06-13 10:50:57,084 INFO org.apache.flink.client.ClientUtils > >> > - Starting program (detached: false) > >> > 2020-06-13 10:50:59,021 INFO > >> > org.apache.flink.api.java.typeutils.TypeExtractor - class > >> > org.joda.time.DateTime does not contain a getter for field iMillis > >> > 2020-06-13 10:50:59,021 INFO > >> > org.apache.flink.api.java.typeutils.TypeExtractor - class > >> > org.joda.time.DateTime does not contain a setter for field iMillis > >> > 2020-06-13 10:50:59,021 INFO > >> > org.apache.flink.api.java.typeutils.TypeExtractor - Class > >> > class org.joda.time.DateTime cannot be used as a POJO type because not > >> > all fields are valid POJO fields, and must be processed as > >> > GenericType. Please read the Flink documentation on "Data Types & > >> > Serialization" for details of the effect on performance. > >> > 2020-06-13 10:50:59,028 INFO > >> > org.apache.flink.api.java.typeutils.TypeExtractor - class > >> > org.joda.time.DateTime does not contain a getter for field iMillis > >> > 2020-06-13 10:50:59,028 INFO > >> > org.apache.flink.api.java.typeutils.TypeExtractor - class > >> > org.joda.time.DateTime does not contain a setter for field iMillis > >> > 2020-06-13 10:50:59,028 INFO > >> > org.apache.flink.api.java.typeutils.TypeExtractor - Class > >> > class org.joda.time.DateTime cannot be used as a POJO type because not > >> > all fields are valid POJO fields, and must be processed as > >> > GenericType. Please read the Flink documentation on "Data Types & > >> > Serialization" for details of the effect on performance. > >> > 2020-06-13 10:53:19,508 WARN org.apache.flink.util.ExecutorUtils > >> > - ExecutorService did not terminate in time. > >> > Shutting it down now. > >> > 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend > >> > - Error while running the command. > >> > org.apache.flink.client.program.ProgramInvocationException: The main > >> > method caused an error: java.util.concurrent.ExecutionException: > >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to > >> > submit JobGraph. > >> > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > >> > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > >> > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148) > >> > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) > >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) > >> > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) > >> > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > >> > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > >> > Caused by: java.lang.RuntimeException: > >> > java.util.concurrent.ExecutionException: > >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to > >> > submit JobGraph. > >> > at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276) > >> > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764) > >> > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106) > >> > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72) > >> > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) > >> > at > org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136) > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > at java.lang.reflect.Method.invoke(Method.java:498) > >> > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > >> > ... 8 more > >> > Caused by: java.util.concurrent.ExecutionException: > >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to > >> > submit JobGraph. > >> > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > >> > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > >> > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759) > >> > ... 17 more > >> > Caused by: org.apache.flink.runtime.client.JobSubmissionException: > >> > Failed to submit JobGraph. > >> > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) > >> > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) > >> > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) > >> > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > >> > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > >> > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) > >> > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > >> > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > >> > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > >> > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) > >> > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) > >> > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > >> > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> > at java.lang.Thread.run(Thread.java:748) > >> > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > >> > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not > >> > upload job files. > >> > at > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169) > >> > at > java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119) > >> > at > java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084) > >> > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > >> > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) > >> > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> > at java.lang.Thread.run(Thread.java:748) > >> > Caused by: org.apache.flink.util.FlinkException: Could not upload job > files. > >> > at > org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80) > >> > at > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167) > >> > ... 7 more > >> > Caused by: java.io.IOException: Could not connect to BlobServer at > >> > address localhost/192.168.56.1:35193 > >> > at > org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100) > >> > at > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167) > >> > at > org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76) > >> > ... 8 more > >> > Caused by: java.net.ConnectException: Connection timed out (Connection > >> > timed out) > >> > at java.net.PlainSocketImpl.socketConnect(Native Method) > >> > at java.net > .AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > >> > at java.net > .AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > >> > at java.net > .AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > >> > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > >> > at java.net.Socket.connect(Socket.java:607) > >> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95) > >> > ... 10 more > >> > ] > >> > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > >> > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > >> > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) > >> > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > >> > ... 4 more > >> > -- > >> > -- Felipe Gutierrez > >> > -- skype: felipe.o.gutierrez > >> > -- https://felipeogutierrez.blogspot.com > >> > > >> > On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <yungao...@aliyun.com> wrote: > >> > > > >> > > Hi Felipe, > >> > > > >> > > I tested the basic RideCleansingExercise[1] jobs that uses the > TaxiRide type locally and it seems to be able to startup normally. > >> > > > >> > > Could you also share your current executing code and the full > stacktrace of the exception ? > >> > > > >> > > Best, > >> > > Yun > >> > > > >> > > [1] > https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java > >> > > > >> > > ------------------Original Mail ------------------ > >> > > Sender:Felipe Gutierrez <felipe.o.gutier...@gmail.com> > >> > > Send Date:Fri Jun 12 23:11:28 2020 > >> > > Recipients:user <user@flink.apache.org> > >> > > Subject:How to add org.joda.time.DateTime in the > StreamExecutionEnvironment to the TaxiRide training example? > >> > >> > >> > >> Hi, > >> > >> > >> > >> I am using the flink training exercise TaxiRide [1] to execute a > >> > >> stream count of events. On the cluster and on my local machine I am > >> > >> receiving the message that joda.Time cannot be serialized "class > >> > >> org.joda.time.LocalDateTime is not a valid POJO type". However it > is > >> > >> starting the job on the cluster, but not in my local machine. So I > >> > >> searched in the internet and it is requested to register the > jodaTime > >> > >> class on the environment[2]. I did like this: > >> > >> > >> > >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class, > >> > >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class); > >> > >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, > >> > >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class); > >> > >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, > >> > >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); > >> > >> > >> > >> and I added the joda and avro dependency on the pom.xml: > >> > >> > >> > >> <dependency> > >> > >> <groupId>joda-time</groupId> > >> > >> <artifactId>joda-time</artifactId> > >> > >> </dependency> > >> > >> <dependency> > >> > >> <groupId>org.apache.flink</groupId> > >> > >> <artifactId>flink-avro</artifactId> > >> > >> <version>${project.version}</version> > >> > >> </dependency> > >> > >> > >> > >> I also tested using addDefaultKryoSerializer but I got the same > error. > >> > >> For some reason, it is still not working. Does anyone have some > hint > >> > >> of what could be happening? > >> > >> > >> > >> Thanks! Felipe > >> > >> [1] > https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java > >> > >> [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html > >> > >> > >> > >> -- > >> > >> -- Felipe Gutierrez > >> > >> -- skype: felipe.o.gutierrez > >> > >> -- https://felipeogutierrez.blogspot.com >