Re: Flink 1.14. Bi-weekly 2021-06-22

2021-06-29 Thread Yu Li
Thanks for taking care of this, Joe! The information is very helpful!

Best Regards,
Yu


On Mon, 28 Jun 2021 at 18:40, Till Rohrmann  wrote:

> Thanks a lot for the update, Joe. This is very helpful!
>
> Cheers,
> Till
>
> On Mon, Jun 28, 2021 at 10:10 AM Xintong Song 
> wrote:
>
> > Thanks for the update, Joe.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > On Mon, Jun 28, 2021 at 3:54 PM Johannes Moser 
> > wrote:
> >
> > > Hello,
> > >
> > > Last Tuesday was our second bi-weekly.
> > >
> > > You can read up the outcome in the confluence wiki page [1].
> > >
> > > *Feature freeze date*
> > > As we didn't come to a clear agreement, we will keep the anticipated
> > > feature freeze date
> > > as it is at early August.
> > >
> > > *Build stability*
> > > The good thing: we decreased the number of issues, the not so good
> thing:
> > > only by ten.
> > > We as a community need to put further effort into this.
> > >
> > > *Dependencies*
> > > We'd like to ask all contributors to have a look at the components they
> > > are heavily
> > > Involved with to see if any dependencies require updating. There were
> > some
> > > Issues recently to pass the security scans by some of the users. In
> > future
> > > this should
> > > somehow be a default at the beginning of every release cycle.
> > >
> > > *Criteria for merging PRs*
> > > We want to avoid merging PRs with unrelated CI failures. We are quite
> > > aware that we
> > > need to raise the importance of the Docker caching issue.
> > >
> > > What can you do to make the Flink 1.14. release a good one:
> > > * Identify and update outdated dependencies
> > > * Get rid of test instabilities
> > > * Don't merge PRs including unrelated CI failures
> > >
> > > Best,
> > > Joe
> > >
> > >
> > > [1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release
> >
>


[jira] [Created] (FLINK-23178) Raise an error for writing stream data into partitioned hive/FS tables without a partition committer

2021-06-29 Thread Rui Li (Jira)
Rui Li created FLINK-23178:
--

 Summary: Raise an error for writing stream data into partitioned 
hive/FS tables without a partition committer
 Key: FLINK-23178
 URL: https://issues.apache.org/jira/browse/FLINK-23178
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem, Connectors / Hive
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23179) Support PIVOT and UNPIVOT

2021-06-29 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-23179:


 Summary: Support PIVOT and UNPIVOT 
 Key: FLINK-23179
 URL: https://issues.apache.org/jira/browse/FLINK-23179
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Jingsong Lee


See PIVOT in SqlServer: 
[https://docs.microsoft.com/en-us/sql/t-sql/queries/from-using-pivot-and-unpivot?view=sql-server-ver15]

Not only in batch, Streaming also has this requirement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23180) Initialize checkpoint location lazily in DataStream Batch Jobs

2021-06-29 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-23180:
--

 Summary: Initialize checkpoint location lazily in DataStream Batch 
Jobs
 Key: FLINK-23180
 URL: https://issues.apache.org/jira/browse/FLINK-23180
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Jiayi Liao


Currently batch jobs will initialize checkpoint location early when 
{{CheckpointCoordinator}} is created, which will create lots of useless 
directories on distributed filesystem. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23181) Add support for multi hop window

2021-06-29 Thread Aitozi (Jira)
Aitozi created FLINK-23181:
--

 Summary: Add support for multi hop window 
 Key: FLINK-23181
 URL: https://issues.apache.org/jira/browse/FLINK-23181
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Moving to JUnit5

2021-06-29 Thread Qingsheng Ren
Thanks for wrapping things up and effort on the migration Arvid!

I’m +1 for the migration plan.

To summarize the migration path proposed by Arvid:

1. Remove JUnit 4 dependency and introduce junit5-vintage-engine in the project 
(all existing cases will still work)
2. Rewrite JUnit 4 rules in JUnit 5 extension style (~10 rules)
3. Migrate all existing tests to JUnit 5 (This is a giant commit similar to 
code formatting)
4. Remove vintage runner and some cleanup

One issue is that we cannot totally get rid of JUnit 4 dependency from the 
project because:

1. Testcontainers. As mentioned in their official docs[1], Testcontainers still 
depends on JUnit 4, and the problem might not be solved until Testcontainer 2, 
which still has no roadmap[2].
2. It’s still possible to appear as transitive dependency

Since classes of JUnit 4 and 5 are under different packages, there won’t be 
conflict having JUnit 4 in the project. To prevent the project splitting into 4 
& 5 again, we can ban JUnit 4 imports in CheckStyle to prevent developers to 
write test cases in JUnit 4 style intentionally or mistakenly.

I’m happy and willing to take over the migration work. This migration indeed 
takes some efforts, but it will help with test case developing in the future.

[1] https://www.testcontainers.org/test_framework_integration/junit_5/
[2] 
https://github.com/testcontainers/testcontainers-java/issues/970#issuecomment-437273363

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Jun 16, 2021, 3:13 AM +0800, Arvid Heise , wrote:
> Sorry for following up so late. A while ago, I spiked a junit 5 migration.
>
> To recap: here is the migration plan.
>
> 0. (There is a way to use JUnit4 + 5 at the same time in a project - you'd
> > use a specific JUnit4 runner to execute JUnit5. I'd like to skip this
> > step as it would slow down migration significantly)
> > 1. Use JUnit5 with vintage runner. JUnit4 tests run mostly out of the
> > box. The most important difference is that only 3 base rules are supported
> > and the remainder needs to be migrated. Luckily, most of our rules derive
> > from the supported ExternalResource. So in this step, we would need to
> > migrate the rules.
> > 2. Implement new tests in JUnit5.
> > 3. Soft-migrate old tests in JUnit5. This is mostly a renaming of
> > annotation (@Before -> @BeforeEach, etc.). Adjust parameterized tests
> > (~400), replace rule usages (~670) with extensions, exception handling
> > (~1600 tests), and timeouts (~200). This can be done on a test class by
> > test class base and there is no hurry.
> > 4. Remove vintage runner, once most tests are migrated by doing a final
> > push for lesser used modules.
> >
>
> Here are my insights:
> 0. works but I don't see the benefit
> 1. works well [1] with a small diff [2]. Note that the branch is based on a
> quite old master.
> 2. works well as well [3].
> 2a. However, we should be aware that we need to port quite a few rules to
> extensions before we can implement more complex JUnit5 tests, especially
> ITCases (I'd probably skip junit-jupiter-migrationsupport that allows us to
> reuse _some_ rules using specific base classes). We have ~10-15 rules that
> need to be ported.
> 3. Soft migration will take forever and probably never finish. Many tests
> can be automatically ported with some (I used 8) simple regexes. I'd rather
> do a hard migration of all tests at a particular point (no freeze) and have
> that git commit excluded from blame, similar to the spotless commit.
> 3a. A huge chunk of changes (>90%) comes from the optional message in
> assertX being moved from the first to the last position. @Chesnay Schepler
>  proposed to rather implement our own Assertion class
> in the old junit package that translates it. But this would need to go hand
> in hand with 4) to avoid name clash. It could also just be a temporary
> thing that we use during hard migration and then inline before merging.
> 4. If we do hard migration, we should probably do that in a follow-up PR
> (contains just the migrations of the tests that have been added in the
> meantime).
>
> Here is my time-limited take on the hard migration [4]. It was a matter of
> ~10h.
>
> [1]
> https://dev.azure.com/arvidheise0209/arvidheise/_build/results?buildId=1208&view=ms.vss-test-web.build-test-results-tab&runId=24838&resultId=10&paneView=debug
> [2]
> https://github.com/AHeise/flink/commit/7f3e7faac9ba53615bda89e51d5fd17d940c4a55
> [3]
> https://github.com/AHeise/flink/commit/c0dd3d12fbd07b327b560107396ee0bb1e2d8969
> [4] https://github.com/apache/flink/compare/master...AHeise:junit5?expand=1
>
> On Tue, Dec 1, 2020 at 9:54 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
> > +1 for the migration
> >
> > (I agree with Dawid, for me the most important benefit is better support of
> > parameterized tests).
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, Nov 30, 2020 at 9:42 PM Arvid Heise  wrote:
> >
> > > Hi Till,
> > >
> > > immediate benefit would be

[jira] [Created] (FLINK-23182) Connection leak in RMQSource

2021-06-29 Thread Jira
Michał Ciesielczyk created FLINK-23182:
--

 Summary: Connection leak in RMQSource 
 Key: FLINK-23182
 URL: https://issues.apache.org/jira/browse/FLINK-23182
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors/ RabbitMQ
Affects Versions: 1.12.4, 1.13.1, 1.14.0
Reporter: Michał Ciesielczyk


The RabbitMQ connection is not closed properly in the RMQSource connector in 
case of failures. This leads to a connection leak (we loose handles to still 
opened connections) that will last until the Flink TaskManager is either 
stopped or crashes.

The issue is caused by improper resource releasing in open and close methods of 
RMQSource:
 - 
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L260]
 - here the connection is opened, but not closed in case of failure (e.g. 
caused by invalid queue configuration)
 - 
[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L282]
 - here the connection might not closed properly if stopping the consumer 
causes a failure first

In both cases, the solution is relatively simple - make sure that the 
connection#close is always called if it should be (failing to close one 
resource should not prevent other close methods from being called). In open we 
probably can silently close allocated resources (as the process did not succeed 
eventually anyway). In close, we should either throw the first caught exception 
or the last one, and log all the others as warnings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23183) Lost ACKs for redelivered messages in RMQSource

2021-06-29 Thread Jira
Michał Ciesielczyk created FLINK-23183:
--

 Summary: Lost ACKs for redelivered messages in RMQSource 
 Key: FLINK-23183
 URL: https://issues.apache.org/jira/browse/FLINK-23183
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors/ RabbitMQ
Affects Versions: 1.12.4, 1.13.1
Reporter: Michał Ciesielczyk


As described in the FLINK-20244, the redelivered messages are not acknowledged 
properly (only applicable when autoAck is disabled). When used with a prefetch 
count in the consumer it may even lead to stop the source to consume any more 
messages.


 The solution (proposed in FLINK-20244) should resolve the issue. All 
successfully consumed RMQ messages should be acknowledged, regardless of 
whether the message is ignored or processed further in the pipeline. The 
{{sessionIds.add(deliveryTag)}} 
([RMQSource.java#L423|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L423])
 should be called before checking if the message has already been processed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23184) CompileException Assignment conversion not possible from type "int" to type "short"

2021-06-29 Thread xiaojin.wy (Jira)
xiaojin.wy created FLINK-23184:
--

 Summary: CompileException Assignment conversion not possible from 
type "int" to type "short"
 Key: FLINK-23184
 URL: https://issues.apache.org/jira/browse/FLINK-23184
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.14.0
Reporter: xiaojin.wy


CREATE TABLE MySink (
  `a` SMALLINT
) WITH (
  'connector' = 'filesystem',
  'format' = 'testcsv',
  'path' = '$resultPath'
)

CREATE TABLE database8_t0 (
  `c0` SMALLINT
) WITH (
  'connector' = 'filesystem',
  'format' = 'testcsv',
  'path' = '$resultPath11'
)

CREATE TABLE database8_t1 (
  `c0` SMALLINT,
  `c1` TINYINT
) WITH (
  'connector' = 'filesystem',
  'format' = 'testcsv',
  'path' = '$resultPath22'
)

INSERT OVERWRITE database8_t0(c0) VALUES(cast(22424 as SMALLINT))
INSERT OVERWRITE database8_t1(c0, c1) VALUES(cast(-17443 as SMALLINT), cast(97 
as TINYINT))
insert into MySink
SELECT database8_t0.c0 AS ref0 FROM database8_t0, database8_t1 WHERE CAST ((- 
(database8_t0.c0)) AS BOOLEAN)


Excuting that , you will get the errors:
2021-06-29 19:39:27
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:440)
at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not instantiate generated class 
'BatchExecCalc$4536'
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:66)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:43)
at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:626)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:600)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorCha

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-29 Thread Márton Balassi
Hi all,

I commend Konstantin and Till when it comes to standing up for the
community values.

Based on your feedback we are withdrawing the original proposal and
attaching a more general custom netty handler API proposal [1] written by
G. The change necessary to the Flink repository is approximately 500 lines
of code. [2]

Please let us focus on discussing the details of this API and whether it
covers the necessary use cases.

[1]
https://docs.google.com/document/d/1Idnw8YauMK1x_14iv0rVF0Hqm58J6Dg-hi-hEuL6hwM/edit#heading=h.ijcbce3c5gip
[2]
https://github.com/gaborgsomogyi/flink/commit/942f23679ac21428bb87fc85557b9b443fcaf310

Thanks,
Marton

On Wed, Jun 23, 2021 at 9:36 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi all,
>
> Thanks, Konstantin and Till, for guiding the discussion.
>
> I was not aware of the results of the call with Konstantin and was
> attempting to resolve the unanswered questions before more, potentially
> fruitless, work was done.
>
> I am also looking forward to the coming proposal, as well as increasing my
> understanding of this specific use case + its limitations!
>
> Best,
> Austin
>
> On Tue, Jun 22, 2021 at 6:32 AM Till Rohrmann 
> wrote:
>
> > Hi everyone,
> >
> > I do like the idea of keeping the actual change outside of Flink but to
> > enable Flink to support such a use case (different authentication
> > mechanisms). I think this is a good compromise for the community that
> > combines long-term maintainability with support for new use-cases. I am
> > looking forward to your proposal.
> >
> > I also want to second Konstantin here that the tone of your last email,
> > Marton, does not reflect the values and manners of the Flink community
> and
> > is not representative of how we conduct discussions. Especially, the more
> > senior community members should know this and act accordingly in order to
> > be good role models for others in the community. Technical discussions
> > should not be decided by who wields presumably the greatest authority but
> > by the soundness of arguments and by what is the best solution for a
> > problem.
> >
> > Let us now try to find the best solution for the problem at hand!
> >
> > Cheers,
> > Till
> >
> > On Tue, Jun 22, 2021 at 11:24 AM Konstantin Knauf 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > First, Marton and I had a brief conversation yesterday offline and
> > > discussed exploring the approach of exposing the authentication
> > > functionality via an API. So, I am looking forward to your proposal in
> > that
> > > direction. The benefit of such a solution would be that it is
> extensible
> > > for others and it does add a smaller maintenance (in particular
> testing)
> > > footprint to Apache Flink itself. If we end up going down this route,
> > > flink-packages.org would be a great way to promote these third party
> > > "authentication modules".
> > >
> > > Second, Marton, I understand your frustration about the long discussion
> > on
> > > this "simple matter", but the condescending tone of your last mail
> feels
> > > uncalled for to me. Austin expressed a valid opinion on the topic,
> which
> > is
> > > based on his experience from other Open Source frameworks (CNCF
> mostly).
> > I
> > > am sure you agree that it is important for Apache Flink to stay open
> and
> > to
> > > consider different approaches and ideas and I don't think it helps the
> > > culture of discussion to shoot it down like this ("This is where this
> > > discussion stops.").
> > >
> > > Let's continue to move this discussion forward and I am sure we'll
> find a
> > > consensus based on product and technological considerations.
> > >
> > > Thanks,
> > >
> > > Konstantin
> > >
> > > On Tue, Jun 22, 2021 at 9:31 AM Márton Balassi <
> balassi.mar...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Austin,
> > > >
> > > > Thank you for your thoughts. This is where this discussion stops.
> This
> > > > email thread already contains more characters than the implementation
> > and
> > > > what is needed for the next 20 years of maintenance.
> > > >
> > > > It is great that you have a view on modern solutions and thank you
> for
> > > > offering your help with brainstorming solutions. I am responsible for
> > > Flink
> > > > at Cloudera and we do need an implementation like this and it is in
> > fact
> > > > already in production at dozens of customers. We are open to adapting
> > > that
> > > > to expose a more generic API (and keeping Kerberos to our fork), to
> > > > contribute this to the community as others have asked for it and to
> > > protect
> > > > ourselves from occasionally having to update this critical
> > implementation
> > > > path based on changes in the Apache codebase. I have worked with
> close
> > > to a
> > > > hundred Big Data customers as a consultant and an engineering manager
> > and
> > > > committed hundreds of changes to Apache Flink over the past decade,
> > > please
> > > > trust my judgement on a simple matter like this.

Re: [DISCUSS] Moving to JUnit5

2021-06-29 Thread Arvid Heise
Hi Qingsheng,

I like the idea of enforcing JUnit5 tests with checkstyle. I'm assuming
JUnit4 will bleed from time to time into the test classpath.

Obviously, we can only do that after all tests are migrated and we are
confident that no small change would require a contributor to do the
migration of a test in an unrelated change.

For the big commit, I'd propose to have it after branch cut for 1.14
release. So for 1.14 we would just have the coexistance PR with the vintage
engine. In that way, the least possible number of contributors should be
affected. Of course, the big commit can be prepared beforehand.

On Tue, Jun 29, 2021 at 11:44 AM Qingsheng Ren  wrote:

> Thanks for wrapping things up and effort on the migration Arvid!
>
> I’m +1 for the migration plan.
>
> To summarize the migration path proposed by Arvid:
>
> 1. Remove JUnit 4 dependency and introduce junit5-vintage-engine in the
> project (all existing cases will still work)
> 2. Rewrite JUnit 4 rules in JUnit 5 extension style (~10 rules)
> 3. Migrate all existing tests to JUnit 5 (This is a giant commit similar
> to code formatting)
> 4. Remove vintage runner and some cleanup
>
> One issue is that we cannot totally get rid of JUnit 4 dependency from the
> project because:
>
> 1. Testcontainers. As mentioned in their official docs[1], Testcontainers
> still depends on JUnit 4, and the problem might not be solved until
> Testcontainer 2, which still has no roadmap[2].
> 2. It’s still possible to appear as transitive dependency
>
> Since classes of JUnit 4 and 5 are under different packages, there won’t
> be conflict having JUnit 4 in the project. To prevent the project splitting
> into 4 & 5 again, we can ban JUnit 4 imports in CheckStyle to prevent
> developers to write test cases in JUnit 4 style intentionally or mistakenly.
>
> I’m happy and willing to take over the migration work. This migration
> indeed takes some efforts, but it will help with test case developing in
> the future.
>
> [1] https://www.testcontainers.org/test_framework_integration/junit_5/
> [2]
> https://github.com/testcontainers/testcontainers-java/issues/970#issuecomment-437273363
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Jun 16, 2021, 3:13 AM +0800, Arvid Heise , wrote:
> > Sorry for following up so late. A while ago, I spiked a junit 5
> migration.
> >
> > To recap: here is the migration plan.
> >
> > 0. (There is a way to use JUnit4 + 5 at the same time in a project -
> you'd
> > > use a specific JUnit4 runner to execute JUnit5. I'd like to skip this
> > > step as it would slow down migration significantly)
> > > 1. Use JUnit5 with vintage runner. JUnit4 tests run mostly out of the
> > > box. The most important difference is that only 3 base rules are
> supported
> > > and the remainder needs to be migrated. Luckily, most of our rules
> derive
> > > from the supported ExternalResource. So in this step, we would need to
> > > migrate the rules.
> > > 2. Implement new tests in JUnit5.
> > > 3. Soft-migrate old tests in JUnit5. This is mostly a renaming of
> > > annotation (@Before -> @BeforeEach, etc.). Adjust parameterized tests
> > > (~400), replace rule usages (~670) with extensions, exception handling
> > > (~1600 tests), and timeouts (~200). This can be done on a test class by
> > > test class base and there is no hurry.
> > > 4. Remove vintage runner, once most tests are migrated by doing a final
> > > push for lesser used modules.
> > >
> >
> > Here are my insights:
> > 0. works but I don't see the benefit
> > 1. works well [1] with a small diff [2]. Note that the branch is based
> on a
> > quite old master.
> > 2. works well as well [3].
> > 2a. However, we should be aware that we need to port quite a few rules to
> > extensions before we can implement more complex JUnit5 tests, especially
> > ITCases (I'd probably skip junit-jupiter-migrationsupport that allows us
> to
> > reuse _some_ rules using specific base classes). We have ~10-15 rules
> that
> > need to be ported.
> > 3. Soft migration will take forever and probably never finish. Many tests
> > can be automatically ported with some (I used 8) simple regexes. I'd
> rather
> > do a hard migration of all tests at a particular point (no freeze) and
> have
> > that git commit excluded from blame, similar to the spotless commit.
> > 3a. A huge chunk of changes (>90%) comes from the optional message in
> > assertX being moved from the first to the last position. @Chesnay
> Schepler
> >  proposed to rather implement our own Assertion
> class
> > in the old junit package that translates it. But this would need to go
> hand
> > in hand with 4) to avoid name clash. It could also just be a temporary
> > thing that we use during hard migration and then inline before merging.
> > 4. If we do hard migration, we should probably do that in a follow-up PR
> > (contains just the migrations of the tests that have been added in the
> > meantime).
> >
> > Here is my time-limited take on the h

[jira] [Created] (FLINK-23185) REST API docs refer to removed rescaling endpoints

2021-06-29 Thread Juha (Jira)
Juha created FLINK-23185:


 Summary: REST API docs refer to removed rescaling endpoints
 Key: FLINK-23185
 URL: https://issues.apache.org/jira/browse/FLINK-23185
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.13.1
Reporter: Juha


The REST API docs says:

"There exist several async operations among these APIs, e.g. trigger savepoint, 
rescale a job."

Additionally according to the docs the endpoints "/jobs/:jobid/rescaling" and 
"/jobs/:jobid/rescaling/:triggerid" are there and return HTTP 200. AFAIK 
they're removed and return HTTP 503. The operations were removed in the CLI in 
FLINK-12312.

The docs of above endpoints should be changed to tell that they've been removed 
(or fail always) and the examples of of async operations shouldn't contain a 
removed endpoint.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-171: Async Sink

2021-06-29 Thread Hausmann, Steffen
Hey Poitr,

I've just adapted the FLIP and changed the signature for the 
`submitRequestEntries` method:

protected abstract void submitRequestEntries(List 
requestEntries, ResultFuture requestResult);

In addition, we are likely to use an AtomicLong to track the number of 
outstanding requests, as you have proposed in 2b). I've already indicated this 
in the FLIP, but it's not fully fleshed out. But as you have said, that seems 
to be an implementation detail and the important part is the change of the 
`submitRequestEntries` signature.

Thanks for your feedback!

Cheers, Steffen


On 25.06.21, 17:05, "Hausmann, Steffen"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi Piotr,

I’m happy to take your guidance on this. I need to think through your 
proposals and I’ll follow-up on Monday with some more context so that we can 
close the discussion on these details. But for now, I’ll close the vote.

Thanks, Steffen

From: Piotr Nowojski 
Date: Friday, 25. June 2021 at 14:48
To: Till Rohrmann 
Cc: Steffen Hausmann , "dev@flink.apache.org" 
, Arvid Heise 
Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink


CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.


Hey,

I've just synced with Arvid about a couple of more remarks from my side and 
he shared mine concerns.

1. I would very strongly recommend ditching `CompletableFuture ` from 
the  `protected abstract CompletableFuture 
submitRequestEntries(List requestEntries);`  in favor of 
something like `org.apache.flink.streaming.api.functions.async.ResultFuture` 
interface. `CompletableFuture` would partially make the threading model of 
the `AsyncSincWriter` part of the public API and it would tie our hands. 
Regardless how `CompletableFuture` is used, it imposes performance overhead 
because it's synchronisation/volatile inside of it. On the other hand something 
like:

protected abstract void submitRequestEntries(List 
requestEntries, ResultFuture requestResult);

Would allow us to implement the threading model as we wish. `ResultFuture` 
could be backed via `CompletableFuture` underneath, but it could also be 
something more efficient.  I will explain what I have in mind in a second.

2. It looks to me that proposed `AsyncSinkWriter` Internals are not very 
efficient and maybe the threading model hasn't been thought through? Especially 
private fields:

private final BlockingDeque bufferedRequestEntries;
private BlockingDeque> inFlightRequests;

are a bit strange to me. Why do we need two separate thread safe 
collections? Why do we need a `BlockingDeque` of `CompletableFuture`s? If we 
are already using a fully synchronised collection, there should be no need for 
another layer of thread safe `CompletableFuture`.

As I understand, the threading model of the `AsyncSinkWriter` is very 
similar to that of the `AsyncWaitOperator`, with very similar requirements for 
inducing backpressure. How I would see it implemented is for example:

a) Having a single lock, that would encompass the whole 
`AsyncSinkWriter#flush()` method. `flush()` would be called from the task 
thread (mailbox). To induce backpressure, `#flush()` would just call 
`lock.wait()`. `ResultFuture#complete(...)` called from an async thread, would 
also synchronize on the same lock, and mark some of the inflight requests as 
completed and call `lock.notify()`.

b) More efficient solution. On the hot path we would have for example only 
`AtomicLong numberOfInFlightRequests`. Task thread would be bumping it, 
`ResultFuture#complete()` would be decreasing it. If the task thread when 
bumping `numberOfInFlightRequests` exceeds a threshold, he goes to sleep/wait 
on a lock or some `CompletableFuture`. If `ResultFuture#complete()` when 
decreasing the count goes below the threshold, it would wake up the task 
thread.  Compared to the option a),  on the hot path, option b) would have only 
AtomicLong.increment overhead

c) We could use mailbox, the same way as AsyncWaitOperator is doing. In 
this case `ResultFuture#complete()` would be enquing mailbox action, which is 
thread safe on it's own.

Either of those options would be more efficient and simpler (from the 
threading model perspective) than having two `BlockingQueues` and 
`CompletableFuture`. Also as you can see, neither of those solutions require 
the overhead of ` CompletableFuture submitRequestEntries(List 
requestEntries)`. Each one of those could use a more efficient and custom 
implementation of `ResultFuture.complete(...)`.


Whether we use a), b) or c) I think should be an implementation detail. But 
to allow this to truly be an implementation detail, we would need to agree on 
1. Neve

[jira] [Created] (FLINK-23186) Update "Local Installation" page

2021-06-29 Thread Daisy Tsang (Jira)
Daisy Tsang created FLINK-23186:
---

 Summary: Update "Local Installation" page
 Key: FLINK-23186
 URL: https://issues.apache.org/jira/browse/FLINK-23186
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Daisy Tsang


This page could be updated with more detail and more user empathy in mind. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-29 Thread Austin Cawley-Edwards
Hi all,

Thanks for the updated proposal. I have a few questions about the API,
please see below.

* What stability semantics do you envision for this API?
* Does Flink expose dependencies’ APIs in other places? Since this exposes
the Netty API, will this make it difficult to upgrade Netty?
* I share Till's concern about multiple factories – other HTTP middleware
frameworks commonly support chaining middlewares. Since the proposed API
does not include these features/guarantee ordering, do you see any reason
to allow more than one factory?

Best,
Austin

On Tue, Jun 29, 2021 at 8:55 AM Márton Balassi 
wrote:

> Hi all,
>
> I commend Konstantin and Till when it comes to standing up for the
> community values.
>
> Based on your feedback we are withdrawing the original proposal and
> attaching a more general custom netty handler API proposal [1] written by
> G. The change necessary to the Flink repository is approximately 500 lines
> of code. [2]
>
> Please let us focus on discussing the details of this API and whether it
> covers the necessary use cases.
>
> [1]
>
> https://docs.google.com/document/d/1Idnw8YauMK1x_14iv0rVF0Hqm58J6Dg-hi-hEuL6hwM/edit#heading=h.ijcbce3c5gip
> [2]
>
> https://github.com/gaborgsomogyi/flink/commit/942f23679ac21428bb87fc85557b9b443fcaf310
>
> Thanks,
> Marton
>
> On Wed, Jun 23, 2021 at 9:36 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
> > Hi all,
> >
> > Thanks, Konstantin and Till, for guiding the discussion.
> >
> > I was not aware of the results of the call with Konstantin and was
> > attempting to resolve the unanswered questions before more, potentially
> > fruitless, work was done.
> >
> > I am also looking forward to the coming proposal, as well as increasing
> my
> > understanding of this specific use case + its limitations!
> >
> > Best,
> > Austin
> >
> > On Tue, Jun 22, 2021 at 6:32 AM Till Rohrmann 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I do like the idea of keeping the actual change outside of Flink but to
> > > enable Flink to support such a use case (different authentication
> > > mechanisms). I think this is a good compromise for the community that
> > > combines long-term maintainability with support for new use-cases. I am
> > > looking forward to your proposal.
> > >
> > > I also want to second Konstantin here that the tone of your last email,
> > > Marton, does not reflect the values and manners of the Flink community
> > and
> > > is not representative of how we conduct discussions. Especially, the
> more
> > > senior community members should know this and act accordingly in order
> to
> > > be good role models for others in the community. Technical discussions
> > > should not be decided by who wields presumably the greatest authority
> but
> > > by the soundness of arguments and by what is the best solution for a
> > > problem.
> > >
> > > Let us now try to find the best solution for the problem at hand!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Jun 22, 2021 at 11:24 AM Konstantin Knauf 
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > First, Marton and I had a brief conversation yesterday offline and
> > > > discussed exploring the approach of exposing the authentication
> > > > functionality via an API. So, I am looking forward to your proposal
> in
> > > that
> > > > direction. The benefit of such a solution would be that it is
> > extensible
> > > > for others and it does add a smaller maintenance (in particular
> > testing)
> > > > footprint to Apache Flink itself. If we end up going down this route,
> > > > flink-packages.org would be a great way to promote these third party
> > > > "authentication modules".
> > > >
> > > > Second, Marton, I understand your frustration about the long
> discussion
> > > on
> > > > this "simple matter", but the condescending tone of your last mail
> > feels
> > > > uncalled for to me. Austin expressed a valid opinion on the topic,
> > which
> > > is
> > > > based on his experience from other Open Source frameworks (CNCF
> > mostly).
> > > I
> > > > am sure you agree that it is important for Apache Flink to stay open
> > and
> > > to
> > > > consider different approaches and ideas and I don't think it helps
> the
> > > > culture of discussion to shoot it down like this ("This is where this
> > > > discussion stops.").
> > > >
> > > > Let's continue to move this discussion forward and I am sure we'll
> > find a
> > > > consensus based on product and technological considerations.
> > > >
> > > > Thanks,
> > > >
> > > > Konstantin
> > > >
> > > > On Tue, Jun 22, 2021 at 9:31 AM Márton Balassi <
> > balassi.mar...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Austin,
> > > > >
> > > > > Thank you for your thoughts. This is where this discussion stops.
> > This
> > > > > email thread already contains more characters than the
> implementation
> > > and
> > > > > what is needed for the next 20 years of maintenance.
> > > > >
> > > > > It is great that you have a view on modern s

[jira] [Created] (FLINK-23187) [Documentation] PubSubSink should use "withTopicName" rather than "withSubscriptionName"

2021-06-29 Thread Jared Wasserman (Jira)
Jared Wasserman created FLINK-23187:
---

 Summary: [Documentation] PubSubSink should use "withTopicName" 
rather than "withSubscriptionName"
 Key: FLINK-23187
 URL: https://issues.apache.org/jira/browse/FLINK-23187
 Project: Flink
  Issue Type: Bug
Reporter: Jared Wasserman


There are two PubSubSink code examples in the documentation:

[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/pubsub/#pubsub-sink]

[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/pubsub/#integration-testing]

These examples should use "withTopicName" rather than "withSubscriptionName" in 
the builders.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Feedback Collection Jira Bot

2021-06-29 Thread Stephan Ewen
Being a bit late to the party, and don't want to ask to change everything,
just maybe some observation.

My main observation and concern is still that this puts pressure and
confusion on contributors, which are mostly blocked on committers for
reviews, or are taking tickets as multi-week projects. I think it is not a
great experience for contributors, when they are already unsure why their
work isn't getting the attention from committers that they hoped for, to
then see issues unassigned or deprioritized automatically. I think we
really need to weigh this discouragement of contributors against the desire
for a tidy ticket system.
I also think by now this isn't just a matter of phrasing the bot's message
correctly. Auto unassignment and deprioritization sends a subtle message
that jira resolution is a more important goal than paying attention to
contributors (at least I think that is how it will be perceived by many).

Back to the original motivation, to not have issues lying around forever,
ensuring there is closure eventually.
For that, even much longer intervals would be fine. Like pinging every
three months, closing after three pings - would resolve most tickets in a
year, which is not too bad in the scope of a project like Flink. Many
features/wishes easily move to two releases in the future, which is almost
a year. We would get rid of long dead tickets and interfere little with
current tickets. Contributors can probably understand ticket closing after
a year of inactivity.

I am curious if a very simple bot that really just looks at stale issues
(no comment/change in three months), pings the
issue/reporter/assignee/watchers and closes it after three pings would do
the job.
We would get out of the un-assigning business (which can send very tricky
signals) and would rely on reporters/assignees/watchers to unassign if they
see that the contributor abandoned the issue. With a cadence of three
months for pinging, this isn't much work for the ones that get pinged.

Issues where we rely on faster handling are probably the ones where
committers have a stake in getting those into an upcoming release, so these
tend to be watched anyways.

On Wed, Jun 23, 2021 at 2:39 PM JING ZHANG  wrote:

> Hi Konstantin, Chesnay,
>
> > I would like it to not unassign people if a PR is open. These are
> > usually blocked by the reviewer, not the assignee, and having the
> > assignees now additionally having to update JIRA periodically is a bit
> > like rubbing salt into the wound.
>
> I agree with Chesnay about not un-assign an issue if a PR is open.
> Besides, Could assignees remove the "stale-assigned" tag  by themself? It
> seems assignees have no permission to delete the tag if the issue is not
> created by themselves.
>
> Best regards,
> JING ZHANG
>
> Konstantin Knauf  于2021年6月23日周三 下午4:17写道:
>
> > > I agree there are such tickets, but I don't see how this is addressing
> my
> > concerns. There are also tickets that just shouldn't be closed as I
> > described above. Why do you think that duplicating tickets and losing
> > discussions/knowledge is a good solution?
> >
> > I don't understand why we are necessarily losing discussion/knowledge.
> The
> > tickets are still there, just in "Closed" state, which are included in
> > default Jira search. We could of course just add a label, but closing
> seems
> > clearer to me given that likely this ticket will not get comitter
> attention
> > in the foreseeable future.
> >
> > > I would like to avoid having to constantly fight against the bot. It's
> > already responsible for the majority of my daily emails, with quite
> little
> > benefit for me personally. I initially thought that after some period of
> > time it will settle down, but now I'm afraid it won't happen.
> >
> > Can you elaborate which rules you are running into mostly? I'd rather
> like
> > to understand how we work right now and where this conflicts with the
> Jira
> > bot vs slowly disabling the jira bot via labels.
> >
> > On Wed, Jun 23, 2021 at 10:00 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Konstantin,
> > >
> > > > In my opinion it is important that we close tickets eventually. There
> > are
> > > a
> > > > lot of tickets (bugs, improvements, tech debt) that over time became
> > > > irrelevant, out-of-scope, irreproducible, etc.  In my experience,
> these
> > > > tickets are usually not closed by anyone but the bot.
> > >
> > > I agree there are such tickets, but I don't see how this is addressing
> my
> > > concerns. There are also tickets that just shouldn't be closed as I
> > > described above. Why do you think that duplicating tickets and losing
> > > discussions/knowledge is a good solution?
> > >
> > > I would like to avoid having to constantly fight against the bot. It's
> > > already responsible for the majority of my daily emails, with quite
> > little
> > > benefit for me personally. I initially thought that after some period
> of
> > > time it will settle down, but now I'm afraid it won't happen

[jira] [Created] (FLINK-23188) Unsupported function definition: IFNULL. Only user defined functions are supported as inline functions

2021-06-29 Thread xiaojin.wy (Jira)
xiaojin.wy created FLINK-23188:
--

 Summary: Unsupported function definition: IFNULL. Only user 
defined functions are supported as inline functions
 Key: FLINK-23188
 URL: https://issues.apache.org/jira/browse/FLINK-23188
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: xiaojin.wy


CREATE TABLE database0_t0(
c0 FLOAT
) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs:///tmp/database0_t0.csv',
  'format' = 'csv'
);
INSERT OVERWRITE database0_t0(c0) VALUES(0.40445197);

SELECT database0_t0.c0 AS ref0 FROM database0_t0 WHERE 
((IFNULL(database0_t0.c1, database0_t0.c1)) IS NULL);

The errors:
"(BridgingSqlFunction.java:76)
  at 
org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction.of(BridgingSqlFunction.java:116)
  at 
org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:65)
  at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
  at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:71)
  at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:134)
  at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:247)
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)  
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)  at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)  
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)  at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.toRexNodes(ExpressionConverter.java:240)
  at 
org.apache.flink.table.planner.expressions.converter.DirectConvertRule.lambda$convert$0(DirectConvertRule.java:220)
  at java.util.Optional.map(Optional.java:215)  at 
org.apache.flink.table.planner.expressions.converter.DirectConvertRule.convert(DirectConvertRule.java:217)
  at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
  at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:71)
  at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:134)
  at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoSourceScanRuleBase.lambda$convertExpressionToRexNode$0(PushFilterIntoSourceScanRuleBase.java:73)
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)  
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)  at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)  
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)  at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoSourceScanRuleBase.convertExpressionToRexNode(PushFilterIntoSourceScanRuleBase.java:73)
  at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoSourceScanRuleBase.resolveFiltersAndCreateTableSourceTable(PushFilterIntoSourceScanRuleBase.java:116)
  at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.java:95)
  at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.java:70)
  at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)  at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)  at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)  
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
  at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) 
 at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
  at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$

?????? [DISCUSS] FLIP-171: Async Sink

2021-06-29 Thread 123052475





--  --
??: "Hausmann, Steffen"mailto:trohrm...@apache.org>>; napisa?0?0(a):
    Adding the InterruptException to the write method would make 
it explicit that the write call can block but must react to interruptions (e.g. 
when Flink wants to cancel the operation). I think this makes the contract a 
bit clearer.

    I think starting simple and then extending the API as we see 
the need is a good idea.

    Cheers,
    Till

    On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen 
mailto:shau...@amazon.de>>; wrote:
    Hey,

    Agreed on starting with a blocking `write`. I've adapted the 
FLIP accordingly.

    For now I've chosen to add the `InterruptedException` to the 
`write` method signature as I'm not fully understanding the implications of 
swallowing the exception. Depending on the details of  the code that is 
calling the write method, it may cause event loss. But this seems more of an 
implementation detail, that we can revisit once we are actually implementing 
the sink.

    Unless there are additional comments, does it make sense to 
start the voting process in the next day or two?

    Cheers, Steffen


    On 21.06.21, 14:51, "Piotr Nowojski" 
mailto:pnowoj...@apache.org>>; wrote:

    CAUTION: This email originated from 
outside of the organization. Do not click links or open attachments unless you 
can confirm the sender and know the content is safe.



    Hi,

    Thanks Steffen for the explanations. 
I think it makes sense to me.

    Re Arvid/Steffen:

    - Keep in mind that even if we 
choose to provide a non blocking API using
    the 
`isAvailable()`/`getAvailableFuture()` method, we would still need to
    support blocking inside the sinks. 
For example at the very least, emitting
    many records at once (`flatMap`) or 
firing timers are scenarios when output
    availability would be ignored at the 
moment by the runtime. Also I would
    imagine writing very large (like 
1GB) records would be blocking on
    something as well.
    - Secondly, exposing availability to 
the API level might not be that
    easy/trivial. The availability 
pattern as defined in `AvailabilityProvider`
    class is quite complicated and not 
that easy to implement by a user.

    Both of those combined with lack of 
a clear motivation for adding
    `AvailabilityProvider` to the 
sinks/operators/functions,  I would vote on
    just starting with blocking `write` 
calls. This can always be extended in
    the future with availability if 
needed/motivated properly.

    That would be aligned with either 
Arvid's option 1 or 2. I don't know what
    are the best practices with 
`InterruptedException`, but I'm always afraid
    of it, so I would feel personally 
safer with option 2.

    I'm not sure what problem option 3 
is helping to solve? Adding `wakeUp()`
    would sound strange to me.

    Best,
    Piotrek

    pon., 21 cze 2021 o 12:15 Arvid 
Heise mailto:ar...@apache.org>>; napisa?0?0(a):

    > Hi Piotr,
    >
    > to pick up this discussion 
thread again:
    > - This FLIP is about providing 
some base implementation for FLIP-143 sinks
    > that make adding new 
implementations easier, similar to the
    > SourceReaderBase.
    > - The whole availability topic 
will most likely be a separate FLIP. The
    > basic issue just popped up here 
because we currently have no way to signal
    > backpressure in sinks except by 
blocking `write`. This feels quite natural
    > in sinks with sync 
communication but quite unnatural in async sinks.
    >
    > Now we have a couple of 
options. In all cases, we would have some WIP
    > limit on the number of 
records/requests being able to be processed in
    > parallel asynchronously 
(similar to asyncIO).
    > 1. We use some blocking queue 
in `write`, then we need to handle
    > interruptions. In the easiest 
case, we extend `write` to throw the
    > `InterruptedException`, which 
is a small API change.
    > 2. We use a blocking queue, but 
handle interrupts and swallow/translate
    > them. No API change.
    > Both solutions block the task 
thread, so any RPC message / unaligned
    > checkpoint would be processed 
only after the backpressure is temporarily
    > lifted. That's similar to the 
discussions that you linked. Cancellation may
    > also be a tad harder on 2.
    > 3. We could also add some 
`wakeUp` to the `SinkWriter` similar to
    > `SplitFetcher` [1]. Basically, 
you use a normal queue with a completeable
    > future on which you block. 
Wakeup would be a clean way to complete it next
    > to the natural completion 
through finished requests.
    > 4. We add availability to the 
sink. However, this API change also requires
    > that we allow operators to be

Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.

2021-06-29 Thread Rui Li
Thanks Xintong. +1 to the proposal.

On Tue, Jun 29, 2021 at 11:05 AM 刘建刚  wrote:

> +1 for the proposal. Since the test time is long and environment may vary,
> unstable tests are really annoying for developers. The solution is welcome.
>
> Best
> liujiangang
>
> Jingsong Li  于2021年6月29日周二 上午10:31写道:
>
> > +1 Thanks Xintong for the update!
> >
> > Best,
> > Jingsong
> >
> > On Mon, Jun 28, 2021 at 6:44 PM Till Rohrmann 
> > wrote:
> >
> > > +1, thanks for updating the guidelines Xintong!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Jun 28, 2021 at 11:49 AM Yangze Guo 
> wrote:
> > >
> > > > +1
> > > >
> > > > Thanks Xintong for drafting this doc.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Jun 28, 2021 at 5:42 PM JING ZHANG 
> > wrote:
> > > > >
> > > > > Thanks Xintong for giving detailed documentation.
> > > > >
> > > > > The best practice for handling test failure is very detailed, it's
> a
> > > good
> > > > > guidelines document with clear action steps.
> > > > >
> > > > > +1 to Xintong's proposal.
> > > > >
> > > > > Xintong Song  于2021年6月28日周一 下午4:07写道:
> > > > >
> > > > > > Thanks all for the discussion.
> > > > > >
> > > > > > Based on the opinions so far, I've drafted the new guidelines
> [1],
> > > as a
> > > > > > potential replacement of the original wiki page [2].
> > > > > >
> > > > > > Hopefully this draft has covered the most opinions discussed and
> > > > consensus
> > > > > > made in this discussion thread.
> > > > > >
> > > > > > Looking forward to your feedback.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1uUbxbgbGErBXtmEjhwVhBWG3i6nhQ0LXs96OlntEYnU/edit?usp=sharing
> > > > > >
> > > > > > [2]
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 25, 2021 at 10:40 PM Piotr Nowojski <
> > > pnowoj...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for the clarification Till. +1 for what you have
> written.
> > > > > > >
> > > > > > > Piotrek
> > > > > > >
> > > > > > > pt., 25 cze 2021 o 16:00 Till Rohrmann 
> > > > > > napisał(a):
> > > > > > >
> > > > > > > > One quick note for clarification. I don't have anything
> against
> > > > builds
> > > > > > > > running on your personal Azure account and this is not what I
> > > > > > understood
> > > > > > > > under "local environment". For me "local environment" means
> > that
> > > > > > someone
> > > > > > > > runs the test locally on his machine and then says that the
> > > > > > > > tests have passed locally.
> > > > > > > >
> > > > > > > > I do agree that there might be a conflict of interests if a
> PR
> > > > author
> > > > > > > > disables tests. Here I would argue that we don't have
> malignant
> > > > > > > committers
> > > > > > > > which means that every committer will probably first check
> the
> > > > > > respective
> > > > > > > > ticket for how often the test failed. Then I guess the next
> > step
> > > > would
> > > > > > be
> > > > > > > > to discuss on the ticket whether to disable it or not. And
> > > finally,
> > > > > > after
> > > > > > > > reaching a consensus, it will be disabled. If we see someone
> > > > abusing
> > > > > > this
> > > > > > > > policy, then we can still think about how to guard against
> it.
> > > But,
> > > > > > > > honestly, I have very rarely seen such a case. I am also ok
> to
> > > > pull in
> > > > > > > the
> > > > > > > > release manager to make the final call if this resolves
> > concerns.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Till
> > > > > > > >
> > > > > > > > On Fri, Jun 25, 2021 at 9:07 AM Piotr Nowojski <
> > > > pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 for the general idea, however I have concerns about a
> > couple
> > > > of
> > > > > > > > details.
> > > > > > > > >
> > > > > > > > > > I would first try to not introduce the exception for
> local
> > > > builds.
> > > > > > > > > > It makes it quite hard for others to verify the build and
> > to
> > > > make
> > > > > > > sure
> > > > > > > > > that the right things were executed.
> > > > > > > > >
> > > > > > > > > I would counter Till's proposal to ignore local green
> builds.
> > > If
> > > > > > > > committer
> > > > > > > > > is merging and closing a PR, with official azure failure,
> but
> > > > there
> > > > > > > was a
> > > > > > > > > green build before or in local azure it's IMO enough to
> leave
> > > the
> > > > > > > > message:
> > > > > > > > >
> > > > > > > > > > Latest build failure is a known issue: FLINK-12345
> > > > > > > > > > Green local build: URL
> > > > > > > > >
> > > > > > > > > This should address Till's concern about verification.
> > > > > > > > >
> > > > > > > > > On the other hand I have concerns about disabling tests.*
> It
> > > > > > shouldn't
> > 

[jira] [Created] (FLINK-23189) Count and fail the task when the disk is error on JobManager

2021-06-29 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-23189:
---

 Summary: Count and fail the task when the disk is error on 
JobManager
 Key: FLINK-23189
 URL: https://issues.apache.org/jira/browse/FLINK-23189
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.13.1, 1.12.2
Reporter: zlzhang0122


When the jobmanager disk is error and the triggerCheckpoint will throw a 
IOException and fail, this will cause a TRIGGER_CHECKPOINT_FAILURE, but this 
failure won't cause Job failed. Users can hardly find this error if he don't 
see the JobManager logs. To avoid this case, I propose that we can figure out 
these IOException case and increase the failureCounter which can fail the job 
finally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23190) Make task-slot allocation much more evenly

2021-06-29 Thread loyi (Jira)
loyi created FLINK-23190:


 Summary: Make task-slot allocation much more evenly
 Key: FLINK-23190
 URL: https://issues.apache.org/jira/browse/FLINK-23190
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.12.3
Reporter: loyi


Description:

FLINK-12122 only guarantees spreading out tasks across the set of TMs which are 
registered at the time of scheduling, but our jobs are all runing on active 
yarn mode, the job with smaller source parallelism offen cause load-balance 
issues. 

For this job:

 

 
{code:java}
//  -ys 4 means 10 taskmanagers

env.addSource(...).name("A").setParallelism(10).
 map(...).name("B").setParallelism(30)
 .map(...).name("C").setParallelism(40)
 .addSink(...).name("D").setParallelism(20);
{code}
 

 released-1.12.3  allocation:

 

 
||operator||tm1 ||tm2||tm3||tm4||tm5||5m6||tm7||tm8||tm9||tm10||
|A| 
1|{color:#de350b}2{color}|{color:#de350b}2{color}|1|1|{color:#de350b}3{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|{color:#de350b}0{color}|
|B|3|3|3|3|3|3|3|3|{color:#de350b}2{color}|{color:#de350b}4{color}|
|C|4|4|4|4|4|4|4|4|4|4|
|D|2|2|2|2|2|{color:#de350b}1{color}|{color:#de350b}1{color}|2|2|{color:#de350b}4{color}|

 

Suggestions:

When TM register slots to slotManager , we could group the pendingRequests by 
their "ExecutionVertexGroup" , then allocate the slots proportionally to each 
group.

 

I have implement a concept version based on release-1.12.3 , the job have fully 
evenly task allocation . I want to know if there are other point that have not 
been considered ?  

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.

2021-06-29 Thread godfrey he
+1 for the proposal. Thanks Xintong!

Best,
Godfrey



Rui Li  于2021年6月30日周三 上午11:36写道:

> Thanks Xintong. +1 to the proposal.
>
> On Tue, Jun 29, 2021 at 11:05 AM 刘建刚  wrote:
>
> > +1 for the proposal. Since the test time is long and environment may
> vary,
> > unstable tests are really annoying for developers. The solution is
> welcome.
> >
> > Best
> > liujiangang
> >
> > Jingsong Li  于2021年6月29日周二 上午10:31写道:
> >
> > > +1 Thanks Xintong for the update!
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Jun 28, 2021 at 6:44 PM Till Rohrmann 
> > > wrote:
> > >
> > > > +1, thanks for updating the guidelines Xintong!
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Jun 28, 2021 at 11:49 AM Yangze Guo 
> > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > Thanks Xintong for drafting this doc.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Mon, Jun 28, 2021 at 5:42 PM JING ZHANG 
> > > wrote:
> > > > > >
> > > > > > Thanks Xintong for giving detailed documentation.
> > > > > >
> > > > > > The best practice for handling test failure is very detailed,
> it's
> > a
> > > > good
> > > > > > guidelines document with clear action steps.
> > > > > >
> > > > > > +1 to Xintong's proposal.
> > > > > >
> > > > > > Xintong Song  于2021年6月28日周一 下午4:07写道:
> > > > > >
> > > > > > > Thanks all for the discussion.
> > > > > > >
> > > > > > > Based on the opinions so far, I've drafted the new guidelines
> > [1],
> > > > as a
> > > > > > > potential replacement of the original wiki page [2].
> > > > > > >
> > > > > > > Hopefully this draft has covered the most opinions discussed
> and
> > > > > consensus
> > > > > > > made in this discussion thread.
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > >
> > > > > > > Thank you~
> > > > > > >
> > > > > > > Xintong Song
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1uUbxbgbGErBXtmEjhwVhBWG3i6nhQ0LXs96OlntEYnU/edit?usp=sharing
> > > > > > >
> > > > > > > [2]
> > > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jun 25, 2021 at 10:40 PM Piotr Nowojski <
> > > > pnowoj...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the clarification Till. +1 for what you have
> > written.
> > > > > > > >
> > > > > > > > Piotrek
> > > > > > > >
> > > > > > > > pt., 25 cze 2021 o 16:00 Till Rohrmann  >
> > > > > > > napisał(a):
> > > > > > > >
> > > > > > > > > One quick note for clarification. I don't have anything
> > against
> > > > > builds
> > > > > > > > > running on your personal Azure account and this is not
> what I
> > > > > > > understood
> > > > > > > > > under "local environment". For me "local environment" means
> > > that
> > > > > > > someone
> > > > > > > > > runs the test locally on his machine and then says that the
> > > > > > > > > tests have passed locally.
> > > > > > > > >
> > > > > > > > > I do agree that there might be a conflict of interests if a
> > PR
> > > > > author
> > > > > > > > > disables tests. Here I would argue that we don't have
> > malignant
> > > > > > > > committers
> > > > > > > > > which means that every committer will probably first check
> > the
> > > > > > > respective
> > > > > > > > > ticket for how often the test failed. Then I guess the next
> > > step
> > > > > would
> > > > > > > be
> > > > > > > > > to discuss on the ticket whether to disable it or not. And
> > > > finally,
> > > > > > > after
> > > > > > > > > reaching a consensus, it will be disabled. If we see
> someone
> > > > > abusing
> > > > > > > this
> > > > > > > > > policy, then we can still think about how to guard against
> > it.
> > > > But,
> > > > > > > > > honestly, I have very rarely seen such a case. I am also ok
> > to
> > > > > pull in
> > > > > > > > the
> > > > > > > > > release manager to make the final call if this resolves
> > > concerns.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Till
> > > > > > > > >
> > > > > > > > > On Fri, Jun 25, 2021 at 9:07 AM Piotr Nowojski <
> > > > > pnowoj...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1 for the general idea, however I have concerns about a
> > > couple
> > > > > of
> > > > > > > > > details.
> > > > > > > > > >
> > > > > > > > > > > I would first try to not introduce the exception for
> > local
> > > > > builds.
> > > > > > > > > > > It makes it quite hard for others to verify the build
> and
> > > to
> > > > > make
> > > > > > > > sure
> > > > > > > > > > that the right things were executed.
> > > > > > > > > >
> > > > > > > > > > I would counter Till's proposal to ignore local green
> > builds.
> > > > If
> > > > > > > > > committer
> > > > > > > > > > is merging and closing a PR, with official azure failure,
> > but
> > > > > there
> > > > > > > > was a
> > > > > > > > > > green build 

Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.

2021-06-29 Thread tison
Hi,

There are a number of PRs modifying docs only, but we still require all
tests passed on that.

It is a good proposal we avoid merge PR with "unrelated" failure, but can
we improve the case where the contributor only works for docs?

For example, base on the file change set, run doc tests only.

Best,
tison.


godfrey he  于2021年6月30日周三 下午2:17写道:

> +1 for the proposal. Thanks Xintong!
>
> Best,
> Godfrey
>
>
>
> Rui Li  于2021年6月30日周三 上午11:36写道:
>
> > Thanks Xintong. +1 to the proposal.
> >
> > On Tue, Jun 29, 2021 at 11:05 AM 刘建刚  wrote:
> >
> > > +1 for the proposal. Since the test time is long and environment may
> > vary,
> > > unstable tests are really annoying for developers. The solution is
> > welcome.
> > >
> > > Best
> > > liujiangang
> > >
> > > Jingsong Li  于2021年6月29日周二 上午10:31写道:
> > >
> > > > +1 Thanks Xintong for the update!
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Mon, Jun 28, 2021 at 6:44 PM Till Rohrmann 
> > > > wrote:
> > > >
> > > > > +1, thanks for updating the guidelines Xintong!
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Mon, Jun 28, 2021 at 11:49 AM Yangze Guo 
> > > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > Thanks Xintong for drafting this doc.
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Mon, Jun 28, 2021 at 5:42 PM JING ZHANG  >
> > > > wrote:
> > > > > > >
> > > > > > > Thanks Xintong for giving detailed documentation.
> > > > > > >
> > > > > > > The best practice for handling test failure is very detailed,
> > it's
> > > a
> > > > > good
> > > > > > > guidelines document with clear action steps.
> > > > > > >
> > > > > > > +1 to Xintong's proposal.
> > > > > > >
> > > > > > > Xintong Song  于2021年6月28日周一 下午4:07写道:
> > > > > > >
> > > > > > > > Thanks all for the discussion.
> > > > > > > >
> > > > > > > > Based on the opinions so far, I've drafted the new guidelines
> > > [1],
> > > > > as a
> > > > > > > > potential replacement of the original wiki page [2].
> > > > > > > >
> > > > > > > > Hopefully this draft has covered the most opinions discussed
> > and
> > > > > > consensus
> > > > > > > > made in this discussion thread.
> > > > > > > >
> > > > > > > > Looking forward to your feedback.
> > > > > > > >
> > > > > > > > Thank you~
> > > > > > > >
> > > > > > > > Xintong Song
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1uUbxbgbGErBXtmEjhwVhBWG3i6nhQ0LXs96OlntEYnU/edit?usp=sharing
> > > > > > > >
> > > > > > > > [2]
> > > > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Jun 25, 2021 at 10:40 PM Piotr Nowojski <
> > > > > pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the clarification Till. +1 for what you have
> > > written.
> > > > > > > > >
> > > > > > > > > Piotrek
> > > > > > > > >
> > > > > > > > > pt., 25 cze 2021 o 16:00 Till Rohrmann <
> trohrm...@apache.org
> > >
> > > > > > > > napisał(a):
> > > > > > > > >
> > > > > > > > > > One quick note for clarification. I don't have anything
> > > against
> > > > > > builds
> > > > > > > > > > running on your personal Azure account and this is not
> > what I
> > > > > > > > understood
> > > > > > > > > > under "local environment". For me "local environment"
> means
> > > > that
> > > > > > > > someone
> > > > > > > > > > runs the test locally on his machine and then says that
> the
> > > > > > > > > > tests have passed locally.
> > > > > > > > > >
> > > > > > > > > > I do agree that there might be a conflict of interests
> if a
> > > PR
> > > > > > author
> > > > > > > > > > disables tests. Here I would argue that we don't have
> > > malignant
> > > > > > > > > committers
> > > > > > > > > > which means that every committer will probably first
> check
> > > the
> > > > > > > > respective
> > > > > > > > > > ticket for how often the test failed. Then I guess the
> next
> > > > step
> > > > > > would
> > > > > > > > be
> > > > > > > > > > to discuss on the ticket whether to disable it or not.
> And
> > > > > finally,
> > > > > > > > after
> > > > > > > > > > reaching a consensus, it will be disabled. If we see
> > someone
> > > > > > abusing
> > > > > > > > this
> > > > > > > > > > policy, then we can still think about how to guard
> against
> > > it.
> > > > > But,
> > > > > > > > > > honestly, I have very rarely seen such a case. I am also
> ok
> > > to
> > > > > > pull in
> > > > > > > > > the
> > > > > > > > > > release manager to make the final call if this resolves
> > > > concerns.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Till
> > > > > > > > > >
> > > > > > > > > > On Fri, Jun 25, 2021 at 9:07 AM Piotr Nowojski <
> > > > > > pnowoj...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > +1 f

Re: [DISCUSS] FLIP-171: Async Sink

2021-06-29 Thread Piotr Nowojski
Thanks for addressing this issue :)

Best, Piotrek

wt., 29 cze 2021 o 17:58 Hausmann, Steffen  napisał(a):

> Hey Poitr,
>
> I've just adapted the FLIP and changed the signature for the
> `submitRequestEntries` method:
>
> protected abstract void submitRequestEntries(List
> requestEntries, ResultFuture requestResult);
>
> In addition, we are likely to use an AtomicLong to track the number of
> outstanding requests, as you have proposed in 2b). I've already indicated
> this in the FLIP, but it's not fully fleshed out. But as you have said,
> that seems to be an implementation detail and the important part is the
> change of the `submitRequestEntries` signature.
>
> Thanks for your feedback!
>
> Cheers, Steffen
>
>
> On 25.06.21, 17:05, "Hausmann, Steffen" 
> wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Hi Piotr,
>
> I’m happy to take your guidance on this. I need to think through your
> proposals and I’ll follow-up on Monday with some more context so that we
> can close the discussion on these details. But for now, I’ll close the vote.
>
> Thanks, Steffen
>
> From: Piotr Nowojski 
> Date: Friday, 25. June 2021 at 14:48
> To: Till Rohrmann 
> Cc: Steffen Hausmann , "dev@flink.apache.org" <
> dev@flink.apache.org>, Arvid Heise 
> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink
>
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
> Hey,
>
> I've just synced with Arvid about a couple of more remarks from my
> side and he shared mine concerns.
>
> 1. I would very strongly recommend ditching `CompletableFuture `
> from the  `protected abstract CompletableFuture
> submitRequestEntries(List requestEntries);`  in favor of
> something like
> `org.apache.flink.streaming.api.functions.async.ResultFuture` interface.
> `CompletableFuture` would partially make the threading model of the
> `AsyncSincWriter` part of the public API and it would tie our hands.
> Regardless how `CompletableFuture` is used, it imposes performance
> overhead because it's synchronisation/volatile inside of it. On the other
> hand something like:
>
> protected abstract void submitRequestEntries(List
> requestEntries, ResultFuture requestResult);
>
> Would allow us to implement the threading model as we wish.
> `ResultFuture` could be backed via `CompletableFuture` underneath, but
> it could also be something more efficient.  I will explain what I have in
> mind in a second.
>
> 2. It looks to me that proposed `AsyncSinkWriter` Internals are not
> very efficient and maybe the threading model hasn't been thought through?
> Especially private fields:
>
> private final BlockingDeque bufferedRequestEntries;
> private BlockingDeque> inFlightRequests;
>
> are a bit strange to me. Why do we need two separate thread safe
> collections? Why do we need a `BlockingDeque` of `CompletableFuture`s?
> If we are already using a fully synchronised collection, there should be no
> need for another layer of thread safe `CompletableFuture`.
>
> As I understand, the threading model of the `AsyncSinkWriter` is very
> similar to that of the `AsyncWaitOperator`, with very similar requirements
> for inducing backpressure. How I would see it implemented is for example:
>
> a) Having a single lock, that would encompass the whole
> `AsyncSinkWriter#flush()` method. `flush()` would be called from the task
> thread (mailbox). To induce backpressure, `#flush()` would just call
> `lock.wait()`. `ResultFuture#complete(...)` called from an async thread,
> would also synchronize on the same lock, and mark some of the inflight
> requests as completed and call `lock.notify()`.
>
> b) More efficient solution. On the hot path we would have for example
> only `AtomicLong numberOfInFlightRequests`. Task thread would be bumping
> it, `ResultFuture#complete()` would be decreasing it. If the task thread
> when bumping `numberOfInFlightRequests` exceeds a threshold, he goes to
> sleep/wait on a lock or some `CompletableFuture`. If
> `ResultFuture#complete()` when decreasing the count goes below the
> threshold, it would wake up the task thread.  Compared to the option a),
> on the hot path, option b) would have only AtomicLong.increment overhead
>
> c) We could use mailbox, the same way as AsyncWaitOperator is doing.
> In this case `ResultFuture#complete()` would be enquing mailbox action,
> which is thread safe on it's own.
>
> Either of those options would be more efficient and simpler (from the
> threading model perspective) than having two `BlockingQueues` and
> `CompletableFuture`. Also as you can see, neither of those solutions
> require the overhead of ` CompletableFuture
> submitRequestEntries(List requestEntrie