1.3.2 -- should I update to the latest version?

Thanks,

Le

On Wed, Mar 6, 2019 at 4:24 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Which version of Flink are you using?
>
> On Tue, Mar 5, 2019 at 10:58 PM Le Xu <sharonx...@gmail.com> wrote:
>
>> Hi Till:
>>
>> Thanks for the reply. The setup of the jobs is roughly as follows: For a
>> cluster with N machines, we deploy X simple map/reduce style jobs (the job
>> DAG and settings are exactly the same, except they consumes different
>> data). Each job has N mappers (they are evenly distributed, one mapper on
>> each machine).There are X mappers on each machine (as there are X jobs in
>> total). Each job has only one reducer where all mappers point to. What I'm
>> observing is that all reducers are allocated to machine 1 (where all mapper
>> 1 from every job is allocated to).  It does make sense since reducer and
>> mapper 1 are in the same slot group. The original purpose of the questions
>> is to find out whether it is possible to explicitly specify that reducer
>> can be co-located with another mapper (such as mapper 2 so the reducer of
>> job 2 can be placed on machine 2). Just trying to figure out if it is all
>> possible without using more expensive approach (through YARN for example).
>> But if it is not possible I will see if I can move to job mode as Piotr
>> suggests.
>>
>> Thanks,
>>
>> Le
>>
>> On Tue, Mar 5, 2019 at 9:24 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hard to tell whether this is related to FLINK-11815.
>>>
>>> To me the setup is not fully clear. Let me try to sum it up: According
>>> to Le Xu's description there are n jobs running on a session cluster. I
>>> assume that every TaskManager has n slots. The observed behaviour is that
>>> every job allocates the slot for the first mapper and chained sink from the
>>> first TM, right? Since Flink does not give strict guarantees for the slot
>>> allocation this is possible, however it should be highly unlikely or at
>>> least change when re-executing the same setup. At the moment there is no
>>> functionality in place to control the task-slot assignment.
>>>
>>> Chaining only affects which task will be grouped together and executed
>>> by the same Task (being executed by the same thread). Separate tasks can
>>> still be executed in the same slot if they have the same slot sharing
>>> group. This means that there can be multiple threads running in each slot.
>>>
>>> For me it would be helpful to get more information about the actual job
>>> deployments.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 5, 2019 at 12:00 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Hi Le,
>>>>
>>>> As I wrote, you can try running Flink in job mode, which spawns
>>>> separate clusters per each job.
>>>>
>>>> Till, is this issue covered by FLINK-11815
>>>> <https://issues.apache.org/jira/browse/FLINK-11815> ? Is this the same
>>>> as:
>>>>
>>>> > Known issues:
>>>> > 1. (…)
>>>> > 2. if task slots are registered before slot request, the code have a
>>>> tendency to group requests together on the same machine because we
>>>> are using a LinkedHashMap
>>>>
>>>> ?
>>>>
>>>> Piotrek
>>>>
>>>> On 4 Mar 2019, at 21:08, Le Xu <sharonx...@gmail.com> wrote:
>>>>
>>>> Thanks Piotr.
>>>>
>>>> I didn't realize that the email attachment isn't working so the example
>>>> I was referring to was this figure from Flink website:
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/fig/slot_sharing.svg
>>>>
>>>> So I try to run multiple jobs concurrently in a cluster -- the jobs are
>>>> identical and the DAG looks very similar to the one in the figure. Each
>>>> machine holds one map task from each job. I end up with X number of sinks
>>>> on machine 1 (X being the number of jobs). I assume this is caused by the
>>>> operator chaining (so that all sinks are chained to mapper 1 all end up on
>>>> machine 1). But I also tried disabling chaining but I still get the same
>>>> result. Some how even when the sink and the map belongs to different
>>>> threads they are still placed in the same slot.
>>>>
>>>> My goal was to see whether it is possible to have sinks evenly
>>>> distributed across the cluster (instead of all on machine 1). One way to do
>>>> this is to see if it is ok to chained the sink to one of the other mapper
>>>> -- the other way is to see if we can change the placement of the mapper
>>>> altogether (like placing map 1 of job 2 on machine 2, map 1 of job 3 on
>>>> machine 3 so we end up with sinks sit evenly throughout the cluster).
>>>>
>>>> Thanks.
>>>>
>>>> Le
>>>>
>>>> On Mon, Mar 4, 2019 at 6:49 AM Piotr Nowojski <pi...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Are you asking the question if that’s the behaviour or you have
>>>>> actually observed this issue? I’m not entirely sure, but I would guess 
>>>>> that
>>>>> the Sink tasks would be distributed randomly across the cluster, but maybe
>>>>> I’m mixing this issue with resource allocations for Task Managers. Maybe
>>>>> Till will know something more about this?
>>>>>
>>>>> One thing that might have solve/workaround the issue is to run those
>>>>> jobs in the job mode (one cluster per job), not in cluster mode, since
>>>>> containers for Task Managers are created/requested randomly.
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 2 Mar 2019, at 23:53, Le Xu <sharonx...@gmail.com> wrote:
>>>>>
>>>>> Hello!
>>>>>
>>>>> I'm trying to find out if there a way to force task slot sharing
>>>>> within a job. The example on the website looks like the following (as in
>>>>> the screenshot)
>>>>>
>>>>> <image.png>
>>>>> In this example, the single sink is slot-sharing with source/map (1)
>>>>> and window operator (1). If I deploy multiple identical jobs shown above,
>>>>> all sink operators would be placed on the first machine (which creates an
>>>>> unbalanced scenario). Is there a way to avoid this situation (i.e., to 
>>>>> have
>>>>> sink operators of different jobs spread evenly across the task slots for
>>>>> the entire cluster). Specifically, I was wondering if either of the
>>>>> following options are possible:
>>>>> 1. To force Sink[1] to be slot sharing with mapper from a different
>>>>> partition on other slots such as (source[2] and window[2]).
>>>>> 2. If option 1 is not possible, is there a "hacky" way for Flink to
>>>>> deploy jobs starting from a different machine: e.g. For job 2, it can
>>>>> allocate source/map[1], window[1], sink[1] to machine 2 instead of again 
>>>>> on
>>>>> machine 1. In this way the slot-sharing groups are still the same, but we
>>>>> end up having sinks from the two jobs on different machines.
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>

Reply via email to