If you are using only a single task manager but want to get parallelism > 1, you will need to increase taskmanager.numberOfTaskSlots in your flink-conf.yaml. https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <robbe.sneyd...@ml6.eu> wrote: > Hi Kyle, > > Thanks for the quick response. > The problem was that the pipeline could not access the input file. The > Task Manager errors seem unrelated indeed. > > I'm now able to run the pipeline completely, but I'm running into problems > when using parallelism. > The pipeline can be summarized as: > read file -> shuffle -> process -> write files > > When using parallelism > 1, the pipeline stalls and the Task Manager > outputs following warnings: > flink-taskmanager_1 | 2020-04-30 09:24:46,272 INFO > org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel > stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at > [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap > (FlatMap at ExtractOutput[0]) (7/10). See: > https://issues.apache.org/jira/browse/BEAM-4280 for the history for this > issue. > > The referenced issue [1] doesn't contain a lot of information and is > resolved. There is a Flink issue [2] that seems related, although I'm not > seeing the reported stacktrace. I guess this problem occurs since I'm > reading and writing to the same disc in parallel. > > Increasing the Task Manager memory seems to resolve the issue partially. > I'm still getting the stalled channel warnings, but the pipeline does > proceed step-wise but slowly. > > Using BATCH_FORCED execution mode removes the warnings, but still runs a > lot slower than running with parallelism=1. > > The pipeline shouldn't be I/O bounded, so I guess I should still be able > to get some benefit out of running tasks in parallel? > > 1. https://issues.apache.org/jira/browse/BEAM-4280 > 2. > https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692 > > Kind regards, > Robbe > > [image: https://ml6.eu] <https://ml6.eu> > > Robbe Sneyders > > ML6 Gent > <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> > > M: +32 474 71 31 08 <+32%20474%2071%2031%2008> > > > On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kcwea...@google.com> wrote: > >> > This seems to have worked, as the output file is created on the host >> system. However the pipeline silently fails, and the output file remains >> empty. >> >> Have you checked the SDK container logs? They are most likely to contain >> relevant failure information. >> >> > I don't know if this is a result of me rebuilding the Job Server, or >> caused by another issue. >> >> Looks like there is an old but unresolved bug with the same error: >> https://issues.apache.org/jira/browse/BEAM-5397 >> >> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <robbe.sneyd...@ml6.eu> >> wrote: >> >>> Hi all, >>> >>> We're working on a project where we're limited to one big development >>> machine for now. We want to start developing data processing pipelines in >>> Python, which should eventually be ported to a currently unknown setup on a >>> separate cluster or cloud, so we went with Beam for its portability. >>> >>> For the development setup, we wanted to have the least amount of >>> overhead possible, so we deployed a one node flink cluster with >>> docker-compose. The whole setup is defined by the following >>> docker-compose.yml: >>> >>> ``` >>> version: "2.1" >>> services: >>> flink-jobmanager: >>> image: flink:1.9 >>> network_mode: host >>> command: jobmanager >>> environment: >>> - JOB_MANAGER_RPC_ADDRESS=localhost >>> >>> flink-taskmanager: >>> image: flink:1.9 >>> network_mode: host >>> depends_on: >>> - flink-jobmanager >>> command: taskmanager >>> environment: >>> - JOB_MANAGER_RPC_ADDRESS=localhost >>> volumes: >>> - staging-dir:/tmp/beam-artifact-staging >>> - /usr/bin/docker:/usr/bin/docker >>> - /var/run/docker.sock:/var/run/docker.sock >>> user: flink:${DOCKER_GID} >>> >>> beam-jobserver: >>> image: apache/beam_flink1.9_job_server:2.20.0 >>> network_mode: host >>> command: --flink-master=localhost:8081 >>> volumes: >>> - staging-dir:/tmp/beam-artifact-staging >>> >>> volumes: >>> staging-dir: >>> ``` >>> >>> We can submit and run pipelines with the following options: >>> ``` >>> 'runner': 'PortableRunner', >>> 'job_endpoint': 'localhost:8099', >>> ``` >>> The environment type for the SDK Harness is configured to the default >>> 'docker'. >>> >>> However, we cannot write output files to the host system. To fix this, >>> I tried to mount a host directory to the Beam SDK Container (I had to >>> rebuild the Beam Job Server jar and image to do this). This seems to have >>> worked, as the output file is created on the host system. However the >>> pipeline silently fails, and the output file remains empty. Running the >>> pipeline with DirectRunner confirms that the pipeline is working. >>> >>> Looking at the output logs, the following error is thrown in the Flink >>> Task Manager: >>> flink-taskmanager_1 | java.lang.NoClassDefFoundError: >>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 >>> I don't know if this is a result of me rebuilding the Job Server, or >>> caused by another issue. >>> >>> We currently do not have a distributed file system available. Is there >>> any way to make writing to the host system possible? >>> >>> Kind regards, >>> Robbe >>> >>> [image: https://ml6.eu] <https://ml6.eu> >>> >>> Robbe Sneyders >>> >>> ML6 Gent >>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> >>> >>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008> >>> >>