Hi,

I'd like to ask for help regarding the java exception:
Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be
cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
java.sql of loader 'platform'; java.time.LocalDateTime is in module
java.base of loader 'bootstrap')

Full backtrace:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/opt/venv/lib/python3.8/site-packages/pyflink/lib/flink-dist-1.15.0.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
+------+------------------------+-------+-----+--------+--------------------+
| name |                   type |  null | key | extras |          watermark
|
+------+------------------------+-------+-----+--------+--------------------+
|   f0 |               CHAR(36) | FALSE |     |        |
 |
|   f1 |                 BIGINT | FALSE |     |        |
 |
|   f2 |                  FLOAT |  TRUE |     |        |
 |
|   f3 |                  FLOAT |  TRUE |     |        |
 |
|   f4 |                  FLOAT |  TRUE |     |        |
 |
|   f5 |            VARCHAR(64) |  TRUE |     |        |
 |
|   f6 | TIMESTAMP(3) *ROWTIME* | FALSE |     |        | SOURCE_WATERMARK()
|
+------+------------------------+-------+-----+--------+--------------------+
7 rows in set
+------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
|       name |
    type |  null | key | extras | watermark |
+------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
|         f0 |
CHAR(36) | FALSE |     |        |           |
|   start_ts |
  BIGINT | FALSE |     |        |           |
|     end_ts |
  BIGINT | FALSE |     |        |           |
| trajectory | ARRAY<ROW<`timestamp_unix` BIGINT, `x` FLOAT, `y` FLOAT, `z`
FLOAT>> |  TRUE |     |        |           |
+------------+----------------------------------------------------------------------+-------+-----+--------+-----------+
4 rows in set
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in
_bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 598, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 581, in _read_inputs
    for elements in elements_iterator:
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
in __next__
    return self._next()
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654778704.584603399","description":"Error received from peer
ipv4:127.0.0.1:43123","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in
_bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 598, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File
"/opt/venv/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 581, in _read_inputs
    for elements in elements_iterator:
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 426,
in __next__
    return self._next()
  File "/opt/venv/lib/python3.8/site-packages/grpc/_channel.py", line 826,
in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654778704.633462921","description":"Error received from peer
ipv4:127.0.0.1:41365","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/app/trajectory_maker/__main__.py", line 29, in <module>
    loop.run_until_complete(main())
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in
run_until_complete
    return future.result()
  File "/app/trajectory_maker/__main__.py", line 24, in main
    raise ex
  File "/app/trajectory_maker/__main__.py", line 20, in main
    job_res = exec_env.execute(job_name)
  File
"/opt/venv/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py",
line 761, in execute
    return
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line
1321, in __call__
    return_value = get_return_value(
  File "/opt/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py",
line 146, in deco
    return f(*a, **kw)
  File "/opt/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326,
in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
at jdk.internal.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException:
Caught exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by:
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator}
... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:47)
at
org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator.emitResult(AbstractTwoInputPythonFunctionOperator.java:121)
at
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:99)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:274)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
... 13 more

*Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input
conversion from external DataStream API to internal Table API data
structures. Make sure that the provided data types that configure the
converters are correctly declared in the schema. Affected
record:(00000000-0000-0000-0000-000000000001,1652186037,10.0,15.0,0.0,,2022-05-10
12:33:57.0)*
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:95)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 24 more
*Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot
be cast to class java.time.LocalDateTime (java.sql.Timestamp is in module
java.sql of loader 'platform'; java.time.LocalDateTime is in module
java.base of loader 'bootstrap')*
at org$apache$flink$api$java$tuple$Tuple7$2$Converter.toInternal(Unknown
Source)
at
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96)
at
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46)
at
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
at
org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92)
... 25 more

make: *** [Makefile:69: run-dev] Error 1

-------------------------


* I run a match_recognize query on the 1st table creating the 2nd view
using f6 as the order_by.
* The 1st table is made from a stream whose elements are returned by a
.key_by(..).process(MyProcessor()).
* MyProcessor for f6 is using:
datetime.utcfromtimestamp(self._state.my_event_ts())

* I am using *pyflink 1.15.0*.

I am not sure if i do something wrong or if this is a bug in (py)flink.

-- 

Sincerely,
Mark

Reply via email to