Hi Roman,

Thanks for working on this! I deployed the change and it appears to be
working as expected.

Will monitor over a period of time to compare the checkpoint counts and get
back to you if there are still issues.

Thomas


On Thu, Aug 13, 2020 at 3:41 AM Roman Khachatryan <ro...@data-artisans.com>
wrote:

> Hi Thomas,
>
> The fix is now merged to master and to release-1.11.
> So if you'd like you can check if it solves your problem (it would be
> helpful for us too).
>
> On Sat, Aug 8, 2020 at 9:26 AM Roman Khachatryan <ro...@data-artisans.com>
> wrote:
>
>> Hi Thomas,
>>
>> Thanks a lot for the detailed information.
>>
>> I think the problem is in CheckpointCoordinator. It stores the last
>> checkpoint completion time after checking queued requests.
>> I've created a ticket to fix this:
>> https://issues.apache.org/jira/browse/FLINK-18856
>>
>>
>> On Sat, Aug 8, 2020 at 5:25 AM Thomas Weise <t...@apache.org> wrote:
>>
>>> Just another update:
>>>
>>> The duration of snapshotState is capped by the Kinesis
>>> producer's "RecordTtl" setting (default 30s). The sleep time in flushSync
>>> does not contribute to the observed behavior.
>>>
>>> I guess the open question is why, with the same settings, is 1.11 since
>>> commit 355184d69a8519d29937725c8d85e8465d7e3a90 processing more checkpoints?
>>>
>>>
>>> On Fri, Aug 7, 2020 at 9:15 AM Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Hi Roman,
>>>>
>>>> Here are the checkpoint summaries for both commits:
>>>>
>>>>
>>>> https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit#slide=id.g86d15b2fc7_0_0
>>>>
>>>> The config:
>>>>
>>>>     CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>>>
>>>> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>     checkpointConfig.setCheckpointInterval(*10_000*);
>>>>     checkpointConfig.setMinPauseBetweenCheckpoints(*10_000*);
>>>>
>>>> checkpointConfig.enableExternalizedCheckpoints(DELETE_ON_CANCELLATION);
>>>>     checkpointConfig.setCheckpointTimeout(600_000);
>>>>     checkpointConfig.setMaxConcurrentCheckpoints(1);
>>>>     checkpointConfig.setFailOnCheckpointingErrors(true);
>>>>
>>>> The values marked bold when changed to *60_000* make the symptom
>>>> disappear. I meanwhile also verified that with the 1.11.0 release commit.
>>>>
>>>> I will take a look at the sleep time issue.
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Fri, Aug 7, 2020 at 1:44 AM Roman Khachatryan <
>>>> ro...@data-artisans.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> Thanks for your reply!
>>>>>
>>>>> I think you are right, we can remove this sleep and improve
>>>>> KinesisProducer.
>>>>> Probably, it's snapshotState can also be sped up by forcing records
>>>>> flush more often.
>>>>> Do you see that 30s checkpointing duration is caused
>>>>> by KinesisProducer (or maybe other operators)?
>>>>>
>>>>> I'd also like to understand the reason behind this increase in
>>>>> checkpoint frequency.
>>>>> Can you please share these values:
>>>>>  - execution.checkpointing.min-pause
>>>>>  - execution.checkpointing.max-concurrent-checkpoints
>>>>>  - execution.checkpointing.timeout
>>>>>
>>>>> And what is the "new" observed checkpoint frequency (or how many
>>>>> checkpoints are created) compared to older versions?
>>>>>
>>>>>
>>>>> On Fri, Aug 7, 2020 at 4:49 AM Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> Hi Roman,
>>>>>>
>>>>>> Indeed there are more frequent checkpoints with this change! The
>>>>>> application was configured to checkpoint every 10s. With 1.10 ("good
>>>>>> commit"), that leads to fewer completed checkpoints compared to 1.11
>>>>>> ("bad
>>>>>> commit"). Just to be clear, the only difference between the two runs
>>>>>> was
>>>>>> the commit 355184d69a8519d29937725c8d85e8465d7e3a90
>>>>>>
>>>>>> Since the sync part of checkpoints with the Kinesis producer always
>>>>>> takes
>>>>>> ~30 seconds, the 10s configured checkpoint frequency really had no
>>>>>> effect
>>>>>> before 1.11. I confirmed that both commits perform comparably by
>>>>>> setting
>>>>>> the checkpoint frequency and min pause to 60s.
>>>>>>
>>>>>> I still have to verify with the final 1.11.0 release commit.
>>>>>>
>>>>>> It's probably good to take a look at the Kinesis producer. Is it
>>>>>> really
>>>>>> necessary to have 500ms sleep time? What's responsible for the ~30s
>>>>>> duration in snapshotState?
>>>>>>
>>>>>> As things stand it doesn't make sense to use checkpoint intervals <
>>>>>> 30s
>>>>>> when using the Kinesis producer.
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>> On Sat, Aug 1, 2020 at 2:53 PM Roman Khachatryan <
>>>>>> ro...@data-artisans.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Thomas,
>>>>>> >
>>>>>> > Thanks a lot for the analysis.
>>>>>> >
>>>>>> > The first thing that I'd check is whether checkpoints became more
>>>>>> frequent
>>>>>> > with this commit (as each of them adds at least 500ms if there is
>>>>>> at least
>>>>>> > one not sent record, according to
>>>>>> FlinkKinesisProducer.snapshotState).
>>>>>> >
>>>>>> > Can you share checkpointing statistics (1.10 vs 1.11 or last "good"
>>>>>> vs
>>>>>> > first "bad" commits)?
>>>>>> >
>>>>>> > On Fri, Jul 31, 2020 at 5:29 AM Thomas Weise <
>>>>>> thomas.we...@gmail.com>
>>>>>> > wrote:
>>>>>> >
>>>>>> > > I run git bisect and the first commit that shows the regression
>>>>>> is:
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>> https://github.com/apache/flink/commit/355184d69a8519d29937725c8d85e8465d7e3a90
>>>>>> > >
>>>>>> > >
>>>>>> > > On Thu, Jul 23, 2020 at 6:46 PM Kurt Young <ykt...@gmail.com>
>>>>>> wrote:
>>>>>> > >
>>>>>> > > > From my experience, java profilers are sometimes not accurate
>>>>>> enough to
>>>>>> > > > find out the performance regression
>>>>>> > > > root cause. In this case, I would suggest you try out intel
>>>>>> vtune
>>>>>> > > amplifier
>>>>>> > > > to watch more detailed metrics.
>>>>>> > > >
>>>>>> > > > Best,
>>>>>> > > > Kurt
>>>>>> > > >
>>>>>> > > >
>>>>>> > > > On Fri, Jul 24, 2020 at 8:51 AM Thomas Weise <t...@apache.org>
>>>>>> wrote:
>>>>>> > > >
>>>>>> > > > > The cause of the issue is all but clear.
>>>>>> > > > >
>>>>>> > > > > Previously I had mentioned that there is no suspect change to
>>>>>> the
>>>>>> > > Kinesis
>>>>>> > > > > connector and that I had reverted the AWS SDK change to no
>>>>>> effect.
>>>>>> > > > >
>>>>>> > > > > https://issues.apache.org/jira/browse/FLINK-17496 actually
>>>>>> fixed
>>>>>> > > another
>>>>>> > > > > regression in the previous release and is present before and
>>>>>> after.
>>>>>> > > > >
>>>>>> > > > > I repeated the run with 1.11.0 core and downgraded the entire
>>>>>> Kinesis
>>>>>> > > > > connector to 1.10.1: Nothing changes, i.e. the regression is
>>>>>> still
>>>>>> > > > present.
>>>>>> > > > > Therefore we will need to look elsewhere for the root cause.
>>>>>> > > > >
>>>>>> > > > > Regarding the time spent in snapshotState, repeat runs reveal
>>>>>> a wide
>>>>>> > > > range
>>>>>> > > > > for both versions, 1.10 and 1.11. So again this is nothing
>>>>>> pointing
>>>>>> > to
>>>>>> > > a
>>>>>> > > > > root cause.
>>>>>> > > > >
>>>>>> > > > > At this point, I have no ideas remaining other than doing a
>>>>>> bisect to
>>>>>> > > > find
>>>>>> > > > > the culprit. Any other suggestions?
>>>>>> > > > >
>>>>>> > > > > Thomas
>>>>>> > > > >
>>>>>> > > > >
>>>>>> > > > > On Thu, Jul 16, 2020 at 9:19 PM Zhijiang <
>>>>>> wangzhijiang...@aliyun.com
>>>>>> > > > > .invalid>
>>>>>> > > > > wrote:
>>>>>> > > > >
>>>>>> > > > > > Hi Thomas,
>>>>>> > > > > >
>>>>>> > > > > > Thanks for your further profiling information and glad to
>>>>>> see we
>>>>>> > > > already
>>>>>> > > > > > finalized the location to cause the regression.
>>>>>> > > > > > Actually I was also suspicious of the point of
>>>>>> #snapshotState in
>>>>>> > > > previous
>>>>>> > > > > > discussions since it indeed cost much time to block normal
>>>>>> operator
>>>>>> > > > > > processing.
>>>>>> > > > > >
>>>>>> > > > > > Based on your below feedback, the sleep time during
>>>>>> #snapshotState
>>>>>> > > > might
>>>>>> > > > > > be the main concern, and I also digged into the
>>>>>> implementation of
>>>>>> > > > > > FlinkKinesisProducer#snapshotState.
>>>>>> > > > > > while (producer.getOutstandingRecordsCount() > 0) {
>>>>>> > > > > >    producer.flush();
>>>>>> > > > > >    try {
>>>>>> > > > > >       Thread.sleep(500);
>>>>>> > > > > >    } catch (InterruptedException e) {
>>>>>> > > > > >       LOG.warn("Flushing was interrupted.");
>>>>>> > > > > >       break;
>>>>>> > > > > >    }
>>>>>> > > > > > }
>>>>>> > > > > > It seems that the sleep time is mainly affected by the
>>>>>> internal
>>>>>> > > > > operations
>>>>>> > > > > > inside KinesisProducer implementation provided by
>>>>>> amazonaws, which
>>>>>> > I
>>>>>> > > am
>>>>>> > > > > not
>>>>>> > > > > > quite familiar with.
>>>>>> > > > > > But I noticed there were two upgrades related to it in
>>>>>> > > release-1.11.0.
>>>>>> > > > > One
>>>>>> > > > > > is for upgrading amazon-kinesis-producer to 0.14.0 [1] and
>>>>>> another
>>>>>> > is
>>>>>> > > > for
>>>>>> > > > > > upgrading aws-sdk-version to 1.11.754 [2].
>>>>>> > > > > > You mentioned that you already reverted the SDK upgrade to
>>>>>> verify
>>>>>> > no
>>>>>> > > > > > changes. Did you also revert the [1] to verify?
>>>>>> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-17496
>>>>>> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-14881
>>>>>> > > > > >
>>>>>> > > > > > Best,
>>>>>> > > > > > Zhijiang
>>>>>> > > > > >
>>>>>> ------------------------------------------------------------------
>>>>>> > > > > > From:Thomas Weise <t...@apache.org>
>>>>>> > > > > > Send Time:2020年7月17日(星期五) 05:29
>>>>>> > > > > > To:dev <dev@flink.apache.org>
>>>>>> > > > > > Cc:Zhijiang <wangzhijiang...@aliyun.com>; Stephan Ewen <
>>>>>> > > > se...@apache.org
>>>>>> > > > > >;
>>>>>> > > > > > Arvid Heise <ar...@ververica.com>; Aljoscha Krettek <
>>>>>> > > > aljos...@apache.org
>>>>>> > > > > >
>>>>>> > > > > > Subject:Re: Kinesis Performance Issue (was [VOTE] Release
>>>>>> 1.11.0,
>>>>>> > > > release
>>>>>> > > > > > candidate #4)
>>>>>> > > > > >
>>>>>> > > > > > Sorry for the delay.
>>>>>> > > > > >
>>>>>> > > > > > I confirmed that the regression is due to the sink
>>>>>> (unsurprising,
>>>>>> > > since
>>>>>> > > > > > another job with the same consumer, but not the producer,
>>>>>> runs as
>>>>>> > > > > > expected).
>>>>>> > > > > >
>>>>>> > > > > > As promised I did CPU profiling on the problematic
>>>>>> application,
>>>>>> > which
>>>>>> > > > > gives
>>>>>> > > > > > more insight into the regression [1]
>>>>>> > > > > >
>>>>>> > > > > > The screenshots show that the average time for snapshotState
>>>>>> > > increases
>>>>>> > > > > from
>>>>>> > > > > > ~9s to ~28s. The data also shows the increase in sleep time
>>>>>> during
>>>>>> > > > > > snapshotState.
>>>>>> > > > > >
>>>>>> > > > > > Does anyone, based on changes made in 1.11, have a theory
>>>>>> why?
>>>>>> > > > > >
>>>>>> > > > > > I had previously looked at the changes to the Kinesis
>>>>>> connector and
>>>>>> > > > also
>>>>>> > > > > > reverted the SDK upgrade, which did not change the
>>>>>> situation.
>>>>>> > > > > >
>>>>>> > > > > > It will likely be necessary to drill into the sink /
>>>>>> checkpointing
>>>>>> > > > > details
>>>>>> > > > > > to understand the cause of the problem.
>>>>>> > > > > >
>>>>>> > > > > > Let me know if anyone has specific questions that I can
>>>>>> answer from
>>>>>> > > the
>>>>>> > > > > > profiling results.
>>>>>> > > > > >
>>>>>> > > > > > Thomas
>>>>>> > > > > >
>>>>>> > > > > > [1]
>>>>>> > > > > >
>>>>>> > > > > >
>>>>>> > > > >
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>> https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit?usp=sharing
>>>>>> > > > > >
>>>>>> > > > > > On Mon, Jul 13, 2020 at 11:14 AM Thomas Weise <
>>>>>> t...@apache.org>
>>>>>> > > wrote:
>>>>>> > > > > >
>>>>>> > > > > > > + dev@ for visibility
>>>>>> > > > > > >
>>>>>> > > > > > > I will investigate further today.
>>>>>> > > > > > >
>>>>>> > > > > > >
>>>>>> > > > > > > On Wed, Jul 8, 2020 at 4:42 AM Aljoscha Krettek <
>>>>>> > > aljos...@apache.org
>>>>>> > > > >
>>>>>> > > > > > > wrote:
>>>>>> > > > > > >
>>>>>> > > > > > >> On 06.07.20 20:39, Stephan Ewen wrote:
>>>>>> > > > > > >> >    - Did sink checkpoint notifications change in a
>>>>>> relevant
>>>>>> > way,
>>>>>> > > > for
>>>>>> > > > > > >> example
>>>>>> > > > > > >> > due to some Kafka issues we addressed in 1.11
>>>>>> (@Aljoscha
>>>>>> > maybe?)
>>>>>> > > > > > >>
>>>>>> > > > > > >> I think that's unrelated: the Kafka fixes were isolated
>>>>>> in Kafka
>>>>>> > > and
>>>>>> > > > > the
>>>>>> > > > > > >> one bug I discovered on the way was about the Task
>>>>>> reaper.
>>>>>> > > > > > >>
>>>>>> > > > > > >>
>>>>>> > > > > > >> On 07.07.20 17:51, Zhijiang wrote:
>>>>>> > > > > > >> > Sorry for my misunderstood of the previous information,
>>>>>> > Thomas.
>>>>>> > > I
>>>>>> > > > > was
>>>>>> > > > > > >> assuming that the sync checkpoint duration increased
>>>>>> after
>>>>>> > upgrade
>>>>>> > > > as
>>>>>> > > > > it
>>>>>> > > > > > >> was mentioned before.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > If I remembered correctly, the memory state backend
>>>>>> also has
>>>>>> > the
>>>>>> > > > > same
>>>>>> > > > > > >> issue? If so, we can dismiss the rocksDB state changes.
>>>>>> As the
>>>>>> > > slot
>>>>>> > > > > > sharing
>>>>>> > > > > > >> enabled, the downstream and upstream should
>>>>>> > > > > > >> > probably deployed into the same slot, then no network
>>>>>> shuffle
>>>>>> > > > > effect.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > I think we need to find out whether it has other
>>>>>> symptoms
>>>>>> > > changed
>>>>>> > > > > > >> besides the performance regression to further figure out
>>>>>> the
>>>>>> > > scope.
>>>>>> > > > > > >> > E.g. any metrics changes, the number of TaskManager
>>>>>> and the
>>>>>> > > number
>>>>>> > > > > of
>>>>>> > > > > > >> slots per TaskManager from deployment changes.
>>>>>> > > > > > >> > 40% regression is really big, I guess the changes
>>>>>> should also
>>>>>> > be
>>>>>> > > > > > >> reflected in other places.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > I am not sure whether we can reproduce the regression
>>>>>> in our
>>>>>> > AWS
>>>>>> > > > > > >> environment by writing any Kinesis jobs, since there are
>>>>>> also
>>>>>> > > normal
>>>>>> > > > > > >> Kinesis jobs as Thomas mentioned after upgrade.
>>>>>> > > > > > >> > So it probably looks like to touch some corner case. I
>>>>>> am very
>>>>>> > > > > willing
>>>>>> > > > > > >> to provide any help for debugging if possible.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Best,
>>>>>> > > > > > >> > Zhijiang
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > ------------------------------------------------------------------
>>>>>> > > > > > >> > From:Thomas Weise <t...@apache.org>
>>>>>> > > > > > >> > Send Time:2020年7月7日(星期二) 23:01
>>>>>> > > > > > >> > To:Stephan Ewen <se...@apache.org>
>>>>>> > > > > > >> > Cc:Aljoscha Krettek <aljos...@apache.org>; Arvid
>>>>>> Heise <
>>>>>> > > > > > >> ar...@ververica.com>; Zhijiang <
>>>>>> wangzhijiang...@aliyun.com>
>>>>>> > > > > > >> > Subject:Re: Kinesis Performance Issue (was [VOTE]
>>>>>> Release
>>>>>> > > 1.11.0,
>>>>>> > > > > > >> release candidate #4)
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > We are deploying our apps with FlinkK8sOperator. We
>>>>>> have one
>>>>>> > job
>>>>>> > > > > that
>>>>>> > > > > > >> works as expected after the upgrade and the one
>>>>>> discussed here
>>>>>> > > that
>>>>>> > > > > has
>>>>>> > > > > > the
>>>>>> > > > > > >> performance regression.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > "The performance regression is obvious caused by long
>>>>>> duration
>>>>>> > > of
>>>>>> > > > > sync
>>>>>> > > > > > >> checkpoint process in Kinesis sink operator, which would
>>>>>> block
>>>>>> > the
>>>>>> > > > > > normal
>>>>>> > > > > > >> data processing until back pressure the source."
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > That's a constant. Before (1.10) and upgrade have the
>>>>>> same
>>>>>> > sync
>>>>>> > > > > > >> checkpointing time. The question is what change came in
>>>>>> with the
>>>>>> > > > > > upgrade.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > On Tue, Jul 7, 2020 at 7:33 AM Stephan Ewen <
>>>>>> se...@apache.org
>>>>>> > >
>>>>>> > > > > wrote:
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > @Thomas Just one thing real quick: Are you using the
>>>>>> > standalone
>>>>>> > > > > setup
>>>>>> > > > > > >> scripts (like start-cluster.sh, and the former "slaves"
>>>>>> file) ?
>>>>>> > > > > > >> > Be aware that this is now called "workers" because of
>>>>>> avoiding
>>>>>> > > > > > >> sensitive names.
>>>>>> > > > > > >> > In one internal benchmark we saw quite a lot of
>>>>>> slowdown
>>>>>> > > > initially,
>>>>>> > > > > > >> before seeing that the cluster was not a distributed
>>>>>> cluster any
>>>>>> > > > more
>>>>>> > > > > > ;-)
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > On Tue, Jul 7, 2020 at 9:08 AM Zhijiang <
>>>>>> > > > wangzhijiang...@aliyun.com
>>>>>> > > > > >
>>>>>> > > > > > >> wrote:
>>>>>> > > > > > >> > Thanks for this kickoff and help analysis, Stephan!
>>>>>> > > > > > >> > Thanks for the further feedback and investigation,
>>>>>> Thomas!
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > The performance regression is obvious caused by long
>>>>>> duration
>>>>>> > of
>>>>>> > > > > sync
>>>>>> > > > > > >> checkpoint process in Kinesis sink operator, which would
>>>>>> block
>>>>>> > the
>>>>>> > > > > > normal
>>>>>> > > > > > >> data processing until back pressure the source.
>>>>>> > > > > > >> > Maybe we could dig into the process of sync execution
>>>>>> in
>>>>>> > > > checkpoint.
>>>>>> > > > > > >> E.g. break down the steps inside respective
>>>>>> > operator#snapshotState
>>>>>> > > > to
>>>>>> > > > > > >> statistic which operation cost most of the time, then
>>>>>> > > > > > >> > we might probably find the root cause to bring such
>>>>>> cost.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Look forward to the further progress. :)
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Best,
>>>>>> > > > > > >> > Zhijiang
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > ------------------------------------------------------------------
>>>>>> > > > > > >> > From:Stephan Ewen <se...@apache.org>
>>>>>> > > > > > >> > Send Time:2020年7月7日(星期二) 14:52
>>>>>> > > > > > >> > To:Thomas Weise <t...@apache.org>
>>>>>> > > > > > >> > Cc:Stephan Ewen <se...@apache.org>; Zhijiang <
>>>>>> > > > > > >> wangzhijiang...@aliyun.com>; Aljoscha Krettek <
>>>>>> > > aljos...@apache.org
>>>>>> > > > >;
>>>>>> > > > > > >> Arvid Heise <ar...@ververica.com>
>>>>>> > > > > > >> > Subject:Re: Kinesis Performance Issue (was [VOTE]
>>>>>> Release
>>>>>> > > 1.11.0,
>>>>>> > > > > > >> release candidate #4)
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Thank you for the digging so deeply.
>>>>>> > > > > > >> > Mysterious think this regression.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > On Mon, Jul 6, 2020, 22:56 Thomas Weise <
>>>>>> t...@apache.org>
>>>>>> > wrote:
>>>>>> > > > > > >> > @Stephan: yes, I refer to sync time in the web UI (it
>>>>>> is
>>>>>> > > unchanged
>>>>>> > > > > > >> between 1.10 and 1.11 for the specific pipeline).
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > I verified that increasing the checkpointing interval
>>>>>> does not
>>>>>> > > > make
>>>>>> > > > > a
>>>>>> > > > > > >> difference.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > I looked at the Kinesis connector changes since 1.10.1
>>>>>> and
>>>>>> > don't
>>>>>> > > > see
>>>>>> > > > > > >> anything that could cause this.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Another pipeline that is using the Kinesis consumer
>>>>>> (but not
>>>>>> > the
>>>>>> > > > > > >> producer) performs as expected.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > I tried reverting the AWS SDK version change, symptoms
>>>>>> remain
>>>>>> > > > > > unchanged:
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > diff --git
>>>>>> a/flink-connectors/flink-connector-kinesis/pom.xml
>>>>>> > > > > > >> b/flink-connectors/flink-connector-kinesis/pom.xml
>>>>>> > > > > > >> > index a6abce23ba..741743a05e 100644
>>>>>> > > > > > >> > --- a/flink-connectors/flink-connector-kinesis/pom.xml
>>>>>> > > > > > >> > +++ b/flink-connectors/flink-connector-kinesis/pom.xml
>>>>>> > > > > > >> > @@ -33,7 +33,7 @@ under the License.
>>>>>> > > > > > >> >
>>>>>> > > > > > >>
>>>>>> > > > >
>>>>>> > >
>>>>>> <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
>>>>>> > > > > > >> >          <name>flink-connector-kinesis</name>
>>>>>> > > > > > >> >          <properties>
>>>>>> > > > > > >> > -
>>>>>>  <aws.sdk.version>1.11.754</aws.sdk.version>
>>>>>> > > > > > >> > +
>>>>>>  <aws.sdk.version>1.11.603</aws.sdk.version>
>>>>>> > > > > > >> >
>>>>>> > > > > > >> <aws.kinesis-kcl.version>1.11.2</aws.kinesis-kcl.version>
>>>>>> > > > > > >> >
>>>>>> > > > > > >> <aws.kinesis-kpl.version>0.14.0</aws.kinesis-kpl.version>
>>>>>> > > > > > >> >
>>>>>> > > > > > >>
>>>>>> > > > > >
>>>>>> > > > >
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>> <aws.dynamodbstreams-kinesis-adapter.version>1.5.0</aws.dynamodbstreams-kinesis-adapter.version>
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > I'm planning to take a look with a profiler next.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Thomas
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > On Mon, Jul 6, 2020 at 11:40 AM Stephan Ewen <
>>>>>> > se...@apache.org>
>>>>>> > > > > > wrote:
>>>>>> > > > > > >> > Hi all!
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Forking this thread out of the release vote thread.
>>>>>> > > > > > >> >  From what Thomas describes, it really sounds like a
>>>>>> > > sink-specific
>>>>>> > > > > > >> issue.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > @Thomas: When you say sink has a long synchronous
>>>>>> checkpoint
>>>>>> > > time,
>>>>>> > > > > you
>>>>>> > > > > > >> mean the time that is shown as "sync time" on the
>>>>>> metrics and
>>>>>> > web
>>>>>> > > > UI?
>>>>>> > > > > > That
>>>>>> > > > > > >> is not including any network buffer related operations.
>>>>>> It is
>>>>>> > > purely
>>>>>> > > > > the
>>>>>> > > > > > >> operator's time.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Can we dig into the changes we did in sinks:
>>>>>> > > > > > >> >    - Kinesis version upgrade, AWS library updates
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >    - Could it be that some call (checkpoint complete)
>>>>>> that was
>>>>>> > > > > > >> previously (1.10) in a separate thread is not in the
>>>>>> mailbox and
>>>>>> > > > this
>>>>>> > > > > > >> simply reduces the number of threads that do the work?
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >    - Did sink checkpoint notifications change in a
>>>>>> relevant
>>>>>> > way,
>>>>>> > > > for
>>>>>> > > > > > >> example due to some Kafka issues we addressed in 1.11
>>>>>> (@Aljoscha
>>>>>> > > > > maybe?)
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > Best,
>>>>>> > > > > > >> > Stephan
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> > On Sun, Jul 5, 2020 at 7:10 AM Zhijiang <
>>>>>> > > > wangzhijiang...@aliyun.com
>>>>>> > > > > > .invalid>
>>>>>> > > > > > >> wrote:
>>>>>> > > > > > >> > Hi Thomas,
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   Regarding [2], it has more detail infos in the Jira
>>>>>> > > description
>>>>>> > > > (
>>>>>> > > > > > >> https://issues.apache.org/jira/browse/FLINK-16404).
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   I can also give some basic explanations here to
>>>>>> dismiss the
>>>>>> > > > > concern.
>>>>>> > > > > > >> >   1. In the past, the following buffers after the
>>>>>> barrier will
>>>>>> > > be
>>>>>> > > > > > >> cached on downstream side before alignment.
>>>>>> > > > > > >> >   2. In 1.11, the upstream would not send the buffers
>>>>>> after
>>>>>> > the
>>>>>> > > > > > >> barrier. When the downstream finishes the alignment, it
>>>>>> will
>>>>>> > > notify
>>>>>> > > > > the
>>>>>> > > > > > >> downstream of continuing sending following buffers,
>>>>>> since it can
>>>>>> > > > > process
>>>>>> > > > > > >> them after alignment.
>>>>>> > > > > > >> >   3. The only difference is that the temporary blocked
>>>>>> buffers
>>>>>> > > are
>>>>>> > > > > > >> cached either on downstream side or on upstream side
>>>>>> before
>>>>>> > > > alignment.
>>>>>> > > > > > >> >   4. The side effect would be the additional
>>>>>> notification cost
>>>>>> > > for
>>>>>> > > > > > >> every barrier alignment. If the downstream and upstream
>>>>>> are
>>>>>> > > deployed
>>>>>> > > > > in
>>>>>> > > > > > >> separate TaskManager, the cost is network transport
>>>>>> delay (the
>>>>>> > > > effect
>>>>>> > > > > > can
>>>>>> > > > > > >> be ignored based on our testing with 1s checkpoint
>>>>>> interval).
>>>>>> > For
>>>>>> > > > > > sharing
>>>>>> > > > > > >> slot in your case, the cost is only one method call in
>>>>>> > processor,
>>>>>> > > > can
>>>>>> > > > > be
>>>>>> > > > > > >> ignored also.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   You mentioned "In this case, the downstream task has
>>>>>> a high
>>>>>> > > > > average
>>>>>> > > > > > >> checkpoint duration(~30s, sync part)." This duration is
>>>>>> not
>>>>>> > > > reflecting
>>>>>> > > > > > the
>>>>>> > > > > > >> changes above, and it is only indicating the duration for
>>>>>> > calling
>>>>>> > > > > > >> `Operation.snapshotState`.
>>>>>> > > > > > >> >   If this duration is beyond your expectation, you can
>>>>>> check
>>>>>> > or
>>>>>> > > > > debug
>>>>>> > > > > > >> whether the source/sink operations might take more time
>>>>>> to
>>>>>> > finish
>>>>>> > > > > > >> `snapshotState` in practice. E.g. you can
>>>>>> > > > > > >> >   make the implementation of this method as empty to
>>>>>> further
>>>>>> > > > verify
>>>>>> > > > > > the
>>>>>> > > > > > >> effect.
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   Best,
>>>>>> > > > > > >> >   Zhijiang
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > >
>>>>>> ------------------------------------------------------------------
>>>>>> > > > > > >> >   From:Thomas Weise <t...@apache.org>
>>>>>> > > > > > >> >   Send Time:2020年7月5日(星期日) 12:22
>>>>>> > > > > > >> >   To:dev <dev@flink.apache.org>; Zhijiang <
>>>>>> > > > > wangzhijiang...@aliyun.com
>>>>>> > > > > > >
>>>>>> > > > > > >> >   Cc:Yingjie Cao <kevin.ying...@gmail.com>
>>>>>> > > > > > >> >   Subject:Re: [VOTE] Release 1.11.0, release candidate
>>>>>> #4
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   Hi Zhijiang,
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   Could you please point me to more details regarding:
>>>>>> "[2]:
>>>>>> > > Delay
>>>>>> > > > > > send
>>>>>> > > > > > >> the
>>>>>> > > > > > >> >   following buffers after checkpoint barrier on
>>>>>> upstream side
>>>>>> > > > until
>>>>>> > > > > > >> barrier
>>>>>> > > > > > >> >   alignment on downstream side."
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   In this case, the downstream task has a high average
>>>>>> > > checkpoint
>>>>>> > > > > > >> duration
>>>>>> > > > > > >> >   (~30s, sync part). If there was a change to hold
>>>>>> buffers
>>>>>> > > > depending
>>>>>> > > > > > on
>>>>>> > > > > > >> >   downstream performance, could this possibly apply to
>>>>>> this
>>>>>> > case
>>>>>> > > > > (even
>>>>>> > > > > > >> when
>>>>>> > > > > > >> >   there is no shuffle that would require alignment)?
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   Thanks,
>>>>>> > > > > > >> >   Thomas
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   On Sat, Jul 4, 2020 at 7:39 AM Zhijiang <
>>>>>> > > > > wangzhijiang...@aliyun.com
>>>>>> > > > > > >> .invalid>
>>>>>> > > > > > >> >   wrote:
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   > Hi Thomas,
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Thanks for the further update information.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > I guess we can dismiss the network stack changes,
>>>>>> since in
>>>>>> > > > your
>>>>>> > > > > > >> case the
>>>>>> > > > > > >> >   > downstream and upstream would probably be deployed
>>>>>> in the
>>>>>> > > same
>>>>>> > > > > > slot
>>>>>> > > > > > >> >   > bypassing the network data shuffle.
>>>>>> > > > > > >> >   > Also I guess release-1.11 will not bring general
>>>>>> > performance
>>>>>> > > > > > >> regression in
>>>>>> > > > > > >> >   > runtime engine, as we also did the performance
>>>>>> testing for
>>>>>> > > all
>>>>>> > > > > > >> general
>>>>>> > > > > > >> >   > cases by [1] in real cluster before and the testing
>>>>>> > results
>>>>>> > > > > should
>>>>>> > > > > > >> fit the
>>>>>> > > > > > >> >   > expectation. But we indeed did not test the
>>>>>> specific
>>>>>> > source
>>>>>> > > > and
>>>>>> > > > > > sink
>>>>>> > > > > > >> >   > connectors yet as I known.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Regarding your performance regression with 40%, I
>>>>>> wonder
>>>>>> > it
>>>>>> > > is
>>>>>> > > > > > >> probably
>>>>>> > > > > > >> >   > related to specific source/sink changes (e.g.
>>>>>> kinesis) or
>>>>>> > > > > > >> environment
>>>>>> > > > > > >> >   > issues with corner case.
>>>>>> > > > > > >> >   > If possible, it would be helpful to further locate
>>>>>> whether
>>>>>> > > the
>>>>>> > > > > > >> regression
>>>>>> > > > > > >> >   > is caused by kinesis, by replacing the kinesis
>>>>>> source &
>>>>>> > sink
>>>>>> > > > and
>>>>>> > > > > > >> keeping
>>>>>> > > > > > >> >   > the others same.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > As you said, it would be efficient to contact with
>>>>>> you
>>>>>> > > > directly
>>>>>> > > > > > >> next week
>>>>>> > > > > > >> >   > to further discuss this issue. And we are
>>>>>> willing/eager to
>>>>>> > > > > provide
>>>>>> > > > > > >> any help
>>>>>> > > > > > >> >   > to resolve this issue soon.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Besides that, I guess this issue should not be the
>>>>>> blocker
>>>>>> > > for
>>>>>> > > > > the
>>>>>> > > > > > >> >   > release, since it is probably a corner case based
>>>>>> on the
>>>>>> > > > current
>>>>>> > > > > > >> analysis.
>>>>>> > > > > > >> >   > If we really conclude anything need to be resolved
>>>>>> after
>>>>>> > the
>>>>>> > > > > final
>>>>>> > > > > > >> >   > release, then we can also make the next minor
>>>>>> > release-1.11.1
>>>>>> > > > > come
>>>>>> > > > > > >> soon.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > [1]
>>>>>> https://issues.apache.org/jira/browse/FLINK-18433
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Best,
>>>>>> > > > > > >> >   > Zhijiang
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >
>>>>>> > > > >
>>>>>> ------------------------------------------------------------------
>>>>>> > > > > > >> >   > From:Thomas Weise <t...@apache.org>
>>>>>> > > > > > >> >   > Send Time:2020年7月4日(星期六) 12:26
>>>>>> > > > > > >> >   > To:dev <dev@flink.apache.org>; Zhijiang <
>>>>>> > > > > > wangzhijiang...@aliyun.com
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >   > Cc:Yingjie Cao <kevin.ying...@gmail.com>
>>>>>> > > > > > >> >   > Subject:Re: [VOTE] Release 1.11.0, release
>>>>>> candidate #4
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Hi Zhijiang,
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > It will probably be best if we connect next week
>>>>>> and
>>>>>> > discuss
>>>>>> > > > the
>>>>>> > > > > > >> issue
>>>>>> > > > > > >> >   > directly since this could be quite difficult to
>>>>>> reproduce.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Before the testing result on our side comes out
>>>>>> for your
>>>>>> > > > > > respective
>>>>>> > > > > > >> job
>>>>>> > > > > > >> >   > case, I have some other questions to confirm for
>>>>>> further
>>>>>> > > > > analysis:
>>>>>> > > > > > >> >   >     -  How much percentage regression you found
>>>>>> after
>>>>>> > > > switching
>>>>>> > > > > to
>>>>>> > > > > > >> 1.11?
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > ~40% throughput decline
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >     -  Are there any network bottleneck in your
>>>>>> cluster?
>>>>>> > > E.g.
>>>>>> > > > > the
>>>>>> > > > > > >> network
>>>>>> > > > > > >> >   > bandwidth is full caused by other jobs? If so, it
>>>>>> might
>>>>>> > have
>>>>>> > > > > more
>>>>>> > > > > > >> effects
>>>>>> > > > > > >> >   > by above [2]
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > The test runs on a k8s cluster that is also used
>>>>>> for other
>>>>>> > > > > > >> production jobs.
>>>>>> > > > > > >> >   > There is no reason be believe network is the
>>>>>> bottleneck.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >     -  Did you adjust the default network buffer
>>>>>> setting?
>>>>>> > > E.g.
>>>>>> > > > > > >> >   >
>>>>>> "taskmanager.network.memory.floating-buffers-per-gate" or
>>>>>> > > > > > >> >   > "taskmanager.network.memory.buffers-per-channel"
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > The job is using the defaults, i.e we don't
>>>>>> configure the
>>>>>> > > > > > settings.
>>>>>> > > > > > >> If you
>>>>>> > > > > > >> >   > want me to try specific settings in the hope that
>>>>>> it will
>>>>>> > > help
>>>>>> > > > > to
>>>>>> > > > > > >> isolate
>>>>>> > > > > > >> >   > the issue please let me know.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >     -  I guess the topology has three vertexes
>>>>>> > > > "KinesisConsumer
>>>>>> > > > > ->
>>>>>> > > > > > >> Chained
>>>>>> > > > > > >> >   > FlatMap -> KinesisProducer", and the partition
>>>>>> mode for
>>>>>> > > > > > >> "KinesisConsumer ->
>>>>>> > > > > > >> >   > FlatMap" and "FlatMap->KinesisProducer" are both
>>>>>> > "forward"?
>>>>>> > > If
>>>>>> > > > > so,
>>>>>> > > > > > >> the edge
>>>>>> > > > > > >> >   > connection is one-to-one, not all-to-all, then the
>>>>>> above
>>>>>> > > > [1][2]
>>>>>> > > > > > >> should no
>>>>>> > > > > > >> >   > effects in theory with default network buffer
>>>>>> setting.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > There are only 2 vertices and the edge is
>>>>>> "forward".
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >     - By slot sharing, I guess these three vertex
>>>>>> > > parallelism
>>>>>> > > > > task
>>>>>> > > > > > >> would
>>>>>> > > > > > >> >   > probably be deployed into the same slot, then the
>>>>>> data
>>>>>> > > shuffle
>>>>>> > > > > is
>>>>>> > > > > > >> by memory
>>>>>> > > > > > >> >   > queue, not network stack. If so, the above [2]
>>>>>> should no
>>>>>> > > > effect.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Yes, vertices share slots.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >     - I also saw some Jira changes for kinesis in
>>>>>> this
>>>>>> > > > release,
>>>>>> > > > > > >> could you
>>>>>> > > > > > >> >   > confirm that these changes would not effect the
>>>>>> > performance?
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > I will need to take a look. 1.10 already had a
>>>>>> regression
>>>>>> > > > > > >> introduced by the
>>>>>> > > > > > >> >   > Kinesis producer update.
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > Thanks,
>>>>>> > > > > > >> >   > Thomas
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > On Thu, Jul 2, 2020 at 11:46 PM Zhijiang <
>>>>>> > > > > > >> wangzhijiang...@aliyun.com
>>>>>> > > > > > >> >   > .invalid>
>>>>>> > > > > > >> >   > wrote:
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   > > Hi Thomas,
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Thanks for your reply with rich information!
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > We are trying to reproduce your case in our
>>>>>> cluster to
>>>>>> > > > further
>>>>>> > > > > > >> verify it,
>>>>>> > > > > > >> >   > > and  @Yingjie Cao is working on it now.
>>>>>> > > > > > >> >   > >  As we have not kinesis consumer and producer
>>>>>> > internally,
>>>>>> > > so
>>>>>> > > > > we
>>>>>> > > > > > >> will
>>>>>> > > > > > >> >   > > construct the common source and sink instead in
>>>>>> the case
>>>>>> > > of
>>>>>> > > > > > >> backpressure.
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Firstly, we can dismiss the rockdb factor in this
>>>>>> > release,
>>>>>> > > > > since
>>>>>> > > > > > >> you also
>>>>>> > > > > > >> >   > > mentioned that "filesystem leads to same
>>>>>> symptoms".
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Secondly, if my understanding is right, you
>>>>>> emphasis
>>>>>> > that
>>>>>> > > > the
>>>>>> > > > > > >> regression
>>>>>> > > > > > >> >   > > only exists for the jobs with low checkpoint
>>>>>> interval
>>>>>> > > (10s).
>>>>>> > > > > > >> >   > > Based on that, I have two suspicions with the
>>>>>> network
>>>>>> > > > related
>>>>>> > > > > > >> changes in
>>>>>> > > > > > >> >   > > this release:
>>>>>> > > > > > >> >   > >     - [1]: Limited the maximum backlog value
>>>>>> (default
>>>>>> > 10)
>>>>>> > > in
>>>>>> > > > > > >> subpartition
>>>>>> > > > > > >> >   > > queue.
>>>>>> > > > > > >> >   > >     - [2]: Delay send the following buffers after
>>>>>> > > checkpoint
>>>>>> > > > > > >> barrier on
>>>>>> > > > > > >> >   > > upstream side until barrier alignment on
>>>>>> downstream
>>>>>> > side.
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > These changes are motivated for reducing the
>>>>>> in-flight
>>>>>> > > > buffers
>>>>>> > > > > > to
>>>>>> > > > > > >> speedup
>>>>>> > > > > > >> >   > > checkpoint especially in the case of
>>>>>> backpressure.
>>>>>> > > > > > >> >   > > In theory they should have very minor
>>>>>> performance effect
>>>>>> > > and
>>>>>> > > > > > >> actually we
>>>>>> > > > > > >> >   > > also tested in cluster to verify within
>>>>>> expectation
>>>>>> > before
>>>>>> > > > > > >> merging them,
>>>>>> > > > > > >> >   > >  but maybe there are other corner cases we have
>>>>>> not
>>>>>> > > thought
>>>>>> > > > of
>>>>>> > > > > > >> before.
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Before the testing result on our side comes out
>>>>>> for your
>>>>>> > > > > > >> respective job
>>>>>> > > > > > >> >   > > case, I have some other questions to confirm for
>>>>>> further
>>>>>> > > > > > analysis:
>>>>>> > > > > > >> >   > >     -  How much percentage regression you found
>>>>>> after
>>>>>> > > > > switching
>>>>>> > > > > > >> to 1.11?
>>>>>> > > > > > >> >   > >     -  Are there any network bottleneck in your
>>>>>> cluster?
>>>>>> > > > E.g.
>>>>>> > > > > > the
>>>>>> > > > > > >> network
>>>>>> > > > > > >> >   > > bandwidth is full caused by other jobs? If so,
>>>>>> it might
>>>>>> > > have
>>>>>> > > > > > more
>>>>>> > > > > > >> effects
>>>>>> > > > > > >> >   > > by above [2]
>>>>>> > > > > > >> >   > >     -  Did you adjust the default network buffer
>>>>>> > setting?
>>>>>> > > > E.g.
>>>>>> > > > > > >> >   > >
>>>>>> "taskmanager.network.memory.floating-buffers-per-gate"
>>>>>> > or
>>>>>> > > > > > >> >   > > "taskmanager.network.memory.buffers-per-channel"
>>>>>> > > > > > >> >   > >     -  I guess the topology has three vertexes
>>>>>> > > > > "KinesisConsumer
>>>>>> > > > > > ->
>>>>>> > > > > > >> >   > Chained
>>>>>> > > > > > >> >   > > FlatMap -> KinesisProducer", and the partition
>>>>>> mode for
>>>>>> > > > > > >> "KinesisConsumer
>>>>>> > > > > > >> >   > ->
>>>>>> > > > > > >> >   > > FlatMap" and "FlatMap->KinesisProducer" are both
>>>>>> > > "forward"?
>>>>>> > > > If
>>>>>> > > > > > >> so, the
>>>>>> > > > > > >> >   > edge
>>>>>> > > > > > >> >   > > connection is one-to-one, not all-to-all, then
>>>>>> the above
>>>>>> > > > > [1][2]
>>>>>> > > > > > >> should no
>>>>>> > > > > > >> >   > > effects in theory with default network buffer
>>>>>> setting.
>>>>>> > > > > > >> >   > >     - By slot sharing, I guess these three vertex
>>>>>> > > > parallelism
>>>>>> > > > > > >> task would
>>>>>> > > > > > >> >   > > probably be deployed into the same slot, then
>>>>>> the data
>>>>>> > > > shuffle
>>>>>> > > > > > is
>>>>>> > > > > > >> by
>>>>>> > > > > > >> >   > memory
>>>>>> > > > > > >> >   > > queue, not network stack. If so, the above [2]
>>>>>> should no
>>>>>> > > > > effect.
>>>>>> > > > > > >> >   > >     - I also saw some Jira changes for kinesis
>>>>>> in this
>>>>>> > > > > release,
>>>>>> > > > > > >> could you
>>>>>> > > > > > >> >   > > confirm that these changes would not effect the
>>>>>> > > performance?
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Best,
>>>>>> > > > > > >> >   > > Zhijiang
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > >
>>>>>> ------------------------------------------------------------------
>>>>>> > > > > > >> >   > > From:Thomas Weise <t...@apache.org>
>>>>>> > > > > > >> >   > > Send Time:2020年7月3日(星期五) 01:07
>>>>>> > > > > > >> >   > > To:dev <dev@flink.apache.org>; Zhijiang <
>>>>>> > > > > > >> wangzhijiang...@aliyun.com>
>>>>>> > > > > > >> >   > > Subject:Re: [VOTE] Release 1.11.0, release
>>>>>> candidate #4
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Hi Zhijiang,
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > The performance degradation manifests in
>>>>>> backpressure
>>>>>> > > which
>>>>>> > > > > > leads
>>>>>> > > > > > >> to
>>>>>> > > > > > >> >   > > growing backlog in the source. I switched a few
>>>>>> times
>>>>>> > > > between
>>>>>> > > > > > >> 1.10 and
>>>>>> > > > > > >> >   > 1.11
>>>>>> > > > > > >> >   > > and the behavior is consistent.
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > The DAG is:
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > KinesisConsumer -> (Flat Map, Flat Map, Flat Map)
>>>>>> > >  --------
>>>>>> > > > > > >> forward
>>>>>> > > > > > >> >   > > ---------> KinesisProducer
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Parallelism: 160
>>>>>> > > > > > >> >   > > No shuffle/rebalance.
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Checkpointing config:
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Checkpointing Mode Exactly Once
>>>>>> > > > > > >> >   > > Interval 10s
>>>>>> > > > > > >> >   > > Timeout 10m 0s
>>>>>> > > > > > >> >   > > Minimum Pause Between Checkpoints 10s
>>>>>> > > > > > >> >   > > Maximum Concurrent Checkpoints 1
>>>>>> > > > > > >> >   > > Persist Checkpoints Externally Enabled (delete on
>>>>>> > > > > cancellation)
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > State backend: rocksdb  (filesystem leads to same
>>>>>> > > symptoms)
>>>>>> > > > > > >> >   > > Checkpoint size is tiny (500KB)
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > An interesting difference to another job that I
>>>>>> had
>>>>>> > > upgraded
>>>>>> > > > > > >> successfully
>>>>>> > > > > > >> >   > > is the low checkpointing interval.
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > Thanks,
>>>>>> > > > > > >> >   > > Thomas
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > On Wed, Jul 1, 2020 at 9:02 PM Zhijiang <
>>>>>> > > > > > >> wangzhijiang...@aliyun.com
>>>>>> > > > > > >> >   > > .invalid>
>>>>>> > > > > > >> >   > > wrote:
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > > > Hi Thomas,
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Thanks for the efficient feedback.
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Regarding the suggestion of adding the release
>>>>>> notes
>>>>>> > > > > document,
>>>>>> > > > > > >> I agree
>>>>>> > > > > > >> >   > > > with your point. Maybe we should adjust the
>>>>>> vote
>>>>>> > > template
>>>>>> > > > > > >> accordingly
>>>>>> > > > > > >> >   > in
>>>>>> > > > > > >> >   > > > the respective wiki to guide the following
>>>>>> release
>>>>>> > > > > processes.
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Regarding the performance regression, could you
>>>>>> > provide
>>>>>> > > > some
>>>>>> > > > > > >> more
>>>>>> > > > > > >> >   > details
>>>>>> > > > > > >> >   > > > for our better measurement or reproducing on
>>>>>> our
>>>>>> > sides?
>>>>>> > > > > > >> >   > > > E.g. I guess the topology only includes two
>>>>>> vertexes
>>>>>> > > > source
>>>>>> > > > > > and
>>>>>> > > > > > >> sink?
>>>>>> > > > > > >> >   > > > What is the parallelism for every vertex?
>>>>>> > > > > > >> >   > > > The upstream shuffles data to the downstream
>>>>>> via
>>>>>> > > rebalance
>>>>>> > > > > > >> partitioner
>>>>>> > > > > > >> >   > or
>>>>>> > > > > > >> >   > > > other?
>>>>>> > > > > > >> >   > > > The checkpoint mode is exactly-once with
>>>>>> rocksDB state
>>>>>> > > > > > backend?
>>>>>> > > > > > >> >   > > > The backpressure happened in this case?
>>>>>> > > > > > >> >   > > > How much percentage regression in this case?
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Best,
>>>>>> > > > > > >> >   > > > Zhijiang
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >>
>>>>>> > ------------------------------------------------------------------
>>>>>> > > > > > >> >   > > > From:Thomas Weise <t...@apache.org>
>>>>>> > > > > > >> >   > > > Send Time:2020年7月2日(星期四) 09:54
>>>>>> > > > > > >> >   > > > To:dev <dev@flink.apache.org>
>>>>>> > > > > > >> >   > > > Subject:Re: [VOTE] Release 1.11.0, release
>>>>>> candidate
>>>>>> > #4
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Hi Till,
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Yes, we don't have the setting in
>>>>>> flink-conf.yaml.
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Generally, we carry forward the existing
>>>>>> configuration
>>>>>> > > and
>>>>>> > > > > any
>>>>>> > > > > > >> change
>>>>>> > > > > > >> >   > to
>>>>>> > > > > > >> >   > > > default configuration values would impact the
>>>>>> upgrade.
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Yes, since it is an incompatible change I
>>>>>> would state
>>>>>> > it
>>>>>> > > > in
>>>>>> > > > > > the
>>>>>> > > > > > >> release
>>>>>> > > > > > >> >   > > > notes.
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > Thanks,
>>>>>> > > > > > >> >   > > > Thomas
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > BTW I found a performance regression while
>>>>>> trying to
>>>>>> > > > upgrade
>>>>>> > > > > > >> another
>>>>>> > > > > > >> >   > > > pipeline with this RC. It is a simple Kinesis
>>>>>> to
>>>>>> > Kinesis
>>>>>> > > > > job.
>>>>>> > > > > > >> Wasn't
>>>>>> > > > > > >> >   > able
>>>>>> > > > > > >> >   > > > to pin it down yet, symptoms include increased
>>>>>> > > checkpoint
>>>>>> > > > > > >> alignment
>>>>>> > > > > > >> >   > time.
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > On Wed, Jul 1, 2020 at 12:04 AM Till Rohrmann <
>>>>>> > > > > > >> trohrm...@apache.org>
>>>>>> > > > > > >> >   > > > wrote:
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > > > Hi Thomas,
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > > > just to confirm: When starting the image in
>>>>>> local
>>>>>> > > mode,
>>>>>> > > > > then
>>>>>> > > > > > >> you
>>>>>> > > > > > >> >   > don't
>>>>>> > > > > > >> >   > > > have
>>>>>> > > > > > >> >   > > > > any of the JobManager memory configuration
>>>>>> settings
>>>>>> > > > > > >> configured in the
>>>>>> > > > > > >> >   > > > > effective flink-conf.yaml, right? Does this
>>>>>> mean
>>>>>> > that
>>>>>> > > > you
>>>>>> > > > > > have
>>>>>> > > > > > >> >   > > explicitly
>>>>>> > > > > > >> >   > > > > removed `jobmanager.heap.size: 1024m` from
>>>>>> the
>>>>>> > default
>>>>>> > > > > > >> configuration?
>>>>>> > > > > > >> >   > > If
>>>>>> > > > > > >> >   > > > > this is the case, then I believe it was more
>>>>>> of an
>>>>>> > > > > > >> unintentional
>>>>>> > > > > > >> >   > > artifact
>>>>>> > > > > > >> >   > > > > that it worked before and it has been
>>>>>> corrected now
>>>>>> > so
>>>>>> > > > > that
>>>>>> > > > > > >> one needs
>>>>>> > > > > > >> >   > > to
>>>>>> > > > > > >> >   > > > > specify the memory of the JM process
>>>>>> explicitly. Do
>>>>>> > > you
>>>>>> > > > > > think
>>>>>> > > > > > >> it
>>>>>> > > > > > >> >   > would
>>>>>> > > > > > >> >   > > > help
>>>>>> > > > > > >> >   > > > > to explicitly state this in the release
>>>>>> notes?
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > > > Cheers,
>>>>>> > > > > > >> >   > > > > Till
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > > > On Wed, Jul 1, 2020 at 7:01 AM Thomas Weise <
>>>>>> > > > > t...@apache.org
>>>>>> > > > > > >
>>>>>> > > > > > >> wrote:
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > > > > Thanks for preparing another RC!
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > As mentioned in the previous RC thread, it
>>>>>> would
>>>>>> > be
>>>>>> > > > > super
>>>>>> > > > > > >> helpful
>>>>>> > > > > > >> >   > if
>>>>>> > > > > > >> >   > > > the
>>>>>> > > > > > >> >   > > > > > release notes that are part of the
>>>>>> documentation
>>>>>> > can
>>>>>> > > > be
>>>>>> > > > > > >> included
>>>>>> > > > > > >> >   > [1].
>>>>>> > > > > > >> >   > > > > It's
>>>>>> > > > > > >> >   > > > > > a significant time-saver to have read
>>>>>> those first.
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > I found one more non-backward compatible
>>>>>> change
>>>>>> > that
>>>>>> > > > > would
>>>>>> > > > > > >> be worth
>>>>>> > > > > > >> >   > > > > > addressing/mentioning:
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > It is now necessary to configure the
>>>>>> jobmanager
>>>>>> > heap
>>>>>> > > > > size
>>>>>> > > > > > in
>>>>>> > > > > > >> >   > > > > > flink-conf.yaml (with either
>>>>>> jobmanager.heap.size
>>>>>> > > > > > >> >   > > > > > or jobmanager.memory.heap.size). Why would
>>>>>> I not
>>>>>> > > want
>>>>>> > > > to
>>>>>> > > > > > do
>>>>>> > > > > > >> that
>>>>>> > > > > > >> >   > > > anyways?
>>>>>> > > > > > >> >   > > > > > Well, we set it dynamically for a cluster
>>>>>> > deployment
>>>>>> > > > via
>>>>>> > > > > > the
>>>>>> > > > > > >> >   > > > > > flinkk8soperator, but the container image
>>>>>> can also
>>>>>> > > be
>>>>>> > > > > used
>>>>>> > > > > > >> for
>>>>>> > > > > > >> >   > > testing
>>>>>> > > > > > >> >   > > > > with
>>>>>> > > > > > >> >   > > > > > local mode (./bin/jobmanager.sh
>>>>>> start-foreground
>>>>>> > > > local).
>>>>>> > > > > > >> That will
>>>>>> > > > > > >> >   > > fail
>>>>>> > > > > > >> >   > > > > if
>>>>>> > > > > > >> >   > > > > > the heap wasn't configured and that's how I
>>>>>> > noticed
>>>>>> > > > it.
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > Thanks,
>>>>>> > > > > > >> >   > > > > > Thomas
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > [1]
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >>
>>>>>> > > > > >
>>>>>> > > > >
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > On Tue, Jun 30, 2020 at 3:18 AM Zhijiang <
>>>>>> > > > > > >> >   > wangzhijiang...@aliyun.com
>>>>>> > > > > > >> >   > > > > > .invalid>
>>>>>> > > > > > >> >   > > > > > wrote:
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > > > > Hi everyone,
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > > > Please review and vote on the release
>>>>>> candidate
>>>>>> > #4
>>>>>> > > > for
>>>>>> > > > > > the
>>>>>> > > > > > >> >   > version
>>>>>> > > > > > >> >   > > > > > 1.11.0,
>>>>>> > > > > > >> >   > > > > > > as follows:
>>>>>> > > > > > >> >   > > > > > > [ ] +1, Approve the release
>>>>>> > > > > > >> >   > > > > > > [ ] -1, Do not approve the release
>>>>>> (please
>>>>>> > provide
>>>>>> > > > > > >> specific
>>>>>> > > > > > >> >   > > comments)
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > > > The complete staging area is available
>>>>>> for your
>>>>>> > > > > review,
>>>>>> > > > > > >> which
>>>>>> > > > > > >> >   > > > includes:
>>>>>> > > > > > >> >   > > > > > > * JIRA release notes [1],
>>>>>> > > > > > >> >   > > > > > > * the official Apache source release and
>>>>>> binary
>>>>>> > > > > > >> convenience
>>>>>> > > > > > >> >   > > releases
>>>>>> > > > > > >> >   > > > to
>>>>>> > > > > > >> >   > > > > > be
>>>>>> > > > > > >> >   > > > > > > deployed to dist.apache.org [2], which
>>>>>> are
>>>>>> > signed
>>>>>> > > > > with
>>>>>> > > > > > >> the key
>>>>>> > > > > > >> >   > > with
>>>>>> > > > > > >> >   > > > > > > fingerprint
>>>>>> > > 2DA85B93244FDFA19A6244500653C0A2CEA00D0E
>>>>>> > > > > > [3],
>>>>>> > > > > > >> >   > > > > > > * all artifacts to be deployed to the
>>>>>> Maven
>>>>>> > > Central
>>>>>> > > > > > >> Repository
>>>>>> > > > > > >> >   > [4],
>>>>>> > > > > > >> >   > > > > > > * source code tag "release-1.11.0-rc4"
>>>>>> [5],
>>>>>> > > > > > >> >   > > > > > > * website pull request listing the new
>>>>>> release
>>>>>> > and
>>>>>> > > > > > adding
>>>>>> > > > > > >> >   > > > announcement
>>>>>> > > > > > >> >   > > > > > > blog post [6].
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > > > The vote will be open for at least 72
>>>>>> hours. It
>>>>>> > is
>>>>>> > > > > > >> adopted by
>>>>>> > > > > > >> >   > > > majority
>>>>>> > > > > > >> >   > > > > > > approval, with at least 3 PMC
>>>>>> affirmative votes.
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > > > Thanks,
>>>>>> > > > > > >> >   > > > > > > Release Manager
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > > > [1]
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >>
>>>>>> > > > > >
>>>>>> > > > >
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346364
>>>>>> > > > > > >> >   > > > > > > [2]
>>>>>> > > > > > >> >   >
>>>>>> > > > https://dist.apache.org/repos/dist/dev/flink/flink-1.11.0-rc4/
>>>>>> > > > > > >> >   > > > > > > [3]
>>>>>> > > > > > https://dist.apache.org/repos/dist/release/flink/KEYS
>>>>>> > > > > > >> >   > > > > > > [4]
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >>
>>>>>> > > > >
>>>>>> > >
>>>>>> https://repository.apache.org/content/repositories/orgapacheflink-1377/
>>>>>> > > > > > >> >   > > > > > > [5]
>>>>>> > > > > > >> >   > >
>>>>>> > > > >
>>>>>> https://github.com/apache/flink/releases/tag/release-1.11.0-rc4
>>>>>> > > > > > >> >   > > > > > > [6]
>>>>>> > https://github.com/apache/flink-web/pull/352
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > > >
>>>>>> > > > > > >> >   > > > > >
>>>>>> > > > > > >> >   > > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   > >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >   >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >> >
>>>>>> > > > > > >>
>>>>>> > > > > > >>
>>>>>> > > > > >
>>>>>> > > > > >
>>>>>> > > > >
>>>>>> > > >
>>>>>> > >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Regards,
>>>>>> > Roman
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>
>>
>> --
>> Regards,
>> Roman
>>
>
>
> --
> Regards,
> Roman
>

Reply via email to