There may be a slight misunderstanding: all the FlinkSql tasks _were_
set at a parallelism of 72 -- 18 nodes 4 slots. I was hoping that the
setting cluster.evenly-spread-out-slots would spread out the active
kafka consumers evenly among the TM's given the topic has 36
partitions, but I now realize that doesn't necessarily make sense. I
have since reduced the cluster to 9 pods with 4 slots each and things
run well. I'm learning the hard way. :) Thanks for your time.

 Unfortunately I can't share the job manager logs.

On Mon, Mar 15, 2021 at 8:37 PM Xintong Song <tonysong...@gmail.com> wrote:
>
> If all the tasks have the same parallelism 36, your job should only allocate 
> 36 slots. The evenly-spread-out-slots option should help in your case.
>
> Is it possible for you to share the complete jobmanager logs?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Mar 16, 2021 at 12:46 AM Aeden Jameson <aeden.jame...@gmail.com> 
> wrote:
>>
>> Hi Xintong,
>>
>>     Thanks for replying.  Yes, you understood my scenario. Every task
>> has the same parallelism since we're using FlinkSql unless there is a
>> way to change the parallelism of the source task that I have missed.
>> Your explanation of the setting makes sense and is what I ended up
>> concluding. Assuming one can't change the parallelism of FlinkSQL
>> tasks other than the sink-parallelism option I've concluded when using
>> FlinkSQL that have to plan at the cluster level. e.g. Reduce the task
>> slots, increase the partitions, reduce the TM's (possibily making them
>> bigger) etc...
>>
>> Aeden
>>
>> On Sun, Mar 14, 2021 at 10:41 PM Xintong Song <tonysong...@gmail.com> wrote:
>> >
>> > Hi Aeden,
>> >
>> > IIUC, the topic being read has 36 partitions means that your source task 
>> > has a parallelism of 36. What's the parallelism of other tasks? Is the job 
>> > taking use of all the 72 (18 TMs * 4 slots/TM) slots?
>> >
>> > I'm afraid currently there's no good way to guarantee subtasks of a task 
>> > are spread out evenly.
>> >
>> > The configuration option you mentioned makes sure slots are allocated from 
>> > TMs evenly, it does not affect how tasks are distributed over the 
>> > allocated slots.
>> > E.g., say your job has two tasks A & B, with parallelism 36 & 54 
>> > respectively. That means, with the default slot sharing strategy, your job 
>> > needs 54 slots in total to be executed. With the configuration enabled, it 
>> > is guaranteed that for each TM 3 slots are occupied. For B (parallelism 
>> > 54), there's a subtask deployed in each slot, thus 3 subtasks on each TM. 
>> > As for A, there're only 36 slots containing a subtask of it, and there's 
>> > no guarantee which 36 out of the 54 contain it.
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> >
>> > On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler <ches...@apache.org> 
>> > wrote:
>> >>
>> >> Is this a brand-new job, with the cluster having all 18 TMs at the time
>> >> of submission? (or did you add more TMs while the job was running)
>> >>
>> >> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
>> >> > Hi Matthias,
>> >> >
>> >> > Yes, all the task managers have the same hardware/memory configuration.
>> >> >
>> >> > Aeden
>> >> >
>> >> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <matth...@ververica.com> 
>> >> > wrote:
>> >> >> Hi Aeden,
>> >> >> just to be sure: All task managers have the same hardware/memory 
>> >> >> configuration, haven't they? I'm not 100% sure whether this affects 
>> >> >> the slot selection in the end, but it looks like this parameter has 
>> >> >> also an influence on the slot matching strategy preferring slots with 
>> >> >> less utilization of resources [1].
>> >> >>
>> >> >> I'm gonna add Chesnay to the thread. He might have more insights here. 
>> >> >> @Chesnay are there any other things that might affect the slot 
>> >> >> selection when actually trying to evenly spread out the slots?
>> >> >>
>> >> >> Matthias
>> >> >>
>> >> >> [1] 
>> >> >> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
>> >> >>
>> >> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson 
>> >> >> <aeden.jame...@gmail.com> wrote:
>> >> >>> Hi Arvid,
>> >> >>>
>> >> >>>    Thanks for responding. I did check the configuration tab of the job
>> >> >>> manager and the setting cluster.evenly-spread-out-slots: true is
>> >> >>> there. However I'm still observing unevenness in the distribution of
>> >> >>> source tasks. Perhaps this additional information could shed light.
>> >> >>>
>> >> >>> Version: 1.12.1
>> >> >>> Deployment Mode: Application
>> >> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
>> >> >>> Flink operator https://github.com/lyft/flinkk8soperator
>> >> >>>
>> >> >>> I did place the setting under the flinkConfig section,
>> >> >>>
>> >> >>> apiVersion: flink.k8s.io/v1beta1
>> >> >>> ....
>> >> >>> spec:
>> >> >>>    flinkConfig:
>> >> >>>      cluster.evenly-spread-out-slots: true
>> >> >>>      high-availability: zookeeper
>> >> >>>      ...
>> >> >>>      state.backend: filesystem
>> >> >>>      ...
>> >> >>>    jobManagerConfig:
>> >> >>>      envConfig:
>> >> >>>          ....
>> >> >>>
>> >> >>> Would you explain how the setting ends up evenly distributing active
>> >> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
>> >> >>> TM3 ... TM18 in order and starting again. In my case I have 36
>> >> >>> partitions and 18 nodes so after the second pass in assignment I would
>> >> >>> end up with 2 subtasks in the consumer group on each TM. And then
>> >> >>> subsequent passes result in inactive consumers.
>> >> >>>
>> >> >>>
>> >> >>> Thank you,
>> >> >>> Aeden
>> >> >>>
>> >> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
>> >> >>>> Hi Aeden,
>> >> >>>>
>> >> >>>> the option that you mentioned should have actually caused your 
>> >> >>>> desired behavior. Can you double-check that it's set for the job 
>> >> >>>> (you can look at the config in the Flink UI to be 100% sure).
>> >> >>>>
>> >> >>>> Another option is to simply give all task managers 2 slots. In that 
>> >> >>>> way, the scheduler can only evenly distribute.
>> >> >>>>
>> >> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson 
>> >> >>>> <aeden.jame...@gmail.com> wrote:
>> >> >>>>>      I have a cluster with 18 task managers 4 task slots each 
>> >> >>>>> running a
>> >> >>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>> >> >>>>> connector. The topic being read has 36 partitions. The problem I'm
>> >> >>>>> observing is that the subtasks for the sources are not evenly
>> >> >>>>> distributed. For example, 1 task manager will have 4 active source
>> >> >>>>> subtasks and other TM's none. Is there a way to force  each task
>> >> >>>>> manager to have 2 active source subtasks.  I tried using the setting
>> >> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
>> >> >>>>> desired effect.
>> >> >>>>>
>> >> >>>>> --
>> >> >>>>> Thank you,
>> >> >>>>> Aeden
>> >>
>> >>



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful

Reply via email to