-user@flink <u...@flink.apache.org> as this looks like purely beam issue

Could you please elaborate more about what "stuck" means? Does the watermark stop progressing? Does that happen at any specific instant (e.g. end of window or end of window + allowed lateness)?

On 6/1/22 15:43, Gorjan Todorovski wrote:
Hi Jan,

I have not checked the harness log. I have now checked it *Apache Beam worker log) and found this, but currently not sure what it means:

2022/06/01 13:34:40 Python exited: <nil>
2022/06/01 13:34:41 Python exited: <nil>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 587, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 570, in _read_inputs
    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = "{"created":"@1654090485.252525992","description":"Error received from peer ipv4:127.0.0.1:44439 <http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
>

2022/06/01 13:34:45 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:47 Python exited: <nil>
Starting worker with command ['/opt/apache/beam/boot', '--id=3-1', '--logging_endpoint=localhost:44267', '--artifact_endpoint=localhost:36413', '--provision_endpoint=localhost:42179', '--control_endpoint=localhost:38825'] Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', '--logging_endpoint=localhost:38683', '--artifact_endpoint=localhost:44867', '--provision_endpoint=localhost:34833', '--control_endpoint=localhost:44351'] Starting worker with command ['/opt/apache/beam/boot', '--id=3-2', '--logging_endpoint=localhost:35391', '--artifact_endpoint=localhost:46571', '--provision_endpoint=localhost:44073', '--control_endpoint=localhost:44133']
Starting work...

On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi Gorjan,

    +user@beam <mailto:user@beam.apache.org>

    The trace you posted is just waiting for a bundle to finish in the
    SDK harness. I would suspect there is a problem in the logs of the
    harness. Did you look for possible errors there?

     Jan

    On 5/31/22 13:54, Gorjan Todorovski wrote:
    Hi,

    I am running a TensorFlow Extended (TFX) pipeline which uses
    Apache Beam for data processing which in turn has a Flink Runner
    (Basically a batch job on a Flink Session Cluster on Kubernetes)
    version 1.13.6, but the job (for gathering stats) gets stuck.

    There is nothing significant in the Job Manager or Task Manager
    logs. The only thing that possibly might tell why the task is
    stuck seems to be a thread dump:

    "MapPartition (MapPartition at [14]{TFXIORead[train],
    GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
    java.util.concurrent.CompletableFuture$Signaller@6f078632
        at sun.misc.Unsafe.park(Native Method)
        - waiting on
    java.util.concurrent.CompletableFuture$Signaller@6f078632
        at
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at
    
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
        at
    java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at
    
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
        at
    java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
        at
    
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
        ...
    I use 32 parallel degrees. Task managers are set, so each TM runs
    in one container with 1 CPU and a total process memory set to 20
    GB. Each TM runs 1 tasksslot.
    This is failing with ~100 files with a total size of about 100
    GB. If I run the pipeline with a smaller number of files to
    process, it runs ok.
    I need Flink to be able to process different amounts of data as
    it is able to scale by automatically adding pods depending on the
    parallel degree setting for the specific job (I set the parallel
    degree to the max(number of files,32))
    Thanks,
    Gorjan

Reply via email to