Dear Yuxia,

I have already sent this message in Slack (# troubleshooting). Any, I am
writing it here again:
In order to understand it better, I was trying to run an existing project
from: *https://apache.googlesource.com/flink-playgrounds/
<https://apache.googlesource.com/flink-playgrounds/>*. I could run it on my
docker successfully. But still, I have questions:

1) *First*: In *payment_msg_proccessing.py
<https://apache.googlesource.com/flink-playgrounds/+/HEAD/pyflink-walkthrough/payment_msg_proccessing.py>*
code,
I want to run a simple query on Kafka stream (payment_msg table) without
insertion data into the sink table (es_sink here) and do some data
processing. (In my project, I won’t insert any data).  So, is it possible
to run the query (queries) on sources (streams) *without insertion data
into other tables*?

2) *Second*: How can I iterate over results, and print data in the output?
For example, I wrote this simple query: *table_result =
t_env.execute_sql(“select provinceId, payAmount from payment_msg”) *then
after:

with table_result.collect() as results:
    for result in results:
        print(result)

but this code does not work. How can I iterate over table_result and
extract all columns?

(Also, I tried using print() function but it raised an exception when I
tried to iterate over the list:

t_env.execute_sql("select payAmount from payment_msg").print()

I got this error:

py4j.protocol.Py4JJavaError: An error occurred while calling o157.print.
: java.lang.RuntimeException: Failed to fetch next result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
        at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
        at 
org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
        at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Failed to fetch job execution result
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
        ... 15 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: deb1b7e20b9919964146f2feaa3ea507)
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
        ... 17 more
Caused by: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: deb1b7e20b9919964146f2feaa3ea507)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
        ... 24 more
Caused by: org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
        at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1622)
        at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1137)
        at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1077)
        at 
org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:916)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionOperations.markFailed(DefaultExecutionOperations.java:43)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.handleTaskDeploymentFailure(DefaultExecutionDeployer.java:327)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignAllResourcesAndRegisterProducedPartitions$2(DefaultExecutionDeployer.java:170)
        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at 
org.apache.flink.runtime.jobmaster.slotpool.PendingRequest.failRequest(PendingRequest.java:88)
        at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:185)
        at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:408)
        at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:396)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:889)
        at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$0(AkkaRpcActor.java:301)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:300)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:227)
        ... 38 more
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        ... 36 more
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.

Hi Amir
org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168)
        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
        ... 13 more


Thanks a lot,

Best,

Amir



On Tue, Feb 7, 2023 at 10:48 AM Amir Hossein Sharifzadeh <
amirsharifza...@gmail.com> wrote:

> Traceback (most recent call last):
>   File
> "/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py",
> line 59, in <module>
>     log_processing()
>   File
> "/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py",
> line 34, in log_processing
>     table_result = t_env.execute_sql("select raw_id, raw_data from
> raw_table")
>   File
> "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/table/table_environment.py",
> line 836, in execute_sql
>     return TableResult(self._j_tenv.executeSql(stmt))
>   File
> "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/py4j/java_gateway.py",
> line 1321, in __call__
>     return_value = get_return_value(
>   File
> "/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/util/exceptions.py",
> line 158, in deco
>     raise java_exception
> pyflink.util.exceptions.TableException:
> org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'collect'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
> at
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> at
> org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
> ... 13 more
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobInitializationException: Could not start
> the JobMaster.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> Caused by: org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.util.concurrent.CompletionException:
> java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot
> instantiate the coordinator for operator Source: raw_table[1]
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ... 3 more
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.JobException: Cannot instantiate the coordinator
> for operator Source: raw_table[1]
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ... 3 more
> Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the
> coordinator for operator Source: raw_table[1]
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ... 4 more
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.kafka.clients.consumer.OffsetResetStrategy to field
> org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy
> of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance
> of
> org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2460)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
> at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
> ... 20 more
>
>
> Process finished with exit code 1
>
> On Mon, Feb 6, 2023 at 11:05 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, could you please share us the root cause?
>> Seems the error message you posted hadn't contained the root cause. Maybe
>> you can post the full error message .
>>
>> Best regards,
>> Yuxia
>>
>> ------------------------------
>> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
>> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn>
>> *抄送: *"dev" <dev@flink.apache.org>
>> *发送时间: *星期二, 2023年 2 月 07日 上午 10:39:25
>> *主题: *Re: Need help how to use Table API to join two Kafka streams
>>
>> Thank you for your reply. I tied it with a sample stream but it did not
>> work. I am trying to get the results from my producer here with a very
>> simple query. I want to see results in the console/output.
>> This is my code:
>>
>> // Docker: docker-compose.yml
>>
>> version: '2'
>> services:
>>   zookeeper:
>>     image: confluentinc/cp-zookeeper:6.1.1
>>     hostname: zookeeper
>>     container_name: zookeeper
>>     ports:
>>       - "2181:2181"
>>     environment:
>>       ZOOKEEPER_CLIENT_PORT: 2181
>>       ZOOKEEPER_TICK_TIME: 2000
>>
>>   broker:
>>     image: confluentinc/cp-kafka:6.1.1
>>     hostname: broker
>>     container_name: broker
>>     depends_on:
>>       - zookeeper
>>     ports:
>>       - "29092:29092"
>>       - "9092:9092"
>>       - "9101:9101"
>>     environment:
>>       KAFKA_BROKER_ID: 1
>>       KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>>       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
>> PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
>>       KAFKA_ADVERTISED_LISTENERS: 
>> PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
>>       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>>       KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>>       KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>>       KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>>
>>
>>
>> // Producer
>>
>> import json
>> import sys
>>
>> from kafka import KafkaProducer
>>
>> KAFKA_SERVER = "127.0.0.1:9092"
>>
>> def serializer(dictionary):
>> try:
>> message = json.dumps(dictionary)
>> except Exception as e:
>> sys.stderr.write(str(e) + '\n')
>> message = str(dictionary)
>> return message.encode('utf8')
>>
>> def create_sample_empad_json(raw_id):
>>     return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)}
>>
>> def do_produce():
>> producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer
>> =serializer)
>> for raw_id in range(1,10):
>> empad_json = data_helper.create_sample_empad_json(raw_id)
>> producer.send('EMPAD', empad_json)
>>
>>         producer.flush()
>>
>> if __name__ == '__main__':
>>     do_produce(XRD_PATH)
>>
>>
>> // Flink
>>
>> from pyflink.datastream.stream_execution_environment import 
>> StreamExecutionEnvironment
>> from pyflink.table import  EnvironmentSettings
>> from pyflink.table.table_environment import StreamTableEnvironment
>>
>> def data_processing():
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
>>     
>> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
>>     
>> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
>>
>>     settings = EnvironmentSettings.new_instance() \
>>         .in_streaming_mode() \
>>         .build()
>>
>>     t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
>> environment_settings=settings)
>>
>>     t1 = f"""
>>             CREATE TEMPORARY TABLE raw_table(
>>                 raw_id INT,
>>                 raw_data STRING
>>             ) WITH (
>>               'connector' = 'kafka',
>>               'topic' = 'EMPAD',
>>               'properties.bootstrap.servers' = 'localhost:9092',
>>               'properties.group.id' = 'MY_GRP',
>>               'scan.startup.mode' = 'latest-offset',
>>               'format' = 'json'
>>             )
>>             """
>>
>>     t_env.execute_sql(t1)
>>
>>     table_result = t_env.execute_sql("select raw_id, raw_data from 
>> raw_table")
>>
>>     with table_result.collect() as results:
>>         for result in results:
>>             print(result)
>>
>> if __name__ == '__main__':
>>     data_processing()
>>
>>
>>
>> getting this error message:
>>
>> pyflink.util.exceptions.TableException: 
>> org.apache.flink.table.api.TableException: Failed to execute sql
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>      at java.lang.Thread.run(Thread.java:750)
>> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
>> 'collect'.
>>      at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>>      at 
>> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
>>      at 
>> org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
>>      ... 13 more
>>
>>
>> Best,
>>
>> Amir
>>
>>
>> On Sun, Feb 5, 2023 at 8:20 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>>
>>> Hi, thanks for reaching me out.
>>> For your question, you don't need to cosume data in my cosumer class
>>> seperately and then insert them into those tables. The data will
>>> be consumed from what we implemented here.
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
>>> *收件人: *luoyu...@alumni.sjtu.edu.cn
>>> *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02
>>> *主题: *Re: Need help how to use Table API to join two Kafka streams
>>>
>>> Dear Yuxia, dev@flink.apache.org
>>> Thank you again for your help. I am implementing code in Python. But I
>>> am still have some confusion about my application.
>>> As I mentioned before, I am sending two simple messages (JSON) on two
>>> different topics:
>>> This is my Kafka producer class:
>>>
>>> import json
>>> import sys
>>>
>>> from kafka import KafkaProducer
>>>
>>> def serializer(dictionary):
>>>     try:
>>>         message = json.dumps(dictionary)
>>>     except Exception as e:
>>>         sys.stderr.write(str(e) + '\n')
>>>         message = str(dictionary)
>>>     return message.encode('utf8')
>>>
>>> def create_sample_json(row_id):
>>>     return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)}
>>>
>>> def do_produce(topic_name):
>>>     producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, 
>>> value_serializer=serializer)
>>>     for row_id in range(1,10):
>>>         my_data = data_helper.create_sample_json(row_id)
>>>         producer.send(topic_name, my_data)
>>>         producer.flush()
>>>
>>> if __name__ == '__main__':
>>>     do_produce('topic1')
>>>     do_produce('topic2')
>>>
>>> ==================================================================================
>>>
>>> As you helped me, this is my Flink Consumer that I want to cosnume data 
>>> from producer and run queries on them:
>>>
>>> from pyflink.datastream.stream_execution_environment import 
>>> StreamExecutionEnvironment
>>> from pyflink.table import  EnvironmentSettings
>>> from pyflink.table.expressions import col
>>> from pyflink.table.table_environment import StreamTableEnvironment
>>>
>>> from org.varimat.model.com.varimat_constants import EMPAD_TOPIC
>>>
>>> KAFKA_SERVERS = 'localhost:9092'
>>>
>>> def log_processing():
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
>>>     
>>> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
>>>     
>>> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
>>>
>>>     settings = EnvironmentSettings.new_instance() \
>>>         .in_streaming_mode() \
>>>         .build()
>>>
>>>     t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
>>> environment_settings=settings)
>>>
>>>     t1 = f"""
>>>             CREATE TEMPORARY TABLE table1(
>>>                 row_id INT,
>>>                 row_data STRING
>>>             ) WITH (
>>>               'connector' = 'kafka',
>>>               'topic' = 'topic1',
>>>               'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
>>>               'properties.group.id' = 'MY_GRP',
>>>               'scan.startup.mode' = 'latest-offset',
>>>               'format' = 'json'
>>>             )
>>>             """
>>>
>>>     t2 = f"""
>>>             CREATE TEMPORARY TABLE table2(
>>>                 row_id INT,
>>>                 row_data STRING
>>>             ) WITH (
>>>               'connector' = 'kafka',
>>>               'topic' = 'table2',
>>>               'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
>>>               'properties.group.id' = 'MY_GRP',
>>>               'scan.startup.mode' = 'latest-offset',
>>>               'format' = 'json'
>>>             )
>>>             """
>>>
>>>     t_env.execute_sql(t1)
>>>     t_env.execute_sql(t2)
>>>
>>>     t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM 
>>> table2")
>>>
>>> // please tell me what should I do next:
>>>
>>> // Questions:
>>>
>>> // 1) Do I need to cosume data in my cosumer class seperately and then 
>>> insert them into those tables or data will be consumed from
>>>
>>>       what we implemented here (as I passed the name of the connector, 
>>> toipc, bootstartap.servers, etc...)?
>>>
>>> // 2) If so:
>>>
>>>        2.1) how can I make join from those streams in Python?
>>>
>>>        2.2) How can I prevant the previous data as my rocedure will send 
>>> thousands messages in each topic. I want to make sure that
>>>
>>>             not to make duplicate queries.
>>>
>>> // 3) If not, what should I do?
>>>
>>>
>>> Thank you very much.
>>>
>>> Amir
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn>
>>> wrote:
>>>
>>>> Hi, Amir.
>>>> May look like using scala code:
>>>>
>>>> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn
>>>> string) WITH ('connector' = 'kafka', ...);
>>>> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn
>>>> string) WITH ('connector' = 'kafka', ...);
>>>>
>>>> // you will need to rename the field to join, otherwise, it'll
>>>> "org.apache.flink.table.api.ValidationException: Ambiguous column name:
>>>> ssn".
>>>> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
>>>> val result = t1.join(t3).where($"ssn" === $"ssn1");
>>>>
>>>> Also, you can refer here for more detail[1].
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
>>>>
>>>> Best regards,
>>>> Yuxia
>>>>
>>>> ----- 原始邮件 -----
>>>> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
>>>> 收件人: "dev" <dev@flink.apache.org>
>>>> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
>>>> 主题: Need help how to use Table API to join two Kafka streams
>>>>
>>>> Hello,
>>>>
>>>> I have a Kafka producer and a Kafka consumer that produces and consumes
>>>> multiple data respectively. You can think of two data sets here. Both
>>>> datasets have a similar structure but carry different data.
>>>>
>>>> I want to implement a Table API to join two Kafka streams while I
>>>> consume them. For example, data1.ssn==data2.ssn
>>>>
>>>> Constraints:
>>>> I don't want to change my producer or use FlinkKafkaProducer.
>>>>
>>>> Thank you very much.
>>>>
>>>> Best,
>>>> Amir
>>>>
>>>
>>

Reply via email to