-user@flink <u...@flink.apache.org> as this looks like purely beam issue
Could you please elaborate more about what "stuck" means? Does the
watermark stop progressing? Does that happen at any specific instant
(e.g. end of window or end of window + allowed lateness)?
On 6/1/22 15:43, Gorjan Todorovski wrote:
Hi Jan,
I have not checked the harness log. I have now checked it *Apache Beam
worker log) and found this, but currently not sure what it means:
2022/06/01 13:34:40 Python exited: <nil>
2022/06/01 13:34:41 Python exited: <nil>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 926, in
_bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 587, in <lambda>
target=lambda: self._read_inputs(elements_iterator),
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 570, in _read_inputs
for elements in elements_iterator:
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
416, in __next__
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of
RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654090485.252525992","description":"Error received from
peer ipv4:127.0.0.1:44439
<http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>
2022/06/01 13:34:45 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:47 Python exited: <nil>
Starting worker with command ['/opt/apache/beam/boot', '--id=3-1',
'--logging_endpoint=localhost:44267',
'--artifact_endpoint=localhost:36413',
'--provision_endpoint=localhost:42179',
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-3',
'--logging_endpoint=localhost:38683',
'--artifact_endpoint=localhost:44867',
'--provision_endpoint=localhost:34833',
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-2',
'--logging_endpoint=localhost:35391',
'--artifact_endpoint=localhost:46571',
'--provision_endpoint=localhost:44073',
'--control_endpoint=localhost:44133']
Starting work...
On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi Gorjan,
+user@beam <mailto:user@beam.apache.org>
The trace you posted is just waiting for a bundle to finish in the
SDK harness. I would suspect there is a problem in the logs of the
harness. Did you look for possible errors there?
Jan
On 5/31/22 13:54, Gorjan Todorovski wrote:
Hi,
I am running a TensorFlow Extended (TFX) pipeline which uses
Apache Beam for data processing which in turn has a Flink Runner
(Basically a batch job on a Flink Session Cluster on Kubernetes)
version 1.13.6, but the job (for gathering stats) gets stuck.
There is nothing significant in the Job Manager or Task Manager
logs. The only thing that possibly might tell why the task is
stuck seems to be a thread dump:
"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
...
I use 32 parallel degrees. Task managers are set, so each TM runs
in one container with 1 CPU and a total process memory set to 20
GB. Each TM runs 1 tasksslot.
This is failing with ~100 files with a total size of about 100
GB. If I run the pipeline with a smaller number of files to
process, it runs ok.
I need Flink to be able to process different amounts of data as
it is able to scale by automatically adding pods depending on the
parallel degree setting for the specific job (I set the parallel
degree to the max(number of files,32))
Thanks,
Gorjan