Have you tried to use existing operators such as https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?
On Wed, Mar 11, 2020 at 4:46 AM Xintong Song <tonysong...@gmail.com> wrote: > Hi Eleanore, > > That does't sound like a scaling issue. It's probably a data skew, that > the data volume on some of the keys are significantly higher than others. > I'm not familiar with this area though, and have copied Jark for you, who > is one of the community experts in this area. > > Thank you~ > > Xintong Song > > > > On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin <eleanore....@gmail.com> > wrote: > >> _Hi Xintong, >> >> Thanks for the prompt reply! To answer your question: >> >> - Which Flink version are you using? >> >> v1.8.2 >> >> - Is this skew observed only after a scaling-up? What happens if the >> parallelism is initially set to the scaled-up value? >> >> I also tried this, it seems skew also happens even I do >> not change the parallelism, so it may not caused by scale-up/down >> >> - Keeping the job running a while after the scale-up, does the skew >> ease? >> >> So the skew happens in such a way that: some partitions >> lags down to 0, but other partitions are still at level of 10_000, and I am >> seeing the back pressure is ok. >> >> Thanks a lot! >> Eleanore >> >> >> On Tue, Mar 10, 2020 at 7:03 PM Xintong Song <tonysong...@gmail.com> >> wrote: >> >>> Hi Eleanore, >>> >>> I have a few more questions regarding your issue. >>> >>> - Which Flink version are you using? >>> - Is this skew observed only after a scaling-up? What happens if the >>> parallelism is initially set to the scaled-up value? >>> - Keeping the job running a while after the scale-up, does the skew >>> ease? >>> >>> I suspect the performance difference might be an outcome of some warming >>> up issues. E.g., the existing TMs might have some file already localized, >>> or some memory buffers already promoted to the JVM tenured area, while the >>> new TMs have not. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin <eleanore....@gmail.com> >>> wrote: >>> >>>> Hi Experts, >>>> I have my flink application running on Kubernetes, initially with 1 Job >>>> Manager, and 2 Task Managers. >>>> >>>> Then we have the custom operator that watches for the CRD, when the CRD >>>> replicas changed, it will patch the Flink Job Manager deployment >>>> parallelism and max parallelism according to the replicas from CRD >>>> (parallelism can be configured via env variables for our application). >>>> which causes the job manager restart. hence a new Flink job. But the >>>> consumer group does not change, so it will continue from the offset >>>> where it left. >>>> >>>> In addition, operator will also update Task Manager's deployment >>>> replicas, and will adjust the pod number. >>>> >>>> In case of scale up, the existing task manager pods do not get killed, >>>> but new task manager pods will be created. >>>> >>>> And we observed a skew in the partition offset consumed. e.g. some >>>> partitions have huge lags and other partitions have small lags. (observed >>>> from burrow) >>>> >>>> This is also validated by the metrics from Flink UI, showing the >>>> throughput differs for slotss >>>> >>>> Any clue why this is the case? >>>> >>>> Thanks a lot! >>>> Eleanore >>>> >>>