There may be a slight misunderstanding: all the FlinkSql tasks _were_ set at a parallelism of 72 -- 18 nodes 4 slots. I was hoping that the setting cluster.evenly-spread-out-slots would spread out the active kafka consumers evenly among the TM's given the topic has 36 partitions, but I now realize that doesn't necessarily make sense. I have since reduced the cluster to 9 pods with 4 slots each and things run well. I'm learning the hard way. :) Thanks for your time.
Unfortunately I can't share the job manager logs. On Mon, Mar 15, 2021 at 8:37 PM Xintong Song <tonysong...@gmail.com> wrote: > > If all the tasks have the same parallelism 36, your job should only allocate > 36 slots. The evenly-spread-out-slots option should help in your case. > > Is it possible for you to share the complete jobmanager logs? > > > Thank you~ > > Xintong Song > > > > On Tue, Mar 16, 2021 at 12:46 AM Aeden Jameson <aeden.jame...@gmail.com> > wrote: >> >> Hi Xintong, >> >> Thanks for replying. Yes, you understood my scenario. Every task >> has the same parallelism since we're using FlinkSql unless there is a >> way to change the parallelism of the source task that I have missed. >> Your explanation of the setting makes sense and is what I ended up >> concluding. Assuming one can't change the parallelism of FlinkSQL >> tasks other than the sink-parallelism option I've concluded when using >> FlinkSQL that have to plan at the cluster level. e.g. Reduce the task >> slots, increase the partitions, reduce the TM's (possibily making them >> bigger) etc... >> >> Aeden >> >> On Sun, Mar 14, 2021 at 10:41 PM Xintong Song <tonysong...@gmail.com> wrote: >> > >> > Hi Aeden, >> > >> > IIUC, the topic being read has 36 partitions means that your source task >> > has a parallelism of 36. What's the parallelism of other tasks? Is the job >> > taking use of all the 72 (18 TMs * 4 slots/TM) slots? >> > >> > I'm afraid currently there's no good way to guarantee subtasks of a task >> > are spread out evenly. >> > >> > The configuration option you mentioned makes sure slots are allocated from >> > TMs evenly, it does not affect how tasks are distributed over the >> > allocated slots. >> > E.g., say your job has two tasks A & B, with parallelism 36 & 54 >> > respectively. That means, with the default slot sharing strategy, your job >> > needs 54 slots in total to be executed. With the configuration enabled, it >> > is guaranteed that for each TM 3 slots are occupied. For B (parallelism >> > 54), there's a subtask deployed in each slot, thus 3 subtasks on each TM. >> > As for A, there're only 36 slots containing a subtask of it, and there's >> > no guarantee which 36 out of the 54 contain it. >> > >> > Thank you~ >> > >> > Xintong Song >> > >> > >> > >> > On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler <ches...@apache.org> >> > wrote: >> >> >> >> Is this a brand-new job, with the cluster having all 18 TMs at the time >> >> of submission? (or did you add more TMs while the job was running) >> >> >> >> On 3/12/2021 5:47 PM, Aeden Jameson wrote: >> >> > Hi Matthias, >> >> > >> >> > Yes, all the task managers have the same hardware/memory configuration. >> >> > >> >> > Aeden >> >> > >> >> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <matth...@ververica.com> >> >> > wrote: >> >> >> Hi Aeden, >> >> >> just to be sure: All task managers have the same hardware/memory >> >> >> configuration, haven't they? I'm not 100% sure whether this affects >> >> >> the slot selection in the end, but it looks like this parameter has >> >> >> also an influence on the slot matching strategy preferring slots with >> >> >> less utilization of resources [1]. >> >> >> >> >> >> I'm gonna add Chesnay to the thread. He might have more insights here. >> >> >> @Chesnay are there any other things that might affect the slot >> >> >> selection when actually trying to evenly spread out the slots? >> >> >> >> >> >> Matthias >> >> >> >> >> >> [1] >> >> >> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141 >> >> >> >> >> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson >> >> >> <aeden.jame...@gmail.com> wrote: >> >> >>> Hi Arvid, >> >> >>> >> >> >>> Thanks for responding. I did check the configuration tab of the job >> >> >>> manager and the setting cluster.evenly-spread-out-slots: true is >> >> >>> there. However I'm still observing unevenness in the distribution of >> >> >>> source tasks. Perhaps this additional information could shed light. >> >> >>> >> >> >>> Version: 1.12.1 >> >> >>> Deployment Mode: Application >> >> >>> Deployment Type: Standalone, Docker on Kubernetes using the Lyft >> >> >>> Flink operator https://github.com/lyft/flinkk8soperator >> >> >>> >> >> >>> I did place the setting under the flinkConfig section, >> >> >>> >> >> >>> apiVersion: flink.k8s.io/v1beta1 >> >> >>> .... >> >> >>> spec: >> >> >>> flinkConfig: >> >> >>> cluster.evenly-spread-out-slots: true >> >> >>> high-availability: zookeeper >> >> >>> ... >> >> >>> state.backend: filesystem >> >> >>> ... >> >> >>> jobManagerConfig: >> >> >>> envConfig: >> >> >>> .... >> >> >>> >> >> >>> Would you explain how the setting ends up evenly distributing active >> >> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2, >> >> >>> TM3 ... TM18 in order and starting again. In my case I have 36 >> >> >>> partitions and 18 nodes so after the second pass in assignment I would >> >> >>> end up with 2 subtasks in the consumer group on each TM. And then >> >> >>> subsequent passes result in inactive consumers. >> >> >>> >> >> >>> >> >> >>> Thank you, >> >> >>> Aeden >> >> >>> >> >> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote: >> >> >>>> Hi Aeden, >> >> >>>> >> >> >>>> the option that you mentioned should have actually caused your >> >> >>>> desired behavior. Can you double-check that it's set for the job >> >> >>>> (you can look at the config in the Flink UI to be 100% sure). >> >> >>>> >> >> >>>> Another option is to simply give all task managers 2 slots. In that >> >> >>>> way, the scheduler can only evenly distribute. >> >> >>>> >> >> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson >> >> >>>> <aeden.jame...@gmail.com> wrote: >> >> >>>>> I have a cluster with 18 task managers 4 task slots each >> >> >>>>> running a >> >> >>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka >> >> >>>>> connector. The topic being read has 36 partitions. The problem I'm >> >> >>>>> observing is that the subtasks for the sources are not evenly >> >> >>>>> distributed. For example, 1 task manager will have 4 active source >> >> >>>>> subtasks and other TM's none. Is there a way to force each task >> >> >>>>> manager to have 2 active source subtasks. I tried using the setting >> >> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the >> >> >>>>> desired effect. >> >> >>>>> >> >> >>>>> -- >> >> >>>>> Thank you, >> >> >>>>> Aeden >> >> >> >> -- Cheers, Aeden GitHub: https://github.com/aedenj Linked In: http://www.linkedin.com/in/aedenjameson Blah Blah Blah: http://www.twitter.com/daliful