[jira] [Created] (FLINK-25069) YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager failed on AZP

2021-11-26 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25069:
-

 Summary: 
YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager failed on AZP
 Key: FLINK-25069
 URL: https://issues.apache.org/jira/browse/FLINK-25069
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.15.0
Reporter: Till Rohrmann
 Fix For: 1.15.0


The test {{YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager}} 
fails on AZP with:

{code}
2021-11-25T18:28:27.9848753Z Nov 25 18:28:27 [ERROR] Tests run: 3, Failures: 0, 
Errors: 3, Skipped: 0, Time elapsed: 3,676.541 s <<< FAILURE! - in 
org.apache.flink.yarn.YARNHighAvailabilityITCase
2021-11-25T18:28:27.9849967Z Nov 25 18:28:27 [ERROR] 
org.apache.flink.yarn.YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager
  Time elapsed: 70.846 s  <<< ERROR!
2021-11-25T18:28:27.9850929Z Nov 25 18:28:27 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
2021-11-25T18:28:27.9854591Z Nov 25 18:28:27at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
2021-11-25T18:28:27.9855441Z Nov 25 18:28:27at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
2021-11-25T18:28:27.9856301Z Nov 25 18:28:27at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.submitJob(YARNHighAvailabilityITCase.java:378)
2021-11-25T18:28:27.9857202Z Nov 25 18:28:27at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.lambda$testJobRecoversAfterKillingTaskManager$1(YARNHighAvailabilityITCase.java:204)
2021-11-25T18:28:27.9858300Z Nov 25 18:28:27at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:288)
2021-11-25T18:28:27.9859245Z Nov 25 18:28:27at 
org.apache.flink.yarn.YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager(YARNHighAvailabilityITCase.java:197)
2021-11-25T18:28:27.9860026Z Nov 25 18:28:27at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2021-11-25T18:28:27.9860705Z Nov 25 18:28:27at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2021-11-25T18:28:27.9861466Z Nov 25 18:28:27at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2021-11-25T18:28:27.9862158Z Nov 25 18:28:27at 
java.lang.reflect.Method.invoke(Method.java:498)
2021-11-25T18:28:27.9863016Z Nov 25 18:28:27at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2021-11-25T18:28:27.9863959Z Nov 25 18:28:27at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2021-11-25T18:28:27.9864829Z Nov 25 18:28:27at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2021-11-25T18:28:27.9865604Z Nov 25 18:28:27at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2021-11-25T18:28:27.9866300Z Nov 25 18:28:27at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
2021-11-25T18:28:27.9867044Z Nov 25 18:28:27at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
2021-11-25T18:28:27.9867692Z Nov 25 18:28:27at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
2021-11-25T18:28:27.9868220Z Nov 25 18:28:27at 
java.lang.Thread.run(Thread.java:748)
2021-11-25T18:28:27.9869072Z Nov 25 18:28:27Suppressed: 
java.lang.AssertionError: There is at least one application on the cluster that 
is not finished.[App application_1637861234319_0001 is in state RUNNING.]
2021-11-25T18:28:27.9870263Z Nov 25 18:28:27at 
org.junit.Assert.fail(Assert.java:89)
2021-11-25T18:28:27.9870862Z Nov 25 18:28:27at 
org.apache.flink.yarn.YarnTestBase$CleanupYarnApplication.close(YarnTestBase.java:325)
2021-11-25T18:28:27.9871516Z Nov 25 18:28:27at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:289)
2021-11-25T18:28:27.9871986Z Nov 25 18:28:27... 13 more
2021-11-25T18:28:27.9872665Z Nov 25 18:28:27 Caused by: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
2021-11-25T18:28:27.9873393Z Nov 25 18:28:27at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)
2021-11-25T18:28:27.9874102Z Nov 25 18:28:27at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
2021-11-25T18:28:27.9874774Z Nov 25 18:28:27at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
2021-11-25T18:28:27.9875454Z Nov 25 18:28:27at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2021-11-25T18:28:27.9876123Z Nov 25 18:28:27at 
java.util.concurrent.CompletableFuture.completeEx

Re: [VOTE][FLIP-195] Improve the name and structure of vertex and operator name for job

2021-11-26 Thread wenlong.lwl
Hi everyone,

I'm happy to announce that we have unanimously approved this FLIP.

There are 12 approved votes, 6 of which are binding:

Terry Wang (non-binding)
Aitozi (non-binding)
Martijn Visser (non-binding)
Jing Zhang (binding)
Jark Wu (binding)
Lijie Wang (non-binding)
Godfrey He (binding)
Sergey Nuyanzin (non-binding)
Yun Tang (binding)
Yangze Guo (binding)
Xianxun Ye (non-binding)
Yun Gao (binding)

There are no disapproving votes. Thanks everyone for voting!

On Fri, 26 Nov 2021 at 11:15, Yun Gao  wrote:

>
> +1 (binding).
>
> Very thanks Wenlong for the proposal!
>
> Best,
> Yun--
> Sender:Xianxun Ye
> Date:2021/11/24 16:49:44
> Recipient:dev@flink.apache.org
> Theme:[VOTE][FLIP-195] Improve the name and structure of vertex and
> operator name for job
>
> +1 (non-binding)
>
>
>
>
> On 11/24/2021 15:50,Sergey Nuyanzin wrote:
> +1 (non-binding)
>
> On Wed, Nov 24, 2021 at 8:38 AM godfrey he  wrote:
>
> +1 (binding)
>
> Best,
> Godfrey
>
> Jark Wu  于2021年11月24日周三 下午12:02写道:
>
> +1 (binding)
>
> Btw, @JingZhang I think your vote can be counted into binding now.
>
> Best,
> Jark
>
> On Tue, 23 Nov 2021 at 20:19, Jing Zhang  wrote:
>
> +1 (non-binding)
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年11月23日周二 下午7:42写道:
>
> +1 (non-binding)
>
> On Tue, 23 Nov 2021 at 12:13, Aitozi  wrote:
>
> +1 (non-binding)
>
> Best,
> Aitozi
>
> wenlong.lwl  于2021年11月23日周二 下午4:00写道:
>
> Hi everyone,
>
> Based on the discussion[1], we seem to have consensus, so I would
> like
> to
> start a vote on FLIP-195 [2].
> Thanks for all of your feedback.
>
> The vote will last for at least 72 hours (Nov 26th 16:00 GMT)
> unless
> there is an objection or insufficient votes.
>
> [1]
> https://lists.apache.org/thread/kvdxr8db0l5s6wk7hwlt0go5fms99b8t
> [2]
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-195%3A+Improve+the+name+and+structure+of+vertex+and+operator+name+for+job
>
> Best,
> Wenlong Lyu
>
>
>
>
>
>
>
> --
> Best regards,
> Sergey
>
>


[jira] [Created] (FLINK-25070) FLIP-195: Improve the name and structure of vertex and operator name for job

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25070:
---

 Summary: FLIP-195: Improve the name and structure of vertex and 
operator name for job
 Key: FLINK-25070
 URL: https://issues.apache.org/jira/browse/FLINK-25070
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Runtime / Web Frontend, Table SQL / 
Runtime
Reporter: Wenlong Lyu


this is an umbrella issue tracking the improvement of operator/vertex names in 
flink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Dawid Wysakowicz
Hi all,

I updated the FLIP with a few clarifications:

 1. I added a description how would we trigger a "full snapshot" in the
changelog state backend
  * (We would go for this option in the 1st version). Trigger a
snapshot of the base state backend in the 1st checkpoint, which
induces materializing the changelog. In this approach we could
duplicate SST files, but we would not duplicate the diff files.
  * Add a hook for logic for computing which task should duplicate
the diff files. We would have to do a pass over all states after
the state assignment in StateAssignmentOperation
 2. I clarified that the "no-claim" mode requires a completed/successful
checkpoint before we can remove the one we are restoring from. Also
added a note that we can assume a checkpoint is completed if it is
confirmed by Flink's API for checkpointing stats or by checking an
entry in HA services. A checkpoint can not be assumed completed by
just looking at the checkpoint files.

I suggest going on with the proposal for "no-claim" as suggested so far,
as it is easier to understand by users. They can reliably tell when they
can expect the checkpoint to be deletable. If we see that the time to
take the 1st checkpoint becomes a problem we can extend the set of
restore methods and e.g. add a "claim-temporarily" method.

I hope we can reach a consensus and start a vote, some time early next week.

Best,

Dawid

|| 
On 23/11/2021 22:39, Roman Khachatryan wrote:
>> I also referred to the "no-claim" mode and I still think neither of them 
>> works in that mode, as you'd have to keep lineage of checkpoints externally 
>> to be able delete any checkpoint.
> I think the lineage is needed in all approaches with arbitrary
> histories; the difference is whether a running Flink is required or
> not. Is that what you mean?
> (If not, could you please explain how the scenario you mentioned above
> with multiple jobs branching from the same checkpoint is handled?)
>
>> BTW, the state key for RocksDB is actually: backend UID + key group range + 
>> SST file name, so the key would be different (the key group range is 
>> different for two tasks) and we would've two separate counters for the same 
>> file.
> You're right. But there is also a collision between old and new entries.
>
>> To be on the same page here. It is not a problem so far in RocksDB, because 
>> we do not reuse any shared files in case of rescaling.
> As I mentioned above, collision happens not only because of rescaling;
> and AFAIK, there are some ideas to reuse files on rescaling (probably
> Yuan could clarify). Anyways, I think it makes sense to not bake in
> this assumption unless it's hard to implement (or at least state it
> explicitly in FLIP).
>
>> It is not suggested as an optimization. It is suggested as a must for state 
>> backends that need it. I did not elaborate on it, because it could affected 
>> only the changelog state backend at the moment, which I don't have much 
>> insights. I agree it might make sense to look a bit how we could force full 
>> snapshots in the changelog state backend. I will spend some extra time on 
>> that.
> I see. For the Changelog state backend, the easiest way would be to
> obtain a full snapshot from the underlying backend in snapshot(),
> ignoring all non-materialized changes. This will effectively
> materialize all the changes, so only new non-materialized state will
> be used in subsequent checkpoints.
>
>> Only the task that gets assigned [1,16] would be responsible for duplicating 
>> files of the old range [1, 64].
> Wouldn't it be likely that the same TM will be responsible for [1, 64]
> "windowState", [1, 64] "timerState", and so on, for all operators in
> the chain, and probably other chains? (that what I mean by skew)
> If we want to address this, preserving handle immutability then we'll
> probably have to rebuild the whole task state snapshot.
> (depending on how we approach RocksDB re-uploading, it might not be
> relevant though)
>
>
> Regards,
> Roman
>
>
> On Tue, Nov 23, 2021 at 4:06 PM Dawid Wysakowicz  
> wrote:
>> I think I know where the confusion comes from regarding arbitrarily
>> recovery histories: Both my counter-proposals were for "no-claim"
>> mode; I didn't mean to replace "claim" mode with them.
>> However, as Yun pointed out, it's impossible to guarantee that all the
>> files will be compacted in a finite number of checkpoints; so let's
>> withdraw those proposals.
>>
>> I also referred to the "no-claim" mode and I still think neither of them 
>> works in that mode, as you'd have to keep lineage of checkpoints externally 
>> to be able delete any checkpoint.
>>
>> Let's consider a job running with DoP=1; it created checkpoint C1 with
>> a single file F1 and then stopped.
>> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
>> will receive the same file F1.
>>
>> To be on the same page here. It is not a problem so far in

[jira] [Created] (FLINK-25071) Enable ParquetFileSystemITCase.testLimitableBulkFormat

2021-11-26 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-25071:


 Summary: Enable ParquetFileSystemITCase.testLimitableBulkFormat
 Key: FLINK-25071
 URL: https://issues.apache.org/jira/browse/FLINK-25071
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.15.0
Reporter: Jingsong Lee
 Fix For: 1.15.0


The test {{ParquetFileSystemITCase.testLimitableBulkFormat}} is unstable. See 
more in FLINK-24763

We have ignored this test. We should find the unstable reason and re-open it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Drop Zookeeper 3.4

2021-11-26 Thread Till Rohrmann
According to this SO question [1], it seems that Zk 3.5 clients cannot talk
to 3.4 servers. I also tried it out with a local deployment and Flink was
not able to start.

Newer Zk versions can talk to older Zk servers if no new APIs are used [2].

[1] https://stackoverflow.com/a/61630617/4815083
[2] https://zookeeper.apache.org/releases.html

Cheers,
Till

On Thu, Nov 25, 2021 at 10:39 PM Chesnay Schepler 
wrote:

> I included the user ML in the thread.
>
> @users Are you still using Zookeeper 3.4? If so, were you planning to
> upgrade Zookeeper in the near future?
>
> I'm not sure about ZK compatibility, but we'd also upgrade Curator to
> 5.x, which doesn't support ookeeperK 3.4 anymore.
>
> On 25/11/2021 21:56, Till Rohrmann wrote:
> > Should we ask on the user mailing list whether anybody is still using
> > ZooKeeper 3.4 and thus needs support for this version or can a ZooKeeper
> > 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect that not a
> > lot of users depend on it but just to make sure that we aren't annoying a
> > lot of our users with this change. Apart from that +1 for removing it if
> > not a lot of user depend on it.
> >
> > Cheers,
> > Till
> >
> > On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl 
> > wrote:
> >
> >> Thanks for starting this discussion, Chesnay. +1 from my side. It's
> time to
> >> move forward with the ZK support considering the EOL of 3.4 you already
> >> mentioned. The benefits we gain from upgrading Curator to 5.x as a
> >> consequence is another plus point. Just for reference on the
> inconsistent
> >> state issue you mentioned: FLINK-24543 [1].
> >>
> >> Matthias
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-24543
> >>
> >> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler 
> >> wrote:
> >>
> >>> Hello,
> >>>
> >>> I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
> >>> default to 3.5 with an opt-in for 3.6.
> >>>
> >>> Supporting Zookeeper 3.4 (which is already EOL) prevents us from
> >>> upgrading Curator to 5.x, which would allow us to properly fix an issue
> >>> with inconsistent state. It is also required to eventually support ZK
> >> 3.6.
>
>
>


Re: [DISCUSS] Deprecate Java 8 support

2021-11-26 Thread Nico Kruber
+1 for the deprecation but let me raise one more point that we have discovered 
in our training exercises:

All our training jobs that aim for high throughput run slower on Java 11 than 
on Java 8! That may not be a general problem for the various applications in 
the wild, but it could be and this should definitely be investigated.

@Piotr I guess that could be investigated along with an update of flink-
benchmarks to run on Java 11 as well...


Nico

On Thursday, 25 November 2021 22:08:58 CET Till Rohrmann wrote:
> +1 for the deprecation and reaching out to the user ML to ask for feedback
> from our users. Thanks for driving this Chesnay!
> 
> Cheers,
> Till
> 
> On Thu, Nov 25, 2021 at 10:15 AM Roman Khachatryan  wrote:
> > The situation is probably a bit different now compared to the previous
> > upgrade: some users might be using Amazon Coretto (or other builds)
> > which have longer support.
> > 
> > Still +1 for deprecation to trigger migration, and thanks for bringing
> > this up!
> > 
> > Regards,
> > Roman
> > 
> > On Thu, Nov 25, 2021 at 10:09 AM Arvid Heise  wrote:
> > > +1 to deprecate Java 8, so we can hopefully incorporate the module
> > 
> > concept
> > 
> > > in Flink.
> > > 
> > > On Thu, Nov 25, 2021 at 9:49 AM Chesnay Schepler 
> > 
> > wrote:
> > > > Users can already use APIs from Java 8/11.
> > > > 
> > > > On 25/11/2021 09:35, Francesco Guardiani wrote:
> > > > > +1 with what both Ingo and Matthias sad, personally, I cannot wait
> > > > > to
> > > > 
> > > > start using some of
> > > > 
> > > > > the APIs introduced in Java 9. And I'm pretty sure that's the same
> > 
> > for
> > 
> > > > our users as well.
> > > > 
> > > > > On Tuesday, 23 November 2021 13:35:07 CET Ingo Bürk wrote:
> > > > >> Hi everyone,
> > > > >> 
> > > > >> continued support for Java 8 can also create project risks, e.g. if
> > 
> > a
> > 
> > > > >> vulnerability arises in Flink's dependencies and we cannot upgrade
> > 
> > them
> > 
> > > > >> because they no longer support Java 8. Some projects already
> > > > >> started
> > > > >> deprecating support as well, like Kafka, and other projects will
> > 
> > likely
> > 
> > > > >> follow.
> > > > >> Let's also keep in mind that the proposal here is not to drop
> > 
> > support
> > 
> > > > right
> > > > 
> > > > >> away, but to deprecate it, send the message, and motivate users to
> > 
> > start
> > 
> > > > >> migrating. Delaying this process could ironically mean users have
> > 
> > less
> > 
> > > > time
> > > > 
> > > > >> to prepare for it.
> > > > >> 
> > > > >> 
> > > > >> Ingo
> > > > >> 
> > > > >> On Tue, Nov 23, 2021 at 8:54 AM Matthias Pohl <
> > 
> > matth...@ververica.com>
> > 
> > > > >> wrote:
> > > > >>> Thanks for constantly driving these maintenance topics, Chesnay.
> > > > >>> +1
> > > > 
> > > > from
> > > > 
> > > > >>> my
> > > > >>> side for deprecating Java 8. I see the point Jingsong is raising.
> > 
> > But I
> > 
> > > > >>> agree with what David already said here. Deprecating the Java
> > 
> > version
> > 
> > > > is a
> > > > 
> > > > >>> tool to make users aware of it (same as starting this discussion
> > > > 
> > > > thread).
> > > > 
> > > > >>> If there's no major opposition against deprecating it in the
> > 
> > community
> > 
> > > > we
> > > > 
> > > > >>> should move forward in this regard to make the users who do not
> > > > >>> regularly browse the mailing list aware of it. That said,
> > 
> > deprecating
> > 
> > > > Java
> > > > 
> > > > >>> 8 in 1.15 does not necessarily mean that it is dropped in 1.16.
> > > > >>> 
> > > > >>> Best,
> > > > >>> Matthias
> > > > >>> 
> > > > >>> On Tue, Nov 23, 2021 at 8:46 AM David Morávek 
> > 
> > wrote:
> > > >  Thank you Chesnay for starting the discussion! This will generate
> > 
> > bit
> > 
> > > > of
> > > > 
> > > > >>> a
> > > > >>> 
> > > >  work for some users, but it's a good thing to keep moving the
> > 
> > project
> > 
> > > >  forward. Big +1 for this.
> > > >  
> > > >  Jingsong:
> > > >  
> > > >  Receiving this signal, the user may be unhappy because his
> > 
> > application
> > 
> > > > > may be all on Java 8. Upgrading is a big job, after all, many
> > 
> > systems
> > 
> > > > > have not been upgraded yet. (Like you said, HBase and Hive)
> > > >  
> > > >  The whole point of deprecation is to raise awareness, that this
> > 
> > will
> > 
> > > > be
> > > > 
> > > >  happening eventually and users should take some steps to address
> > 
> > this
> > 
> > > > in
> > > > 
> > > >  medium-term. If I understand Chesnay correctly, we'd still keep
> > 
> > Java 8
> > 
> > > >  around for quite some time to give users enough time to upgrade,
> > 
> > but
> > 
> > > >  without raising awareness we'd fight the very same argument later
> > 
> > in
> > 
> > > > >>> time.
> > > > >>> 
> > > >  All of the prerequisites from 3rd party projects for both HBase
> > 
> > [1]
> > 
> > > > and
> > > > 
> > > >  Hive [

[jira] [Created] (FLINK-25072) Introduce description for operator

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25072:
---

 Summary: Introduce description for operator
 Key: FLINK-25072
 URL: https://issues.apache.org/jira/browse/FLINK-25072
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25073) Introduce Tree Mode

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25073:
---

 Summary: Introduce Tree Mode
 Key: FLINK-25073
 URL: https://issues.apache.org/jira/browse/FLINK-25073
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25074) Simplify name of window operators in DS by moving details to description

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25074:
---

 Summary: Simplify name of window operators in DS by moving details 
to description
 Key: FLINK-25074
 URL: https://issues.apache.org/jira/browse/FLINK-25074
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25075) Remove reflection to instantiate PlannerExpressionParser

2021-11-26 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25075:
---

 Summary: Remove reflection to instantiate PlannerExpressionParser
 Key: FLINK-25075
 URL: https://issues.apache.org/jira/browse/FLINK-25075
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


This reflection leaks the planner module classpath and can cause issues when 
isolating the classpath



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Konstantin Knauf
Hi Dawid,

sounds good, specifically 2., too.

Best,

Konstantin

On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz 
wrote:

> Hi all,
>
> I updated the FLIP with a few clarifications:
>
>1. I added a description how would we trigger a "full snapshot" in the
>changelog state backend
>   - (We would go for this option in the 1st version). Trigger a
>   snapshot of the base state backend in the 1st checkpoint, which induces
>   materializing the changelog. In this approach we could duplicate SST 
> files,
>   but we would not duplicate the diff files.
>- Add a hook for logic for computing which task should duplicate the
>   diff files. We would have to do a pass over all states after the state
>   assignment in StateAssignmentOperation
>   2. I clarified that the "no-claim" mode requires a
>completed/successful checkpoint before we can remove the one we are
>restoring from. Also added a note that we can assume a checkpoint is
>completed if it is confirmed by Flink's API for checkpointing stats or by
>checking an entry in HA services. A checkpoint can not be assumed completed
>by just looking at the checkpoint files.
>
> I suggest going on with the proposal for "no-claim" as suggested so far,
> as it is easier to understand by users. They can reliably tell when they
> can expect the checkpoint to be deletable. If we see that the time to take
> the 1st checkpoint becomes a problem we can extend the set of restore
> methods and e.g. add a "claim-temporarily" method.
>
> I hope we can reach a consensus and start a vote, some time early next
> week.
>
> Best,
>
> Dawid
>
> On 23/11/2021 22:39, Roman Khachatryan wrote:
>
> I also referred to the "no-claim" mode and I still think neither of them 
> works in that mode, as you'd have to keep lineage of checkpoints externally 
> to be able delete any checkpoint.
>
> I think the lineage is needed in all approaches with arbitrary
> histories; the difference is whether a running Flink is required or
> not. Is that what you mean?
> (If not, could you please explain how the scenario you mentioned above
> with multiple jobs branching from the same checkpoint is handled?)
>
>
> BTW, the state key for RocksDB is actually: backend UID + key group range + 
> SST file name, so the key would be different (the key group range is 
> different for two tasks) and we would've two separate counters for the same 
> file.
>
> You're right. But there is also a collision between old and new entries.
>
>
> To be on the same page here. It is not a problem so far in RocksDB, because 
> we do not reuse any shared files in case of rescaling.
>
> As I mentioned above, collision happens not only because of rescaling;
> and AFAIK, there are some ideas to reuse files on rescaling (probably
> Yuan could clarify). Anyways, I think it makes sense to not bake in
> this assumption unless it's hard to implement (or at least state it
> explicitly in FLIP).
>
>
> It is not suggested as an optimization. It is suggested as a must for state 
> backends that need it. I did not elaborate on it, because it could affected 
> only the changelog state backend at the moment, which I don't have much 
> insights. I agree it might make sense to look a bit how we could force full 
> snapshots in the changelog state backend. I will spend some extra time on 
> that.
>
> I see. For the Changelog state backend, the easiest way would be to
> obtain a full snapshot from the underlying backend in snapshot(),
> ignoring all non-materialized changes. This will effectively
> materialize all the changes, so only new non-materialized state will
> be used in subsequent checkpoints.
>
>
> Only the task that gets assigned [1,16] would be responsible for duplicating 
> files of the old range [1, 64].
>
> Wouldn't it be likely that the same TM will be responsible for [1, 64]
> "windowState", [1, 64] "timerState", and so on, for all operators in
> the chain, and probably other chains? (that what I mean by skew)
> If we want to address this, preserving handle immutability then we'll
> probably have to rebuild the whole task state snapshot.
> (depending on how we approach RocksDB re-uploading, it might not be
> relevant though)
>
>
> Regards,
> Roman
>
>
> On Tue, Nov 23, 2021 at 4:06 PM Dawid Wysakowicz  
>  wrote:
>
> I think I know where the confusion comes from regarding arbitrarily
> recovery histories: Both my counter-proposals were for "no-claim"
> mode; I didn't mean to replace "claim" mode with them.
> However, as Yun pointed out, it's impossible to guarantee that all the
> files will be compacted in a finite number of checkpoints; so let's
> withdraw those proposals.
>
> I also referred to the "no-claim" mode and I still think neither of them 
> works in that mode, as you'd have to keep lineage of checkpoints externally 
> to be able delete any checkpoint.
>
> Let's consider a job running with DoP=1; it created checkpoint C1 with
> a single file F1 and then stopped.
> We 

[jira] [Created] (FLINK-25076) Simplify name of SQL operators

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25076:
---

 Summary: Simplify name of SQL operators
 Key: FLINK-25076
 URL: https://issues.apache.org/jira/browse/FLINK-25076
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25077) Postgresql connector fails in case column with nested arrays

2021-11-26 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-25077:
---

 Summary: Postgresql connector fails in case column with nested 
arrays
 Key: FLINK-25077
 URL: https://issues.apache.org/jira/browse/FLINK-25077
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: Sergey Nuyanzin


On Postgres
{code:sql}
create table sal_emp (
name varchar,
pay_by_quarter int[],
schedule varchar[][]
);
insert into sal_emp values ('test', array[1], array[array['nested']]);
{code}

on Flink
{code:sql}
 CREATE TABLE flink_sal_emp (
   name string,
   pay_by_quarter array,
   schedule array>
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://localhost:5432/postgres',
    'table-name' = 'sal_emp',
    'username' = 'postgres',
    'password' = 'postgres'
 );
select * from default_catalog.default_database.flink_sal_emp ;
{code}

result
{noformat}
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class [Ljava.lang.String; cannot be cast to class 
org.postgresql.jdbc.PgArray ([Ljava.lang.String; is in module java.base of 
loader 'bootstrap'; org.postgresql.jdbc.PgArray is in unnamed module of loader 
'app')
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:104)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:108)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78)
    at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
    at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
    at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:330)
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Roman Khachatryan
Hi,

Thanks for updating the FLIP Dawid

There seems to be a consensus in the discussion, however, I couldn't
find stop-with-savepoint in the document.

A few minor things:
- maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
- add an explicit option to preserve the current behavior (no claim
and no duplicate)?
And I still think it would be nice to list object stores which support
duplicate operation.

Regards,
Roman


On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf  wrote:
>
> Hi Dawid,
>
> sounds good, specifically 2., too.
>
> Best,
>
> Konstantin
>
> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz 
> wrote:
>
> > Hi all,
> >
> > I updated the FLIP with a few clarifications:
> >
> >1. I added a description how would we trigger a "full snapshot" in the
> >changelog state backend
> >   - (We would go for this option in the 1st version). Trigger a
> >   snapshot of the base state backend in the 1st checkpoint, which 
> > induces
> >   materializing the changelog. In this approach we could duplicate SST 
> > files,
> >   but we would not duplicate the diff files.
> >- Add a hook for logic for computing which task should duplicate the
> >   diff files. We would have to do a pass over all states after the state
> >   assignment in StateAssignmentOperation
> >   2. I clarified that the "no-claim" mode requires a
> >completed/successful checkpoint before we can remove the one we are
> >restoring from. Also added a note that we can assume a checkpoint is
> >completed if it is confirmed by Flink's API for checkpointing stats or by
> >checking an entry in HA services. A checkpoint can not be assumed 
> > completed
> >by just looking at the checkpoint files.
> >
> > I suggest going on with the proposal for "no-claim" as suggested so far,
> > as it is easier to understand by users. They can reliably tell when they
> > can expect the checkpoint to be deletable. If we see that the time to take
> > the 1st checkpoint becomes a problem we can extend the set of restore
> > methods and e.g. add a "claim-temporarily" method.
> >
> > I hope we can reach a consensus and start a vote, some time early next
> > week.
> >
> > Best,
> >
> > Dawid
> >
> > On 23/11/2021 22:39, Roman Khachatryan wrote:
> >
> > I also referred to the "no-claim" mode and I still think neither of them 
> > works in that mode, as you'd have to keep lineage of checkpoints externally 
> > to be able delete any checkpoint.
> >
> > I think the lineage is needed in all approaches with arbitrary
> > histories; the difference is whether a running Flink is required or
> > not. Is that what you mean?
> > (If not, could you please explain how the scenario you mentioned above
> > with multiple jobs branching from the same checkpoint is handled?)
> >
> >
> > BTW, the state key for RocksDB is actually: backend UID + key group range + 
> > SST file name, so the key would be different (the key group range is 
> > different for two tasks) and we would've two separate counters for the same 
> > file.
> >
> > You're right. But there is also a collision between old and new entries.
> >
> >
> > To be on the same page here. It is not a problem so far in RocksDB, because 
> > we do not reuse any shared files in case of rescaling.
> >
> > As I mentioned above, collision happens not only because of rescaling;
> > and AFAIK, there are some ideas to reuse files on rescaling (probably
> > Yuan could clarify). Anyways, I think it makes sense to not bake in
> > this assumption unless it's hard to implement (or at least state it
> > explicitly in FLIP).
> >
> >
> > It is not suggested as an optimization. It is suggested as a must for state 
> > backends that need it. I did not elaborate on it, because it could affected 
> > only the changelog state backend at the moment, which I don't have much 
> > insights. I agree it might make sense to look a bit how we could force full 
> > snapshots in the changelog state backend. I will spend some extra time on 
> > that.
> >
> > I see. For the Changelog state backend, the easiest way would be to
> > obtain a full snapshot from the underlying backend in snapshot(),
> > ignoring all non-materialized changes. This will effectively
> > materialize all the changes, so only new non-materialized state will
> > be used in subsequent checkpoints.
> >
> >
> > Only the task that gets assigned [1,16] would be responsible for 
> > duplicating files of the old range [1, 64].
> >
> > Wouldn't it be likely that the same TM will be responsible for [1, 64]
> > "windowState", [1, 64] "timerState", and so on, for all operators in
> > the chain, and probably other chains? (that what I mean by skew)
> > If we want to address this, preserving handle immutability then we'll
> > probably have to rebuild the whole task state snapshot.
> > (depending on how we approach RocksDB re-uploading, it might not be
> > relevant though)
> >
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Nov 23, 20

[jira] [Created] (FLINK-25078) An error is reported when the flink SQL connects to Phoenix

2021-11-26 Thread xinli liang (Jira)
xinli liang created FLINK-25078:
---

 Summary: An error is reported when the flink SQL connects to 
Phoenix
 Key: FLINK-25078
 URL: https://issues.apache.org/jira/browse/FLINK-25078
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
 Environment: Maven configuration environment is as follows:
{code:java}
//代码占位符

http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>

iyb-db-account
org.iyunbao
1.0-SNAPSHOT

4.0.0

iyb-realtime


1.8
${java.version}
${java.version}
1.12.0
2.12
3.1.3




org.apache.flink
flink-connector-hbase-2.2_2.11
1.12.3



org.apache.flink
flink-java
${flink.version}



org.apache.flink
flink-streaming-java_${scala.version}
${flink.version}



org.apache.flink
flink-table-planner_${scala.version}
${flink.version}
provided



org.apache.flink
flink-connector-kafka_${scala.version}
${flink.version}



org.apache.flink
flink-clients_${scala.version}
${flink.version}



org.apache.flink
flink-cep_${scala.version}
${flink.version}




org.apache.flink
flink-json
${flink.version}




com.alibaba
fastjson
1.2.68



org.apache.flink

flink-table-api-java-bridge_${scala.version}
${flink.version}



org.apache.flink
flink-table-planner-blink_${scala.version}
${flink.version}




org.apache.flink
flink-shaded-hadoop2
1.6.2




org.apache.flink
flink-jdbc_2.12
1.8.0






org.apache.hadoop
hadoop-client
${hadoop.version}




org.slf4j
slf4j-api
1.7.25



org.slf4j
slf4j-log4j12
1.7.25



org.apache.logging.log4j
log4j-to-slf4j
2.14.0





org.projectlombok
lombok
1.18.12



mysql
mysql-connector-java
5.1.47



org.apache.flink
flink-connector-jdbc_${scala.version}
${flink.version}



com.alibaba.blink
flink-jdbc
1.5.1



org.apache.phoenix
phoenix-core
5.0.0-HBase-2.0


   

  


commons-beanutils
commons-beanutils-core
1.8.0



redis.clients
jedis
3.3.0







org.apache.maven.plugins
maven-assembly-plugin
3.0.0


jar-with-dependencies




make-assembly
package

single







 {code}
Reporter: xinli liang


I want to know where the problem is. Thanks

 

The code is as follows
{code:java}
//代码占位符
package com.iyunbao.app.dim;

import com.iyunbao.common.GmallConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author liangxinli
 * @create 2021-11-03 10:33
 */
public class dim_agt_account_cloud_hhbak {
public static void main(String[] args) {

//创建执行环境
StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();

//todo 创建表环境   bug比较多,建议指定版本
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
//从流环境转换成表环境
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(senv,settings);

try {
 

[jira] [Created] (FLINK-25079) Add assertj assertions for table types

2021-11-26 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25079:
---

 Summary: Add assertj assertions for table types
 Key: FLINK-25079
 URL: https://issues.apache.org/jira/browse/FLINK-25079
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Dawid Wysakowicz
- maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?

I don't think this is a good idea. The modes apply to both savepoints
and checkpoints, plus it's much longer to type in. (minor)

- add an explicit option to preserve the current behavior (no claim
and no duplicate)?

We had an offline discussion about it and so far we were leaning towards
keeping the set of supported options minimal. However, if we really
think the old behaviour is useful we can add a --legacy restore mode. cc
@Konstantin @Piotr

There seems to be a consensus in the discussion, however, I couldn't
find stop-with-savepoint in the document.

Sorry, I forgot, about this one. I added a note that savepoints
generated from stop-with-savepoint should commit side effects.

And I still think it would be nice to list object stores which support
duplicate operation.

I listed a couple of file systems that do have some sort of a COPY API.

Best,

Dawid

On 26/11/2021 11:03, Roman Khachatryan wrote:
> Hi,
>
> Thanks for updating the FLIP Dawid
>
> There seems to be a consensus in the discussion, however, I couldn't
> find stop-with-savepoint in the document.
>
> A few minor things:
> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
> - add an explicit option to preserve the current behavior (no claim
> and no duplicate)?
> And I still think it would be nice to list object stores which support
> duplicate operation.
>
> Regards,
> Roman
>
>
> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf  wrote:
>> Hi Dawid,
>>
>> sounds good, specifically 2., too.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi all,
>>>
>>> I updated the FLIP with a few clarifications:
>>>
>>>1. I added a description how would we trigger a "full snapshot" in the
>>>changelog state backend
>>>   - (We would go for this option in the 1st version). Trigger a
>>>   snapshot of the base state backend in the 1st checkpoint, which 
>>> induces
>>>   materializing the changelog. In this approach we could duplicate SST 
>>> files,
>>>   but we would not duplicate the diff files.
>>>- Add a hook for logic for computing which task should duplicate the
>>>   diff files. We would have to do a pass over all states after the state
>>>   assignment in StateAssignmentOperation
>>>   2. I clarified that the "no-claim" mode requires a
>>>completed/successful checkpoint before we can remove the one we are
>>>restoring from. Also added a note that we can assume a checkpoint is
>>>completed if it is confirmed by Flink's API for checkpointing stats or by
>>>checking an entry in HA services. A checkpoint can not be assumed 
>>> completed
>>>by just looking at the checkpoint files.
>>>
>>> I suggest going on with the proposal for "no-claim" as suggested so far,
>>> as it is easier to understand by users. They can reliably tell when they
>>> can expect the checkpoint to be deletable. If we see that the time to take
>>> the 1st checkpoint becomes a problem we can extend the set of restore
>>> methods and e.g. add a "claim-temporarily" method.
>>>
>>> I hope we can reach a consensus and start a vote, some time early next
>>> week.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 23/11/2021 22:39, Roman Khachatryan wrote:
>>>
>>> I also referred to the "no-claim" mode and I still think neither of them 
>>> works in that mode, as you'd have to keep lineage of checkpoints externally 
>>> to be able delete any checkpoint.
>>>
>>> I think the lineage is needed in all approaches with arbitrary
>>> histories; the difference is whether a running Flink is required or
>>> not. Is that what you mean?
>>> (If not, could you please explain how the scenario you mentioned above
>>> with multiple jobs branching from the same checkpoint is handled?)
>>>
>>>
>>> BTW, the state key for RocksDB is actually: backend UID + key group range + 
>>> SST file name, so the key would be different (the key group range is 
>>> different for two tasks) and we would've two separate counters for the same 
>>> file.
>>>
>>> You're right. But there is also a collision between old and new entries.
>>>
>>>
>>> To be on the same page here. It is not a problem so far in RocksDB, because 
>>> we do not reuse any shared files in case of rescaling.
>>>
>>> As I mentioned above, collision happens not only because of rescaling;
>>> and AFAIK, there are some ideas to reuse files on rescaling (probably
>>> Yuan could clarify). Anyways, I think it makes sense to not bake in
>>> this assumption unless it's hard to implement (or at least state it
>>> explicitly in FLIP).
>>>
>>>
>>> It is not suggested as an optimization. It is suggested as a must for state 
>>> backends that need it. I did not elaborate on it, because it could affected 
>>> only the changelog state backend at the moment, which I don't have much 
>>> insights. I agree it might make sense to look a bit how we could

Re: [DISCUSS] Releasing Flink 1.14.1

2021-11-26 Thread David Morávek
Hi Martijn,

* https://issues.apache.org/jira/browse/FLINK-24543 - Zookeeper connection
> issue causes inconsistent state in Flink -> I think this depends on the
> outcome of dropping Zookeeper 3.4 as was proposed on the Dev mailing list
>

We already have an approved patch for master, I'll be preparing backports
today.

* https://issues.apache.org/jira/browse/FLINK-24789 - IllegalStateException
> with CheckpointCleaner being closed already
>

We already have an approved patch for master, I'll be preparing backports
today.

* https://issues.apache.org/jira/browse/FLINK-23946 - Application mode
> fails fatally when being shut down -> This depends on
>

We've briefly talked with Till about this one and we'll provide a patch for
1.14.1. We just need to disable one test that could be flaky because of
FLINK-24038.

https://issues.apache.org/jira/browse/FLINK-24038 and I don't see much
> happening there, so I also expect that this would move to Flink 1.15.
> David, could you confirm?
>

Till has prepared a prototype for this, but the change is too invasive to
be introduced in a patch versions. We're moving this to 1.15.

Best,
D.

On Thu, Nov 25, 2021 at 9:03 AM Dawid Wysakowicz 
wrote:

> Hey Martijn,
>
> +1 for releasing 1.14.1
>
> As for https://issues.apache.org/jira/browse/FLINK-24328 I removed the
> 1.14.1 fix version. It definitely should not block the release. If we
> decide to backport it to 1.14.x it can safely land in 1.14.2.
>
> Best,
>
> Dawid
>
> On 24/11/2021 19:40, Martijn Visser wrote:
> > Hi all,
> >
> > I would like to start a discussion on releasing Flink 1.14.1. Flink 1.14
> > was released on the 29th of September [1] and so far 107 issues have been
> > resolved, including multiple blockers and critical priorities [2].
> >
> > There are currently 169 open tickets which contain a fixVersion for
> 1.14.1
> > [3]. I'm including the ones that are currently marked as critical or a
> > blocker to verify if these should be included in Flink 1.14.1. It would
> be
> > great if those that are assigned or working on one or more of these
> tickets
> > can give an update on its status.
> >
> > * https://issues.apache.org/jira/browse/FLINK-24543 - Zookeeper
> connection
> > issue causes inconsistent state in Flink -> I think this depends on the
> > outcome of dropping Zookeeper 3.4 as was proposed on the Dev mailing list
> > * https://issues.apache.org/jira/browse/FLINK-25027 - Allow GC of a
> > finished job's JobMaster before the slot timeout is reached
> > * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader leak
> with
> > ThreadLocals on the JM when submitting a job through the REST API
> > * https://issues.apache.org/jira/browse/FLINK-24789 -
> IllegalStateException
> > with CheckpointCleaner being closed already
> > * https://issues.apache.org/jira/browse/FLINK-24328 - Long term fix for
> > receiving new buffer size before network reader configured -> I'm not
> sure
> > if this would end up in Flink 1.14.1, I think it's more likely that it
> > would be Flink 1.15. Anton/Dawid, could you confirm this?
> > * https://issues.apache.org/jira/browse/FLINK-23946 - Application mode
> > fails fatally when being shut down -> This depends on
> > https://issues.apache.org/jira/browse/FLINK-24038 and I don't see much
> > happening there, so I also expect that this would move to Flink 1.15.
> > David, could you confirm?
> > * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey
> constraint
> > is lost with multiple sources join in SQL
> > * https://issues.apache.org/jira/browse/FLINK-21788 - Throw
> > PartitionNotFoundException if the partition file has been lost for
> blocking
> > shuffle -> I'm also expecting that this would move to Flink 1.15, can you
> > confirm Yingjie ?
> >
> > There are quite some other tickets that I've excluded from this list,
> > because they are either test instabilities or are not depending on a
> Flink
> > release to be resolved.
> >
> > Note: there are quite a few test instabilities in the list and help on
> > those is always appreciated. You can check all unassigned tickets
> > instabilities in Jira [4].
> >
> > Are there any other open tickets that we should wait for? Is there a PMC
> > member who would like to manage the release? I'm more than happy to help
> > with monitoring the status of the tickets.
> >
> > Best regards,
> >
> > Martijn
> >
> > [1] https://flink.apache.org/news/2021/09/29/release-1.14.0.html
> > [2]
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%201.14.1%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC
> > [3]
> >
> https://issues.apache.org/jira/issues?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20fixVersion%20%3D%201.14.1%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC
> >
> > [4]
> >
> https://issues.apache.org/jira/issues?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20

Re: [DISCUSS] Releasing Flink 1.14.1

2021-11-26 Thread Chesnay Schepler
FLINK-25022: I will open a PR later today, and it should be easy to 
backport.
FLINK-25027: Unlikely to make it for 1.14.1; I also wouldn't consider it 
a blocker


On 24/11/2021 19:40, Martijn Visser wrote:

Hi all,

I would like to start a discussion on releasing Flink 1.14.1. Flink 1.14
was released on the 29th of September [1] and so far 107 issues have been
resolved, including multiple blockers and critical priorities [2].

There are currently 169 open tickets which contain a fixVersion for 1.14.1
[3]. I'm including the ones that are currently marked as critical or a
blocker to verify if these should be included in Flink 1.14.1. It would be
great if those that are assigned or working on one or more of these tickets
can give an update on its status.

* https://issues.apache.org/jira/browse/FLINK-24543 - Zookeeper connection
issue causes inconsistent state in Flink -> I think this depends on the
outcome of dropping Zookeeper 3.4 as was proposed on the Dev mailing list
* https://issues.apache.org/jira/browse/FLINK-25027 - Allow GC of a
finished job's JobMaster before the slot timeout is reached
* https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader leak with
ThreadLocals on the JM when submitting a job through the REST API
* https://issues.apache.org/jira/browse/FLINK-24789 - IllegalStateException
with CheckpointCleaner being closed already
* https://issues.apache.org/jira/browse/FLINK-24328 - Long term fix for
receiving new buffer size before network reader configured -> I'm not sure
if this would end up in Flink 1.14.1, I think it's more likely that it
would be Flink 1.15. Anton/Dawid, could you confirm this?
* https://issues.apache.org/jira/browse/FLINK-23946 - Application mode
fails fatally when being shut down -> This depends on
https://issues.apache.org/jira/browse/FLINK-24038 and I don't see much
happening there, so I also expect that this would move to Flink 1.15.
David, could you confirm?
* https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey constraint
is lost with multiple sources join in SQL
* https://issues.apache.org/jira/browse/FLINK-21788 - Throw
PartitionNotFoundException if the partition file has been lost for blocking
shuffle -> I'm also expecting that this would move to Flink 1.15, can you
confirm Yingjie ?

There are quite some other tickets that I've excluded from this list,
because they are either test instabilities or are not depending on a Flink
release to be resolved.

Note: there are quite a few test instabilities in the list and help on
those is always appreciated. You can check all unassigned tickets
instabilities in Jira [4].

Are there any other open tickets that we should wait for? Is there a PMC
member who would like to manage the release? I'm more than happy to help
with monitoring the status of the tickets.

Best regards,

Martijn

[1] https://flink.apache.org/news/2021/09/29/release-1.14.0.html
[2]
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%201.14.1%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC
[3]
https://issues.apache.org/jira/issues?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20fixVersion%20%3D%201.14.1%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC

[4]
https://issues.apache.org/jira/issues?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20fixVersion%20%3D%201.14.1%20AND%20labels%20%3D%20test-stability%20AND%20assignee%20in%20(EMPTY)%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time





[jira] [Created] (FLINK-25080) FutureUtils is located in flink-core whereas the corresponding test is still located in flink-runtime

2021-11-26 Thread Matthias (Jira)
Matthias created FLINK-25080:


 Summary: FutureUtils is located in flink-core whereas the 
corresponding test is still located in flink-runtime
 Key: FLINK-25080
 URL: https://issues.apache.org/jira/browse/FLINK-25080
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Affects Versions: 1.14.0, 1.15.0
Reporter: Matthias
 Fix For: 1.15.0


The same applies to the {{RetryStrategy}} implementation tests.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Creating an external connector repository

2021-11-26 Thread Till Rohrmann
Hi Arvid,

Thanks for updating this thread with the latest findings. The described
limitations for a single connector repo sound suboptimal to me.

* Option 2. sounds as if we try to simulate multi connector repos inside of
a single repo. I also don't know how we would share code between the
different branches (sharing infrastructure would probably be easier
though). This seems to have the same limitations as dedicated repos with
the downside of having a not very intuitive branching model.
* Isn't option 1. kind of a degenerated version of option 2. where we have
some unrelated code from other connectors in the individual connector
branches?
* Option 3. has the downside that someone creating a release has to release
all connectors. This means that she either has to sync with the different
connector maintainers or has to be able to release all connectors on her
own. We are already seeing in the Flink community that releases require
quite good communication/coordination between the different people working
on different Flink components. Given our goals to make connector releases
easier and more frequent, I think that coupling different connector
releases might be counter-productive.

To me it sounds not very practical to mainly use a mono repository w/o
having some more advanced build infrastructure that, for example, allows to
have different git roots in different connector directories. Maybe the mono
repo can be a catch all repository for connectors that want to be released
in lock-step (Option 3.) with all other connectors the repo contains. But
for connectors that get changed frequently, having a dedicated repository
that allows independent releases sounds preferable to me.

What utilities and infrastructure code do you intend to share? Using git
submodules can definitely be one option to share code. However, it might
also be ok to depend on flink-connector-common artifacts which could make
things easier. Where I am unsure is whether git submodules can be used to
share infrastructure code (e.g. the .github/workflows) because you need
these files in the repo to trigger the CI infrastructure.

Cheers,
Till

On Thu, Nov 25, 2021 at 1:59 PM Arvid Heise  wrote:

> Hi Brian,
>
> Thank you for sharing. I think your approach is very valid and is in line
> with what I had in mind.
>
> Basically Pravega community aligns the connector releases with the Pravega
> > mainline release
> >
> This certainly would mean that there is little value in coupling connector
> versions. So it's making a good case for having separate connector repos.
>
>
> > and maintains the connector with the latest 3 Flink versions(CI will
> > publish snapshots for all these 3 branches)
> >
> I'd like to give connector devs a simple way to express to which Flink
> versions the current branch is compatible. From there we can generate the
> compatibility matrix automatically and optionally also create different
> releases per supported Flink version. Not sure if the latter is indeed
> better than having just one artifact that happens to run with multiple
> Flink versions. I guess it depends on what dependencies we are exposing. If
> the connector uses flink-connector-base, then we probably need separate
> artifacts with poms anyways.
>
> Best,
>
> Arvid
>
> On Fri, Nov 19, 2021 at 10:55 AM Zhou, Brian  wrote:
>
> > Hi Arvid,
> >
> > For branching model, the Pravega Flink connector has some experience what
> > I would like to share. Here[1][2] is the compatibility matrix and wiki
> > explaining the branching model and releases. Basically Pravega community
> > aligns the connector releases with the Pravega mainline release, and
> > maintains the connector with the latest 3 Flink versions(CI will publish
> > snapshots for all these 3 branches).
> > For example, recently we have 0.10.1 release[3], and in maven central we
> > need to upload three artifacts(For Flink 1.13, 1.12, 1.11) for 0.10.1
> > version[4].
> >
> > There are some alternatives. Another solution that we once discussed but
> > finally got abandoned is to have a independent version just like the
> > current CDC connector, and then give a big compatibility matrix to users.
> > We think it would be too confusing when the connector develops. On the
> > contrary, we can also do the opposite way to align with Flink version and
> > maintain several branches for different system version.
> >
> > I would say this is only a fairly-OK solution because it is a bit painful
> > for maintainers as cherry-picks are very common and releases would
> require
> > much work. However, if neither systems do not have a nice backward
> > compatibility, there seems to be no comfortable solution to the their
> > connector.
> >
> > [1] https://github.com/pravega/flink-connectors#compatibility-matrix
> > [2]
> >
> https://github.com/pravega/flink-connectors/wiki/Versioning-strategy-for-Flink-connector
> > [3] https://github.com/pravega/flink-connectors/releases/tag/v0.10.1
> > [4] https://search.maven.org/search?

Re: [DISCUSS] Creating an external connector repository

2021-11-26 Thread Chesnay Schepler
For sharing workflows we should be able to use composite actions. We'd 
have the main definition files in the flink-connectors repo, that we 
also need to tag/release, which other branches/repos can then import. 
These are also versioned, so we don't have to worry about accidentally 
breaking stuff.
These could also be used to enforce certain standards / interfaces such 
that we can automate more things (e.g., integration into the Flink 
documentation).


It is true that Option 2) and dedicated repositories share a lot of 
properties. While I did say in an offline conversation that we in that 
case might just as well use separate repositories, I'm not so sure 
anymore. One repo would make administration a bit easier, for example 
secrets wouldn't have to be applied to each repo (we wouldn't want 
certain secrets to be set up organization-wide).
I overall also like that one repo would present a single access point; 
you can't "miss" a connector repo, and I would hope that having it as 
one repo would nurture more collaboration between the connectors, which 
after all need to solve similar problems.


It is a fair point that the branching model would be quite weird, but I 
think that would subside pretty quickly.


Personally I'd go with Option 2, and if that doesn't work out we can 
still split the repo later on. (Which should then be a trivial matter of 
copying all /* branches and renaming them).


On 26/11/2021 12:47, Till Rohrmann wrote:

Hi Arvid,

Thanks for updating this thread with the latest findings. The described
limitations for a single connector repo sound suboptimal to me.

* Option 2. sounds as if we try to simulate multi connector repos inside of
a single repo. I also don't know how we would share code between the
different branches (sharing infrastructure would probably be easier
though). This seems to have the same limitations as dedicated repos with
the downside of having a not very intuitive branching model.
* Isn't option 1. kind of a degenerated version of option 2. where we have
some unrelated code from other connectors in the individual connector
branches?
* Option 3. has the downside that someone creating a release has to release
all connectors. This means that she either has to sync with the different
connector maintainers or has to be able to release all connectors on her
own. We are already seeing in the Flink community that releases require
quite good communication/coordination between the different people working
on different Flink components. Given our goals to make connector releases
easier and more frequent, I think that coupling different connector
releases might be counter-productive.

To me it sounds not very practical to mainly use a mono repository w/o
having some more advanced build infrastructure that, for example, allows to
have different git roots in different connector directories. Maybe the mono
repo can be a catch all repository for connectors that want to be released
in lock-step (Option 3.) with all other connectors the repo contains. But
for connectors that get changed frequently, having a dedicated repository
that allows independent releases sounds preferable to me.

What utilities and infrastructure code do you intend to share? Using git
submodules can definitely be one option to share code. However, it might
also be ok to depend on flink-connector-common artifacts which could make
things easier. Where I am unsure is whether git submodules can be used to
share infrastructure code (e.g. the .github/workflows) because you need
these files in the repo to trigger the CI infrastructure.

Cheers,
Till

On Thu, Nov 25, 2021 at 1:59 PM Arvid Heise  wrote:


Hi Brian,

Thank you for sharing. I think your approach is very valid and is in line
with what I had in mind.

Basically Pravega community aligns the connector releases with the Pravega

mainline release


This certainly would mean that there is little value in coupling connector
versions. So it's making a good case for having separate connector repos.



and maintains the connector with the latest 3 Flink versions(CI will
publish snapshots for all these 3 branches)


I'd like to give connector devs a simple way to express to which Flink
versions the current branch is compatible. From there we can generate the
compatibility matrix automatically and optionally also create different
releases per supported Flink version. Not sure if the latter is indeed
better than having just one artifact that happens to run with multiple
Flink versions. I guess it depends on what dependencies we are exposing. If
the connector uses flink-connector-base, then we probably need separate
artifacts with poms anyways.

Best,

Arvid

On Fri, Nov 19, 2021 at 10:55 AM Zhou, Brian  wrote:


Hi Arvid,

For branching model, the Pravega Flink connector has some experience what
I would like to share. Here[1][2] is the compatibility matrix and wiki
explaining the branching model and releases. Basically Pravega community
aligns the connector relea

[jira] [Created] (FLINK-25081) When chaining an operator of a side output stream, the num records sent displayed on the dashboard is incorrect

2021-11-26 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-25081:
--

 Summary: When chaining an operator of a side output stream, the 
num records sent displayed on the dashboard is incorrect
 Key: FLINK-25081
 URL: https://issues.apache.org/jira/browse/FLINK-25081
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.14.0
Reporter: Lijie Wang
 Attachments: image-2021-11-26-20-32-08-443.png

As show in the following figure, "Map" is an operator of a side output stream, 
the num records sent of first vertex is 0.

!image-2021-11-26-20-32-08-443.png|width=750,height=253!

 

The job code is as follows:
{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

SingleOutputStreamOperator dataStream =
env.addSource(new 
DataGeneratorSource<>(RandomGenerator.longGenerator(1, 1000)))
.returns(Long.class)
.setParallelism(10)
.slotSharingGroup("group1");

DataStream sideOutput = dataStream.getSideOutput(new 
OutputTag("10") {});
sideOutput.map(num -> num).setParallelism(10).slotSharingGroup("group1");

dataStream.addSink(new 
DiscardingSink<>()).setParallelism(10).slotSharingGroup("group2");

env.execute("WordCount"); {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-189: SQL Client Usability Improvements

2021-11-26 Thread Daisy Tsang
Hi,

would a command that shows the available table source/sink factories be
implemented?  Something that can show that a factory is loaded, etc.

- Daisy

On Thu, Nov 4, 2021 at 8:30 AM Sergey Nuyanzin  wrote:

> I've started a [VOTE] thread for this FLIP
> https://lists.apache.org/thread/f14jjhrscrdv7h6zw6h1k72nfx232qxs
>
> On Wed, Nov 3, 2021 at 2:59 PM Sergey Nuyanzin 
> wrote:
>
> > Hi Timo,
> >
> > I completely agree it would be great if we can propagate Calcite parser
> > config in
> > the way you have described. As you mentioned we could discuss this
> > when it comes to the implementation.
> >
> > Meanwhile it looks like I can start voting (please correct me if I'm
> > wrong).
> > I will start it a bit later today
> >
> > On Wed, Nov 3, 2021 at 1:37 PM Timo Walther  wrote:
> >
> >> Hi Sergey,
> >>
> >> thanks for your explanation.
> >>
> >> Regarding keywords and other info: We should receive the information
> >> from the Flink SQL parser directly. We have added a couple of new
> >> keywords such as WATERMARK or MATCH_RECOGNIZE clauses. SQL92 would not
> >> help a user understand why a column name needs to be escaped. And in
> >> general, we should not have duplicate code. Let's discuss this when it
> >> comes to the implementation. I'm sure we can propagate the Calcite
> >> parser config into a nice POJO that the CLI can receive from the
> Executor.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 03.11.21 11:12, Sergey Nuyanzin wrote:
> >> > Hi 李宇彬,
> >> >
> >> > I think you are right. Thank you very much for the idea.
> >> > I came across MySQL[1] and PostgreSQL[2] prompts and also
> >> > found several interesting features like control symbols to change
> style,
> >> > showing current property value and different datetime formats.
> >> >
> >> > I have added your proposals and my findings to FLIP's page, please
> have
> >> a
> >> > look.
> >> >
> >> > [1] https://dev.mysql.com/doc/refman/8.0/en/mysql-commands.html
> >> > [2]
> https://www.postgresql.org/docs/14/app-psql.html#APP-PSQL-PROMPTING
> >> >
> >> > On Wed, Nov 3, 2021 at 2:31 AM 李宇彬  wrote:
> >> >
> >> >> Hi Sergey
> >> >>
> >> >>
> >> >> It is a very useful improvement I'm looking forward to. in addition,
> I
> >> >> think prompt
> >> >> can play a greater role.
> >> >>
> >> >>
> >> >> To help users call commands in expected context, we can get session
> >> >> context
> >> >> (current catalog/db/time) from cli prompt like MySQL,
> >> >> please see details as below:
> >> >>
> >> >>
> >> >> https://issues.apache.org/jira/browse/FLINK-24730
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On 11/2/2021 21:09,Sergey Nuyanzin wrote:
> >> >> Hi Timo
> >> >>
> >> >> Thank you for your questions.
> >> >>
> >> >> I will answer your questions here and update FLIP's page as well
> >> >>
> >> >> For example, who is responsible for parsing comments? I guess the SQL
> >> >> Client and not the Flink SQL parser will take care of this?
> >> >> Yes, you are right. SQL Client is responsible for parsing here.
> >> >> However it does not validate sql, it only validates that comments,
> >> brackets
> >> >> and quotes are closed and the statement ends with a semicolon.
> >> >> Also under the hood jline's splits input into words and works with
> >> them.
> >> >> Within a custom parser it is possible to specify what should be
> >> considered
> >> >> as a word or not considered at all (e.g. it is possible to remove
> >> >> all line and block comments before submitting a query,
> >> >> probably as another non default option)... During parsing it marks
> >> >> what is a comment, a keyword, a quoted string etc. based on rules
> >> >> defined in SQL Client parser.
> >> >> SQL Client highlighter could use the result of this marking to
> >> highlight.
> >> >> Completer could use it to do completion, e.g. if based on parser's
> >> marks
> >> >> completer knows that a cursor is inside a comment or a string
> >> >> then no need to complete anything.
> >> >>
> >> >> Also, will the prompt hints for `'>` and ``>` support escaping? This
> >> can
> >> >> be a tricky topic sometimes.
> >> >> Ideally yes, I played with lots of tricky cases and it behaves ok.
> >> >> At least I do not see limitations here.
> >> >> In case you do please share...
> >> >>
> >> >> In general, how do we deal with different SQL dialects in the SQL
> >> >> Client. For example, it possible to `table.sql-dialect` to `HIVE`.
> Will
> >> >> all highlighting, auto-complete and prompt hints be disabled in this
> >> case?
> >> >> It could be turned off for the beginning.
> >> >> To make it supported across different dialects it is required to have
> >> such
> >> >> info:
> >> >> 1) Set of keywords
> >> >> 2) Quote sign
> >> >> 3) SQL identifier quote
> >> >> 4) Start of a line comment
> >> >> 5) Start and end of a block comment
> >> >> 6) Start and end of hints
> >> >> I see at least 2 ways:
> >> >> 1. provide such api
> >> >> 2. create this mapping in SQL Client and use it based on current
> >> dialect
> >> >> Then it 

Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-26 Thread Till Rohrmann
Thanks for writing this FLIP Timo. I think this will be a very important
improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand the statement

> For simplification of the design, we assume that upgrades use a step size
of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

a bit better. Is it because Flink does not guarantee that a savepoint
created by version 1.x can be directly recovered by version 1.y with x + 1
< y but users might have to go through a cascade of upgrades? From how I
understand your proposal, the compiled plan won't be changed after being
written initially. Hence, I would assume that for the plan alone Flink will
have to give backwards compatibility guarantees for all versions. Am I
understanding this part correctly?

Cheers,
Till

On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani 
wrote:

> Hi Timo,
>
> Thanks for putting this amazing work together, I have some
> considerations/questions
> about the FLIP:
> *Proposed changes #6*: Other than defining this rule of thumb, we must
> also make sure
> that compiling plans with these objects that cannot be serialized in the
> plan must fail hard,
> so users don't bite themselves with such issues, or at least we need to
> output warning
> logs. In general, whenever the user is trying to use the CompiledPlan APIs
> and at the same
> time, they're trying to do something "illegal" for the plan, we should
> immediately either
> log or fail depending on the issue, in order to avoid any surprises once
> the user upgrades.
> I would also say the same for things like registering a function,
> registering a DataStream,
> and for every other thing which won't end up in the plan, we should log
> such info to the
> user by default.
>
> *General JSON Plan Assumptions #9:* When thinking to connectors and
> formats, I think
> it's reasonable to assume and keep out of the feature design that no
> feature/ability can
> deleted from a connector/format. I also don't think new features/abilities
> can influence
> this FLIP as well, given the plan is static, so if for example,
> MyCoolTableSink in the next
> flink version implements SupportsProjectionsPushDown, then it shouldn't be
> a problem
> for the upgrade story since the plan is still configured as computed from
> the previous flink
> version. What worries me is breaking changes, in particular behavioural
> changes that
> might happen in connectors/formats. Although this argument doesn't seem
> relevant for
> the connectors shipped by the flink project itself, because we try to keep
> them as stable as
> possible and avoid eventual breaking changes, it's compelling to external
> connectors and
> formats, which might be decoupled from the flink release cycle and might
> have different
> backward compatibility guarantees. It's totally reasonable if we don't
> want to tackle it in
> this first iteration of the feature, but it's something we need to keep in
> mind for the future.
>
>
> *Functions:* It's not clear to me what you mean for "identifier", because
> then somewhere
> else in the same context you talk about "name". Are we talking about the
> function name
> or the function complete signature? Let's assume for example we have these
> function
> definitions:
>
>
> * TO_TIMESTAMP_LTZ(BIGINT)
> * TO_TIMESTAMP_LTZ(STRING)
> * TO_TIMESTAMP_LTZ(STRING, STRING)
>
> These for me are very different functions with different implementations,
> where each of
> them might evolve separately at a different pace. Hence when we store them
> in the json
> plan we should perhaps use a logically defined unique id like
> /bigIntToTimestamp/, /
> stringToTimestamp/ and /stringToTimestampWithFormat/. This also solves the
> issue of
> correctly referencing the functions when restoring the plan, without
> running again the
> inference logic (which might have been changed in the meantime) and it
> might also solve
> the versioning, that is the function identifier can contain the function
> version like /
> stringToTimestampWithFormat_1_1 /or /stringToTimestampWithFormat_1_2/. An
> alternative could be to use the string signature representation, which
> might not be trivial
> to compute, given the complexity of our type inference logic.
>
> *The term "JSON plan"*: I think we should rather keep JSON out of the
> concept and just
> name it "Compiled Plan" (like the proposed API) or something similar, as I
> see how in
> future we might decide to support/modify our persistence format to
> something more
> efficient storage wise like BSON. For example, I would rename /
> CompiledPlan.fromJsonFile/ to simply /CompiledPlan.fromFile/.
>
> *Who is the owner of the plan file?* I asked myself this question when
> reading this:
>
>
> > For simplification of the design, we assume that upgrades use a step
> size of a single
> minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
> 1.14).
>
> My understanding of this statement is that a user can upgrad

Re: [DISCUSS] Deprecate Java 8 support

2021-11-26 Thread Becket Qin
Thanks for raising the discussion, Chesnay.

I think it is OK to deprecate Java 8 to let the users know that Java 11
migration should be put into the schedule. However, According to some of
the statistics of the Java version adoption[1], a large number of users are
still using Java 8 in production. I doubt that Java 8 users will drop to a
negligible amount within the next 2 - 3 Flink releases. I would suggest
making the time to drop Java 8 support flexible.

Thanks,

Jiangjie (Becket) Qin

[1] https://www.infoq.com/news/2021/07/snyk-jvm-2021/

On Fri, Nov 26, 2021 at 5:09 AM Till Rohrmann  wrote:

> +1 for the deprecation and reaching out to the user ML to ask for feedback
> from our users. Thanks for driving this Chesnay!
>
> Cheers,
> Till
>
> On Thu, Nov 25, 2021 at 10:15 AM Roman Khachatryan 
> wrote:
>
> > The situation is probably a bit different now compared to the previous
> > upgrade: some users might be using Amazon Coretto (or other builds)
> > which have longer support.
> >
> > Still +1 for deprecation to trigger migration, and thanks for bringing
> > this up!
> >
> > Regards,
> > Roman
> >
> > On Thu, Nov 25, 2021 at 10:09 AM Arvid Heise  wrote:
> > >
> > > +1 to deprecate Java 8, so we can hopefully incorporate the module
> > concept
> > > in Flink.
> > >
> > > On Thu, Nov 25, 2021 at 9:49 AM Chesnay Schepler 
> > wrote:
> > >
> > > > Users can already use APIs from Java 8/11.
> > > >
> > > > On 25/11/2021 09:35, Francesco Guardiani wrote:
> > > > > +1 with what both Ingo and Matthias sad, personally, I cannot wait
> to
> > > > start using some of
> > > > > the APIs introduced in Java 9. And I'm pretty sure that's the same
> > for
> > > > our users as well.
> > > > >
> > > > > On Tuesday, 23 November 2021 13:35:07 CET Ingo Bürk wrote:
> > > > >> Hi everyone,
> > > > >>
> > > > >> continued support for Java 8 can also create project risks, e.g.
> if
> > a
> > > > >> vulnerability arises in Flink's dependencies and we cannot upgrade
> > them
> > > > >> because they no longer support Java 8. Some projects already
> started
> > > > >> deprecating support as well, like Kafka, and other projects will
> > likely
> > > > >> follow.
> > > > >> Let's also keep in mind that the proposal here is not to drop
> > support
> > > > right
> > > > >> away, but to deprecate it, send the message, and motivate users to
> > start
> > > > >> migrating. Delaying this process could ironically mean users have
> > less
> > > > time
> > > > >> to prepare for it.
> > > > >>
> > > > >>
> > > > >> Ingo
> > > > >>
> > > > >> On Tue, Nov 23, 2021 at 8:54 AM Matthias Pohl <
> > matth...@ververica.com>
> > > > >>
> > > > >> wrote:
> > > > >>> Thanks for constantly driving these maintenance topics, Chesnay.
> +1
> > > > from
> > > > >>> my
> > > > >>> side for deprecating Java 8. I see the point Jingsong is raising.
> > But I
> > > > >>> agree with what David already said here. Deprecating the Java
> > version
> > > > is a
> > > > >>> tool to make users aware of it (same as starting this discussion
> > > > thread).
> > > > >>> If there's no major opposition against deprecating it in the
> > community
> > > > we
> > > > >>> should move forward in this regard to make the users who do not
> > > > >>> regularly browse the mailing list aware of it. That said,
> > deprecating
> > > > Java
> > > > >>> 8 in 1.15 does not necessarily mean that it is dropped in 1.16.
> > > > >>>
> > > > >>> Best,
> > > > >>> Matthias
> > > > >>>
> > > > >>> On Tue, Nov 23, 2021 at 8:46 AM David Morávek 
> > wrote:
> > > >  Thank you Chesnay for starting the discussion! This will
> generate
> > bit
> > > > of
> > > > >>> a
> > > > >>>
> > > >  work for some users, but it's a good thing to keep moving the
> > project
> > > >  forward. Big +1 for this.
> > > > 
> > > >  Jingsong:
> > > > 
> > > >  Receiving this signal, the user may be unhappy because his
> > application
> > > > 
> > > > > may be all on Java 8. Upgrading is a big job, after all, many
> > systems
> > > > > have not been upgraded yet. (Like you said, HBase and Hive)
> > > >  The whole point of deprecation is to raise awareness, that this
> > will
> > > > be
> > > >  happening eventually and users should take some steps to address
> > this
> > > > in
> > > >  medium-term. If I understand Chesnay correctly, we'd still keep
> > Java 8
> > > >  around for quite some time to give users enough time to upgrade,
> > but
> > > >  without raising awareness we'd fight the very same argument
> later
> > in
> > > > >>> time.
> > > > >>>
> > > >  All of the prerequisites from 3rd party projects for both HBase
> > [1]
> > > > and
> > > >  Hive [2] to fully support Java 11 have been completed, so the
> > ball is
> > > > on
> > > >  their side and there doesn't seem to be much activity.
> Generating
> > bit
> > > > >>> more
> > > > >>>
> > > >  pressure on these efforts might be a good thing.
> > > > 
> > > >  It would be great

Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Till Rohrmann
Thanks for writing this FLIP Dawid. Just to clarify one thing for the
support of forced full snapshots. If a state backend does not support this
feature, then the user either has to copy the snapshot manually or resume
using --claim mode, create a savepoint in canonical format and then
change the state backend if he wants to use --no-claim, right?

Cheers,
Till

On Fri, Nov 26, 2021 at 11:49 AM Dawid Wysakowicz 
wrote:

> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>
> I don't think this is a good idea. The modes apply to both savepoints and
> checkpoints, plus it's much longer to type in. (minor)
>
> - add an explicit option to preserve the current behavior (no claim
> and no duplicate)?
>
> We had an offline discussion about it and so far we were leaning towards
> keeping the set of supported options minimal. However, if we really think
> the old behaviour is useful we can add a --legacy restore mode. cc
> @Konstantin @Piotr
>
> There seems to be a consensus in the discussion, however, I couldn't
> find stop-with-savepoint in the document.
>
> Sorry, I forgot, about this one. I added a note that savepoints generated
> from stop-with-savepoint should commit side effects.
>
> And I still think it would be nice to list object stores which support
> duplicate operation.
>
> I listed a couple of file systems that do have some sort of a COPY API.
>
> Best,
>
> Dawid
> On 26/11/2021 11:03, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for updating the FLIP Dawid
>
> There seems to be a consensus in the discussion, however, I couldn't
> find stop-with-savepoint in the document.
>
> A few minor things:
> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
> - add an explicit option to preserve the current behavior (no claim
> and no duplicate)?
> And I still think it would be nice to list object stores which support
> duplicate operation.
>
> Regards,
> Roman
>
>
> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf  
>  wrote:
>
> Hi Dawid,
>
> sounds good, specifically 2., too.
>
> Best,
>
> Konstantin
>
> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz  
> 
> wrote:
>
>
> Hi all,
>
> I updated the FLIP with a few clarifications:
>
>1. I added a description how would we trigger a "full snapshot" in the
>changelog state backend
>   - (We would go for this option in the 1st version). Trigger a
>   snapshot of the base state backend in the 1st checkpoint, which induces
>   materializing the changelog. In this approach we could duplicate SST 
> files,
>   but we would not duplicate the diff files.
>- Add a hook for logic for computing which task should duplicate the
>   diff files. We would have to do a pass over all states after the state
>   assignment in StateAssignmentOperation
>   2. I clarified that the "no-claim" mode requires a
>completed/successful checkpoint before we can remove the one we are
>restoring from. Also added a note that we can assume a checkpoint is
>completed if it is confirmed by Flink's API for checkpointing stats or by
>checking an entry in HA services. A checkpoint can not be assumed completed
>by just looking at the checkpoint files.
>
> I suggest going on with the proposal for "no-claim" as suggested so far,
> as it is easier to understand by users. They can reliably tell when they
> can expect the checkpoint to be deletable. If we see that the time to take
> the 1st checkpoint becomes a problem we can extend the set of restore
> methods and e.g. add a "claim-temporarily" method.
>
> I hope we can reach a consensus and start a vote, some time early next
> week.
>
> Best,
>
> Dawid
>
> On 23/11/2021 22:39, Roman Khachatryan wrote:
>
> I also referred to the "no-claim" mode and I still think neither of them 
> works in that mode, as you'd have to keep lineage of checkpoints externally 
> to be able delete any checkpoint.
>
> I think the lineage is needed in all approaches with arbitrary
> histories; the difference is whether a running Flink is required or
> not. Is that what you mean?
> (If not, could you please explain how the scenario you mentioned above
> with multiple jobs branching from the same checkpoint is handled?)
>
>
> BTW, the state key for RocksDB is actually: backend UID + key group range + 
> SST file name, so the key would be different (the key group range is 
> different for two tasks) and we would've two separate counters for the same 
> file.
>
> You're right. But there is also a collision between old and new entries.
>
>
> To be on the same page here. It is not a problem so far in RocksDB, because 
> we do not reuse any shared files in case of rescaling.
>
> As I mentioned above, collision happens not only because of rescaling;
> and AFAIK, there are some ideas to reuse files on rescaling (probably
> Yuan could clarify). Anyways, I think it makes sense to not bake in
> this assumption unless it's hard to implement (or at least state it
> explicitly in FLIP).
>
>
> 

Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Dawid Wysakowicz
If they'd like to use the --no-claim mode that would be the way to go, yes.

Two points to be on the same page here:

  * all Flink native state backends (RocksDB, HashMap, changelog) would
already support --no-claim
  * if in the end we add the --legacy mode, users can also use that mode
instead of --claim.

Best,

Dawid

On 26/11/2021 15:57, Till Rohrmann wrote:
> Thanks for writing this FLIP Dawid. Just to clarify one thing for the
> support of forced full snapshots. If a state backend does not support this
> feature, then the user either has to copy the snapshot manually or resume
> using --claim mode, create a savepoint in canonical format and then
> change the state backend if he wants to use --no-claim, right?
>
> Cheers,
> Till
>
> On Fri, Nov 26, 2021 at 11:49 AM Dawid Wysakowicz 
> wrote:
>
>> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>>
>> I don't think this is a good idea. The modes apply to both savepoints and
>> checkpoints, plus it's much longer to type in. (minor)
>>
>> - add an explicit option to preserve the current behavior (no claim
>> and no duplicate)?
>>
>> We had an offline discussion about it and so far we were leaning towards
>> keeping the set of supported options minimal. However, if we really think
>> the old behaviour is useful we can add a --legacy restore mode. cc
>> @Konstantin @Piotr
>>
>> There seems to be a consensus in the discussion, however, I couldn't
>> find stop-with-savepoint in the document.
>>
>> Sorry, I forgot, about this one. I added a note that savepoints generated
>> from stop-with-savepoint should commit side effects.
>>
>> And I still think it would be nice to list object stores which support
>> duplicate operation.
>>
>> I listed a couple of file systems that do have some sort of a COPY API.
>>
>> Best,
>>
>> Dawid
>> On 26/11/2021 11:03, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for updating the FLIP Dawid
>>
>> There seems to be a consensus in the discussion, however, I couldn't
>> find stop-with-savepoint in the document.
>>
>> A few minor things:
>> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>> - add an explicit option to preserve the current behavior (no claim
>> and no duplicate)?
>> And I still think it would be nice to list object stores which support
>> duplicate operation.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf  
>>  wrote:
>>
>> Hi Dawid,
>>
>> sounds good, specifically 2., too.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz  
>> 
>> wrote:
>>
>>
>> Hi all,
>>
>> I updated the FLIP with a few clarifications:
>>
>>1. I added a description how would we trigger a "full snapshot" in the
>>changelog state backend
>>   - (We would go for this option in the 1st version). Trigger a
>>   snapshot of the base state backend in the 1st checkpoint, which induces
>>   materializing the changelog. In this approach we could duplicate SST 
>> files,
>>   but we would not duplicate the diff files.
>>- Add a hook for logic for computing which task should duplicate the
>>   diff files. We would have to do a pass over all states after the state
>>   assignment in StateAssignmentOperation
>>   2. I clarified that the "no-claim" mode requires a
>>completed/successful checkpoint before we can remove the one we are
>>restoring from. Also added a note that we can assume a checkpoint is
>>completed if it is confirmed by Flink's API for checkpointing stats or by
>>checking an entry in HA services. A checkpoint can not be assumed 
>> completed
>>by just looking at the checkpoint files.
>>
>> I suggest going on with the proposal for "no-claim" as suggested so far,
>> as it is easier to understand by users. They can reliably tell when they
>> can expect the checkpoint to be deletable. If we see that the time to take
>> the 1st checkpoint becomes a problem we can extend the set of restore
>> methods and e.g. add a "claim-temporarily" method.
>>
>> I hope we can reach a consensus and start a vote, some time early next
>> week.
>>
>> Best,
>>
>> Dawid
>>
>> On 23/11/2021 22:39, Roman Khachatryan wrote:
>>
>> I also referred to the "no-claim" mode and I still think neither of them 
>> works in that mode, as you'd have to keep lineage of checkpoints externally 
>> to be able delete any checkpoint.
>>
>> I think the lineage is needed in all approaches with arbitrary
>> histories; the difference is whether a running Flink is required or
>> not. Is that what you mean?
>> (If not, could you please explain how the scenario you mentioned above
>> with multiple jobs branching from the same checkpoint is handled?)
>>
>>
>> BTW, the state key for RocksDB is actually: backend UID + key group range + 
>> SST file name, so the key would be different (the key group range is 
>> different for two tasks) and we would've two separate counters for the same 
>> file.
>>
>> You're right.

Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-26 Thread Till Rohrmann
Thanks for creating this FLIP Matthias, Mika and David.

I think the JobResultStore is an important piece for fixing Flink's last
high-availability problem (afaik). Once we have this piece in place, users
no longer risk to re-execute a successfully completed job.

I have one comment concerning breaking interfaces:

If we don't want to break interfaces, then we could keep the
HighAvailabilityServices.getRunningJobsRegistry() method and add a default
implementation for HighAvailabilityServices.getJobResultStore(). We could
then deprecate the former method and then remove it in the subsequent
release (1.16).

Apart from that, +1 for the FLIP.

Cheers,
Till

On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:

> Hi everyone,
>
> Matthias, Mika and I want to start a discussion about introduction of a new
> Flink component, the *JobResultStore*.
>
> The main motivation is to address shortcomings of the *RunningJobsRegistry*
> and surpass it with the new component. These shortcomings have been first
> described in FLINK-11813 [1].
>
> This change should improve the overall stability of the JobManager's
> components and address the race conditions in some of the fail over
> scenarios during the job cleanup lifecycle.
>
> It should also help to ensure that Flink doesn't leave any uncleaned
> resources behind.
>
> We've prepared a FLIP-194 [2], which outlines the design and reasoning
> behind this new component.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11813
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
>
> We're looking forward for your feedback ;)
>
> Best,
> Matthias, Mika and David
>


[jira] [Created] (FLINK-25082) RMQSource is not parallel

2021-11-26 Thread Mike Lynch (Jira)
Mike Lynch created FLINK-25082:
--

 Summary: RMQSource is not parallel
 Key: FLINK-25082
 URL: https://issues.apache.org/jira/browse/FLINK-25082
 Project: Flink
  Issue Type: Bug
  Components: Connectors/ RabbitMQ
Reporter: Mike Lynch


The official 
[documentation|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/rabbitmq/]
 states that{{ RMQSource}} can have at-least-once processing when "when 
checkpointing is enabled, but correlation ids are not used or the source is 
parallel." However, as {{RMQSource}} does not implement 
{{{}ParallelSourceFunction{}}}, {{StreamExecutionEnvironment}} will throw an 
exception whenever parallelism is set to a value higher than 1.

Either the documentation should be corrected or {{RMQSource}} should implement 
{{{}ParallelSourceFunction{}}}.

Implementing it seems like it should be fairly easy and I'd be willing to 
submit a patch, but as I'm new to the Flink code base I'm not sure if there 
would be unexpected side effects. If I can get feedback from more experienced 
contributors, I'll submit a patch in the near future.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25083) Flaky Test in RowTest

2021-11-26 Thread Minyang Ye (Jira)
Minyang Ye created FLINK-25083:
--

 Summary: Flaky Test in RowTest
 Key: FLINK-25083
 URL: https://issues.apache.org/jira/browse/FLINK-25083
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Minyang Ye


In the description of Row class it says that Row has a deterministic field 
order. However the tests "testRowNamed" and "testDeepToString" are flaky.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)