If you are using only a single task manager but want to get parallelism >
1, you will need to increase taskmanager.numberOfTaskSlots in
your flink-conf.yaml.
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling

On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <robbe.sneyd...@ml6.eu>
wrote:

> Hi Kyle,
>
> Thanks for the quick response.
> The problem was that the pipeline could not access the input file. The
> Task Manager errors seem unrelated indeed.
>
> I'm now able to run the pipeline completely, but I'm running into problems
> when using parallelism.
> The pipeline can be summarized as:
> read file -> shuffle -> process -> write files
>
> When using parallelism > 1, the pipeline stalls and the Task Manager
> outputs following warnings:
> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
> (FlatMap at ExtractOutput[0]) (7/10). See:
> https://issues.apache.org/jira/browse/BEAM-4280 for the history for this
> issue.
>
> The referenced issue [1] doesn't contain a lot of information and is
> resolved. There is a Flink issue [2] that seems related, although I'm not
> seeing the reported stacktrace. I guess this problem occurs since I'm
> reading and writing to the same disc in parallel.
>
> Increasing the Task Manager memory seems to resolve the issue partially.
> I'm still getting the stalled channel warnings, but the pipeline does
> proceed step-wise but slowly.
>
> Using BATCH_FORCED execution mode removes the warnings, but still runs a
> lot slower than running with parallelism=1.
>
> The pipeline shouldn't be I/O bounded, so I guess I should still be able
> to get some benefit out of running tasks in parallel?
>
> 1. https://issues.apache.org/jira/browse/BEAM-4280
> 2.
> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>
> Kind regards,
> Robbe
>
>  [image: https://ml6.eu] <https://ml6.eu>
>
> Robbe Sneyders
>
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>
>
> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kcwea...@google.com> wrote:
>
>> > This seems to have worked, as the output file is created on the host
>> system. However the pipeline silently fails, and the output file remains
>> empty.
>>
>> Have you checked the SDK container logs? They are most likely to contain
>> relevant failure information.
>>
>> > I don't know if this is a result of me rebuilding the Job Server, or
>> caused by another issue.
>>
>> Looks like there is an old but unresolved bug with the same error:
>> https://issues.apache.org/jira/browse/BEAM-5397
>>
>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <robbe.sneyd...@ml6.eu>
>> wrote:
>>
>>> Hi all,
>>>
>>> We're working on a project where we're limited to one big development
>>> machine for now. We want to start developing data processing pipelines in
>>> Python, which should eventually be ported to a currently unknown setup on a
>>> separate cluster or cloud, so we went with Beam for its portability.
>>>
>>> For the development setup, we wanted to have the least amount of
>>> overhead possible, so we deployed a one node flink cluster with
>>> docker-compose. The whole setup is defined by the following
>>> docker-compose.yml:
>>>
>>> ```
>>> version: "2.1"
>>> services:
>>>   flink-jobmanager:
>>>     image: flink:1.9
>>>     network_mode: host
>>>     command: jobmanager
>>>     environment:
>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>
>>>   flink-taskmanager:
>>>     image: flink:1.9
>>>     network_mode: host
>>>     depends_on:
>>>       - flink-jobmanager
>>>     command: taskmanager
>>>     environment:
>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>     volumes:
>>>       - staging-dir:/tmp/beam-artifact-staging
>>>       - /usr/bin/docker:/usr/bin/docker
>>>       - /var/run/docker.sock:/var/run/docker.sock
>>>     user: flink:${DOCKER_GID}
>>>
>>>   beam-jobserver:
>>>     image: apache/beam_flink1.9_job_server:2.20.0
>>>     network_mode: host
>>>     command: --flink-master=localhost:8081
>>>     volumes:
>>>       - staging-dir:/tmp/beam-artifact-staging
>>>
>>> volumes:
>>>   staging-dir:
>>> ```
>>>
>>> We can submit and run pipelines with the following options:
>>> ```
>>> 'runner': 'PortableRunner',
>>> 'job_endpoint': 'localhost:8099',
>>> ```
>>> The environment type for the SDK Harness is configured to the default
>>> 'docker'.
>>>
>>> However, we cannot write output files to the host system. To fix this,
>>> I tried to mount a host directory to the Beam SDK Container (I had to
>>> rebuild the Beam Job Server jar and image to do this). This seems to have
>>> worked, as the output file is created on the host system. However the
>>> pipeline silently fails, and the output file remains empty. Running the
>>> pipeline with DirectRunner confirms that the pipeline is working.
>>>
>>> Looking at the output logs, the following error is thrown in the Flink
>>> Task Manager:
>>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>> I don't know if this is a result of me rebuilding the Job Server, or
>>> caused by another issue.
>>>
>>> We currently do not have a distributed file system available. Is there
>>> any way to make writing to the host system possible?
>>>
>>> Kind regards,
>>> Robbe
>>>
>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>
>>> Robbe Sneyders
>>>
>>> ML6 Gent
>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>
>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>
>>

Reply via email to