Hello,

I'm playing with deploying a python pipeline to a flink cluster on
kubernetes via flink kubernetes operator. The pipeline simply calculates
average word lengths in a fixed time window of 5 seconds and it works with
the embedded flink cluster.

First, I created a k8s cluster (v1.25.3) on minikube and a docker image
named beam-python-example:1.17 created using the following docker file -
the full details can be checked in
https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile

The java sdk is used for the sdk harness of the kafka io's expansion
service while the job server is used to execute the python pipeline in the
flink operator.

FROM flink:1.17
...
## add java SDK and job server
COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/ /opt/apache/beam/

COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
  /opt/apache/beam/jars/beam-runners-flink-job-server.jar
/opt/apache/beam/jars/beam-runners-flink-job-server.jar

RUN chown -R flink:flink /opt/apache/beam

## install python 3.10.13
RUN apt-get update -y && \
  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
libffi-dev liblzma-dev && \
  wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
${PYTHON_VERSION}.tgz && \
...
## install apache beam 2.56.0
RUN pip3 install apache-beam==${BEAM_VERSION}

## copy pipeline source
RUN mkdir /opt/flink/app
COPY word_len.py /opt/flink/app/

Then the pipeline is deployed using the following manifest - the full
details can be found in
https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: beam-word-len
spec:
  image: beam-python-example:1.17
  imagePullPolicy: Never
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "5"
  serviceAccount: flink
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: BOOTSTRAP_SERVERS
              value: demo-cluster-kafka-bootstrap:9092
...
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    replicas: 2
    resource:
      memory: "2048m"
      cpu: 1
    podTemplate:
      spec:
        containers:
          - name: python-worker-harness
            image: apache/beam_python3.10_sdk:2.56.0
            imagePullPolicy: Never
            args: ["--worker_pool"]
            ports:
              - containerPort: 50000

  job:
    jarURI: local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
    entryClass: org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
    args:
      - "--driver-cmd"
      - "python /opt/flink/app/word_len.py --deploy"
    parallelism: 3
    upgradeMode: stateless

Here is the pipeline source - the full details can be found in
https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py

When I add the --deploy flag, the python sdk harness is set to EXTERNAL and
its config is set to localhost:50000 - I believe it'll point to the side
car container of the task manager. For the kafka io, the expansion
service's sdk harness is configured as PROCESS and the command points to
the java sdk that is added in the beam-python-example:1.17 image.

...
def run(args=None):
    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
    parser.add_argument("--runner", default="FlinkRunner", help="Apache
Beam runner")
    parser.add_argument(
        "--deploy",
        action="store_true",
        default="Flag to indicate whether to use an own local cluster",
    )
    opts, _ = parser.parse_known_args(args)

    pipeline_opts = {
        "runner": opts.runner,
        "job_name": "avg-word-length-beam",
        "streaming": True,
        "environment_type": "EXTERNAL" if opts.deploy is True else
"LOOPBACK",
        "checkpointing_interval": "60000",
    }

    expansion_service = None
    if pipeline_opts["environment_type"] == "EXTERNAL":
        pipeline_opts = {
            **pipeline_opts,
            **{
                "environment_config": "localhost:50000",
                "flink_submit_uber_jar": True,
            },
        }
        expansion_service = kafka.default_io_expansion_service(
            append_args=[
                "--defaultEnvironmentType=PROCESS",

'--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
                "--experiments=use_deprecated_read",  #
https://github.com/apache/beam/issues/20979
            ]
        )
    print(pipeline_opts)
    options = PipelineOptions([], **pipeline_opts)
    # Required, else it will complain that when importing worker functions
    options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=options) as p:
        (
            p
            | "ReadWordsFromKafka"
            >> ReadWordsFromKafka(
                bootstrap_servers=os.getenv(
                    "BOOTSTRAP_SERVERS",
                    "host.docker.internal:29092",
                ),
                topics=[os.getenv("INPUT_TOPIC", "input-topic")],
                group_id=os.getenv("GROUP_ID", "beam-word-len"),
                expansion_service=expansion_service,
            )
            | "CalculateAvgWordLen" >> CalculateAvgWordLen()
            | "WriteWordLenToKafka"
            >> WriteWordLenToKafka(
                bootstrap_servers=os.getenv(
                    "BOOTSTRAP_SERVERS",
                    "host.docker.internal:29092",
                ),
                topic=os.getenv("OUTPUT_TOPIC", "output-topic-beam"),
                expansion_service=expansion_service,
            )
        )

        logging.getLogger().setLevel(logging.INFO)
        logging.info("Building pipeline ...")

When I run the pipeline, I see the following error and it fails to be
submitted to the job manager. It looks like the pipeline DAG is not created
but I'm not sure what makes the error.

T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'>
INFO:dill:T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7f6c918c1d00>
INFO:dill:D2: <dict object at 0x7f6c918c1d00>
F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640>
INFO:dill:F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640>
F2: <function _create_function at 0x7f6cb03de4d0>
INFO:dill:F2: <function _create_function at 0x7f6cb03de4d0>
# F2
INFO:dill:# F2
T1: <class 'code'>
INFO:dill:T1: <class 'code'>
F2: <function _load_type at 0x7f6cb03de3b0>
INFO:dill:F2: <function _load_type at 0x7f6cb03de3b0>
# F2
INFO:dill:# F2
# T1
INFO:dill:# T1
B1: <built-in function getattr>
INFO:dill:B1: <built-in function getattr>
F2: <function _get_attr at 0x7f6cb03deef0>
INFO:dill:F2: <function _get_attr at 0x7f6cb03deef0>
# F2
INFO:dill:# F2
# B1
INFO:dill:# B1
M2: <module 'apache_beam.transforms.core' from
'/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'>
INFO:dill:M2: <module 'apache_beam.transforms.core' from
'/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'>
F2: <function _import_module at 0x7f6cb03df010>
INFO:dill:F2: <function _import_module at 0x7f6cb03df010>
# F2
INFO:dill:# F2
# M2
INFO:dill:# M2
Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50>
INFO:dill:Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50>
F2: <function _create_cell at 0x7f6cb03de8c0>
INFO:dill:F2: <function _create_cell at 0x7f6cb03de8c0>
# F2
INFO:dill:# F2
F1: <function ReadWordsFromKafka.expand.<locals>.decode_message at
0x7f6c91857b50>
INFO:dill:F1: <function ReadWordsFromKafka.expand.<locals>.decode_message
at 0x7f6c91857b50>
D1: <dict object at 0x7f6cb0a5a040>
INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
# D1
INFO:dill:# D1
Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at 0x7f6c918639a0>
INFO:dill:Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at
0x7f6c918639a0>
T2: <class '__main__.ReadWordsFromKafka'>
INFO:dill:T2: <class '__main__.ReadWordsFromKafka'>
F2: <function _create_type at 0x7f6cb03de440>
INFO:dill:F2: <function _create_type at 0x7f6cb03de440>
# F2
INFO:dill:# F2
T1: <class 'type'>
INFO:dill:T1: <class 'type'>
# T1
INFO:dill:# T1
T4: <class 'apache_beam.transforms.ptransform.PTransform'>
INFO:dill:T4: <class 'apache_beam.transforms.ptransform.PTransform'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7f6c922f44c0>
INFO:dill:D2: <dict object at 0x7f6c922f44c0>
F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50>
INFO:dill:F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50>
D1: <dict object at 0x7f6cb0a5a040>
INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
# D1
INFO:dill:# D1
Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410>
INFO:dill:Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410>
T5: <class '__main__.ReadWordsFromKafka'>
INFO:dill:T5: <class '__main__.ReadWordsFromKafka'>
# T5
INFO:dill:# T5
# Ce
INFO:dill:# Ce
D2: <dict object at 0x7f6c9186a740>
INFO:dill:D2: <dict object at 0x7f6c9186a740>
# D2
INFO:dill:# D2
# F1
INFO:dill:# F1
F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0>
INFO:dill:F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0>
D1: <dict object at 0x7f6cb0a5a040>
INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
# D1
INFO:dill:# D1
D2: <dict object at 0x7f6c9186a5c0>
INFO:dill:D2: <dict object at 0x7f6c9186a5c0>
# D2
INFO:dill:# D2
# F1
INFO:dill:# F1
T6: <class 'apache_beam.typehints.decorators.IOTypeHints'>
INFO:dill:T6: <class 'apache_beam.typehints.decorators.IOTypeHints'>
F2: <function _create_namedtuple at 0x7f6cb03dedd0>
INFO:dill:F2: <function _create_namedtuple at 0x7f6cb03dedd0>
# F2
INFO:dill:# F2
# T6
INFO:dill:# T6
# D2
INFO:dill:# D2
# T2
INFO:dill:# T2
D2: <dict object at 0x7f6c9184be80>
INFO:dill:D2: <dict object at 0x7f6c9184be80>
T4: <class 'apache_beam.transforms.external.BeamJarExpansionService'>
INFO:dill:T4: <class
'apache_beam.transforms.external.BeamJarExpansionService'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7f6c922d7c00>
INFO:dill:D2: <dict object at 0x7f6c922d7c00>
T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'>
INFO:dill:T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7f6c9184aa80>
INFO:dill:D2: <dict object at 0x7f6c9184aa80>
T4: <class
'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'>
INFO:dill:T4: <class
'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'>
# T4
INFO:dill:# T4
# D2
INFO:dill:# D2
D2: <dict object at 0x7f6c918b78c0>
INFO:dill:D2: <dict object at 0x7f6c918b78c0>
T4: <class 'grpc._channel.Channel'>
INFO:dill:T4: <class 'grpc._channel.Channel'>
# T4
INFO:dill:# T4
D2: <dict object at 0x7f6c91868440>
INFO:dill:D2: <dict object at 0x7f6c91868440>
{'runner': 'FlinkRunner', 'job_name': 'avg-word-length-beam', 'streaming':
True, 'environment_type': 'EXTERNAL', 'checkpointing_interval': '60000',
'environment_config': 'localhost:50000', 'flink_submit_uber_jar': True}
Traceback (most recent call last):
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 379, in dumps
    s = dill.dumps(o, byref=settings['dill_byref'])
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265,
in dumps
    dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259,
in dump
    Pickler(file, protocol, **_kwds).dump(obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445,
in dump
    StockPickler.dump(self, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 487, in dump
    self.save(obj)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
in save_function
    pickler.save_reduce(_create_function, (obj.__code__,
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
in save_cell
    pickler.save_reduce(_create_cell, (f,), obj=obj)
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
in save_function
    pickler.save_reduce(_create_function, (obj.__code__,
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
in save_cell
    pickler.save_reduce(_create_cell, (f,), obj=obj)
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 578, in save
    rv = reduce(self.proto)
  File "stringsource", line 2, in
grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/flink/app/word_len.py", line 212, in <module>
    run()
  File "/opt/flink/app/word_len.py", line 184, in run
    p
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
line 1110, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
line 623, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 679, in apply
    return self.apply(transform, pvalueish)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py",
line 732, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options
)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py",
line 203, in apply
    return self.apply_PTransform(transform, input, options)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py",
line 207, in apply_PTransform
    return transform.expand(input)
  File "/opt/flink/app/word_len.py", line 94, in expand
    | "DecodeMessage" >> beam.Map(decode_message)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
line 1994, in Map
    pardo = FlatMap(wrapper, *args, **kwargs)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
line 1937, in FlatMap
    pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
line 1473, in __init__
    super().__init__(fn, *args, **kwargs)
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
line 870, in __init__
    self.fn = pickler.loads(pickler.dumps(self.fn))
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/pickler.py",
line 44, in dumps
    return desired_pickle_lib.dumps(
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 383, in dumps
    s = dill.dumps(o, byref=settings['dill_byref'])
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265,
in dumps
    dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259,
in dump
    Pickler(file, protocol, **_kwds).dump(obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445,
in dump
    StockPickler.dump(self, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 487, in dump
    self.save(obj)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
in save_function
    pickler.save_reduce(_create_function, (obj.__code__,
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
in save_cell
    pickler.save_reduce(_create_cell, (f,), obj=obj)
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
in save_function
    pickler.save_reduce(_create_function, (obj.__code__,
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
in save_cell
    pickler.save_reduce(_create_cell, (f,), obj=obj)
  File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
    save(args)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
    save(element)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
    save(state)
  File "/usr/local/lib/python3.10/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File
"/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
line 349, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/usr/local/lib/python3.10/pickle.py", line 578, in save
    rv = reduce(self.proto)
  File "stringsource", line 2, in
grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

        at
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram
(FlinkPortableClientEntryPoint.java:192) ~[?:?]
        at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
FlinkPortableClientEntryPoint.java:100) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
(PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
        at org.apache.flink.client.ClientUtils.executeProgram(
ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
        ... 13 more
Caused by: java.util.concurrent.TimeoutException: Timeout of 30 seconds
waiting for job submission.
        at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$
DetachedJobInvokerFactory.executeDetachedJob(
FlinkPortableClientEntryPoint.java:256) ~[?:?]
        at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$
DetachedJobInvokerFactory.access$300(FlinkPortableClientEntryPoint.java:206)
~[?:?]
        at
org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram
(FlinkPortableClientEntryPoint.java:180) ~[?:?]
        at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
FlinkPortableClientEntryPoint.java:100) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
(PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
        at org.apache.flink.client.ClientUtils.executeProgram(
ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
        ... 13 more
2024-05-10 13:11:41,217 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
        at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown
Source) ~[?:?]
        at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source) ~[?:?]
        at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda
$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[
flink-dist-1.17.2.jar:1.17.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) ~[?:?]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
        at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter
$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[
flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
        at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader
(ClassLoadingUtils.java:68) ~[
flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda
$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[
flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [
flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
ForkJoinExecutorConfigurator.scala:48) [
flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
        at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
Source) [?:?]
        at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
[?:?]
Caused by:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
        ... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Job python /opt/flink/app/word_len.py --deploy
failed.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:372) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
(PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
        at org.apache.flink.client.ClientUtils.executeProgram(
ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
        ... 13 more
Caused by: java.lang.RuntimeException: Job python /opt/flink/app/word_len.py
--deploy failed.
        at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
FlinkPortableClientEntryPoint.java:102) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
(PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
        at org.apache.flink.client.ClientUtils.executeProgram(
ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
        at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
        ... 13 more
Caused by: java.lang.RuntimeException: Failed to start job with driver
program: bash [-c, python /opt/flink/app/word_len.py --deploy
--job_endpoint=localhost:37015] output: WARNING:root:Waiting for grpc
channel to be ready at localhost:32895.

Can you please inform me how to fix this issue?

Cheers,
Jaehyeon

Reply via email to