Great to hear that it is now working.

Cheers,
Till

On Sat, Jun 13, 2020, 12:58 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
wrote:

> yes. again it trapped me. It was the /etc/hosts that I change when I
> am using VMs. Now, even with the INFO "INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a getter for field Millis" my
> program is running. Thanks!
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
> >
> > Hi Felipe,
> >
> > the problem why you cannot submit a job to the Flink cluster is that the
> client cannot reach the blob server:
> >
> > Caused by: java.io.IOException: Could not connect to BlobServer at
> address localhost/192.168.56.1:35193
> >
> > Could you check whether the cluster has been properly started and is
> reachable under 192.168.56.1:35193? You could also share the cluster logs
> with us to further debug the problem.
> >
> > Cheers,
> > Till
> >
> > On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
> >>
> >> Hi, I tried to change the joda.time maven version to be the same of
> >> the flink-training example and I am getting this error on IntelliJ.
> >> Maybe it is more precislyL
> >>
> >> 2020-06-13 12:04:27,333 INFO
> >> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
> >> state backend has been configured, using default (Memory / JobManager)
> >> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
> >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
> >> maxStateSize: 5242880)
> >> 2020-06-13 12:04:27,333 INFO
> >> org.apache.flink.runtime.taskmanager.Task                    [] -
> >> reducer -> flat-output -> Sink: sink (4/4)
> >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
> >> 2020-06-13 12:04:27,337 INFO
> >> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> >> reducer -> flat-output -> Sink: sink (4/4)
> >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
> >> 2020-06-13 12:04:27,360 INFO
> >> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
> >> state backend has been configured, using default (Memory / JobManager)
> >> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
> >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
> >> maxStateSize: 5242880)
> >> 2020-06-13 12:04:27,360 INFO
> >> org.apache.flink.runtime.taskmanager.Task                    [] -
> >> reducer -> flat-output -> Sink: sink (3/4)
> >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
> >> 2020-06-13 12:04:27,362 INFO
> >> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> >> reducer -> flat-output -> Sink: sink (3/4)
> >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
> >> 2020-06-13 12:04:27,376 INFO
> >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
> >> Initializing heap keyed state backend with stream factory.
> >> 2020-06-13 12:04:27,381 INFO
> >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
> >> Initializing heap keyed state backend with stream factory.
> >> 2020-06-13 12:04:27,381 INFO
> >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
> >> Initializing heap keyed state backend with stream factory.
> >> 2020-06-13 12:04:27,389 INFO
> >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
> >> Initializing heap keyed state backend with stream factory.
> >> 2020-06-13 12:04:27,511 WARN
> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] -
> >> Falling back to default Kryo serializer because Chill serializer
> >> couldn't be found.
> >> java.lang.reflect.InvocationTargetException: null
> >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> >> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:?]
> >> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:?]
> >> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> >> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436)
> >> [classes/:?]
> >> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454)
> >> [classes/:?]
> >> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
> >> [classes/:?]
> >> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> >> [classes/:?]
> >> at org.apache.flink.runtime.io
> .network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
> >> [classes/:?]
> >> at org.apache.flink.runtime.io
> .network.api.writer.RecordWriter.emit(RecordWriter.java:117)
> >> [classes/:?]
> >> at org.apache.flink.runtime.io
> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> >> [classes/:?]
> >> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
> >> [classes/:?]
> >> Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve
> >> type variable: A
> >> at
> com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68)
> >> ~[kryo-5.0.0-RC1.jar:?]
> >> at
> org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93)
> >> ~[classes/:?]
> >> at
> org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98)
> >> ~[classes/:?]
> >> at
> org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172)
> >> ~[classes/:?]
> >> at
> org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84)
> >> ~[classes/:?]
> >> ... 25 more
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >> -- https://felipeogutierrez.blogspot.com
> >>
> >> On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez
> >> <felipe.o.gutier...@gmail.com> wrote:
> >> >
> >> > Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
> >> > as a POJO type because not all fields are valid POJO fields, and must
> >> > be processed as GenericType. Please read the Flink documentation on
> >> > "Data Types & Serialization" for details of the effect on
> >> > performance", however I cannot submit my job. It is strange because I
> >> > can start and run it on Intellij, but not on the standalone cluster in
> >> > my machine.
> >> >
> >> > 2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
> >> >                      - Hadoop is not in the classpath/dependencies.
> >> > The extended set of supported File Systems via Hadoop is not
> >> > available.
> >> > 2020-06-13 10:50:56,143 INFO
> >> > org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
> >> > Cannot create Hadoop Security Module because Hadoop cannot be found in
> >> > the Classpath.
> >> > 2020-06-13 10:50:56,164 INFO
> >> > org.apache.flink.runtime.security.modules.JaasModule          - Jaas
> >> > file will be created as /tmp/jaas-837993701496785981.conf.
> >> > 2020-06-13 10:50:56,169 INFO
> >> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
> >> >  - Cannot install HadoopSecurityContext because Hadoop cannot be found
> >> > in the Classpath.
> >> > 2020-06-13 10:50:56,169 WARN
> >> > org.apache.flink.runtime.security.SecurityUtils               - Unable
> >> > to install incompatible security context factory
> >> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
> >> > 2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
> >> >                      - Running 'run' command.
> >> > 2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
> >> >                      - Building program from JAR file
> >> > 2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
> >> >                      - Starting program (detached: false)
> >> > 2020-06-13 10:50:59,021 INFO
> >> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
> >> > org.joda.time.DateTime does not contain a getter for field iMillis
> >> > 2020-06-13 10:50:59,021 INFO
> >> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
> >> > org.joda.time.DateTime does not contain a setter for field iMillis
> >> > 2020-06-13 10:50:59,021 INFO
> >> > org.apache.flink.api.java.typeutils.TypeExtractor             - Class
> >> > class org.joda.time.DateTime cannot be used as a POJO type because not
> >> > all fields are valid POJO fields, and must be processed as
> >> > GenericType. Please read the Flink documentation on "Data Types &
> >> > Serialization" for details of the effect on performance.
> >> > 2020-06-13 10:50:59,028 INFO
> >> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
> >> > org.joda.time.DateTime does not contain a getter for field iMillis
> >> > 2020-06-13 10:50:59,028 INFO
> >> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
> >> > org.joda.time.DateTime does not contain a setter for field iMillis
> >> > 2020-06-13 10:50:59,028 INFO
> >> > org.apache.flink.api.java.typeutils.TypeExtractor             - Class
> >> > class org.joda.time.DateTime cannot be used as a POJO type because not
> >> > all fields are valid POJO fields, and must be processed as
> >> > GenericType. Please read the Flink documentation on "Data Types &
> >> > Serialization" for details of the effect on performance.
> >> > 2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
> >> >                      - ExecutorService did not terminate in time.
> >> > Shutting it down now.
> >> > 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
> >> >                      - Error while running the command.
> >> > org.apache.flink.client.program.ProgramInvocationException: The main
> >> > method caused an error: java.util.concurrent.ExecutionException:
> >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
> >> > submit JobGraph.
> >> > at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> > at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
> >> > at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> >> > at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> >> > at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >> > at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >> > Caused by: java.lang.RuntimeException:
> >> > java.util.concurrent.ExecutionException:
> >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
> >> > submit JobGraph.
> >> > at
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
> >> > at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
> >> > at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
> >> > at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
> >> > at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> >> > at
> org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
> >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> > at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> > at java.lang.reflect.Method.invoke(Method.java:498)
> >> > at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >> > ... 8 more
> >> > Caused by: java.util.concurrent.ExecutionException:
> >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
> >> > submit JobGraph.
> >> > at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >> > at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
> >> > ... 17 more
> >> > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> >> > Failed to submit JobGraph.
> >> > at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> >> > at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> >> > at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> >> > at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> >> > at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> >> > at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> >> > at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> >> > at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> >> > at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> >> > at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> >> > at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
> >> > at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> >> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> >> > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> >> > upload job files.
> >> > at
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
> >> > at
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
> >> > at
> java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
> >> > at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> >> > at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> >> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Caused by: org.apache.flink.util.FlinkException: Could not upload job
> files.
> >> > at
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
> >> > at
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
> >> > ... 7 more
> >> > Caused by: java.io.IOException: Could not connect to BlobServer at
> >> > address localhost/192.168.56.1:35193
> >> > at
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
> >> > at
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
> >> > at
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
> >> > ... 8 more
> >> > Caused by: java.net.ConnectException: Connection timed out (Connection
> >> > timed out)
> >> > at java.net.PlainSocketImpl.socketConnect(Native Method)
> >> > at java.net
> .AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> >> > at java.net
> .AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> >> > at java.net
> .AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> >> > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> >> > at java.net.Socket.connect(Socket.java:607)
> >> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
> >> > ... 10 more
> >> > ]
> >> > at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> >> > at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> >> > at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> >> > at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> >> > ... 4 more
> >> > --
> >> > -- Felipe Gutierrez
> >> > -- skype: felipe.o.gutierrez
> >> > -- https://felipeogutierrez.blogspot.com
> >> >
> >> > On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <yungao...@aliyun.com> wrote:
> >> > >
> >> > > Hi Felipe,
> >> > >
> >> > >    I tested the basic RideCleansingExercise[1] jobs that uses the
> TaxiRide type locally and it seems to be able to startup normally.
> >> > >
> >> > >    Could you also share your current executing code and the full
> stacktrace of the exception ?
> >> > >
> >> > > Best,
> >> > >  Yun
> >> > >
> >> > >  [1]
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
> >> > >
> >> > > ------------------Original Mail ------------------
> >> > > Sender:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> >> > > Send Date:Fri Jun 12 23:11:28 2020
> >> > > Recipients:user <user@flink.apache.org>
> >> > > Subject:How to add org.joda.time.DateTime in the
> StreamExecutionEnvironment to the TaxiRide training example?
> >> > >>
> >> > >> Hi,
> >> > >>
> >> > >> I am using the flink training exercise TaxiRide [1] to execute a
> >> > >> stream count of events. On the cluster and on my local machine I am
> >> > >> receiving the message that joda.Time cannot be serialized "class
> >> > >> org.joda.time.LocalDateTime is not a valid POJO type". However it
> is
> >> > >> starting the job on the cluster, but not in my local machine. So I
> >> > >> searched in the internet and it is requested to register the
> jodaTime
> >> > >> class on the environment[2]. I did like this:
> >> > >>
> >> > >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
> >> > >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
> >> > >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
> >> > >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
> >> > >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
> >> > >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
> >> > >>
> >> > >> and I added the joda and avro dependency on the pom.xml:
> >> > >>
> >> > >> <dependency>
> >> > >> <groupId>joda-time</groupId>
> >> > >> <artifactId>joda-time</artifactId>
> >> > >> </dependency>
> >> > >> <dependency>
> >> > >> <groupId>org.apache.flink</groupId>
> >> > >> <artifactId>flink-avro</artifactId>
> >> > >> <version>${project.version}</version>
> >> > >> </dependency>
> >> > >>
> >> > >> I also tested using addDefaultKryoSerializer but I got the same
> error.
> >> > >> For some reason, it is still not working. Does anyone have some
> hint
> >> > >> of what could be happening?
> >> > >>
> >> > >> Thanks! Felipe
> >> > >> [1]
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
> >> > >> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
> >> > >>
> >> > >> --
> >> > >> -- Felipe Gutierrez
> >> > >> -- skype: felipe.o.gutierrez
> >> > >> -- https://felipeogutierrez.blogspot.com
>

Reply via email to