Dear Yuxia, I have already sent this message in Slack (# troubleshooting). Any, I am writing it here again: In order to understand it better, I was trying to run an existing project from: *https://apache.googlesource.com/flink-playgrounds/ <https://apache.googlesource.com/flink-playgrounds/>*. I could run it on my docker successfully. But still, I have questions:
1) *First*: In *payment_msg_proccessing.py <https://apache.googlesource.com/flink-playgrounds/+/HEAD/pyflink-walkthrough/payment_msg_proccessing.py>* code, I want to run a simple query on Kafka stream (payment_msg table) without insertion data into the sink table (es_sink here) and do some data processing. (In my project, I won’t insert any data). So, is it possible to run the query (queries) on sources (streams) *without insertion data into other tables*? 2) *Second*: How can I iterate over results, and print data in the output? For example, I wrote this simple query: *table_result = t_env.execute_sql(“select provinceId, payAmount from payment_msg”) *then after: with table_result.collect() as results: for result in results: print(result) but this code does not work. How can I iterate over table_result and extract all columns? (Also, I tried using print() function but it raised an exception when I tried to iterate over the list: t_env.execute_sql("select payAmount from payment_msg").print() I got this error: py4j.protocol.Py4JJavaError: An error occurred while calling o157.print. : java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120) at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 15 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: deb1b7e20b9919964146f2feaa3ea507) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ... 17 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: deb1b7e20b9919964146f2feaa3ea507) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) ... 24 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1622) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1137) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1077) at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:916) at org.apache.flink.runtime.scheduler.DefaultExecutionOperations.markFailed(DefaultExecutionOperations.java:43) at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.handleTaskDeploymentFailure(DefaultExecutionDeployer.java:327) at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignAllResourcesAndRegisterProducedPartitions$2(DefaultExecutionDeployer.java:170) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.jobmaster.slotpool.PendingRequest.failRequest(PendingRequest.java:88) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:185) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:408) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:396) at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:889) at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$0(AkkaRpcActor.java:301) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:300) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:227) ... 38 more Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ... 36 more Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. Hi Amir org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130) ... 13 more Thanks a lot, Best, Amir On Tue, Feb 7, 2023 at 10:48 AM Amir Hossein Sharifzadeh < amirsharifza...@gmail.com> wrote: > Traceback (most recent call last): > File > "/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py", > line 59, in <module> > log_processing() > File > "/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py", > line 34, in log_processing > table_result = t_env.execute_sql("select raw_id, raw_data from > raw_table") > File > "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/table/table_environment.py", > line 836, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File > "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/py4j/java_gateway.py", > line 1321, in __call__ > return_value = get_return_value( > File > "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/util/exceptions.py", > line 158, in deco > raise java_exception > pyflink.util.exceptions.TableException: > org.apache.flink.table.api.TableException: Failed to execute sql > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.flink.util.FlinkException: Failed to execute job > 'collect'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) > at > org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) > at > org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) > ... 13 more > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) > Caused by: org.apache.flink.runtime.client.JobInitializationException: > Could not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.util.concurrent.CompletionException: > java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot > instantiate the coordinator for operator Source: raw_table[1] > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) > ... 3 more > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.JobException: Cannot instantiate the coordinator > for operator Source: raw_table[1] > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > ... 3 more > Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the > coordinator for operator Source: raw_table[1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > ... 4 more > Caused by: java.lang.ClassCastException: cannot assign instance of > org.apache.kafka.clients.consumer.OffsetResetStrategy to field > org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy > of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance > of > org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2460) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ... 20 more > > > Process finished with exit code 1 > > On Mon, Feb 6, 2023 at 11:05 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > >> Hi, could you please share us the root cause? >> Seems the error message you posted hadn't contained the root cause. Maybe >> you can post the full error message . >> >> Best regards, >> Yuxia >> >> ------------------------------ >> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> >> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn> >> *抄送: *"dev" <dev@flink.apache.org> >> *发送时间: *星期二, 2023年 2 月 07日 上午 10:39:25 >> *主题: *Re: Need help how to use Table API to join two Kafka streams >> >> Thank you for your reply. I tied it with a sample stream but it did not >> work. I am trying to get the results from my producer here with a very >> simple query. I want to see results in the console/output. >> This is my code: >> >> // Docker: docker-compose.yml >> >> version: '2' >> services: >> zookeeper: >> image: confluentinc/cp-zookeeper:6.1.1 >> hostname: zookeeper >> container_name: zookeeper >> ports: >> - "2181:2181" >> environment: >> ZOOKEEPER_CLIENT_PORT: 2181 >> ZOOKEEPER_TICK_TIME: 2000 >> >> broker: >> image: confluentinc/cp-kafka:6.1.1 >> hostname: broker >> container_name: broker >> depends_on: >> - zookeeper >> ports: >> - "29092:29092" >> - "9092:9092" >> - "9101:9101" >> environment: >> KAFKA_BROKER_ID: 1 >> KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' >> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >> PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT >> KAFKA_ADVERTISED_LISTENERS: >> PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 >> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 >> >> >> >> // Producer >> >> import json >> import sys >> >> from kafka import KafkaProducer >> >> KAFKA_SERVER = "127.0.0.1:9092" >> >> def serializer(dictionary): >> try: >> message = json.dumps(dictionary) >> except Exception as e: >> sys.stderr.write(str(e) + '\n') >> message = str(dictionary) >> return message.encode('utf8') >> >> def create_sample_empad_json(raw_id): >> return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)} >> >> def do_produce(): >> producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer >> =serializer) >> for raw_id in range(1,10): >> empad_json = data_helper.create_sample_empad_json(raw_id) >> producer.send('EMPAD', empad_json) >> >> producer.flush() >> >> if __name__ == '__main__': >> do_produce(XRD_PATH) >> >> >> // Flink >> >> from pyflink.datastream.stream_execution_environment import >> StreamExecutionEnvironment >> from pyflink.table import EnvironmentSettings >> from pyflink.table.table_environment import StreamTableEnvironment >> >> def data_processing(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar") >> >> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar") >> >> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar") >> >> settings = EnvironmentSettings.new_instance() \ >> .in_streaming_mode() \ >> .build() >> >> t_env = StreamTableEnvironment.create(stream_execution_environment=env, >> environment_settings=settings) >> >> t1 = f""" >> CREATE TEMPORARY TABLE raw_table( >> raw_id INT, >> raw_data STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'EMPAD', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'MY_GRP', >> 'scan.startup.mode' = 'latest-offset', >> 'format' = 'json' >> ) >> """ >> >> t_env.execute_sql(t1) >> >> table_result = t_env.execute_sql("select raw_id, raw_data from >> raw_table") >> >> with table_result.collect() as results: >> for result in results: >> print(result) >> >> if __name__ == '__main__': >> data_processing() >> >> >> >> getting this error message: >> >> pyflink.util.exceptions.TableException: >> org.apache.flink.table.api.TableException: Failed to execute sql >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:750) >> Caused by: org.apache.flink.util.FlinkException: Failed to execute job >> 'collect'. >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) >> at >> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) >> at >> org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) >> ... 13 more >> >> >> Best, >> >> Amir >> >> >> On Sun, Feb 5, 2023 at 8:20 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: >> >>> Hi, thanks for reaching me out. >>> For your question, you don't need to cosume data in my cosumer class >>> seperately and then insert them into those tables. The data will >>> be consumed from what we implemented here. >>> >>> Best regards, >>> Yuxia >>> >>> ------------------------------ >>> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> >>> *收件人: *luoyu...@alumni.sjtu.edu.cn >>> *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02 >>> *主题: *Re: Need help how to use Table API to join two Kafka streams >>> >>> Dear Yuxia, dev@flink.apache.org >>> Thank you again for your help. I am implementing code in Python. But I >>> am still have some confusion about my application. >>> As I mentioned before, I am sending two simple messages (JSON) on two >>> different topics: >>> This is my Kafka producer class: >>> >>> import json >>> import sys >>> >>> from kafka import KafkaProducer >>> >>> def serializer(dictionary): >>> try: >>> message = json.dumps(dictionary) >>> except Exception as e: >>> sys.stderr.write(str(e) + '\n') >>> message = str(dictionary) >>> return message.encode('utf8') >>> >>> def create_sample_json(row_id): >>> return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)} >>> >>> def do_produce(topic_name): >>> producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, >>> value_serializer=serializer) >>> for row_id in range(1,10): >>> my_data = data_helper.create_sample_json(row_id) >>> producer.send(topic_name, my_data) >>> producer.flush() >>> >>> if __name__ == '__main__': >>> do_produce('topic1') >>> do_produce('topic2') >>> >>> ================================================================================== >>> >>> As you helped me, this is my Flink Consumer that I want to cosnume data >>> from producer and run queries on them: >>> >>> from pyflink.datastream.stream_execution_environment import >>> StreamExecutionEnvironment >>> from pyflink.table import EnvironmentSettings >>> from pyflink.table.expressions import col >>> from pyflink.table.table_environment import StreamTableEnvironment >>> >>> from org.varimat.model.com.varimat_constants import EMPAD_TOPIC >>> >>> KAFKA_SERVERS = 'localhost:9092' >>> >>> def log_processing(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar") >>> >>> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar") >>> >>> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar") >>> >>> settings = EnvironmentSettings.new_instance() \ >>> .in_streaming_mode() \ >>> .build() >>> >>> t_env = StreamTableEnvironment.create(stream_execution_environment=env, >>> environment_settings=settings) >>> >>> t1 = f""" >>> CREATE TEMPORARY TABLE table1( >>> row_id INT, >>> row_data STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'topic1', >>> 'properties.bootstrap.servers' = '{KAFKA_SERVERS}', >>> 'properties.group.id' = 'MY_GRP', >>> 'scan.startup.mode' = 'latest-offset', >>> 'format' = 'json' >>> ) >>> """ >>> >>> t2 = f""" >>> CREATE TEMPORARY TABLE table2( >>> row_id INT, >>> row_data STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'table2', >>> 'properties.bootstrap.servers' = '{KAFKA_SERVERS}', >>> 'properties.group.id' = 'MY_GRP', >>> 'scan.startup.mode' = 'latest-offset', >>> 'format' = 'json' >>> ) >>> """ >>> >>> t_env.execute_sql(t1) >>> t_env.execute_sql(t2) >>> >>> t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM >>> table2") >>> >>> // please tell me what should I do next: >>> >>> // Questions: >>> >>> // 1) Do I need to cosume data in my cosumer class seperately and then >>> insert them into those tables or data will be consumed from >>> >>> what we implemented here (as I passed the name of the connector, >>> toipc, bootstartap.servers, etc...)? >>> >>> // 2) If so: >>> >>> 2.1) how can I make join from those streams in Python? >>> >>> 2.2) How can I prevant the previous data as my rocedure will send >>> thousands messages in each topic. I want to make sure that >>> >>> not to make duplicate queries. >>> >>> // 3) If not, what should I do? >>> >>> >>> Thank you very much. >>> >>> Amir >>> >>> >>> >>> >>> >>> >>> >>> On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn> >>> wrote: >>> >>>> Hi, Amir. >>>> May look like using scala code: >>>> >>>> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn >>>> string) WITH ('connector' = 'kafka', ...); >>>> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn >>>> string) WITH ('connector' = 'kafka', ...); >>>> >>>> // you will need to rename the field to join, otherwise, it'll >>>> "org.apache.flink.table.api.ValidationException: Ambiguous column name: >>>> ssn". >>>> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") >>>> val result = t1.join(t3).where($"ssn" === $"ssn1"); >>>> >>>> Also, you can refer here for more detail[1]. >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins >>>> >>>> Best regards, >>>> Yuxia >>>> >>>> ----- 原始邮件 ----- >>>> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> >>>> 收件人: "dev" <dev@flink.apache.org> >>>> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 >>>> 主题: Need help how to use Table API to join two Kafka streams >>>> >>>> Hello, >>>> >>>> I have a Kafka producer and a Kafka consumer that produces and consumes >>>> multiple data respectively. You can think of two data sets here. Both >>>> datasets have a similar structure but carry different data. >>>> >>>> I want to implement a Table API to join two Kafka streams while I >>>> consume them. For example, data1.ssn==data2.ssn >>>> >>>> Constraints: >>>> I don't want to change my producer or use FlinkKafkaProducer. >>>> >>>> Thank you very much. >>>> >>>> Best, >>>> Amir >>>> >>> >>