Hi Harshit, According to the stack you provided, I guess you define your Python function in the main file, and the Python function imports xgboost globally. The reason for the error is that the xgboost library is difficult to be serialized by cloudpickle. There are two ways to solve
1. Move `import xgboost` to the inside of the Python function. 2. Move the Python function to another Python file, and then add that Python file as a dependency via `add_python_file`[1]. Best, Xingbo [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/dependency_management/#python-libraries harshit.varsh...@iktara.ai <harshit.varsh...@iktara.ai> 于2022年11月21日周一 20:51写道: > Dear Team, > > I am facing a issue while running pyflink program in flink cluster as it > stop running while reading the machine learning model > > > > This is the error : > > > > ./bin/flink run --python /home/desktop/ProjectFiles/test_new.py > > > > Job has been submitted with JobID 0a561cb330eeac5aa7b40ac047d3c6a3 > > /home/desktop/.local/lib/python3.8/site-packages/sklearn/base.py:329: > UserWarning: Trying to unpickle estimator LabelEncoder from version 1.1.1 > when using version 1.1.2. This might lead to breaking code or invalid > results. Use at your own risk. For more info please refer to: > > > https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations > > warnings.warn( > > /home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py:31: > FutureWarning: pandas.Int64Index is deprecated and will be removed from > pandas in a future version. Use pandas.Index with the appropriate dtype > instead. > > from pandas import MultiIndex, Int64Index > > /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116: > PkgResourcesDeprecationWarning: 0.1.36ubuntu1 is an invalid version and > will not be supported in a future release > > warnings.warn( > > /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116: > PkgResourcesDeprecationWarning: 0.23ubuntu1 is an invalid version and will > not be supported in a future release > > warnings.warn( > > Traceback (most recent call last): > > File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main > > return _run_code(code, main_globals, None, > > File "/usr/lib/python3.8/runpy.py", line 87, in _run_code > > exec(code, run_globals) > > File > "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py", > line 510, in <module> > > new_user_ratings(envir) > > File > "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py", > line 504, in new_user_ratings > > environment.execute('new_user_ratings') > > File > "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", > line 761, in execute > > File > "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", > line 1321, in __call__ > > File > "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 146, in deco > > File > "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py", > line 326, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute. > > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3) > > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:173) > > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:123) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > at java.lang.Thread.run(Thread.java:750) > > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3) > > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) > > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > > at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:708) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > > at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) > > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) > > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > ... 1 more > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) > > ... 24 more > > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) > > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) > > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) > > at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown > Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > > at > scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > > at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > > at akka.actor.Actor.aroundReceive(Actor.scala:537) > > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > > at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > > at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > > Caused by: org.apache.flink.runtime.taskmanager.AsynchronousException: > Caught exception while processing timer. > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > > at java.lang.Thread.run(Thread.java:750) > > Caused by: TimerException{java.lang.RuntimeException: Error while waiting > for BeamPythonFunctionRunner flush} > > ... 14 more > > Caused by: java.lang.RuntimeException: Error while waiting for > BeamPythonFunctionRunner flush > > at > org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106) > > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299) > > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648) > > ... 13 more > > Caused by: java.lang.RuntimeException: Failed to close remote bundle > > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382) > > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366) > > at > org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > ... 1 more > > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Error received from SDK harness for instruction > 1: Traceback (most recent call last): > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 289, in _execute > > response = task() > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 362, in <lambda> > > lambda: self.create_worker().do_instruction(request), request) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 606, in do_instruction > > return getattr(self, request_type)( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 637, in process_bundle > > bundle_processor = self.bundle_processor_cache.get( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 463, in get > > processor = bundle_processor.BundleProcessor( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 868, in __init__ > > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 921, in create_execution_tree > > return collections.OrderedDict([( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 924, in <listcomp> > > get_operation(transform_id))) for transform_id in sorted( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 812, in wrapper > > result = cache[args] = func(*args) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 903, in get_operation > > transform_consumers = { > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 904, in <dictcomp> > > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 904, in <listcomp> > > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 812, in wrapper > > result = cache[args] = func(*args) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 908, in get_operation > > return transform_factory.create_operation( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 1198, in create_operation > > return creator(self, transform_id, transform_proto, payload, consumers) > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", > line 129, in create_data_stream_keyed_process_function > > return _create_user_defined_function_operation( > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", > line 200, in _create_user_defined_function_operation > > return beam_operation_cls( > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in > pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in > pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", > line 75, in __init__ > > extract_stateless_function( > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", > line 155, in extract_stateless_function > > user_defined_func = pickle.loads(user_defined_function_proto.payload) > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", > line 29, in loads > > return cloudpickle.loads(payload) > > File > "/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py", > line 9, in <module> > > from .core import DMatrix, DeviceQuantileDMatrix, Booster > > File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py", > line 23, in <module> > > from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED, > > File > "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py", line > 110, in <module> > > import sparse > > File > "/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line > 1, in <module> > > from ._coo import COO, as_coo > > File > "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py", > line 1, in <module> > > from .core import COO, as_coo > > File > "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py", > line 9, in <module> > > import numba > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py", line > 38, in <module> > > from numba.core.decorators import (cfunc, generated_jit, jit, njit, > stencil, > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py", > line 12, in <module> > > from numba.stencils.stencil import stencil > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py", > line 11, in <module> > > from numba.core import types, typing, utils, ir, config, ir_utils, > registry > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py", > line 4, in <module> > > from numba.core import utils, typing, dispatcher, cpu > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py", > line 13, in <module> > > from numba.core import ( > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py", > line 6, in <module> > > from numba.core import (utils, errors, typing, interpreter, bytecode, > postproc, > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py", > line 12, in <module> > > from numba.core.base import PYOBJECT, GENERIC_POINTER > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line > 24, in <module> > > from numba.cpython import builtins > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py", > line 524, in <module> > > from numba.core.typing.builtins import IndexValue, IndexValueType > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins.py", > line 22, in <module> > > @infer_global(print) > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/templates.py", > line 1278, in register_global > > if getattr(mod, val.__name__) is not val: > > AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' > has no attribute 'print' > > > > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at > org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) > > at > org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504) > > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555) > > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380) > > ... 7 more > > Caused by: java.lang.RuntimeException: Error received from SDK harness for > instruction 1: Traceback (most recent call last): > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 289, in _execute > > response = task() > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 362, in <lambda> > > lambda: self.create_worker().do_instruction(request), request) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 606, in do_instruction > > return getattr(self, request_type)( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 637, in process_bundle > > bundle_processor = self.bundle_processor_cache.get( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 463, in get > > processor = bundle_processor.BundleProcessor( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 868, in __init__ > > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 921, in create_execution_tree > > return collections.OrderedDict([( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 924, in <listcomp> > > get_operation(transform_id))) for transform_id in sorted( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 812, in wrapper > > result = cache[args] = func(*args) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 903, in get_operation > > transform_consumers = { > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 904, in <dictcomp> > > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 904, in <listcomp> > > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 812, in wrapper > > result = cache[args] = func(*args) > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 908, in get_operation > > return transform_factory.create_operation( > > File > "/home/desktop/.local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 1198, in create_operation > > return creator(self, transform_id, transform_proto, payload, consumers) > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", > line 129, in create_data_stream_keyed_process_function > > return _create_user_defined_function_operation( > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py", > line 200, in _create_user_defined_function_operation > > return beam_operation_cls( > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in > pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in > pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", > line 75, in __init__ > > extract_stateless_function( > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/operations.py", > line 155, in extract_stateless_function > > user_defined_func = pickle.loads(user_defined_function_proto.payload) > > File > "/home/desktop/.local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", > line 29, in loads > > return cloudpickle.loads(payload) > > File > "/home/desktop/.local/lib/python3.8/site-packages/xgboost/__init__.py", > line 9, in <module> > > from .core import DMatrix, DeviceQuantileDMatrix, Booster > > File "/home/desktop/.local/lib/python3.8/site-packages/xgboost/core.py", > line 23, in <module> > > from .compat import (STRING_TYPES, DataFrame, py_str, PANDAS_INSTALLED, > > File > "/home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py", line > 110, in <module> > > import sparse > > File > "/home/desktop/.local/lib/python3.8/site-packages/sparse/__init__.py", line > 1, in <module> > > from ._coo import COO, as_coo > > File > "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/__init__.py", > line 1, in <module> > > from .core import COO, as_coo > > File > "/home/desktop/.local/lib/python3.8/site-packages/sparse/_coo/core.py", > line 9, in <module> > > import numba > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/__init__.py", line > 38, in <module> > > from numba.core.decorators import (cfunc, generated_jit, jit, njit, > stencil, > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/decorators.py", > line 12, in <module> > > from numba.stencils.stencil import stencil > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/stencils/stencil.py", > line 11, in <module> > > from numba.core import types, typing, utils, ir, config, ir_utils, > registry > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/registry.py", > line 4, in <module> > > from numba.core import utils, typing, dispatcher, cpu > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/dispatcher.py", > line 13, in <module> > > from numba.core import ( > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/compiler.py", > line 6, in <module> > > from numba.core import (utils, errors, typing, interpreter, bytecode, > postproc, > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/callconv.py", > line 12, in <module> > > from numba.core.base import PYOBJECT, GENERIC_POINTER > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/base.py", line > 24, in <module> > > from numba.cpython import builtins > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/cpython/builtins.py", > line 524, in <module> > > from numba.core.typing.builtins import IndexValue, IndexValueType > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/builtins.py", > line 22, in <module> > > @infer_global(print) > > File > "/home/desktop/.local/lib/python3.8/site-packages/numba/core/typing/templates.py", > line 1278, in register_global > > if getattr(mod, val.__name__) is not val: > > AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' > has no attribute 'print' > > > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180) > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > > ... 3 more > > > > org.apache.flink.client.program.ProgramAbortException: > java.lang.RuntimeException: Python process exits with code: 1 > > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836) > > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) > > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078) > > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156) > > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156) > > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130) > > ... 13 more > > > > > > Can anyone tell me what is the reason for this error or any suggestion . > > > > With Regards, > > Harshit Varshney > > >