Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread Martijn Visser
This sounds like a no-brainer +1

Two things that seem to be obvious, but might be good to double check:

1. All newly discovered partitions will be consumed from the earliest
offset possible. That's how it's documented for version 1.12 [1], but not
for later versions, which is why I would like to double check. If so, I
think it would also be good to restore that in the documentation.

2. Let's assume the following running situation:

* Dynamic partition discovery is enabled, set to 30 secs
* Job runs
* Job checkpoints to checkpoint X
* New partitions are discovered and consumption from the new partitions is
starting
* Job crashes before new checkpoint has been created

My assumption is that the job restarts from checkpoint X, which includes
the old number of partitions. Either directly when the job restarts, or
when the next partitions are discovered, those new/for the job unknown
partitions would be consumed. We still have exactly-once guarantees, since
there was no new checkpoint made before the job crashed

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

Op vr 13 jan. 2023 om 08:27 schreef Leonard Xu :

> Thanks Qingsheng for driving this, enable the dynamic partition discovery
> would be very useful for kafka topic scale partitions scenarios.
>
> +1 for the change.
>
> CC: Becket
>
>
> Best,
> Leonard
>
>
>
> > On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
> >
> > +1 for the change. I think this is beneficial for users and is
> compatible.
> >
> > Best,
> > Jark
> >
> > On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
> >
> >>>
> >>> +1 for this idea, we have enabled kafka dynamic partition discovery in
> >> all
> >>> jobs.
> >>>
> >>>
> >>
>
>


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread John Roesler
Thanks for this proposal, Qingsheng!

If you want to be a little conservative with the default, 5 minutes might be 
better than 30 seconds.

The equivalent config in Kafka seems to be metadata.max.age.ms 
(https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms), 
which has a default value of 5 minutes.

Other than that, I’m in favor. I agree, this should be on by default.

Thanks again,
John

On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> Thanks Qingsheng for driving this, enable the dynamic partition 
> discovery would be very useful for kafka topic scale partitions 
> scenarios.
>
> +1 for the change.
>
> CC: Becket 
>
>
> Best,
> Leonard 
>
>
>
>> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
>> 
>> +1 for the change. I think this is beneficial for users and is compatible.
>> 
>> Best,
>> Jark
>> 
>> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
>> 
 
 +1 for this idea, we have enabled kafka dynamic partition discovery in
>>> all
 jobs.
 
 
>>>


[jira] [Created] (FLINK-30675) Decompose printing logic from Executor

2023-01-13 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-30675:
-

 Summary: Decompose printing logic from Executor
 Key: FLINK-30675
 URL: https://issues.apache.org/jira/browse/FLINK-30675
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Affects Versions: 1.17.0
Reporter: Shengkai Fang
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread Gabor Somogyi
+1 on the overall direction, it's an important feature.

I've had a look on the latest master and looks like removed partition
handling is not yet added but I think this is essential.

https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305

If a partition all of a sudden disappears then it could lead to data loss.
Are you planning to add it?
If yes then when?

G


On Fri, Jan 13, 2023 at 9:22 AM John Roesler  wrote:

> Thanks for this proposal, Qingsheng!
>
> If you want to be a little conservative with the default, 5 minutes might
> be better than 30 seconds.
>
> The equivalent config in Kafka seems to be metadata.max.age.ms (
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms),
> which has a default value of 5 minutes.
>
> Other than that, I’m in favor. I agree, this should be on by default.
>
> Thanks again,
> John
>
> On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > Thanks Qingsheng for driving this, enable the dynamic partition
> > discovery would be very useful for kafka topic scale partitions
> > scenarios.
> >
> > +1 for the change.
> >
> > CC: Becket
> >
> >
> > Best,
> > Leonard
> >
> >
> >
> >> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
> >>
> >> +1 for the change. I think this is beneficial for users and is
> compatible.
> >>
> >> Best,
> >> Jark
> >>
> >> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
> >>
> 
>  +1 for this idea, we have enabled kafka dynamic partition discovery in
> >>> all
>  jobs.
> 
> 
> >>>
>


Re: [VOTE] FLIP-281: Sink Supports Speculative Execution For Batch Job

2023-01-13 Thread Biao Liu
Hi everyone,

I'm happy to announce that FLIP-281[1] has been accepted.

Thanks for all your feedback and votes. Here is the voting result:

+1 (binding), 3 in total:
- Zhu Zhu
- Lijie Wang
- Jing Zhang

+1 (non-binding), 2 in total:
- yuxia
- Jing Ge

+0 (binding), 1 in total:
- Martijn

There are no disapproving votes.

By the way, the discussion of deprecating SinkFunction can be continued in
discussion thread[2].
I think it's more like an orthogonal issue. We might need more time to come
to an agreement about it.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-281
+Sink+Supports+Speculative+Execution+For+Batch+Job
[2] https://lists.apache.org/thread/l05m6cf8fwkkbpnjtzbg9l2lo40oxzd1

Thanks,
Biao /'bɪ.aʊ/



On Fri, 13 Jan 2023 at 08:46, Jing Ge  wrote:

> + 1(not binding)
>
> Best Regards,
> Jing
>
> On Thu, Jan 12, 2023 at 10:01 AM Jing Zhang  wrote:
>
> > +1 (binding)
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang  于2023年1月12日周四 16:39写道:
> >
> > > +1 (binding)
> > >
> > > Best,
> > > Lijie
> > >
> > > Martijn Visser  于2023年1月12日周四 15:56写道:
> > >
> > > > +0 (binding)
> > > >
> > > > Op di 10 jan. 2023 om 13:11 schreef yuxia <
> luoyu...@alumni.sjtu.edu.cn
> > >:
> > > >
> > > > > +1 (non-binding).
> > > > >
> > > > > Best regards,
> > > > > Yuxia
> > > > >
> > > > > - 原始邮件 -
> > > > > 发件人: "Zhu Zhu" 
> > > > > 收件人: "dev" 
> > > > > 发送时间: 星期二, 2023年 1 月 10日 下午 5:50:39
> > > > > 主题: Re: [VOTE] FLIP-281: Sink Supports Speculative Execution For
> > Batch
> > > > Job
> > > > >
> > > > > +1 (binding)
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Biao Liu  于2023年1月5日周四 10:37写道:
> > > > > >
> > > > > > Hi Martijn,
> > > > > >
> > > > > > Sure, thanks for the reminder about the holiday period.
> > > > > > Looking forward to your feedback!
> > > > > >
> > > > > > Thanks,
> > > > > > Biao /'bɪ.aʊ/
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, 5 Jan 2023 at 03:07, Martijn Visser <
> > > martijnvis...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > To be honest, I haven't read the FLIP yet since this is still a
> > > > holiday
> > > > > > > period in Europe. I would like to read it in the next few days.
> > Can
> > > > you
> > > > > > > keep the vote open a little longer?
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Martijn
> > > > > > >
> > > > > > > On Wed, Jan 4, 2023 at 1:31 PM Biao Liu 
> > > wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Thanks for all the feedback!
> > > > > > > >
> > > > > > > > Based on the discussion[1], we seem to have a consensus. So
> I'd
> > > > like
> > > > > to
> > > > > > > > start a vote on FLIP-281: Sink Supports Speculative Execution
> > For
> > > > > Batch
> > > > > > > > Job[2]. The vote will last for 72 hours, unless there is an
> > > > > objection or
> > > > > > > > insufficient votes.
> > > > > > > >
> > > > > > > > [1]
> > > > https://lists.apache.org/thread/l05m6cf8fwkkbpnjtzbg9l2lo40oxzd1
> > > > > > > > [2]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] Apache Flink Table Store 0.3.0, release candidate #1

2023-01-13 Thread Jark Wu
+1 (binding)

- Build and compile the source code locally: *OK*
- Verified signatures and hashes: *OK*
- Checked no missing artifacts in the staging area: *OK*
- Reviewed the website release PR: *OK*
- Checked the licenses: *OK*
- Went through the quick start: *OK*
  * Verified with both flink 1.14.5 and 1.15.1 using
  * Verified web UI and log output, nothing unexpected

Best,
Jark

On Thu, 12 Jan 2023 at 20:52, Jingsong Li  wrote:

> Thanks Yu, I have created table-store-0.3.1 and move these jiras to 0.3.1.
>
> Best,
> Jingsong
>
> On Thu, Jan 12, 2023 at 7:41 PM Yu Li  wrote:
> >
> > Thanks for the quick action Jingsong! Here is my vote with the new
> staging
> > directory:
> >
> > +1 (binding)
> >
> >
> > - Checked release notes: *Action Required*
> >
> >   * The fix version of FLINK-30620 and FLINK-30628 are 0.3.0 but still
> > open, please confirm whether this should be included or we should move it
> > out of 0.3.0
> >
> > - Checked sums and signatures: *OK*
> >
> > - Checked the jars in the staging repo: *OK*
> >
> > - Checked source distribution doesn't include binaries: *OK*
> >
> > - Maven clean install from source: *OK*
> >
> > - Checked version consistency in pom files: *OK*
> >
> > - Went through the quick start: *OK*
> >
> >   * Verified with flink 1.14.6, 1.15.3 and 1.16.0
> >
> > - Checked the website updates: *OK*
> >
> > Best Regards,
> > Yu
> >
> >
> > On Thu, 12 Jan 2023 at 15:36, Jingsong Li 
> wrote:
> >
> > > Thanks Yu for your validation.
> > >
> > > I created a new staging directory [1]
> > >
> > > [1]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1577/
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Jan 12, 2023 at 3:07 PM Yu Li  wrote:
> > > >
> > > > Hi Jingsong,
> > > >
> > > > It seems the given staging directory [1] is not exposed, could you
> double
> > > > check and republish if necessary? Thanks.
> > > >
> > > > Best Regards,
> > > > Yu
> > > >
> > > > [1]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1576/
> > > >
> > > >
> > > > On Tue, 10 Jan 2023 at 16:53, Jingsong Li 
> > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Please review and vote on the release candidate #1 for the version
> > > > > 0.3.0 of Apache Flink Table Store, as follows:
> > > > >
> > > > > [ ] +1, Approve the release
> > > > > [ ] -1, Do not approve the release (please provide specific
> comments)
> > > > >
> > > > > **Release Overview**
> > > > >
> > > > > As an overview, the release consists of the following:
> > > > > a) Table Store canonical source distribution to be deployed to the
> > > > > release repository at dist.apache.org
> > > > > b) Table Store binary convenience releases to be deployed to the
> > > > > release repository at dist.apache.org
> > > > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > > >
> > > > > **Staging Areas to Review**
> > > > >
> > > > > The staging areas containing the above mentioned artifacts are as
> > > follows,
> > > > > for your review:
> > > > > * All artifacts for a) and b) can be found in the corresponding dev
> > > > > repository at dist.apache.org [2]
> > > > > * All artifacts for c) can be found at the Apache Nexus Repository
> [3]
> > > > >
> > > > > All artifacts are signed with the key
> > > > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [4]
> > > > >
> > > > > Other links for your review:
> > > > > * JIRA release notes [5]
> > > > > * source code tag "release-0.3.0-rc1" [6]
> > > > > * PR to update the website Downloads page to include Table Store
> links
> > > [7]
> > > > >
> > > > > **Vote Duration**
> > > > >
> > > > > The voting time will run for at least 72 hours.
> > > > > It is adopted by majority approval, with at least 3 PMC affirmative
> > > votes.
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> > > > > [2]
> > > > >
> > >
> https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.3.0-rc1/
> > > > > [3]
> > > > >
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1576/
> > > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > > [5]
> > > > >
> > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352111
> > > > > [6]
> https://github.com/apache/flink-table-store/tree/release-0.3.0-rc1
> > > > > [7] https://github.com/apache/flink-web/pull/601
> > > > >
> > >
>


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread Qingsheng Ren
Thanks everyone for joining the discussion!

@Martijn:

> All newly discovered partitions will be consumed from the earliest offset
possible.

Thanks for the reminder! I checked the logic of KafkaSource and found that
new partitions will start from the offset initializer specified by the user
instead of the earliest. We need to correct this behavior to avoid dropping
messages from new partitions.

> Job restarts from checkpoint

I think the current logic guarantees the exactly-once semantic. New
partitions created after the checkpoint will be re-discovered again and
picked up by the source.

@John:

> If you want to be a little conservative with the default, 5 minutes might
be better than 30 seconds.

Thanks for the suggestion! I tried to find the equivalent config in Kafka
but missed it. It would be neat to align with the default value of "
metadata.max.age.ms".

@Gabor:

> removed partition handling is not yet added

There was a detailed discussion about removing partitions [1] but it looks
like this is not an easy task considering the potential data loss and state
inconsistency. I'm afraid there's no clear plan on this one and maybe we
could trigger a new discussion thread about how to correctly handle removed
partitions.

[1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt

Best regards,
Qingsheng


On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi 
wrote:

> +1 on the overall direction, it's an important feature.
>
> I've had a look on the latest master and looks like removed partition
> handling is not yet added but I think this is essential.
>
>
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
>
> If a partition all of a sudden disappears then it could lead to data loss.
> Are you planning to add it?
> If yes then when?
>
> G
>
>
> On Fri, Jan 13, 2023 at 9:22 AM John Roesler  wrote:
>
> > Thanks for this proposal, Qingsheng!
> >
> > If you want to be a little conservative with the default, 5 minutes might
> > be better than 30 seconds.
> >
> > The equivalent config in Kafka seems to be metadata.max.age.ms (
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> ),
> > which has a default value of 5 minutes.
> >
> > Other than that, I’m in favor. I agree, this should be on by default.
> >
> > Thanks again,
> > John
> >
> > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > Thanks Qingsheng for driving this, enable the dynamic partition
> > > discovery would be very useful for kafka topic scale partitions
> > > scenarios.
> > >
> > > +1 for the change.
> > >
> > > CC: Becket
> > >
> > >
> > > Best,
> > > Leonard
> > >
> > >
> > >
> > >> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
> > >>
> > >> +1 for the change. I think this is beneficial for users and is
> > compatible.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
> > >>
> > 
> >  +1 for this idea, we have enabled kafka dynamic partition discovery
> in
> > >>> all
> >  jobs.
> > 
> > 
> > >>>
> >
>


[jira] [Created] (FLINK-30676) Introduce Data Structures for table store

2023-01-13 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-30676:


 Summary: Introduce Data Structures for table store
 Key: FLINK-30676
 URL: https://issues.apache.org/jira/browse/FLINK-30676
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.4.0


Copy data structures to table store from Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30677) SqlGatewayServiceStatementITCase.testFlinkSqlStatements fails

2023-01-13 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30677:
-

 Summary: SqlGatewayServiceStatementITCase.testFlinkSqlStatements 
fails
 Key: FLINK-30677
 URL: https://issues.apache.org/jira/browse/FLINK-30677
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Gateway
Affects Versions: 1.17.0
Reporter: Matthias Pohl


We're observing a test instability with 
{{SqlGatewayServiceStatementITCase.testFlinkSqlStatements}} in the following 
build:

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44775&view=logs&j=a9db68b9-a7e0-54b6-0f98-010e0aff39e2&t=cdd32e0b-6047-565b-c58f-14054472f1be&l=14251]
{code:java}
Jan 13 02:46:10 [ERROR] Tests run: 9, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 27.279 s <<< FAILURE! - in 
org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase
Jan 13 02:46:10 [ERROR] 
org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase.testFlinkSqlStatements(String)[5]
  Time elapsed: 1.573 s  <<< FAILURE!
Jan 13 02:46:10 org.opentest4j.AssertionFailedError: 
Jan 13 02:46:10 
Jan 13 02:46:10 expected: 
Jan 13 02:46:10   "# table.q - CREATE/DROP/SHOW/ALTER/DESCRIBE TABLE
Jan 13 02:46:10   #
Jan 13 02:46:10   # Licensed to the Apache Software Foundation (ASF) under one 
or more
Jan 13 02:46:10   # contributor license agreements.  See the NOTICE file 
distributed with
Jan 13 02:46:10   # this work for additional information regarding copyright 
ownership.
Jan 13 02:46:10   # The ASF licenses this file to you under the Apache License, 
Version 2.0
Jan 13 02:46:10   # (the "License"); you may not use this file except in 
compliance with
Jan 13 02:46:10   # the License.  You may obtain a copy of the License at
Jan 13 02:46:10   #
Jan 13 02:46:10   # http://www.apache.org/licenses/LICENSE-2.0
Jan 13 02:46:10   #
Jan 13 02:46:10   # Unless required by applicable law or agreed to in writing, 
software
Jan 13 02:46:10   # distributed under the License is distributed on an "AS IS" 
BASIS,
Jan 13 02:46:10   # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
Jan 13 02:46:10   # See the License for the specific language governing 
permissions and
Jan 13 02:46:10   # limitations under the License.
[...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30678) NettyConnectionManagerTest.testManualConfiguration appears to be unstable

2023-01-13 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30678:
-

 Summary: NettyConnectionManagerTest.testManualConfiguration 
appears to be unstable
 Key: FLINK-30678
 URL: https://issues.apache.org/jira/browse/FLINK-30678
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Matthias Pohl


We observe a test instability due to a "Address already in use" issue:
{code:java}
Jan 13 02:29:43 [ERROR] 
org.apache.flink.runtime.io.network.netty.NettyConnectionManagerTest.testManualConfiguration
  Time elapsed: 0.132 s  <<< ERROR!
Jan 13 02:29:43 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: 
bind(..) failed: Address already in use
Jan 13 02:29:43{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44777&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=6744



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator

2023-01-13 Thread Dawid Wysakowicz

Hi Becket,

May I ask what is the motivation for the change?

I'm really skeptical about making any of those classes `Public` or 
`PublicEvolving`. As far as I am concerned there is no agreement in the 
community if StreamOperator is part of the `Public(Evolving)` API. At 
least I think it should not. I understand `AbstractStreamOperator` was 
marked with `PublicEvolving`, but I am really not convinced it was the 
right decision.


The listed classes are not the only classes exposed to 
`AbstractStreamOperator` that are `Internal` that break the consistency 
and I am sure there is no question those should remain `Internal` such 
as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many more.


As it stands I am strongly against giving any additional guarantees on 
API stability to the classes there unless there is a good motivation for 
a given feature and we're sure this is the best way to go forward.


Thus I'm inclined to go with -1 on any proposal on changing annotations 
for the sake of matching the one on `AbstractStreamOperator`. I might be 
convinced to support requests to give better guarantees for well 
motivated features that are now internal though.


Best,

Dawid

On 12/01/2023 10:20, Becket Qin wrote:

Hi flink devs,

I'd like to start a discussion thread for FLIP-286[1].

As a recap, currently while AbstractStreamOperator is a class marked as
@PublicEvolving, some classes exposed via its methods / fields are
marked as @Internal. This FLIP wants to fix this inconsistency of
stability / scope annotation.

Comments are welcome!

Thanks,

Jiangjie (Becket) Qin

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880841



OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

2023-01-13 Thread Shammon FY
Hi Piotr,

I discussed with @jinsong lee about `Timestamp Barrier` and `Aligned
Checkpoint` for data consistency in FLIP, we think there are many defects
indeed in using `Aligned Checkpoint` to support data consistency as you
mentioned.

According to our historical discussion, I think we have reached an
agreement on an important point: we finally need `Timestamp Barrier
Mechanism` to support data consistency. But according to our (@jinsong lee
and I) opinions, the total design and implementation based on 'Timestamp
Barrier' will be too complex, and it's also too big in one FLIP.

So we‘d like to use FLIP-276[1] as an overview design of data consistency
in Flink Streaming and Batch ETL based on `Timestamp Barrier`. @jinsong and
I hope that we can reach an agreement on the overall design in FLINK-276
first, and then on the basic of FLIP-276 we can create other FLIPs with
detailed design according to modules and drive them. Finally, we can
support data consistency based on Timestamp in Flink.

I have updated FLIP-276, deleted the Checkpoint section, and added the
overall design of  `Timestamp Barrier`. Here I briefly describe the modules
of `Timestamp Barrier` as follows
1. Generation: JobManager must coordinate all source subtasks and generate
a unified timestamp barrier from System Time or Event Time for them
2. Checkpoint: Store  when the timestamp
barrier is generated, so that the job can recover the same timestamp
barrier for the uncompleted checkpoint.
3. Replay data: Store  for source when it
broadcasts timestamp barrier, so that the source can replay the same data
according to the same timestamp barrier.
4. Align data: Align data for stateful operator(aggregation, join and etc.)
and temporal operator(window)
5. Computation: Operator computation for a specific timestamp barrier based
on the results of a previous timestamp barrier.
6. Output: Operator outputs or commits results when it collects all the
timestamp barriers, including operators with data buffer or async
operations.

I also list the main work in Flink and Table Store in FLIP-276. Please help
to review the FLIP when you're free and feel free to give any comments.

Looking forward for your feedback, THX

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store

Best,
Shammon


On Tue, Dec 20, 2022 at 10:01 AM Shammon FY  wrote:

> Hi Piotr,
>
> Thanks for your syncing. I will update the FLIP later and keep this
> discussion open. Looking forward to your feedback, thanks
>
>
> Best,
> Shammon
>
>
> On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski 
> wrote:
>
>> Hi Shammon,
>>
>> I've tried to sync with Timo, David Moravek and Dawid Wysakowicz about
>> this
>> subject. We have only briefly chatted and exchanged some thoughts/ideas,
>> but unfortunately we were not able to finish the discussions before the
>> holiday season/vacations. Can we get back to this topic in January?
>>
>> Best,
>> Piotrek
>>
>> pt., 16 gru 2022 o 10:53 Shammon FY  napisał(a):
>>
>> > Hi Piotr,
>> >
>> > I found there may be several points in our discussion, it will cause
>> > misunderstanding between us when we focus on different one. I list each
>> > point in our discussion as follows
>> >
>> > > Point 1: Is "Aligned Checkpoint" the only mechanism to guarantee data
>> > consistency in the current Flink implementation, and "Watermark" and
>> > "Aligned Checkpoint cannot do that?
>> > My answer is "Yes", the "Aligned Checkpoint" is the only one due to its
>> > "Align Data" ability, we can do it in the first stage.
>> >
>> > > Point2: Can the combination of "Checkpoint Barrier" and "Watermark"
>> > support the complete consistency semantics based on "Timestamp" in the
>> > current Flink implementation?
>> > My answer is "No", we need a new "Timestamp Barrier" mechanism to do
>> that
>> > which may be upgraded from current "Watermark" or a new mechanism, we
>> can
>> > do it in the next second or third stage.
>> >
>> > > Point3: Are the "Checkpoint" and the new "Timestamp Barrier"
>> completely
>> > independent? The "Checkpoint" whatever "Aligned" or "Unaligned" or "Task
>> > Local" supports the "Exactly-Once" between ETLs, and the "Timestamp
>> > Barrier" mechanism guarantees data consistency between tables according
>> to
>> > timestamp for queries.
>> > My answer is "Yes", I totally agree with you. Let "Checkpoint" be
>> > responsible for fault tolerance and "Timestamp Barrier" for consistency
>> > independently.
>> >
>> > @Piotr, What do you think? If I am missing or misunderstanding anything,
>> > please correct me, thanks
>> >
>> > Best,
>> > Shammon
>> >
>> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski 
>> > wrote:
>> >
>> > > Hi Shammon,
>> > >
>> > > > I don't think we can combine watermarks and checkpoint barriers
>> > together
>> > > to
>> > > > guarantee data consistency. There will be a "Timestamp Barrier" in
>> our
>> > > > system to "commit data", "single etl failover", "low lat

[jira] [Created] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced

2023-01-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-30679:
-

 Summary: Can not load the data of hive dim table when 
project-push-down is introduced
 Key: FLINK-30679
 URL: https://issues.apache.org/jira/browse/FLINK-30679
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan


vectorize read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
 

 

mapreduce read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
~[?:1.8.0_301]
    at 
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 ~[?:1.8.0_301]
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
 ~[?:1.8.0_301]
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
~[?:1.8.0_301]
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
 

 

The sql :

 
{code:java}
CREATE TABLE kafkaTableSource (
name string,
age int,
sex string,
address string,
ptime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'csv'
);

CREATE TABLE printsink (
name string,
age int,
sex string,
address string,
score bigint,
dt string
) WITH (
'connector' = 'print'
);

CREATE CATALOG myhive
WITH (
'type' = 'hive',
'default-database' = 'hhy',
'hive-version' = '2.0.0',
'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
);

USE CATALOG myhive;
USE hhy;

set table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
name STRING,
age INT,
score BIGINT
) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
'streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '5 min'
);
set table.sql-dialect=default;

USE CATALOG default_catalog;
INSERT INTO default_catalog.default_database.printsink
SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
FROM default_catalog.default_database.kafkaTableSource  as s
JOIN myhive.hhy.tmp_flink

[DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread Joao Boto
Hi flink devs,

I'd like to start a discussion thread for FLIP-287[1].
This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
specially for the sink[3].

Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
This  changes are necessary to correct migrate the current sinks to SinkV2
like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext

Comments are welcome!
Thanks,

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
[3] https://issues.apache.org/jira/browse/FLINK-25421


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread Jing Ge
+1 for the proposal that makes users' daily work easier and therefore makes
Flink more attractive.

Best regards,
Jing


On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren  wrote:

> Thanks everyone for joining the discussion!
>
> @Martijn:
>
> > All newly discovered partitions will be consumed from the earliest offset
> possible.
>
> Thanks for the reminder! I checked the logic of KafkaSource and found that
> new partitions will start from the offset initializer specified by the user
> instead of the earliest. We need to correct this behavior to avoid dropping
> messages from new partitions.
>
> > Job restarts from checkpoint
>
> I think the current logic guarantees the exactly-once semantic. New
> partitions created after the checkpoint will be re-discovered again and
> picked up by the source.
>
> @John:
>
> > If you want to be a little conservative with the default, 5 minutes might
> be better than 30 seconds.
>
> Thanks for the suggestion! I tried to find the equivalent config in Kafka
> but missed it. It would be neat to align with the default value of "
> metadata.max.age.ms".
>
> @Gabor:
>
> > removed partition handling is not yet added
>
> There was a detailed discussion about removing partitions [1] but it looks
> like this is not an easy task considering the potential data loss and state
> inconsistency. I'm afraid there's no clear plan on this one and maybe we
> could trigger a new discussion thread about how to correctly handle removed
> partitions.
>
> [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
>
> Best regards,
> Qingsheng
>
>
> On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi 
> wrote:
>
> > +1 on the overall direction, it's an important feature.
> >
> > I've had a look on the latest master and looks like removed partition
> > handling is not yet added but I think this is essential.
> >
> >
> >
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> >
> > If a partition all of a sudden disappears then it could lead to data
> loss.
> > Are you planning to add it?
> > If yes then when?
> >
> > G
> >
> >
> > On Fri, Jan 13, 2023 at 9:22 AM John Roesler 
> wrote:
> >
> > > Thanks for this proposal, Qingsheng!
> > >
> > > If you want to be a little conservative with the default, 5 minutes
> might
> > > be better than 30 seconds.
> > >
> > > The equivalent config in Kafka seems to be metadata.max.age.ms (
> > >
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> > ),
> > > which has a default value of 5 minutes.
> > >
> > > Other than that, I’m in favor. I agree, this should be on by default.
> > >
> > > Thanks again,
> > > John
> > >
> > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > > Thanks Qingsheng for driving this, enable the dynamic partition
> > > > discovery would be very useful for kafka topic scale partitions
> > > > scenarios.
> > > >
> > > > +1 for the change.
> > > >
> > > > CC: Becket
> > > >
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > >
> > > >
> > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
> > > >>
> > > >> +1 for the change. I think this is beneficial for users and is
> > > compatible.
> > > >>
> > > >> Best,
> > > >> Jark
> > > >>
> > > >> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
> > > >>
> > > 
> > >  +1 for this idea, we have enabled kafka dynamic partition
> discovery
> > in
> > > >>> all
> > >  jobs.
> > > 
> > > 
> > > >>>
> > >
> >
>


[jira] [Created] (FLINK-30680) Consider using the autoscaler to detect slow taskmanagers

2023-01-13 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30680:
--

 Summary: Consider using the autoscaler to detect slow taskmanagers
 Key: FLINK-30680
 URL: https://issues.apache.org/jira/browse/FLINK-30680
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Gyula Fora


We could leverage logic in the autoscaler to detect slow taskmanagers by 
comparing the per-record processing times between them.

If we notice that all subtasks on a single TM are considerably slower than the 
rest (at similar input rates) we should try simply restarting the job instead 
of scaling it up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Apache Flink Table Store 0.3.0, release candidate #1

2023-01-13 Thread Jingsong Li
+1 (binding)

Best,
Jingsong

On Fri, Jan 13, 2023 at 5:16 PM Jark Wu  wrote:
>
> +1 (binding)
>
> - Build and compile the source code locally: *OK*
> - Verified signatures and hashes: *OK*
> - Checked no missing artifacts in the staging area: *OK*
> - Reviewed the website release PR: *OK*
> - Checked the licenses: *OK*
> - Went through the quick start: *OK*
>   * Verified with both flink 1.14.5 and 1.15.1 using
>   * Verified web UI and log output, nothing unexpected
>
> Best,
> Jark
>
> On Thu, 12 Jan 2023 at 20:52, Jingsong Li  wrote:
>
> > Thanks Yu, I have created table-store-0.3.1 and move these jiras to 0.3.1.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jan 12, 2023 at 7:41 PM Yu Li  wrote:
> > >
> > > Thanks for the quick action Jingsong! Here is my vote with the new
> > staging
> > > directory:
> > >
> > > +1 (binding)
> > >
> > >
> > > - Checked release notes: *Action Required*
> > >
> > >   * The fix version of FLINK-30620 and FLINK-30628 are 0.3.0 but still
> > > open, please confirm whether this should be included or we should move it
> > > out of 0.3.0
> > >
> > > - Checked sums and signatures: *OK*
> > >
> > > - Checked the jars in the staging repo: *OK*
> > >
> > > - Checked source distribution doesn't include binaries: *OK*
> > >
> > > - Maven clean install from source: *OK*
> > >
> > > - Checked version consistency in pom files: *OK*
> > >
> > > - Went through the quick start: *OK*
> > >
> > >   * Verified with flink 1.14.6, 1.15.3 and 1.16.0
> > >
> > > - Checked the website updates: *OK*
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Thu, 12 Jan 2023 at 15:36, Jingsong Li 
> > wrote:
> > >
> > > > Thanks Yu for your validation.
> > > >
> > > > I created a new staging directory [1]
> > > >
> > > > [1]
> > > >
> > https://repository.apache.org/content/repositories/orgapacheflink-1577/
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Thu, Jan 12, 2023 at 3:07 PM Yu Li  wrote:
> > > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > It seems the given staging directory [1] is not exposed, could you
> > double
> > > > > check and republish if necessary? Thanks.
> > > > >
> > > > > Best Regards,
> > > > > Yu
> > > > >
> > > > > [1]
> > > >
> > https://repository.apache.org/content/repositories/orgapacheflink-1576/
> > > > >
> > > > >
> > > > > On Tue, 10 Jan 2023 at 16:53, Jingsong Li 
> > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > Please review and vote on the release candidate #1 for the version
> > > > > > 0.3.0 of Apache Flink Table Store, as follows:
> > > > > >
> > > > > > [ ] +1, Approve the release
> > > > > > [ ] -1, Do not approve the release (please provide specific
> > comments)
> > > > > >
> > > > > > **Release Overview**
> > > > > >
> > > > > > As an overview, the release consists of the following:
> > > > > > a) Table Store canonical source distribution to be deployed to the
> > > > > > release repository at dist.apache.org
> > > > > > b) Table Store binary convenience releases to be deployed to the
> > > > > > release repository at dist.apache.org
> > > > > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > > > >
> > > > > > **Staging Areas to Review**
> > > > > >
> > > > > > The staging areas containing the above mentioned artifacts are as
> > > > follows,
> > > > > > for your review:
> > > > > > * All artifacts for a) and b) can be found in the corresponding dev
> > > > > > repository at dist.apache.org [2]
> > > > > > * All artifacts for c) can be found at the Apache Nexus Repository
> > [3]
> > > > > >
> > > > > > All artifacts are signed with the key
> > > > > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [4]
> > > > > >
> > > > > > Other links for your review:
> > > > > > * JIRA release notes [5]
> > > > > > * source code tag "release-0.3.0-rc1" [6]
> > > > > > * PR to update the website Downloads page to include Table Store
> > links
> > > > [7]
> > > > > >
> > > > > > **Vote Duration**
> > > > > >
> > > > > > The voting time will run for at least 72 hours.
> > > > > > It is adopted by majority approval, with at least 3 PMC affirmative
> > > > votes.
> > > > > >
> > > > > > Best,
> > > > > > Jingsong Lee
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> > > > > > [2]
> > > > > >
> > > >
> > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.3.0-rc1/
> > > > > > [3]
> > > > > >
> > > >
> > https://repository.apache.org/content/repositories/orgapacheflink-1576/
> > > > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > > > [5]
> > > > > >
> > > >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352111
> > > > > > [6]
> > https://github.com/apache/flink-table-store/tree/release-0.3.0-rc1
> > > > > > [7] https://github.com/apache/flink-web/pull/601
> > > > > >
> > > >
> >


[RESULT][VOTE] Apache Flink Table Store 0.3.0, release candidate #1

2023-01-13 Thread Jingsong Li
I'm happy to announce that we have unanimously approved this release.

There are 3 approving votes, 3 of which are binding:
* Yu Li (binding)
* Jark Wu (binding)
* Jingsong Lee (binding)

There are no disapproving votes.

Thank you for verifying the release candidate. I will now proceed to
finalize the release and announce it once everything is published.

Best,
Jingsong


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread Benchao Li
+1, we've enabled this by default (10mins) in our production for years.

Jing Ge  于2023年1月13日周五 22:22写道:

> +1 for the proposal that makes users' daily work easier and therefore makes
> Flink more attractive.
>
> Best regards,
> Jing
>
>
> On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren  wrote:
>
> > Thanks everyone for joining the discussion!
> >
> > @Martijn:
> >
> > > All newly discovered partitions will be consumed from the earliest
> offset
> > possible.
> >
> > Thanks for the reminder! I checked the logic of KafkaSource and found
> that
> > new partitions will start from the offset initializer specified by the
> user
> > instead of the earliest. We need to correct this behavior to avoid
> dropping
> > messages from new partitions.
> >
> > > Job restarts from checkpoint
> >
> > I think the current logic guarantees the exactly-once semantic. New
> > partitions created after the checkpoint will be re-discovered again and
> > picked up by the source.
> >
> > @John:
> >
> > > If you want to be a little conservative with the default, 5 minutes
> might
> > be better than 30 seconds.
> >
> > Thanks for the suggestion! I tried to find the equivalent config in Kafka
> > but missed it. It would be neat to align with the default value of "
> > metadata.max.age.ms".
> >
> > @Gabor:
> >
> > > removed partition handling is not yet added
> >
> > There was a detailed discussion about removing partitions [1] but it
> looks
> > like this is not an easy task considering the potential data loss and
> state
> > inconsistency. I'm afraid there's no clear plan on this one and maybe we
> > could trigger a new discussion thread about how to correctly handle
> removed
> > partitions.
> >
> > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
> >
> > Best regards,
> > Qingsheng
> >
> >
> > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi  >
> > wrote:
> >
> > > +1 on the overall direction, it's an important feature.
> > >
> > > I've had a look on the latest master and looks like removed partition
> > > handling is not yet added but I think this is essential.
> > >
> > >
> > >
> >
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> > >
> > > If a partition all of a sudden disappears then it could lead to data
> > loss.
> > > Are you planning to add it?
> > > If yes then when?
> > >
> > > G
> > >
> > >
> > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler 
> > wrote:
> > >
> > > > Thanks for this proposal, Qingsheng!
> > > >
> > > > If you want to be a little conservative with the default, 5 minutes
> > might
> > > > be better than 30 seconds.
> > > >
> > > > The equivalent config in Kafka seems to be metadata.max.age.ms (
> > > >
> > >
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> > > ),
> > > > which has a default value of 5 minutes.
> > > >
> > > > Other than that, I’m in favor. I agree, this should be on by default.
> > > >
> > > > Thanks again,
> > > > John
> > > >
> > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > > > Thanks Qingsheng for driving this, enable the dynamic partition
> > > > > discovery would be very useful for kafka topic scale partitions
> > > > > scenarios.
> > > > >
> > > > > +1 for the change.
> > > > >
> > > > > CC: Becket
> > > > >
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > >
> > > > >
> > > > >
> > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
> > > > >>
> > > > >> +1 for the change. I think this is beneficial for users and is
> > > > compatible.
> > > > >>
> > > > >> Best,
> > > > >> Jark
> > > > >>
> > > > >> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
> > > > >>
> > > > 
> > > >  +1 for this idea, we have enabled kafka dynamic partition
> > discovery
> > > in
> > > > >>> all
> > > >  jobs.
> > > > 
> > > > 
> > > > >>>
> > > >
> > >
> >
>


-- 

Best,
Benchao Li


[ANNOUNCE] Apache Flink Table Store 0.3.0 released

2023-01-13 Thread Jingsong Li
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.3.0.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2023/01/13/release-table-store-0.3.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://central.sonatype.dev/search?q=flink-table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352111

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Best,
Jingsong Lee


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread Gunnar Morling
Hey Joao,

Thanks for this FLIP! One question on the proposed interface changes:
is it expected that the configuration is *mutated* via the InitContext
passed to Sink::createWriter()? If that's not the case, how about
establishing a read-only contract representing the current
configuration and passing in that one instead? That would probably
deserve its own FLIP upon which yours here then would depend. Later
on, other contracts which effectively shouldn't modify a config could
use that one, too.

Note I don't mean to stall your efforts here, but I thought it'd be a
good idea to bring it up and gauge the general interest in this.

Best,

--Gunnar

Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
>
> Hi flink devs,
>
> I'd like to start a discussion thread for FLIP-287[1].
> This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> specially for the sink[3].
>
> Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> This  changes are necessary to correct migrate the current sinks to SinkV2
> like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
>
> Comments are welcome!
> Thanks,
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> [3] https://issues.apache.org/jira/browse/FLINK-25421


Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator

2023-01-13 Thread Becket Qin
Hi Dawid,

Thanks for the reply. I am currently looking at the Beam Flink runner, and
there are quite some hacks the Beam runner has to do in order to deal with
the backwards incompatible changes in AbstractStreamOperator and some of
the classes exposed by it. Regardless of what we think, the fact is that
AbstractStreamOperator is marked as PublicEvolving today, and our users use
it. It is a basic rule of public API that anything exposed by a public
interface should also be public. This is the direct motivation of this FLIP.

Regarding the StreamTask / StreamConfig exposure, if you look at the
StreamOperatorFactory which is also a PublicEvolving class, it actually
exposes the StreamTask, StreamConfig as well as some other classes in the
StreamOperatorParameters. So these classes are already exposed in multiple
public APIs.

Keeping our public API stability guarantee is really fundamental and
critical to the users. With the current status of inconsistent API
stability annotations, I don't see how can we assure of that. From what I
can see, accidental backwards incompatible changes is likely going to keep
happening. So I'd say let's see how to fix forward instead of doing nothing.

BTW, Initially I thought this is just an accidental mismatch, but after
further exam, it looks that it is a bigger issue. I guess one of the
reasons we end up in this situation is that we haven't really thought it
through regarding the boundary between framework and user space, i.e. what
framework primitives we want to expose to the users. So instead we just
expose a bunch of internal things and hope users only use the "stable" part
of them.

Thanks,

Jiangjie (Becket) Qin


On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz 
wrote:

> Hi Becket,
>
> May I ask what is the motivation for the change?
>
> I'm really skeptical about making any of those classes `Public` or
> `PublicEvolving`. As far as I am concerned there is no agreement in the
> community if StreamOperator is part of the `Public(Evolving)` API. At
> least I think it should not. I understand `AbstractStreamOperator` was
> marked with `PublicEvolving`, but I am really not convinced it was the
> right decision.
>
> The listed classes are not the only classes exposed to
> `AbstractStreamOperator` that are `Internal` that break the consistency
> and I am sure there is no question those should remain `Internal` such
> as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many more.
>
> As it stands I am strongly against giving any additional guarantees on
> API stability to the classes there unless there is a good motivation for
> a given feature and we're sure this is the best way to go forward.
>
> Thus I'm inclined to go with -1 on any proposal on changing annotations
> for the sake of matching the one on `AbstractStreamOperator`. I might be
> convinced to support requests to give better guarantees for well
> motivated features that are now internal though.
>
> Best,
>
> Dawid
>
> On 12/01/2023 10:20, Becket Qin wrote:
> > Hi flink devs,
> >
> > I'd like to start a discussion thread for FLIP-286[1].
> >
> > As a recap, currently while AbstractStreamOperator is a class marked as
> > @PublicEvolving, some classes exposed via its methods / fields are
> > marked as @Internal. This FLIP wants to fix this inconsistency of
> > stability / scope annotation.
> >
> > Comments are welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880841
> >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread João Boto
Hi Gunnar,
Thanks for your time and response...

I think the problem you want to solve is the exposure of the ExecutionConfig 
(that can be mutated) no? 
The configuration is not mutated, we only need to know if objectReuse is 
enable. 
This is already expose on RuntimeContext we think to keep it similar to it to 
simplify any migrations, but as said, for this migration from ExecutionConfig 
we only need the isObjectReuseEnabled, and we could expose only this 
configuration..

Best regards,


On 2023/01/13 15:50:09 Gunnar Morling wrote:
> Hey Joao,
> 
> Thanks for this FLIP! One question on the proposed interface changes:
> is it expected that the configuration is *mutated* via the InitContext
> passed to Sink::createWriter()? If that's not the case, how about
> establishing a read-only contract representing the current
> configuration and passing in that one instead? That would probably
> deserve its own FLIP upon which yours here then would depend. Later
> on, other contracts which effectively shouldn't modify a config could
> use that one, too.
> 
> Note I don't mean to stall your efforts here, but I thought it'd be a
> good idea to bring it up and gauge the general interest in this.
> 
> Best,
> 
> --Gunnar
> 
> Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
> >
> > Hi flink devs,
> >
> > I'd like to start a discussion thread for FLIP-287[1].
> > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> > specially for the sink[3].
> >
> > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> > This  changes are necessary to correct migrate the current sinks to SinkV2
> > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> >
> > Comments are welcome!
> > Thanks,
> >
> > [1]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > [2]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > [3] https://issues.apache.org/jira/browse/FLINK-25421
> 


[jira] [Created] (FLINK-30681) Pulsar-Flink connector corrupts its output topic

2023-01-13 Thread Jacek Wislicki (Jira)
Jacek Wislicki created FLINK-30681:
--

 Summary: Pulsar-Flink connector corrupts its output topic
 Key: FLINK-30681
 URL: https://issues.apache.org/jira/browse/FLINK-30681
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.3
Reporter: Jacek Wislicki


When PulsarSink writes a message to its output topic, the topic gets 
permanently corrupted and cannot be used anymore (even with newly created 
subscriptions).

We have isolated this behaviour to a minimal project demonstrating the problem 
available on [GitHub|https://github.com/JacekWislicki/vp-test5]:
# There are 2 topics: IN and OUT
# IN is subscribed by a Flink's InToOutJob (with PulsarSource) and writes to 
OUT (with PulsarSink)
# OUT is subscribed by a Pulsar's OutReadFunction
# When we write directly to OUT (e.g., with OutTopicProducer), OutReadFunction 
gets each message from its backlog and processes it with no issue (the ledger 
position updates)
# When we write to IN (e.g., with InTopicProducer), InToOutJob reads the 
message, processes it and writes to OUT
# OutReadFunction reads the message, the ledger position updates, but nothing 
happens
## Further messages written to OUT are not read as OUT is blocked on the last 
message from Flink
## Truncating OUT does not help, neither does unsubscribing or creating a new 
subscription

Reproduced with Pulsar 2.9.1, 2.9.2 and 2.10.2.

The issue does not occur when we use our custom temporary old SinkFunction 
implementation based on a Pulsar producer writing to OUT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator

2023-01-13 Thread Konstantin Knauf
Hi Becket,

> It is a basic rule of public API that anything exposed by a public
interface should also be public.

I agree with this in general. Did you get an overview of where we currently
violate this? Is this something that the Arc42 architecture tests could
test for so that as a first measure we don't introduce more occurrences
(cc @Ingo).

Maybe its justified to make a pass over all of these occurrences and
resolve these occurrences one way or another either making the
members/parameters @PublicEvoling or actually making a class/method
@Internal even if its was @PublicEvoling before. I think, this could be the
better option than having @PublicEvolving classes/methods that really
aren't.

Cheers,

Konstantin

Am Fr., 13. Jan. 2023 um 17:02 Uhr schrieb Becket Qin :

> Hi Dawid,
>
> Thanks for the reply. I am currently looking at the Beam Flink runner, and
> there are quite some hacks the Beam runner has to do in order to deal with
> the backwards incompatible changes in AbstractStreamOperator and some of
> the classes exposed by it. Regardless of what we think, the fact is that
> AbstractStreamOperator is marked as PublicEvolving today, and our users use
> it. It is a basic rule of public API that anything exposed by a public
> interface should also be public. This is the direct motivation of this
> FLIP.
>
> Regarding the StreamTask / StreamConfig exposure, if you look at the
> StreamOperatorFactory which is also a PublicEvolving class, it actually
> exposes the StreamTask, StreamConfig as well as some other classes in the
> StreamOperatorParameters. So these classes are already exposed in multiple
> public APIs.
>
> Keeping our public API stability guarantee is really fundamental and
> critical to the users. With the current status of inconsistent API
> stability annotations, I don't see how can we assure of that. From what I
> can see, accidental backwards incompatible changes is likely going to keep
> happening. So I'd say let's see how to fix forward instead of doing
> nothing.
>
> BTW, Initially I thought this is just an accidental mismatch, but after
> further exam, it looks that it is a bigger issue. I guess one of the
> reasons we end up in this situation is that we haven't really thought it
> through regarding the boundary between framework and user space, i.e. what
> framework primitives we want to expose to the users. So instead we just
> expose a bunch of internal things and hope users only use the "stable" part
> of them.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz 
> wrote:
>
> > Hi Becket,
> >
> > May I ask what is the motivation for the change?
> >
> > I'm really skeptical about making any of those classes `Public` or
> > `PublicEvolving`. As far as I am concerned there is no agreement in the
> > community if StreamOperator is part of the `Public(Evolving)` API. At
> > least I think it should not. I understand `AbstractStreamOperator` was
> > marked with `PublicEvolving`, but I am really not convinced it was the
> > right decision.
> >
> > The listed classes are not the only classes exposed to
> > `AbstractStreamOperator` that are `Internal` that break the consistency
> > and I am sure there is no question those should remain `Internal` such
> > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many more.
> >
> > As it stands I am strongly against giving any additional guarantees on
> > API stability to the classes there unless there is a good motivation for
> > a given feature and we're sure this is the best way to go forward.
> >
> > Thus I'm inclined to go with -1 on any proposal on changing annotations
> > for the sake of matching the one on `AbstractStreamOperator`. I might be
> > convinced to support requests to give better guarantees for well
> > motivated features that are now internal though.
> >
> > Best,
> >
> > Dawid
> >
> > On 12/01/2023 10:20, Becket Qin wrote:
> > > Hi flink devs,
> > >
> > > I'd like to start a discussion thread for FLIP-286[1].
> > >
> > > As a recap, currently while AbstractStreamOperator is a class marked as
> > > @PublicEvolving, some classes exposed via its methods / fields are
> > > marked as @Internal. This FLIP wants to fix this inconsistency of
> > > stability / scope annotation.
> > >
> > > Comments are welcome!
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880841
> > >
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator

2023-01-13 Thread Becket Qin
I don't have an overview of all the holes in our public API surface at the
moment. It would be great if there's some tool to do a scan.

In addition to fixing the annotation consistency formally, I think it is
equally, if not more, important to see whether the public APIs we expose
tell a good story. For example, if we say StreamConfig should be internal,
some fair questions to ask is why our own AbstractStreamOperator needs it?
Why does not a user-defined operator need it? Is there something in the
StreamConfig we should expose as a public interface if not the entire class?

Thanks,

Jiangjie (Becket) Qin

On Sat, Jan 14, 2023 at 5:36 AM Konstantin Knauf  wrote:

> Hi Becket,
>
> > It is a basic rule of public API that anything exposed by a public
> interface should also be public.
>
> I agree with this in general. Did you get an overview of where we currently
> violate this? Is this something that the Arc42 architecture tests could
> test for so that as a first measure we don't introduce more occurrences
> (cc @Ingo).
>
> Maybe its justified to make a pass over all of these occurrences and
> resolve these occurrences one way or another either making the
> members/parameters @PublicEvoling or actually making a class/method
> @Internal even if its was @PublicEvoling before. I think, this could be the
> better option than having @PublicEvolving classes/methods that really
> aren't.
>
> Cheers,
>
> Konstantin
>
> Am Fr., 13. Jan. 2023 um 17:02 Uhr schrieb Becket Qin <
> becket@gmail.com
> >:
>
> > Hi Dawid,
> >
> > Thanks for the reply. I am currently looking at the Beam Flink runner,
> and
> > there are quite some hacks the Beam runner has to do in order to deal
> with
> > the backwards incompatible changes in AbstractStreamOperator and some of
> > the classes exposed by it. Regardless of what we think, the fact is that
> > AbstractStreamOperator is marked as PublicEvolving today, and our users
> use
> > it. It is a basic rule of public API that anything exposed by a public
> > interface should also be public. This is the direct motivation of this
> > FLIP.
> >
> > Regarding the StreamTask / StreamConfig exposure, if you look at the
> > StreamOperatorFactory which is also a PublicEvolving class, it actually
> > exposes the StreamTask, StreamConfig as well as some other classes in the
> > StreamOperatorParameters. So these classes are already exposed in
> multiple
> > public APIs.
> >
> > Keeping our public API stability guarantee is really fundamental and
> > critical to the users. With the current status of inconsistent API
> > stability annotations, I don't see how can we assure of that. From what I
> > can see, accidental backwards incompatible changes is likely going to
> keep
> > happening. So I'd say let's see how to fix forward instead of doing
> > nothing.
> >
> > BTW, Initially I thought this is just an accidental mismatch, but after
> > further exam, it looks that it is a bigger issue. I guess one of the
> > reasons we end up in this situation is that we haven't really thought it
> > through regarding the boundary between framework and user space, i.e.
> what
> > framework primitives we want to expose to the users. So instead we just
> > expose a bunch of internal things and hope users only use the "stable"
> part
> > of them.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz  >
> > wrote:
> >
> > > Hi Becket,
> > >
> > > May I ask what is the motivation for the change?
> > >
> > > I'm really skeptical about making any of those classes `Public` or
> > > `PublicEvolving`. As far as I am concerned there is no agreement in the
> > > community if StreamOperator is part of the `Public(Evolving)` API. At
> > > least I think it should not. I understand `AbstractStreamOperator` was
> > > marked with `PublicEvolving`, but I am really not convinced it was the
> > > right decision.
> > >
> > > The listed classes are not the only classes exposed to
> > > `AbstractStreamOperator` that are `Internal` that break the consistency
> > > and I am sure there is no question those should remain `Internal` such
> > > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many
> more.
> > >
> > > As it stands I am strongly against giving any additional guarantees on
> > > API stability to the classes there unless there is a good motivation
> for
> > > a given feature and we're sure this is the best way to go forward.
> > >
> > > Thus I'm inclined to go with -1 on any proposal on changing annotations
> > > for the sake of matching the one on `AbstractStreamOperator`. I might
> be
> > > convinced to support requests to give better guarantees for well
> > > motivated features that are now internal though.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 12/01/2023 10:20, Becket Qin wrote:
> > > > Hi flink devs,
> > > >
> > > > I'd like to start a discussion thread for FLIP-286[1].
> > > >
> > > > As a recap, currently while AbstractStreamOperator is a cla

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread Jing Ge
Hi Joao,

Thanks for bringing this up. Exposing internal domain instances depends on
your requirements. Technically, it is even possible to expose the
RuntimeContext [1] (must be considered very carefully). Since you mentioned
that you only need to know if objectReuse is enabled, how about just expose
isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to
shrink the scope as small as possible to satisfy the requirement. If more
information from ExecutionConfig is needed later, we still can refactor the
code properly according to the strong motivation.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279

On Fri, Jan 13, 2023 at 6:19 PM João Boto  wrote:

> Hi Gunnar,
> Thanks for your time and response...
>
> I think the problem you want to solve is the exposure of the
> ExecutionConfig (that can be mutated) no?
> The configuration is not mutated, we only need to know if objectReuse is
> enable.
> This is already expose on RuntimeContext we think to keep it similar to it
> to simplify any migrations, but as said, for this migration from
> ExecutionConfig we only need the isObjectReuseEnabled, and we could expose
> only this configuration..
>
> Best regards,
>
>
> On 2023/01/13 15:50:09 Gunnar Morling wrote:
> > Hey Joao,
> >
> > Thanks for this FLIP! One question on the proposed interface changes:
> > is it expected that the configuration is *mutated* via the InitContext
> > passed to Sink::createWriter()? If that's not the case, how about
> > establishing a read-only contract representing the current
> > configuration and passing in that one instead? That would probably
> > deserve its own FLIP upon which yours here then would depend. Later
> > on, other contracts which effectively shouldn't modify a config could
> > use that one, too.
> >
> > Note I don't mean to stall your efforts here, but I thought it'd be a
> > good idea to bring it up and gauge the general interest in this.
> >
> > Best,
> >
> > --Gunnar
> >
> > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
> > >
> > > Hi flink devs,
> > >
> > > I'd like to start a discussion thread for FLIP-287[1].
> > > This comes from an offline discussion with @Lijie Wang, from
> FLIP-239[2]
> > > specially for the sink[3].
> > >
> > > Basically to expose the ExecutionConfig and JobId on
> SinkV2#InitContext.
> > > This  changes are necessary to correct migrate the current sinks to
> SinkV2
> > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> > >
> > > Comments are welcome!
> > > Thanks,
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > [2]
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> >
>


Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator

2023-01-13 Thread Jing Ge
Hi Becket,

Speaking of tools, we have ArchUnit integrated in Flink. Extending the
defined ArchRules [1] a little bit, you will get the wished scan result.

[1]
https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-architecture-tests/flink-architecture-tests-production/src/main/java/org/apache/flink/architecture/rules/ApiAnnotationRules.java#L99

Best regards,
Jing

On Sat, Jan 14, 2023 at 12:46 AM Becket Qin  wrote:

> I don't have an overview of all the holes in our public API surface at the
> moment. It would be great if there's some tool to do a scan.
>
> In addition to fixing the annotation consistency formally, I think it is
> equally, if not more, important to see whether the public APIs we expose
> tell a good story. For example, if we say StreamConfig should be internal,
> some fair questions to ask is why our own AbstractStreamOperator needs it?
> Why does not a user-defined operator need it? Is there something in the
> StreamConfig we should expose as a public interface if not the entire
> class?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Jan 14, 2023 at 5:36 AM Konstantin Knauf 
> wrote:
>
> > Hi Becket,
> >
> > > It is a basic rule of public API that anything exposed by a public
> > interface should also be public.
> >
> > I agree with this in general. Did you get an overview of where we
> currently
> > violate this? Is this something that the Arc42 architecture tests could
> > test for so that as a first measure we don't introduce more occurrences
> > (cc @Ingo).
> >
> > Maybe its justified to make a pass over all of these occurrences and
> > resolve these occurrences one way or another either making the
> > members/parameters @PublicEvoling or actually making a class/method
> > @Internal even if its was @PublicEvoling before. I think, this could be
> the
> > better option than having @PublicEvolving classes/methods that really
> > aren't.
> >
> > Cheers,
> >
> > Konstantin
> >
> > Am Fr., 13. Jan. 2023 um 17:02 Uhr schrieb Becket Qin <
> > becket@gmail.com
> > >:
> >
> > > Hi Dawid,
> > >
> > > Thanks for the reply. I am currently looking at the Beam Flink runner,
> > and
> > > there are quite some hacks the Beam runner has to do in order to deal
> > with
> > > the backwards incompatible changes in AbstractStreamOperator and some
> of
> > > the classes exposed by it. Regardless of what we think, the fact is
> that
> > > AbstractStreamOperator is marked as PublicEvolving today, and our users
> > use
> > > it. It is a basic rule of public API that anything exposed by a public
> > > interface should also be public. This is the direct motivation of this
> > > FLIP.
> > >
> > > Regarding the StreamTask / StreamConfig exposure, if you look at the
> > > StreamOperatorFactory which is also a PublicEvolving class, it actually
> > > exposes the StreamTask, StreamConfig as well as some other classes in
> the
> > > StreamOperatorParameters. So these classes are already exposed in
> > multiple
> > > public APIs.
> > >
> > > Keeping our public API stability guarantee is really fundamental and
> > > critical to the users. With the current status of inconsistent API
> > > stability annotations, I don't see how can we assure of that. From
> what I
> > > can see, accidental backwards incompatible changes is likely going to
> > keep
> > > happening. So I'd say let's see how to fix forward instead of doing
> > > nothing.
> > >
> > > BTW, Initially I thought this is just an accidental mismatch, but after
> > > further exam, it looks that it is a bigger issue. I guess one of the
> > > reasons we end up in this situation is that we haven't really thought
> it
> > > through regarding the boundary between framework and user space, i.e.
> > what
> > > framework primitives we want to expose to the users. So instead we just
> > > expose a bunch of internal things and hope users only use the "stable"
> > part
> > > of them.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz <
> dwysakow...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > May I ask what is the motivation for the change?
> > > >
> > > > I'm really skeptical about making any of those classes `Public` or
> > > > `PublicEvolving`. As far as I am concerned there is no agreement in
> the
> > > > community if StreamOperator is part of the `Public(Evolving)` API. At
> > > > least I think it should not. I understand `AbstractStreamOperator`
> was
> > > > marked with `PublicEvolving`, but I am really not convinced it was
> the
> > > > right decision.
> > > >
> > > > The listed classes are not the only classes exposed to
> > > > `AbstractStreamOperator` that are `Internal` that break the
> consistency
> > > > and I am sure there is no question those should remain `Internal`
> such
> > > > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many
> > more.
> > > >
> > > > As it stands I am strongly against giving any additiona

[jira] [Created] (FLINK-30682) FLIP-283: Use adaptive batch scheduler as default scheduler for batch jobs

2023-01-13 Thread JunRui Li (Jira)
JunRui Li created FLINK-30682:
-

 Summary: FLIP-283: Use adaptive batch scheduler as default 
scheduler for batch jobs
 Key: FLINK-30682
 URL: https://issues.apache.org/jira/browse/FLINK-30682
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: JunRui Li
 Fix For: 1.17.0


To further use the adaptive batch scheduler to improve flink's batch 
capability, in this FLIP we aim to make the adaptive batch scheduler as the 
default batch scheduler and optimize the current adaptive batch scheduler 
configuration.

More details see 
[FLIP-283|https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30683) Make adaptive batch scheduler as the default batch scheduler

2023-01-13 Thread Junrui Li (Jira)
Junrui Li created FLINK-30683:
-

 Summary: Make adaptive batch scheduler as the default batch 
scheduler
 Key: FLINK-30683
 URL: https://issues.apache.org/jira/browse/FLINK-30683
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Junrui Li
 Fix For: 1.17.0


Based on the 
[FLIP-283|https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs],
 this issue mainly focuses on the first issue.

This change proposes to make AdaptiveBatchScheduler as the default batch 
scheduler and user can use it without explicitly configuring it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30684) Use the default parallelism as a flag for vertices that can automatically derive parallelism.

2023-01-13 Thread Junrui Li (Jira)
Junrui Li created FLINK-30684:
-

 Summary: Use the default parallelism as a flag for vertices that 
can automatically derive parallelism.
 Key: FLINK-30684
 URL: https://issues.apache.org/jira/browse/FLINK-30684
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Junrui Li
 Fix For: 1.17.0


This change proposes to add a *parallelismConfigured* property as a flag to 
identify whether the parallelism of node is used "parallelism.default" or not. 
If the vertex's *parallelismConfigured* is true, the AdaptiveBatchScheduler 
will not automatically deciding parallelisms for it. Otherwise, 
AdaptiveBatchScheduler will automatically deciding parallelisms and use the 
"parallelism.default" as an alternative value for the 
"jobmanager.adaptive-batch-scheduler.max-parallelism".

This change will make user do not need to configure "parallelism.default" as 
"-1" to automatically deciding parallelisms for vertices.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30685) Support mark the transformations whose parallelism is infected by the input transformation

2023-01-13 Thread Junrui Li (Jira)
Junrui Li created FLINK-30685:
-

 Summary: Support mark the transformations whose parallelism is 
infected by the input transformation
 Key: FLINK-30685
 URL: https://issues.apache.org/jira/browse/FLINK-30685
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Junrui Li
 Fix For: 1.17.0


In order to chain operators together as much as possible, many downstream 
operators will use the parallelism of upstream input operators in the table 
planner.

If some operators need to have their own defined parallelism, the parallelism 
will be explicitly set. Therefore, the operator that takes the parallelism of 
the upstream operator as its own parallelism should be automatically derived by 
the AdaptiveBatchScheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30686) Simplify the configuration of adaptive batch scheduler

2023-01-13 Thread Junrui Li (Jira)
Junrui Li created FLINK-30686:
-

 Summary: Simplify the configuration of adaptive batch scheduler
 Key: FLINK-30686
 URL: https://issues.apache.org/jira/browse/FLINK-30686
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Junrui Li
 Fix For: 1.17.0


Based on the 
[FLIP-283|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs]],
 this issue mainly focuses on the second issue.

This change includes three parts:

1.Introduce "execution.batch.adaptive.auto-parallelism.enabled" as a switch for 
automatic parallelism derivation
2.Modify adaptive batch scheduler configuration default values
3.Rename the configuration of adaptive batch scheduler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-01-13 Thread Yun Tang
+1 for this proposal and thanks Qingsheng for driving this.

Considering the interval, we also set the value as 5min, equivalent to the 
default value of metadata.max.age.ms.


Best
Yun Tang

From: Benchao Li 
Sent: Friday, January 13, 2023 23:06
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka 
source

+1, we've enabled this by default (10mins) in our production for years.

Jing Ge  于2023年1月13日周五 22:22写道:

> +1 for the proposal that makes users' daily work easier and therefore makes
> Flink more attractive.
>
> Best regards,
> Jing
>
>
> On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren  wrote:
>
> > Thanks everyone for joining the discussion!
> >
> > @Martijn:
> >
> > > All newly discovered partitions will be consumed from the earliest
> offset
> > possible.
> >
> > Thanks for the reminder! I checked the logic of KafkaSource and found
> that
> > new partitions will start from the offset initializer specified by the
> user
> > instead of the earliest. We need to correct this behavior to avoid
> dropping
> > messages from new partitions.
> >
> > > Job restarts from checkpoint
> >
> > I think the current logic guarantees the exactly-once semantic. New
> > partitions created after the checkpoint will be re-discovered again and
> > picked up by the source.
> >
> > @John:
> >
> > > If you want to be a little conservative with the default, 5 minutes
> might
> > be better than 30 seconds.
> >
> > Thanks for the suggestion! I tried to find the equivalent config in Kafka
> > but missed it. It would be neat to align with the default value of "
> > metadata.max.age.ms".
> >
> > @Gabor:
> >
> > > removed partition handling is not yet added
> >
> > There was a detailed discussion about removing partitions [1] but it
> looks
> > like this is not an easy task considering the potential data loss and
> state
> > inconsistency. I'm afraid there's no clear plan on this one and maybe we
> > could trigger a new discussion thread about how to correctly handle
> removed
> > partitions.
> >
> > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
> >
> > Best regards,
> > Qingsheng
> >
> >
> > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi  >
> > wrote:
> >
> > > +1 on the overall direction, it's an important feature.
> > >
> > > I've had a look on the latest master and looks like removed partition
> > > handling is not yet added but I think this is essential.
> > >
> > >
> > >
> >
> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
> > >
> > > If a partition all of a sudden disappears then it could lead to data
> > loss.
> > > Are you planning to add it?
> > > If yes then when?
> > >
> > > G
> > >
> > >
> > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler 
> > wrote:
> > >
> > > > Thanks for this proposal, Qingsheng!
> > > >
> > > > If you want to be a little conservative with the default, 5 minutes
> > might
> > > > be better than 30 seconds.
> > > >
> > > > The equivalent config in Kafka seems to be metadata.max.age.ms (
> > > >
> > >
> >
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> > > ),
> > > > which has a default value of 5 minutes.
> > > >
> > > > Other than that, I’m in favor. I agree, this should be on by default.
> > > >
> > > > Thanks again,
> > > > John
> > > >
> > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
> > > > > Thanks Qingsheng for driving this, enable the dynamic partition
> > > > > discovery would be very useful for kafka topic scale partitions
> > > > > scenarios.
> > > > >
> > > > > +1 for the change.
> > > > >
> > > > > CC: Becket
> > > > >
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > >
> > > > >
> > > > >
> > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu  wrote:
> > > > >>
> > > > >> +1 for the change. I think this is beneficial for users and is
> > > > compatible.
> > > > >>
> > > > >> Best,
> > > > >> Jark
> > > > >>
> > > > >> On Fri, 13 Jan 2023 at 14:22, 何军  wrote:
> > > > >>
> > > > 
> > > >  +1 for this idea, we have enabled kafka dynamic partition
> > discovery
> > > in
> > > > >>> all
> > > >  jobs.
> > > > 
> > > > 
> > > > >>>
> > > >
> > >
> >
>


--

Best,
Benchao Li


[jira] [Created] (FLINK-30687) FILTER not effect in count(*)

2023-01-13 Thread tanjialiang (Jira)
tanjialiang created FLINK-30687:
---

 Summary: FILTER not effect in count(*)
 Key: FLINK-30687
 URL: https://issues.apache.org/jira/browse/FLINK-30687
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: tanjialiang


When i try to using Flink SQL like this

 
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

 

I found 'FILTER(WHERE class_id = 1)' is not effect.

But when i tried Flink SQL like this, it worked.

 
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)