Hi, I'd like to ask for help regarding the java exception: Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module java.sql of loader 'platform'; java.time.LocalDateTime is in module java.base of loader 'bootstrap')
Full backtrace: WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/venv/lib/python3.8/site-packages/pyflink/lib/flink-dist-1.15.0.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release +------+------------------------+-------+-----+--------+--------------------+ | name | type | null | key | extras | watermark | +------+------------------------+-------+-----+--------+--------------------+ | f0 | CHAR(36) | FALSE | | | | | f1 | BIGINT | FALSE | | | | | f2 | FLOAT | TRUE | | | | | f3 | FLOAT | TRUE | | | | | f4 | FLOAT | TRUE | | | | | f5 | VARCHAR(64) | TRUE | | | | | f6 | TIMESTAMP(3) *ROWTIME* | FALSE | | | SOURCE_WATERMARK() | +------+------------------------+-------+-----+--------+--------------------+ 7 rows in set +------------+----------------------------------------------------------------------+-------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +------------+----------------------------------------------------------------------+-------+-----+--------+-----------+ | f0 | CHAR(36) | FALSE | | | | | start_ts | BIGINT | FALSE | | | | | end_ts | BIGINT | FALSE | | | | | trajectory | ARRAY<ROW<`timestamp_unix` BIGINT, `x` FLOAT, `y` FLOAT, `z` FLOAT>> | TRUE | | | | +------------+----------------------------------------------------------------------+-------+-----+--------+-----------+ 4 rows in set Exception in thread read_grpc_client_inputs: Traceback (most recent call last): File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 598, in <lambda> target=lambda: self._read_inputs(elements_iterator), File "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 581, in _read_inputs for elements in elements_iterator: File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__ return self._next() File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.CANCELLED details = "Multiplexer hanging up" debug_error_string = "{"created":"@1654778704.584603399","description":"Error received from peer ipv4:127.0.0.1:43123","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer hanging up","grpc_status":1}" > Exception in thread read_grpc_client_inputs: Traceback (most recent call last): File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 598, in <lambda> target=lambda: self._read_inputs(elements_iterator), File "/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 581, in _read_inputs for elements in elements_iterator: File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__ return self._next() File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.CANCELLED details = "Multiplexer hanging up" debug_error_string = "{"created":"@1654778704.633462921","description":"Error received from peer ipv4:127.0.0.1:41365","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer hanging up","grpc_status":1}" > Traceback (most recent call last): File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main return _run_code(code, main_globals, None, File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code exec(code, run_globals) File "/app/trajectory_maker/__main__.py", line 29, in <module> loop.run_until_complete(main()) File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/app/trajectory_maker/__main__.py", line 24, in main raise ex File "/app/trajectory_maker/__main__.py", line 20, in main job_res = exec_env.execute(job_name) File "/opt/venv/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py", line 761, in execute return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) File "/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/opt/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/opt/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) at jdk.internal.reflect.GeneratedMethodAccessor51.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 5 more Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator} ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:47) at org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator.emitResult(AbstractTwoInputPythonFunctionOperator.java:121) at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:99) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:274) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:114) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648) ... 13 more *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input conversion from external DataStream API to internal Table API data structures. Make sure that the provided data types that configure the converters are correctly declared in the schema. Affected record:(00000000-0000-0000-0000-000000000001,1652186037,10.0,15.0,0.0,,2022-05-10 12:33:57.0)* at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:95) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 24 more *Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module java.sql of loader 'platform'; java.time.LocalDateTime is in module java.base of loader 'bootstrap')* at org$apache$flink$api$java$tuple$Tuple7$2$Converter.toInternal(Unknown Source) at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96) at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46) at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) at org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92) ... 25 more make: *** [Makefile:69: run-dev] Error 1 ------------------------- * I run a match_recognize query on the 1st table creating the 2nd view using f6 as the order_by. * The 1st table is made from a stream whose elements are returned by a .key_by(..).process(MyProcessor()). * MyProcessor for f6 is using: datetime.utcfromtimestamp(self._state.my_event_ts()) * I am using *pyflink 1.15.0*. I am not sure if i do something wrong or if this is a bug in (py)flink. -- Sincerely, Mark