Hello, I'm playing with deploying a python pipeline to a flink cluster on kubernetes via flink kubernetes operator. The pipeline simply calculates average word lengths in a fixed time window of 5 seconds and it works with the embedded flink cluster.
First, I created a k8s cluster (v1.25.3) on minikube and a docker image named beam-python-example:1.17 created using the following docker file - the full details can be checked in https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile The java sdk is used for the sdk harness of the kafka io's expansion service while the job server is used to execute the python pipeline in the flink operator. FROM flink:1.17 ... ## add java SDK and job server COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/ /opt/apache/beam/ COPY --from=apache/beam_flink1.17_job_server:2.56.0 \ /opt/apache/beam/jars/beam-runners-flink-job-server.jar /opt/apache/beam/jars/beam-runners-flink-job-server.jar RUN chown -R flink:flink /opt/apache/beam ## install python 3.10.13 RUN apt-get update -y && \ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev && \ wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python- ${PYTHON_VERSION}.tgz && \ ... ## install apache beam 2.56.0 RUN pip3 install apache-beam==${BEAM_VERSION} ## copy pipeline source RUN mkdir /opt/flink/app COPY word_len.py /opt/flink/app/ Then the pipeline is deployed using the following manifest - the full details can be found in https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: beam-word-len spec: image: beam-python-example:1.17 imagePullPolicy: Never flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "5" serviceAccount: flink podTemplate: spec: containers: - name: flink-main-container env: - name: BOOTSTRAP_SERVERS value: demo-cluster-kafka-bootstrap:9092 ... jobManager: resource: memory: "2048m" cpu: 1 taskManager: replicas: 2 resource: memory: "2048m" cpu: 1 podTemplate: spec: containers: - name: python-worker-harness image: apache/beam_python3.10_sdk:2.56.0 imagePullPolicy: Never args: ["--worker_pool"] ports: - containerPort: 50000 job: jarURI: local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar entryClass: org.apache.beam.runners.flink.FlinkPortableClientEntryPoint args: - "--driver-cmd" - "python /opt/flink/app/word_len.py --deploy" parallelism: 3 upgradeMode: stateless Here is the pipeline source - the full details can be found in https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py When I add the --deploy flag, the python sdk harness is set to EXTERNAL and its config is set to localhost:50000 - I believe it'll point to the side car container of the task manager. For the kafka io, the expansion service's sdk harness is configured as PROCESS and the command points to the java sdk that is added in the beam-python-example:1.17 image. ... def run(args=None): parser = argparse.ArgumentParser(description="Beam pipeline arguments") parser.add_argument("--runner", default="FlinkRunner", help="Apache Beam runner") parser.add_argument( "--deploy", action="store_true", default="Flag to indicate whether to use an own local cluster", ) opts, _ = parser.parse_known_args(args) pipeline_opts = { "runner": opts.runner, "job_name": "avg-word-length-beam", "streaming": True, "environment_type": "EXTERNAL" if opts.deploy is True else "LOOPBACK", "checkpointing_interval": "60000", } expansion_service = None if pipeline_opts["environment_type"] == "EXTERNAL": pipeline_opts = { **pipeline_opts, **{ "environment_config": "localhost:50000", "flink_submit_uber_jar": True, }, } expansion_service = kafka.default_io_expansion_service( append_args=[ "--defaultEnvironmentType=PROCESS", '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}', "--experiments=use_deprecated_read", # https://github.com/apache/beam/issues/20979 ] ) print(pipeline_opts) options = PipelineOptions([], **pipeline_opts) # Required, else it will complain that when importing worker functions options.view_as(SetupOptions).save_main_session = True with beam.Pipeline(options=options) as p: ( p | "ReadWordsFromKafka" >> ReadWordsFromKafka( bootstrap_servers=os.getenv( "BOOTSTRAP_SERVERS", "host.docker.internal:29092", ), topics=[os.getenv("INPUT_TOPIC", "input-topic")], group_id=os.getenv("GROUP_ID", "beam-word-len"), expansion_service=expansion_service, ) | "CalculateAvgWordLen" >> CalculateAvgWordLen() | "WriteWordLenToKafka" >> WriteWordLenToKafka( bootstrap_servers=os.getenv( "BOOTSTRAP_SERVERS", "host.docker.internal:29092", ), topic=os.getenv("OUTPUT_TOPIC", "output-topic-beam"), expansion_service=expansion_service, ) ) logging.getLogger().setLevel(logging.INFO) logging.info("Building pipeline ...") When I run the pipeline, I see the following error and it fails to be submitted to the job manager. It looks like the pipeline DAG is not created but I'm not sure what makes the error. T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'> INFO:dill:T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'> # T4 INFO:dill:# T4 D2: <dict object at 0x7f6c918c1d00> INFO:dill:D2: <dict object at 0x7f6c918c1d00> F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640> INFO:dill:F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640> F2: <function _create_function at 0x7f6cb03de4d0> INFO:dill:F2: <function _create_function at 0x7f6cb03de4d0> # F2 INFO:dill:# F2 T1: <class 'code'> INFO:dill:T1: <class 'code'> F2: <function _load_type at 0x7f6cb03de3b0> INFO:dill:F2: <function _load_type at 0x7f6cb03de3b0> # F2 INFO:dill:# F2 # T1 INFO:dill:# T1 B1: <built-in function getattr> INFO:dill:B1: <built-in function getattr> F2: <function _get_attr at 0x7f6cb03deef0> INFO:dill:F2: <function _get_attr at 0x7f6cb03deef0> # F2 INFO:dill:# F2 # B1 INFO:dill:# B1 M2: <module 'apache_beam.transforms.core' from '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'> INFO:dill:M2: <module 'apache_beam.transforms.core' from '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'> F2: <function _import_module at 0x7f6cb03df010> INFO:dill:F2: <function _import_module at 0x7f6cb03df010> # F2 INFO:dill:# F2 # M2 INFO:dill:# M2 Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50> INFO:dill:Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50> F2: <function _create_cell at 0x7f6cb03de8c0> INFO:dill:F2: <function _create_cell at 0x7f6cb03de8c0> # F2 INFO:dill:# F2 F1: <function ReadWordsFromKafka.expand.<locals>.decode_message at 0x7f6c91857b50> INFO:dill:F1: <function ReadWordsFromKafka.expand.<locals>.decode_message at 0x7f6c91857b50> D1: <dict object at 0x7f6cb0a5a040> INFO:dill:D1: <dict object at 0x7f6cb0a5a040> # D1 INFO:dill:# D1 Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at 0x7f6c918639a0> INFO:dill:Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at 0x7f6c918639a0> T2: <class '__main__.ReadWordsFromKafka'> INFO:dill:T2: <class '__main__.ReadWordsFromKafka'> F2: <function _create_type at 0x7f6cb03de440> INFO:dill:F2: <function _create_type at 0x7f6cb03de440> # F2 INFO:dill:# F2 T1: <class 'type'> INFO:dill:T1: <class 'type'> # T1 INFO:dill:# T1 T4: <class 'apache_beam.transforms.ptransform.PTransform'> INFO:dill:T4: <class 'apache_beam.transforms.ptransform.PTransform'> # T4 INFO:dill:# T4 D2: <dict object at 0x7f6c922f44c0> INFO:dill:D2: <dict object at 0x7f6c922f44c0> F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50> INFO:dill:F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50> D1: <dict object at 0x7f6cb0a5a040> INFO:dill:D1: <dict object at 0x7f6cb0a5a040> # D1 INFO:dill:# D1 Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410> INFO:dill:Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410> T5: <class '__main__.ReadWordsFromKafka'> INFO:dill:T5: <class '__main__.ReadWordsFromKafka'> # T5 INFO:dill:# T5 # Ce INFO:dill:# Ce D2: <dict object at 0x7f6c9186a740> INFO:dill:D2: <dict object at 0x7f6c9186a740> # D2 INFO:dill:# D2 # F1 INFO:dill:# F1 F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0> INFO:dill:F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0> D1: <dict object at 0x7f6cb0a5a040> INFO:dill:D1: <dict object at 0x7f6cb0a5a040> # D1 INFO:dill:# D1 D2: <dict object at 0x7f6c9186a5c0> INFO:dill:D2: <dict object at 0x7f6c9186a5c0> # D2 INFO:dill:# D2 # F1 INFO:dill:# F1 T6: <class 'apache_beam.typehints.decorators.IOTypeHints'> INFO:dill:T6: <class 'apache_beam.typehints.decorators.IOTypeHints'> F2: <function _create_namedtuple at 0x7f6cb03dedd0> INFO:dill:F2: <function _create_namedtuple at 0x7f6cb03dedd0> # F2 INFO:dill:# F2 # T6 INFO:dill:# T6 # D2 INFO:dill:# D2 # T2 INFO:dill:# T2 D2: <dict object at 0x7f6c9184be80> INFO:dill:D2: <dict object at 0x7f6c9184be80> T4: <class 'apache_beam.transforms.external.BeamJarExpansionService'> INFO:dill:T4: <class 'apache_beam.transforms.external.BeamJarExpansionService'> # T4 INFO:dill:# T4 D2: <dict object at 0x7f6c922d7c00> INFO:dill:D2: <dict object at 0x7f6c922d7c00> T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'> INFO:dill:T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'> # T4 INFO:dill:# T4 D2: <dict object at 0x7f6c9184aa80> INFO:dill:D2: <dict object at 0x7f6c9184aa80> T4: <class 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'> INFO:dill:T4: <class 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'> # T4 INFO:dill:# T4 # D2 INFO:dill:# D2 D2: <dict object at 0x7f6c918b78c0> INFO:dill:D2: <dict object at 0x7f6c918b78c0> T4: <class 'grpc._channel.Channel'> INFO:dill:T4: <class 'grpc._channel.Channel'> # T4 INFO:dill:# T4 D2: <dict object at 0x7f6c91868440> INFO:dill:D2: <dict object at 0x7f6c91868440> {'runner': 'FlinkRunner', 'job_name': 'avg-word-length-beam', 'streaming': True, 'environment_type': 'EXTERNAL', 'checkpointing_interval': '60000', 'environment_config': 'localhost:50000', 'flink_submit_uber_jar': True} Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 379, in dumps s = dill.dumps(o, byref=settings['dill_byref']) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265, in dumps dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259, in dump Pickler(file, protocol, **_kwds).dump(obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445, in dump StockPickler.dump(self, obj) File "/usr/local/lib/python3.10/pickle.py", line 487, in dump self.save(obj) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, in save_function pickler.save_reduce(_create_function, (obj.__code__, File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, in save_cell pickler.save_reduce(_create_cell, (f,), obj=obj) File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, in save_function pickler.save_reduce(_create_function, (obj.__code__, File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, in save_cell pickler.save_reduce(_create_cell, (f,), obj=obj) File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 578, in save rv = reduce(self.proto) File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__ TypeError: no default __reduce__ due to non-trivial __cinit__ During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/flink/app/word_len.py", line 212, in <module> run() File "/opt/flink/app/word_len.py", line 184, in run p File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py", line 1110, in __ror__ return self.transform.__ror__(pvalueish, self.label) File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py", line 623, in __ror__ result = p.apply(self, pvalueish, label) File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 679, in apply return self.apply(transform, pvalueish) File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 732, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options ) File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py", line 203, in apply return self.apply_PTransform(transform, input, options) File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform return transform.expand(input) File "/opt/flink/app/word_len.py", line 94, in expand | "DecodeMessage" >> beam.Map(decode_message) File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 1994, in Map pardo = FlatMap(wrapper, *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 1937, in FlatMap pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 1473, in __init__ super().__init__(fn, *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py", line 870, in __init__ self.fn = pickler.loads(pickler.dumps(self.fn)) File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/pickler.py", line 44, in dumps return desired_pickle_lib.dumps( File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 383, in dumps s = dill.dumps(o, byref=settings['dill_byref']) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265, in dumps dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259, in dump Pickler(file, protocol, **_kwds).dump(obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445, in dump StockPickler.dump(self, obj) File "/usr/local/lib/python3.10/pickle.py", line 487, in dump self.save(obj) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, in save_function pickler.save_reduce(_create_function, (obj.__code__, File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, in save_cell pickler.save_reduce(_create_cell, (f,), obj=obj) File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, in save_function pickler.save_reduce(_create_function, (obj.__code__, File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, in save_cell pickler.save_reduce(_create_cell, (f,), obj=obj) File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce save(args) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple save(element) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 603, in save self.save_reduce(obj=obj, *rv) File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce save(state) File "/usr/local/lib/python3.10/pickle.py", line 560, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", line 349, in new_save_module_dict return old_save_module_dict(pickler, obj) File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, in save_module_dict StockPickler.save_dict(pickler, obj) File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict self._batch_setitems(obj.items()) File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems save(v) File "/usr/local/lib/python3.10/pickle.py", line 578, in save rv = reduce(self.proto) File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__ TypeError: no default __reduce__ due to non-trivial __cinit__ at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram (FlinkPortableClientEntryPoint.java:192) ~[?:?] at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main( FlinkPortableClientEntryPoint.java:100) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.ClientUtils.executeProgram( ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] ... 13 more Caused by: java.util.concurrent.TimeoutException: Timeout of 30 seconds waiting for job submission. at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$ DetachedJobInvokerFactory.executeDetachedJob( FlinkPortableClientEntryPoint.java:256) ~[?:?] at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$ DetachedJobInvokerFactory.access$300(FlinkPortableClientEntryPoint.java:206) ~[?:?] at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram (FlinkPortableClientEntryPoint.java:180) ~[?:?] at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main( FlinkPortableClientEntryPoint.java:100) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.ClientUtils.executeProgram( ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] ... 13 more 2024-05-10 13:11:41,217 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint (ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda $runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[ flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter $ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[ flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader (ClassLoadingUtils.java:68) ~[ flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda $withContextClassLoader$0(ClassLoadingUtils.java:41) ~[ flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [ flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( ForkJoinExecutorConfigurator.scala:48) [ flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?] Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. ... 14 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job python /opt/flink/app/word_len.py --deploy failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:372) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.ClientUtils.executeProgram( ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] ... 13 more Caused by: java.lang.RuntimeException: Job python /opt/flink/app/word_len.py --deploy failed. at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main( FlinkPortableClientEntryPoint.java:102) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.ClientUtils.executeProgram( ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] ... 13 more Caused by: java.lang.RuntimeException: Failed to start job with driver program: bash [-c, python /opt/flink/app/word_len.py --deploy --job_endpoint=localhost:37015] output: WARNING:root:Waiting for grpc channel to be ready at localhost:32895. Can you please inform me how to fix this issue? Cheers, Jaehyeon
