Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
This sounds like a no-brainer +1 Two things that seem to be obvious, but might be good to double check: 1. All newly discovered partitions will be consumed from the earliest offset possible. That's how it's documented for version 1.12 [1], but not for later versions, which is why I would like to double check. If so, I think it would also be good to restore that in the documentation. 2. Let's assume the following running situation: * Dynamic partition discovery is enabled, set to 30 secs * Job runs * Job checkpoints to checkpoint X * New partitions are discovered and consumption from the new partitions is starting * Job crashes before new checkpoint has been created My assumption is that the job restarts from checkpoint X, which includes the old number of partitions. Either directly when the job restarts, or when the next partitions are discovered, those new/for the job unknown partitions would be consumed. We still have exactly-once guarantees, since there was no new checkpoint made before the job crashed Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery Op vr 13 jan. 2023 om 08:27 schreef Leonard Xu : > Thanks Qingsheng for driving this, enable the dynamic partition discovery > would be very useful for kafka topic scale partitions scenarios. > > +1 for the change. > > CC: Becket > > > Best, > Leonard > > > > > On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: > > > > +1 for the change. I think this is beneficial for users and is > compatible. > > > > Best, > > Jark > > > > On Fri, 13 Jan 2023 at 14:22, 何军 wrote: > > > >>> > >>> +1 for this idea, we have enabled kafka dynamic partition discovery in > >> all > >>> jobs. > >>> > >>> > >> > >
Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
Thanks for this proposal, Qingsheng! If you want to be a little conservative with the default, 5 minutes might be better than 30 seconds. The equivalent config in Kafka seems to be metadata.max.age.ms (https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms), which has a default value of 5 minutes. Other than that, I’m in favor. I agree, this should be on by default. Thanks again, John On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > Thanks Qingsheng for driving this, enable the dynamic partition > discovery would be very useful for kafka topic scale partitions > scenarios. > > +1 for the change. > > CC: Becket > > > Best, > Leonard > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: >> >> +1 for the change. I think this is beneficial for users and is compatible. >> >> Best, >> Jark >> >> On Fri, 13 Jan 2023 at 14:22, 何军 wrote: >> +1 for this idea, we have enabled kafka dynamic partition discovery in >>> all jobs. >>>
[jira] [Created] (FLINK-30675) Decompose printing logic from Executor
Shengkai Fang created FLINK-30675: - Summary: Decompose printing logic from Executor Key: FLINK-30675 URL: https://issues.apache.org/jira/browse/FLINK-30675 Project: Flink Issue Type: Sub-task Components: Table SQL / Client Affects Versions: 1.17.0 Reporter: Shengkai Fang Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
+1 on the overall direction, it's an important feature. I've had a look on the latest master and looks like removed partition handling is not yet added but I think this is essential. https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 If a partition all of a sudden disappears then it could lead to data loss. Are you planning to add it? If yes then when? G On Fri, Jan 13, 2023 at 9:22 AM John Roesler wrote: > Thanks for this proposal, Qingsheng! > > If you want to be a little conservative with the default, 5 minutes might > be better than 30 seconds. > > The equivalent config in Kafka seems to be metadata.max.age.ms ( > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms), > which has a default value of 5 minutes. > > Other than that, I’m in favor. I agree, this should be on by default. > > Thanks again, > John > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > > Thanks Qingsheng for driving this, enable the dynamic partition > > discovery would be very useful for kafka topic scale partitions > > scenarios. > > > > +1 for the change. > > > > CC: Becket > > > > > > Best, > > Leonard > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: > >> > >> +1 for the change. I think this is beneficial for users and is > compatible. > >> > >> Best, > >> Jark > >> > >> On Fri, 13 Jan 2023 at 14:22, 何军 wrote: > >> > > +1 for this idea, we have enabled kafka dynamic partition discovery in > >>> all > jobs. > > > >>> >
Re: [VOTE] FLIP-281: Sink Supports Speculative Execution For Batch Job
Hi everyone, I'm happy to announce that FLIP-281[1] has been accepted. Thanks for all your feedback and votes. Here is the voting result: +1 (binding), 3 in total: - Zhu Zhu - Lijie Wang - Jing Zhang +1 (non-binding), 2 in total: - yuxia - Jing Ge +0 (binding), 1 in total: - Martijn There are no disapproving votes. By the way, the discussion of deprecating SinkFunction can be continued in discussion thread[2]. I think it's more like an orthogonal issue. We might need more time to come to an agreement about it. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-281 +Sink+Supports+Speculative+Execution+For+Batch+Job [2] https://lists.apache.org/thread/l05m6cf8fwkkbpnjtzbg9l2lo40oxzd1 Thanks, Biao /'bɪ.aʊ/ On Fri, 13 Jan 2023 at 08:46, Jing Ge wrote: > + 1(not binding) > > Best Regards, > Jing > > On Thu, Jan 12, 2023 at 10:01 AM Jing Zhang wrote: > > > +1 (binding) > > > > Best, > > Jing Zhang > > > > Lijie Wang 于2023年1月12日周四 16:39写道: > > > > > +1 (binding) > > > > > > Best, > > > Lijie > > > > > > Martijn Visser 于2023年1月12日周四 15:56写道: > > > > > > > +0 (binding) > > > > > > > > Op di 10 jan. 2023 om 13:11 schreef yuxia < > luoyu...@alumni.sjtu.edu.cn > > >: > > > > > > > > > +1 (non-binding). > > > > > > > > > > Best regards, > > > > > Yuxia > > > > > > > > > > - 原始邮件 - > > > > > 发件人: "Zhu Zhu" > > > > > 收件人: "dev" > > > > > 发送时间: 星期二, 2023年 1 月 10日 下午 5:50:39 > > > > > 主题: Re: [VOTE] FLIP-281: Sink Supports Speculative Execution For > > Batch > > > > Job > > > > > > > > > > +1 (binding) > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > Biao Liu 于2023年1月5日周四 10:37写道: > > > > > > > > > > > > Hi Martijn, > > > > > > > > > > > > Sure, thanks for the reminder about the holiday period. > > > > > > Looking forward to your feedback! > > > > > > > > > > > > Thanks, > > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 5 Jan 2023 at 03:07, Martijn Visser < > > > martijnvis...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > To be honest, I haven't read the FLIP yet since this is still a > > > > holiday > > > > > > > period in Europe. I would like to read it in the next few days. > > Can > > > > you > > > > > > > keep the vote open a little longer? > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Martijn > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 1:31 PM Biao Liu > > > wrote: > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > Thanks for all the feedback! > > > > > > > > > > > > > > > > Based on the discussion[1], we seem to have a consensus. So > I'd > > > > like > > > > > to > > > > > > > > start a vote on FLIP-281: Sink Supports Speculative Execution > > For > > > > > Batch > > > > > > > > Job[2]. The vote will last for 72 hours, unless there is an > > > > > objection or > > > > > > > > insufficient votes. > > > > > > > > > > > > > > > > [1] > > > > https://lists.apache.org/thread/l05m6cf8fwkkbpnjtzbg9l2lo40oxzd1 > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [VOTE] Apache Flink Table Store 0.3.0, release candidate #1
+1 (binding) - Build and compile the source code locally: *OK* - Verified signatures and hashes: *OK* - Checked no missing artifacts in the staging area: *OK* - Reviewed the website release PR: *OK* - Checked the licenses: *OK* - Went through the quick start: *OK* * Verified with both flink 1.14.5 and 1.15.1 using * Verified web UI and log output, nothing unexpected Best, Jark On Thu, 12 Jan 2023 at 20:52, Jingsong Li wrote: > Thanks Yu, I have created table-store-0.3.1 and move these jiras to 0.3.1. > > Best, > Jingsong > > On Thu, Jan 12, 2023 at 7:41 PM Yu Li wrote: > > > > Thanks for the quick action Jingsong! Here is my vote with the new > staging > > directory: > > > > +1 (binding) > > > > > > - Checked release notes: *Action Required* > > > > * The fix version of FLINK-30620 and FLINK-30628 are 0.3.0 but still > > open, please confirm whether this should be included or we should move it > > out of 0.3.0 > > > > - Checked sums and signatures: *OK* > > > > - Checked the jars in the staging repo: *OK* > > > > - Checked source distribution doesn't include binaries: *OK* > > > > - Maven clean install from source: *OK* > > > > - Checked version consistency in pom files: *OK* > > > > - Went through the quick start: *OK* > > > > * Verified with flink 1.14.6, 1.15.3 and 1.16.0 > > > > - Checked the website updates: *OK* > > > > Best Regards, > > Yu > > > > > > On Thu, 12 Jan 2023 at 15:36, Jingsong Li > wrote: > > > > > Thanks Yu for your validation. > > > > > > I created a new staging directory [1] > > > > > > [1] > > > > https://repository.apache.org/content/repositories/orgapacheflink-1577/ > > > > > > Best, > > > Jingsong > > > > > > On Thu, Jan 12, 2023 at 3:07 PM Yu Li wrote: > > > > > > > > Hi Jingsong, > > > > > > > > It seems the given staging directory [1] is not exposed, could you > double > > > > check and republish if necessary? Thanks. > > > > > > > > Best Regards, > > > > Yu > > > > > > > > [1] > > > > https://repository.apache.org/content/repositories/orgapacheflink-1576/ > > > > > > > > > > > > On Tue, 10 Jan 2023 at 16:53, Jingsong Li > > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > Please review and vote on the release candidate #1 for the version > > > > > 0.3.0 of Apache Flink Table Store, as follows: > > > > > > > > > > [ ] +1, Approve the release > > > > > [ ] -1, Do not approve the release (please provide specific > comments) > > > > > > > > > > **Release Overview** > > > > > > > > > > As an overview, the release consists of the following: > > > > > a) Table Store canonical source distribution to be deployed to the > > > > > release repository at dist.apache.org > > > > > b) Table Store binary convenience releases to be deployed to the > > > > > release repository at dist.apache.org > > > > > c) Maven artifacts to be deployed to the Maven Central Repository > > > > > > > > > > **Staging Areas to Review** > > > > > > > > > > The staging areas containing the above mentioned artifacts are as > > > follows, > > > > > for your review: > > > > > * All artifacts for a) and b) can be found in the corresponding dev > > > > > repository at dist.apache.org [2] > > > > > * All artifacts for c) can be found at the Apache Nexus Repository > [3] > > > > > > > > > > All artifacts are signed with the key > > > > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [4] > > > > > > > > > > Other links for your review: > > > > > * JIRA release notes [5] > > > > > * source code tag "release-0.3.0-rc1" [6] > > > > > * PR to update the website Downloads page to include Table Store > links > > > [7] > > > > > > > > > > **Vote Duration** > > > > > > > > > > The voting time will run for at least 72 hours. > > > > > It is adopted by majority approval, with at least 3 PMC affirmative > > > votes. > > > > > > > > > > Best, > > > > > Jingsong Lee > > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release > > > > > [2] > > > > > > > > > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.3.0-rc1/ > > > > > [3] > > > > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1576/ > > > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > > [5] > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352111 > > > > > [6] > https://github.com/apache/flink-table-store/tree/release-0.3.0-rc1 > > > > > [7] https://github.com/apache/flink-web/pull/601 > > > > > > > > >
Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
Thanks everyone for joining the discussion! @Martijn: > All newly discovered partitions will be consumed from the earliest offset possible. Thanks for the reminder! I checked the logic of KafkaSource and found that new partitions will start from the offset initializer specified by the user instead of the earliest. We need to correct this behavior to avoid dropping messages from new partitions. > Job restarts from checkpoint I think the current logic guarantees the exactly-once semantic. New partitions created after the checkpoint will be re-discovered again and picked up by the source. @John: > If you want to be a little conservative with the default, 5 minutes might be better than 30 seconds. Thanks for the suggestion! I tried to find the equivalent config in Kafka but missed it. It would be neat to align with the default value of " metadata.max.age.ms". @Gabor: > removed partition handling is not yet added There was a detailed discussion about removing partitions [1] but it looks like this is not an easy task considering the potential data loss and state inconsistency. I'm afraid there's no clear plan on this one and maybe we could trigger a new discussion thread about how to correctly handle removed partitions. [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt Best regards, Qingsheng On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi wrote: > +1 on the overall direction, it's an important feature. > > I've had a look on the latest master and looks like removed partition > handling is not yet added but I think this is essential. > > > https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 > > If a partition all of a sudden disappears then it could lead to data loss. > Are you planning to add it? > If yes then when? > > G > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler wrote: > > > Thanks for this proposal, Qingsheng! > > > > If you want to be a little conservative with the default, 5 minutes might > > be better than 30 seconds. > > > > The equivalent config in Kafka seems to be metadata.max.age.ms ( > > > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms > ), > > which has a default value of 5 minutes. > > > > Other than that, I’m in favor. I agree, this should be on by default. > > > > Thanks again, > > John > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > > > Thanks Qingsheng for driving this, enable the dynamic partition > > > discovery would be very useful for kafka topic scale partitions > > > scenarios. > > > > > > +1 for the change. > > > > > > CC: Becket > > > > > > > > > Best, > > > Leonard > > > > > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: > > >> > > >> +1 for the change. I think this is beneficial for users and is > > compatible. > > >> > > >> Best, > > >> Jark > > >> > > >> On Fri, 13 Jan 2023 at 14:22, 何军 wrote: > > >> > > > > +1 for this idea, we have enabled kafka dynamic partition discovery > in > > >>> all > > jobs. > > > > > > >>> > > >
[jira] [Created] (FLINK-30676) Introduce Data Structures for table store
Jingsong Lee created FLINK-30676: Summary: Introduce Data Structures for table store Key: FLINK-30676 URL: https://issues.apache.org/jira/browse/FLINK-30676 Project: Flink Issue Type: Sub-task Components: Table Store Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: table-store-0.4.0 Copy data structures to table store from Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30677) SqlGatewayServiceStatementITCase.testFlinkSqlStatements fails
Matthias Pohl created FLINK-30677: - Summary: SqlGatewayServiceStatementITCase.testFlinkSqlStatements fails Key: FLINK-30677 URL: https://issues.apache.org/jira/browse/FLINK-30677 Project: Flink Issue Type: Bug Components: Table SQL / Gateway Affects Versions: 1.17.0 Reporter: Matthias Pohl We're observing a test instability with {{SqlGatewayServiceStatementITCase.testFlinkSqlStatements}} in the following build: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44775&view=logs&j=a9db68b9-a7e0-54b6-0f98-010e0aff39e2&t=cdd32e0b-6047-565b-c58f-14054472f1be&l=14251] {code:java} Jan 13 02:46:10 [ERROR] Tests run: 9, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 27.279 s <<< FAILURE! - in org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase Jan 13 02:46:10 [ERROR] org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase.testFlinkSqlStatements(String)[5] Time elapsed: 1.573 s <<< FAILURE! Jan 13 02:46:10 org.opentest4j.AssertionFailedError: Jan 13 02:46:10 Jan 13 02:46:10 expected: Jan 13 02:46:10 "# table.q - CREATE/DROP/SHOW/ALTER/DESCRIBE TABLE Jan 13 02:46:10 # Jan 13 02:46:10 # Licensed to the Apache Software Foundation (ASF) under one or more Jan 13 02:46:10 # contributor license agreements. See the NOTICE file distributed with Jan 13 02:46:10 # this work for additional information regarding copyright ownership. Jan 13 02:46:10 # The ASF licenses this file to you under the Apache License, Version 2.0 Jan 13 02:46:10 # (the "License"); you may not use this file except in compliance with Jan 13 02:46:10 # the License. You may obtain a copy of the License at Jan 13 02:46:10 # Jan 13 02:46:10 # http://www.apache.org/licenses/LICENSE-2.0 Jan 13 02:46:10 # Jan 13 02:46:10 # Unless required by applicable law or agreed to in writing, software Jan 13 02:46:10 # distributed under the License is distributed on an "AS IS" BASIS, Jan 13 02:46:10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Jan 13 02:46:10 # See the License for the specific language governing permissions and Jan 13 02:46:10 # limitations under the License. [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30678) NettyConnectionManagerTest.testManualConfiguration appears to be unstable
Matthias Pohl created FLINK-30678: - Summary: NettyConnectionManagerTest.testManualConfiguration appears to be unstable Key: FLINK-30678 URL: https://issues.apache.org/jira/browse/FLINK-30678 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Matthias Pohl We observe a test instability due to a "Address already in use" issue: {code:java} Jan 13 02:29:43 [ERROR] org.apache.flink.runtime.io.network.netty.NettyConnectionManagerTest.testManualConfiguration Time elapsed: 0.132 s <<< ERROR! Jan 13 02:29:43 org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: bind(..) failed: Address already in use Jan 13 02:29:43{code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44777&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=6744 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator
Hi Becket, May I ask what is the motivation for the change? I'm really skeptical about making any of those classes `Public` or `PublicEvolving`. As far as I am concerned there is no agreement in the community if StreamOperator is part of the `Public(Evolving)` API. At least I think it should not. I understand `AbstractStreamOperator` was marked with `PublicEvolving`, but I am really not convinced it was the right decision. The listed classes are not the only classes exposed to `AbstractStreamOperator` that are `Internal` that break the consistency and I am sure there is no question those should remain `Internal` such as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many more. As it stands I am strongly against giving any additional guarantees on API stability to the classes there unless there is a good motivation for a given feature and we're sure this is the best way to go forward. Thus I'm inclined to go with -1 on any proposal on changing annotations for the sake of matching the one on `AbstractStreamOperator`. I might be convinced to support requests to give better guarantees for well motivated features that are now internal though. Best, Dawid On 12/01/2023 10:20, Becket Qin wrote: Hi flink devs, I'd like to start a discussion thread for FLIP-286[1]. As a recap, currently while AbstractStreamOperator is a class marked as @PublicEvolving, some classes exposed via its methods / fields are marked as @Internal. This FLIP wants to fix this inconsistency of stability / scope annotation. Comments are welcome! Thanks, Jiangjie (Becket) Qin [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880841 OpenPGP_0x31D2DD10BFC15A2D.asc Description: OpenPGP public key OpenPGP_signature Description: OpenPGP digital signature
Re: [DISCUSS] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store
Hi Piotr, I discussed with @jinsong lee about `Timestamp Barrier` and `Aligned Checkpoint` for data consistency in FLIP, we think there are many defects indeed in using `Aligned Checkpoint` to support data consistency as you mentioned. According to our historical discussion, I think we have reached an agreement on an important point: we finally need `Timestamp Barrier Mechanism` to support data consistency. But according to our (@jinsong lee and I) opinions, the total design and implementation based on 'Timestamp Barrier' will be too complex, and it's also too big in one FLIP. So we‘d like to use FLIP-276[1] as an overview design of data consistency in Flink Streaming and Batch ETL based on `Timestamp Barrier`. @jinsong and I hope that we can reach an agreement on the overall design in FLINK-276 first, and then on the basic of FLIP-276 we can create other FLIPs with detailed design according to modules and drive them. Finally, we can support data consistency based on Timestamp in Flink. I have updated FLIP-276, deleted the Checkpoint section, and added the overall design of `Timestamp Barrier`. Here I briefly describe the modules of `Timestamp Barrier` as follows 1. Generation: JobManager must coordinate all source subtasks and generate a unified timestamp barrier from System Time or Event Time for them 2. Checkpoint: Store when the timestamp barrier is generated, so that the job can recover the same timestamp barrier for the uncompleted checkpoint. 3. Replay data: Store for source when it broadcasts timestamp barrier, so that the source can replay the same data according to the same timestamp barrier. 4. Align data: Align data for stateful operator(aggregation, join and etc.) and temporal operator(window) 5. Computation: Operator computation for a specific timestamp barrier based on the results of a previous timestamp barrier. 6. Output: Operator outputs or commits results when it collects all the timestamp barriers, including operators with data buffer or async operations. I also list the main work in Flink and Table Store in FLIP-276. Please help to review the FLIP when you're free and feel free to give any comments. Looking forward for your feedback, THX [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store Best, Shammon On Tue, Dec 20, 2022 at 10:01 AM Shammon FY wrote: > Hi Piotr, > > Thanks for your syncing. I will update the FLIP later and keep this > discussion open. Looking forward to your feedback, thanks > > > Best, > Shammon > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski > wrote: > >> Hi Shammon, >> >> I've tried to sync with Timo, David Moravek and Dawid Wysakowicz about >> this >> subject. We have only briefly chatted and exchanged some thoughts/ideas, >> but unfortunately we were not able to finish the discussions before the >> holiday season/vacations. Can we get back to this topic in January? >> >> Best, >> Piotrek >> >> pt., 16 gru 2022 o 10:53 Shammon FY napisał(a): >> >> > Hi Piotr, >> > >> > I found there may be several points in our discussion, it will cause >> > misunderstanding between us when we focus on different one. I list each >> > point in our discussion as follows >> > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to guarantee data >> > consistency in the current Flink implementation, and "Watermark" and >> > "Aligned Checkpoint cannot do that? >> > My answer is "Yes", the "Aligned Checkpoint" is the only one due to its >> > "Align Data" ability, we can do it in the first stage. >> > >> > > Point2: Can the combination of "Checkpoint Barrier" and "Watermark" >> > support the complete consistency semantics based on "Timestamp" in the >> > current Flink implementation? >> > My answer is "No", we need a new "Timestamp Barrier" mechanism to do >> that >> > which may be upgraded from current "Watermark" or a new mechanism, we >> can >> > do it in the next second or third stage. >> > >> > > Point3: Are the "Checkpoint" and the new "Timestamp Barrier" >> completely >> > independent? The "Checkpoint" whatever "Aligned" or "Unaligned" or "Task >> > Local" supports the "Exactly-Once" between ETLs, and the "Timestamp >> > Barrier" mechanism guarantees data consistency between tables according >> to >> > timestamp for queries. >> > My answer is "Yes", I totally agree with you. Let "Checkpoint" be >> > responsible for fault tolerance and "Timestamp Barrier" for consistency >> > independently. >> > >> > @Piotr, What do you think? If I am missing or misunderstanding anything, >> > please correct me, thanks >> > >> > Best, >> > Shammon >> > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski >> > wrote: >> > >> > > Hi Shammon, >> > > >> > > > I don't think we can combine watermarks and checkpoint barriers >> > together >> > > to >> > > > guarantee data consistency. There will be a "Timestamp Barrier" in >> our >> > > > system to "commit data", "single etl failover", "low lat
[jira] [Created] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced
hehuiyuan created FLINK-30679: - Summary: Can not load the data of hive dim table when project-push-down is introduced Key: FLINK-30679 URL: https://issues.apache.org/jira/browse/FLINK-30679 Project: Flink Issue Type: Bug Reporter: hehuiyuan vectorize read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} mapreduce read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301] at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301] at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301] at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} The sql : {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, ptime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv' ); CREATE TABLE printsink ( name string, age int, sex string, address string, score bigint, dt string ) WITH ( 'connector' = 'print' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy', 'hive-version' = '2.0.0', 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' ); USE CATALOG myhive; USE hhy; set table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( name STRING, age INT, score BIGINT ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); set table.sql-dialect=default; USE CATALOG default_catalog; INSERT INTO default_catalog.default_database.printsink SELECT s.name, s.age, s.sex, s.address, r.score, r.dt FROM default_catalog.default_database.kafkaTableSource as s JOIN myhive.hhy.tmp_flink
[DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi flink devs, I'd like to start a discussion thread for FLIP-287[1]. This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] specially for the sink[3]. Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. This changes are necessary to correct migrate the current sinks to SinkV2 like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext Comments are welcome! Thanks, [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 [3] https://issues.apache.org/jira/browse/FLINK-25421
Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
+1 for the proposal that makes users' daily work easier and therefore makes Flink more attractive. Best regards, Jing On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren wrote: > Thanks everyone for joining the discussion! > > @Martijn: > > > All newly discovered partitions will be consumed from the earliest offset > possible. > > Thanks for the reminder! I checked the logic of KafkaSource and found that > new partitions will start from the offset initializer specified by the user > instead of the earliest. We need to correct this behavior to avoid dropping > messages from new partitions. > > > Job restarts from checkpoint > > I think the current logic guarantees the exactly-once semantic. New > partitions created after the checkpoint will be re-discovered again and > picked up by the source. > > @John: > > > If you want to be a little conservative with the default, 5 minutes might > be better than 30 seconds. > > Thanks for the suggestion! I tried to find the equivalent config in Kafka > but missed it. It would be neat to align with the default value of " > metadata.max.age.ms". > > @Gabor: > > > removed partition handling is not yet added > > There was a detailed discussion about removing partitions [1] but it looks > like this is not an easy task considering the potential data loss and state > inconsistency. I'm afraid there's no clear plan on this one and maybe we > could trigger a new discussion thread about how to correctly handle removed > partitions. > > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt > > Best regards, > Qingsheng > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi > wrote: > > > +1 on the overall direction, it's an important feature. > > > > I've had a look on the latest master and looks like removed partition > > handling is not yet added but I think this is essential. > > > > > > > https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 > > > > If a partition all of a sudden disappears then it could lead to data > loss. > > Are you planning to add it? > > If yes then when? > > > > G > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler > wrote: > > > > > Thanks for this proposal, Qingsheng! > > > > > > If you want to be a little conservative with the default, 5 minutes > might > > > be better than 30 seconds. > > > > > > The equivalent config in Kafka seems to be metadata.max.age.ms ( > > > > > > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms > > ), > > > which has a default value of 5 minutes. > > > > > > Other than that, I’m in favor. I agree, this should be on by default. > > > > > > Thanks again, > > > John > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > > > > Thanks Qingsheng for driving this, enable the dynamic partition > > > > discovery would be very useful for kafka topic scale partitions > > > > scenarios. > > > > > > > > +1 for the change. > > > > > > > > CC: Becket > > > > > > > > > > > > Best, > > > > Leonard > > > > > > > > > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: > > > >> > > > >> +1 for the change. I think this is beneficial for users and is > > > compatible. > > > >> > > > >> Best, > > > >> Jark > > > >> > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 wrote: > > > >> > > > > > > +1 for this idea, we have enabled kafka dynamic partition > discovery > > in > > > >>> all > > > jobs. > > > > > > > > > >>> > > > > > >
[jira] [Created] (FLINK-30680) Consider using the autoscaler to detect slow taskmanagers
Gyula Fora created FLINK-30680: -- Summary: Consider using the autoscaler to detect slow taskmanagers Key: FLINK-30680 URL: https://issues.apache.org/jira/browse/FLINK-30680 Project: Flink Issue Type: New Feature Components: Autoscaler, Kubernetes Operator Reporter: Gyula Fora We could leverage logic in the autoscaler to detect slow taskmanagers by comparing the per-record processing times between them. If we notice that all subtasks on a single TM are considerably slower than the rest (at similar input rates) we should try simply restarting the job instead of scaling it up. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] Apache Flink Table Store 0.3.0, release candidate #1
+1 (binding) Best, Jingsong On Fri, Jan 13, 2023 at 5:16 PM Jark Wu wrote: > > +1 (binding) > > - Build and compile the source code locally: *OK* > - Verified signatures and hashes: *OK* > - Checked no missing artifacts in the staging area: *OK* > - Reviewed the website release PR: *OK* > - Checked the licenses: *OK* > - Went through the quick start: *OK* > * Verified with both flink 1.14.5 and 1.15.1 using > * Verified web UI and log output, nothing unexpected > > Best, > Jark > > On Thu, 12 Jan 2023 at 20:52, Jingsong Li wrote: > > > Thanks Yu, I have created table-store-0.3.1 and move these jiras to 0.3.1. > > > > Best, > > Jingsong > > > > On Thu, Jan 12, 2023 at 7:41 PM Yu Li wrote: > > > > > > Thanks for the quick action Jingsong! Here is my vote with the new > > staging > > > directory: > > > > > > +1 (binding) > > > > > > > > > - Checked release notes: *Action Required* > > > > > > * The fix version of FLINK-30620 and FLINK-30628 are 0.3.0 but still > > > open, please confirm whether this should be included or we should move it > > > out of 0.3.0 > > > > > > - Checked sums and signatures: *OK* > > > > > > - Checked the jars in the staging repo: *OK* > > > > > > - Checked source distribution doesn't include binaries: *OK* > > > > > > - Maven clean install from source: *OK* > > > > > > - Checked version consistency in pom files: *OK* > > > > > > - Went through the quick start: *OK* > > > > > > * Verified with flink 1.14.6, 1.15.3 and 1.16.0 > > > > > > - Checked the website updates: *OK* > > > > > > Best Regards, > > > Yu > > > > > > > > > On Thu, 12 Jan 2023 at 15:36, Jingsong Li > > wrote: > > > > > > > Thanks Yu for your validation. > > > > > > > > I created a new staging directory [1] > > > > > > > > [1] > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1577/ > > > > > > > > Best, > > > > Jingsong > > > > > > > > On Thu, Jan 12, 2023 at 3:07 PM Yu Li wrote: > > > > > > > > > > Hi Jingsong, > > > > > > > > > > It seems the given staging directory [1] is not exposed, could you > > double > > > > > check and republish if necessary? Thanks. > > > > > > > > > > Best Regards, > > > > > Yu > > > > > > > > > > [1] > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1576/ > > > > > > > > > > > > > > > On Tue, 10 Jan 2023 at 16:53, Jingsong Li > > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > Please review and vote on the release candidate #1 for the version > > > > > > 0.3.0 of Apache Flink Table Store, as follows: > > > > > > > > > > > > [ ] +1, Approve the release > > > > > > [ ] -1, Do not approve the release (please provide specific > > comments) > > > > > > > > > > > > **Release Overview** > > > > > > > > > > > > As an overview, the release consists of the following: > > > > > > a) Table Store canonical source distribution to be deployed to the > > > > > > release repository at dist.apache.org > > > > > > b) Table Store binary convenience releases to be deployed to the > > > > > > release repository at dist.apache.org > > > > > > c) Maven artifacts to be deployed to the Maven Central Repository > > > > > > > > > > > > **Staging Areas to Review** > > > > > > > > > > > > The staging areas containing the above mentioned artifacts are as > > > > follows, > > > > > > for your review: > > > > > > * All artifacts for a) and b) can be found in the corresponding dev > > > > > > repository at dist.apache.org [2] > > > > > > * All artifacts for c) can be found at the Apache Nexus Repository > > [3] > > > > > > > > > > > > All artifacts are signed with the key > > > > > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [4] > > > > > > > > > > > > Other links for your review: > > > > > > * JIRA release notes [5] > > > > > > * source code tag "release-0.3.0-rc1" [6] > > > > > > * PR to update the website Downloads page to include Table Store > > links > > > > [7] > > > > > > > > > > > > **Vote Duration** > > > > > > > > > > > > The voting time will run for at least 72 hours. > > > > > > It is adopted by majority approval, with at least 3 PMC affirmative > > > > votes. > > > > > > > > > > > > Best, > > > > > > Jingsong Lee > > > > > > > > > > > > [1] > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release > > > > > > [2] > > > > > > > > > > > > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.3.0-rc1/ > > > > > > [3] > > > > > > > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1576/ > > > > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > > > [5] > > > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352111 > > > > > > [6] > > https://github.com/apache/flink-table-store/tree/release-0.3.0-rc1 > > > > > > [7] https://github.com/apache/flink-web/pull/601 > > > > > > > > > > > >
[RESULT][VOTE] Apache Flink Table Store 0.3.0, release candidate #1
I'm happy to announce that we have unanimously approved this release. There are 3 approving votes, 3 of which are binding: * Yu Li (binding) * Jark Wu (binding) * Jingsong Lee (binding) There are no disapproving votes. Thank you for verifying the release candidate. I will now proceed to finalize the release and announce it once everything is published. Best, Jingsong
Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
+1, we've enabled this by default (10mins) in our production for years. Jing Ge 于2023年1月13日周五 22:22写道: > +1 for the proposal that makes users' daily work easier and therefore makes > Flink more attractive. > > Best regards, > Jing > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren wrote: > > > Thanks everyone for joining the discussion! > > > > @Martijn: > > > > > All newly discovered partitions will be consumed from the earliest > offset > > possible. > > > > Thanks for the reminder! I checked the logic of KafkaSource and found > that > > new partitions will start from the offset initializer specified by the > user > > instead of the earliest. We need to correct this behavior to avoid > dropping > > messages from new partitions. > > > > > Job restarts from checkpoint > > > > I think the current logic guarantees the exactly-once semantic. New > > partitions created after the checkpoint will be re-discovered again and > > picked up by the source. > > > > @John: > > > > > If you want to be a little conservative with the default, 5 minutes > might > > be better than 30 seconds. > > > > Thanks for the suggestion! I tried to find the equivalent config in Kafka > > but missed it. It would be neat to align with the default value of " > > metadata.max.age.ms". > > > > @Gabor: > > > > > removed partition handling is not yet added > > > > There was a detailed discussion about removing partitions [1] but it > looks > > like this is not an easy task considering the potential data loss and > state > > inconsistency. I'm afraid there's no clear plan on this one and maybe we > > could trigger a new discussion thread about how to correctly handle > removed > > partitions. > > > > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt > > > > Best regards, > > Qingsheng > > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi > > > wrote: > > > > > +1 on the overall direction, it's an important feature. > > > > > > I've had a look on the latest master and looks like removed partition > > > handling is not yet added but I think this is essential. > > > > > > > > > > > > https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 > > > > > > If a partition all of a sudden disappears then it could lead to data > > loss. > > > Are you planning to add it? > > > If yes then when? > > > > > > G > > > > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler > > wrote: > > > > > > > Thanks for this proposal, Qingsheng! > > > > > > > > If you want to be a little conservative with the default, 5 minutes > > might > > > > be better than 30 seconds. > > > > > > > > The equivalent config in Kafka seems to be metadata.max.age.ms ( > > > > > > > > > > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms > > > ), > > > > which has a default value of 5 minutes. > > > > > > > > Other than that, I’m in favor. I agree, this should be on by default. > > > > > > > > Thanks again, > > > > John > > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > > > > > Thanks Qingsheng for driving this, enable the dynamic partition > > > > > discovery would be very useful for kafka topic scale partitions > > > > > scenarios. > > > > > > > > > > +1 for the change. > > > > > > > > > > CC: Becket > > > > > > > > > > > > > > > Best, > > > > > Leonard > > > > > > > > > > > > > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: > > > > >> > > > > >> +1 for the change. I think this is beneficial for users and is > > > > compatible. > > > > >> > > > > >> Best, > > > > >> Jark > > > > >> > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 wrote: > > > > >> > > > > > > > > +1 for this idea, we have enabled kafka dynamic partition > > discovery > > > in > > > > >>> all > > > > jobs. > > > > > > > > > > > > >>> > > > > > > > > > > -- Best, Benchao Li
[ANNOUNCE] Apache Flink Table Store 0.3.0 released
The Apache Flink community is very happy to announce the release of Apache Flink Table Store 0.3.0. Apache Flink Table Store is a unified storage to build dynamic tables for both streaming and batch processing in Flink, supporting high-speed data ingestion and timely data query. Please check out the release blog post for an overview of the release: https://flink.apache.org/news/2023/01/13/release-table-store-0.3.0.html The release is available for download at: https://flink.apache.org/downloads.html Maven artifacts for Flink Table Store can be found at: https://central.sonatype.dev/search?q=flink-table-store The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352111 We would like to thank all contributors of the Apache Flink community who made this release possible! Best, Jingsong Lee
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hey Joao, Thanks for this FLIP! One question on the proposed interface changes: is it expected that the configuration is *mutated* via the InitContext passed to Sink::createWriter()? If that's not the case, how about establishing a read-only contract representing the current configuration and passing in that one instead? That would probably deserve its own FLIP upon which yours here then would depend. Later on, other contracts which effectively shouldn't modify a config could use that one, too. Note I don't mean to stall your efforts here, but I thought it'd be a good idea to bring it up and gauge the general interest in this. Best, --Gunnar Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > Hi flink devs, > > I'd like to start a discussion thread for FLIP-287[1]. > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > specially for the sink[3]. > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > This changes are necessary to correct migrate the current sinks to SinkV2 > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > Comments are welcome! > Thanks, > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > [2] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > [3] https://issues.apache.org/jira/browse/FLINK-25421
Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator
Hi Dawid, Thanks for the reply. I am currently looking at the Beam Flink runner, and there are quite some hacks the Beam runner has to do in order to deal with the backwards incompatible changes in AbstractStreamOperator and some of the classes exposed by it. Regardless of what we think, the fact is that AbstractStreamOperator is marked as PublicEvolving today, and our users use it. It is a basic rule of public API that anything exposed by a public interface should also be public. This is the direct motivation of this FLIP. Regarding the StreamTask / StreamConfig exposure, if you look at the StreamOperatorFactory which is also a PublicEvolving class, it actually exposes the StreamTask, StreamConfig as well as some other classes in the StreamOperatorParameters. So these classes are already exposed in multiple public APIs. Keeping our public API stability guarantee is really fundamental and critical to the users. With the current status of inconsistent API stability annotations, I don't see how can we assure of that. From what I can see, accidental backwards incompatible changes is likely going to keep happening. So I'd say let's see how to fix forward instead of doing nothing. BTW, Initially I thought this is just an accidental mismatch, but after further exam, it looks that it is a bigger issue. I guess one of the reasons we end up in this situation is that we haven't really thought it through regarding the boundary between framework and user space, i.e. what framework primitives we want to expose to the users. So instead we just expose a bunch of internal things and hope users only use the "stable" part of them. Thanks, Jiangjie (Becket) Qin On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz wrote: > Hi Becket, > > May I ask what is the motivation for the change? > > I'm really skeptical about making any of those classes `Public` or > `PublicEvolving`. As far as I am concerned there is no agreement in the > community if StreamOperator is part of the `Public(Evolving)` API. At > least I think it should not. I understand `AbstractStreamOperator` was > marked with `PublicEvolving`, but I am really not convinced it was the > right decision. > > The listed classes are not the only classes exposed to > `AbstractStreamOperator` that are `Internal` that break the consistency > and I am sure there is no question those should remain `Internal` such > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many more. > > As it stands I am strongly against giving any additional guarantees on > API stability to the classes there unless there is a good motivation for > a given feature and we're sure this is the best way to go forward. > > Thus I'm inclined to go with -1 on any proposal on changing annotations > for the sake of matching the one on `AbstractStreamOperator`. I might be > convinced to support requests to give better guarantees for well > motivated features that are now internal though. > > Best, > > Dawid > > On 12/01/2023 10:20, Becket Qin wrote: > > Hi flink devs, > > > > I'd like to start a discussion thread for FLIP-286[1]. > > > > As a recap, currently while AbstractStreamOperator is a class marked as > > @PublicEvolving, some classes exposed via its methods / fields are > > marked as @Internal. This FLIP wants to fix this inconsistency of > > stability / scope annotation. > > > > Comments are welcome! > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > [1] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880841 > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Gunnar, Thanks for your time and response... I think the problem you want to solve is the exposure of the ExecutionConfig (that can be mutated) no? The configuration is not mutated, we only need to know if objectReuse is enable. This is already expose on RuntimeContext we think to keep it similar to it to simplify any migrations, but as said, for this migration from ExecutionConfig we only need the isObjectReuseEnabled, and we could expose only this configuration.. Best regards, On 2023/01/13 15:50:09 Gunnar Morling wrote: > Hey Joao, > > Thanks for this FLIP! One question on the proposed interface changes: > is it expected that the configuration is *mutated* via the InitContext > passed to Sink::createWriter()? If that's not the case, how about > establishing a read-only contract representing the current > configuration and passing in that one instead? That would probably > deserve its own FLIP upon which yours here then would depend. Later > on, other contracts which effectively shouldn't modify a config could > use that one, too. > > Note I don't mean to stall your efforts here, but I thought it'd be a > good idea to bring it up and gauge the general interest in this. > > Best, > > --Gunnar > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > > > Hi flink devs, > > > > I'd like to start a discussion thread for FLIP-287[1]. > > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > > specially for the sink[3]. > > > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > > This changes are necessary to correct migrate the current sinks to SinkV2 > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > Comments are welcome! > > Thanks, > > > > [1] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > [2] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > [3] https://issues.apache.org/jira/browse/FLINK-25421 >
[jira] [Created] (FLINK-30681) Pulsar-Flink connector corrupts its output topic
Jacek Wislicki created FLINK-30681: -- Summary: Pulsar-Flink connector corrupts its output topic Key: FLINK-30681 URL: https://issues.apache.org/jira/browse/FLINK-30681 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.15.3 Reporter: Jacek Wislicki When PulsarSink writes a message to its output topic, the topic gets permanently corrupted and cannot be used anymore (even with newly created subscriptions). We have isolated this behaviour to a minimal project demonstrating the problem available on [GitHub|https://github.com/JacekWislicki/vp-test5]: # There are 2 topics: IN and OUT # IN is subscribed by a Flink's InToOutJob (with PulsarSource) and writes to OUT (with PulsarSink) # OUT is subscribed by a Pulsar's OutReadFunction # When we write directly to OUT (e.g., with OutTopicProducer), OutReadFunction gets each message from its backlog and processes it with no issue (the ledger position updates) # When we write to IN (e.g., with InTopicProducer), InToOutJob reads the message, processes it and writes to OUT # OutReadFunction reads the message, the ledger position updates, but nothing happens ## Further messages written to OUT are not read as OUT is blocked on the last message from Flink ## Truncating OUT does not help, neither does unsubscribing or creating a new subscription Reproduced with Pulsar 2.9.1, 2.9.2 and 2.10.2. The issue does not occur when we use our custom temporary old SinkFunction implementation based on a Pulsar producer writing to OUT. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator
Hi Becket, > It is a basic rule of public API that anything exposed by a public interface should also be public. I agree with this in general. Did you get an overview of where we currently violate this? Is this something that the Arc42 architecture tests could test for so that as a first measure we don't introduce more occurrences (cc @Ingo). Maybe its justified to make a pass over all of these occurrences and resolve these occurrences one way or another either making the members/parameters @PublicEvoling or actually making a class/method @Internal even if its was @PublicEvoling before. I think, this could be the better option than having @PublicEvolving classes/methods that really aren't. Cheers, Konstantin Am Fr., 13. Jan. 2023 um 17:02 Uhr schrieb Becket Qin : > Hi Dawid, > > Thanks for the reply. I am currently looking at the Beam Flink runner, and > there are quite some hacks the Beam runner has to do in order to deal with > the backwards incompatible changes in AbstractStreamOperator and some of > the classes exposed by it. Regardless of what we think, the fact is that > AbstractStreamOperator is marked as PublicEvolving today, and our users use > it. It is a basic rule of public API that anything exposed by a public > interface should also be public. This is the direct motivation of this > FLIP. > > Regarding the StreamTask / StreamConfig exposure, if you look at the > StreamOperatorFactory which is also a PublicEvolving class, it actually > exposes the StreamTask, StreamConfig as well as some other classes in the > StreamOperatorParameters. So these classes are already exposed in multiple > public APIs. > > Keeping our public API stability guarantee is really fundamental and > critical to the users. With the current status of inconsistent API > stability annotations, I don't see how can we assure of that. From what I > can see, accidental backwards incompatible changes is likely going to keep > happening. So I'd say let's see how to fix forward instead of doing > nothing. > > BTW, Initially I thought this is just an accidental mismatch, but after > further exam, it looks that it is a bigger issue. I guess one of the > reasons we end up in this situation is that we haven't really thought it > through regarding the boundary between framework and user space, i.e. what > framework primitives we want to expose to the users. So instead we just > expose a bunch of internal things and hope users only use the "stable" part > of them. > > Thanks, > > Jiangjie (Becket) Qin > > > On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz > wrote: > > > Hi Becket, > > > > May I ask what is the motivation for the change? > > > > I'm really skeptical about making any of those classes `Public` or > > `PublicEvolving`. As far as I am concerned there is no agreement in the > > community if StreamOperator is part of the `Public(Evolving)` API. At > > least I think it should not. I understand `AbstractStreamOperator` was > > marked with `PublicEvolving`, but I am really not convinced it was the > > right decision. > > > > The listed classes are not the only classes exposed to > > `AbstractStreamOperator` that are `Internal` that break the consistency > > and I am sure there is no question those should remain `Internal` such > > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many more. > > > > As it stands I am strongly against giving any additional guarantees on > > API stability to the classes there unless there is a good motivation for > > a given feature and we're sure this is the best way to go forward. > > > > Thus I'm inclined to go with -1 on any proposal on changing annotations > > for the sake of matching the one on `AbstractStreamOperator`. I might be > > convinced to support requests to give better guarantees for well > > motivated features that are now internal though. > > > > Best, > > > > Dawid > > > > On 12/01/2023 10:20, Becket Qin wrote: > > > Hi flink devs, > > > > > > I'd like to start a discussion thread for FLIP-286[1]. > > > > > > As a recap, currently while AbstractStreamOperator is a class marked as > > > @PublicEvolving, some classes exposed via its methods / fields are > > > marked as @Internal. This FLIP wants to fix this inconsistency of > > > stability / scope annotation. > > > > > > Comments are welcome! > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880841 > > > > > > -- https://twitter.com/snntrable https://github.com/knaufk
Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator
I don't have an overview of all the holes in our public API surface at the moment. It would be great if there's some tool to do a scan. In addition to fixing the annotation consistency formally, I think it is equally, if not more, important to see whether the public APIs we expose tell a good story. For example, if we say StreamConfig should be internal, some fair questions to ask is why our own AbstractStreamOperator needs it? Why does not a user-defined operator need it? Is there something in the StreamConfig we should expose as a public interface if not the entire class? Thanks, Jiangjie (Becket) Qin On Sat, Jan 14, 2023 at 5:36 AM Konstantin Knauf wrote: > Hi Becket, > > > It is a basic rule of public API that anything exposed by a public > interface should also be public. > > I agree with this in general. Did you get an overview of where we currently > violate this? Is this something that the Arc42 architecture tests could > test for so that as a first measure we don't introduce more occurrences > (cc @Ingo). > > Maybe its justified to make a pass over all of these occurrences and > resolve these occurrences one way or another either making the > members/parameters @PublicEvoling or actually making a class/method > @Internal even if its was @PublicEvoling before. I think, this could be the > better option than having @PublicEvolving classes/methods that really > aren't. > > Cheers, > > Konstantin > > Am Fr., 13. Jan. 2023 um 17:02 Uhr schrieb Becket Qin < > becket@gmail.com > >: > > > Hi Dawid, > > > > Thanks for the reply. I am currently looking at the Beam Flink runner, > and > > there are quite some hacks the Beam runner has to do in order to deal > with > > the backwards incompatible changes in AbstractStreamOperator and some of > > the classes exposed by it. Regardless of what we think, the fact is that > > AbstractStreamOperator is marked as PublicEvolving today, and our users > use > > it. It is a basic rule of public API that anything exposed by a public > > interface should also be public. This is the direct motivation of this > > FLIP. > > > > Regarding the StreamTask / StreamConfig exposure, if you look at the > > StreamOperatorFactory which is also a PublicEvolving class, it actually > > exposes the StreamTask, StreamConfig as well as some other classes in the > > StreamOperatorParameters. So these classes are already exposed in > multiple > > public APIs. > > > > Keeping our public API stability guarantee is really fundamental and > > critical to the users. With the current status of inconsistent API > > stability annotations, I don't see how can we assure of that. From what I > > can see, accidental backwards incompatible changes is likely going to > keep > > happening. So I'd say let's see how to fix forward instead of doing > > nothing. > > > > BTW, Initially I thought this is just an accidental mismatch, but after > > further exam, it looks that it is a bigger issue. I guess one of the > > reasons we end up in this situation is that we haven't really thought it > > through regarding the boundary between framework and user space, i.e. > what > > framework primitives we want to expose to the users. So instead we just > > expose a bunch of internal things and hope users only use the "stable" > part > > of them. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz > > > wrote: > > > > > Hi Becket, > > > > > > May I ask what is the motivation for the change? > > > > > > I'm really skeptical about making any of those classes `Public` or > > > `PublicEvolving`. As far as I am concerned there is no agreement in the > > > community if StreamOperator is part of the `Public(Evolving)` API. At > > > least I think it should not. I understand `AbstractStreamOperator` was > > > marked with `PublicEvolving`, but I am really not convinced it was the > > > right decision. > > > > > > The listed classes are not the only classes exposed to > > > `AbstractStreamOperator` that are `Internal` that break the consistency > > > and I am sure there is no question those should remain `Internal` such > > > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many > more. > > > > > > As it stands I am strongly against giving any additional guarantees on > > > API stability to the classes there unless there is a good motivation > for > > > a given feature and we're sure this is the best way to go forward. > > > > > > Thus I'm inclined to go with -1 on any proposal on changing annotations > > > for the sake of matching the one on `AbstractStreamOperator`. I might > be > > > convinced to support requests to give better guarantees for well > > > motivated features that are now internal though. > > > > > > Best, > > > > > > Dawid > > > > > > On 12/01/2023 10:20, Becket Qin wrote: > > > > Hi flink devs, > > > > > > > > I'd like to start a discussion thread for FLIP-286[1]. > > > > > > > > As a recap, currently while AbstractStreamOperator is a cla
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Joao, Thanks for bringing this up. Exposing internal domain instances depends on your requirements. Technically, it is even possible to expose the RuntimeContext [1] (must be considered very carefully). Since you mentioned that you only need to know if objectReuse is enabled, how about just expose isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to shrink the scope as small as possible to satisfy the requirement. If more information from ExecutionConfig is needed later, we still can refactor the code properly according to the strong motivation. Best regards, Jing [1] https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279 On Fri, Jan 13, 2023 at 6:19 PM João Boto wrote: > Hi Gunnar, > Thanks for your time and response... > > I think the problem you want to solve is the exposure of the > ExecutionConfig (that can be mutated) no? > The configuration is not mutated, we only need to know if objectReuse is > enable. > This is already expose on RuntimeContext we think to keep it similar to it > to simplify any migrations, but as said, for this migration from > ExecutionConfig we only need the isObjectReuseEnabled, and we could expose > only this configuration.. > > Best regards, > > > On 2023/01/13 15:50:09 Gunnar Morling wrote: > > Hey Joao, > > > > Thanks for this FLIP! One question on the proposed interface changes: > > is it expected that the configuration is *mutated* via the InitContext > > passed to Sink::createWriter()? If that's not the case, how about > > establishing a read-only contract representing the current > > configuration and passing in that one instead? That would probably > > deserve its own FLIP upon which yours here then would depend. Later > > on, other contracts which effectively shouldn't modify a config could > > use that one, too. > > > > Note I don't mean to stall your efforts here, but I thought it'd be a > > good idea to bring it up and gauge the general interest in this. > > > > Best, > > > > --Gunnar > > > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > > > > > Hi flink devs, > > > > > > I'd like to start a discussion thread for FLIP-287[1]. > > > This comes from an offline discussion with @Lijie Wang, from > FLIP-239[2] > > > specially for the sink[3]. > > > > > > Basically to expose the ExecutionConfig and JobId on > SinkV2#InitContext. > > > This changes are necessary to correct migrate the current sinks to > SinkV2 > > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > > > Comments are welcome! > > > Thanks, > > > > > > [1] > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > > [2] > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > >
Re: [DISCUSS] FLIP-286: Fix the API stability/scope annotation inconsistency in AbstractStreamOperator
Hi Becket, Speaking of tools, we have ArchUnit integrated in Flink. Extending the defined ArchRules [1] a little bit, you will get the wished scan result. [1] https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-architecture-tests/flink-architecture-tests-production/src/main/java/org/apache/flink/architecture/rules/ApiAnnotationRules.java#L99 Best regards, Jing On Sat, Jan 14, 2023 at 12:46 AM Becket Qin wrote: > I don't have an overview of all the holes in our public API surface at the > moment. It would be great if there's some tool to do a scan. > > In addition to fixing the annotation consistency formally, I think it is > equally, if not more, important to see whether the public APIs we expose > tell a good story. For example, if we say StreamConfig should be internal, > some fair questions to ask is why our own AbstractStreamOperator needs it? > Why does not a user-defined operator need it? Is there something in the > StreamConfig we should expose as a public interface if not the entire > class? > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Jan 14, 2023 at 5:36 AM Konstantin Knauf > wrote: > > > Hi Becket, > > > > > It is a basic rule of public API that anything exposed by a public > > interface should also be public. > > > > I agree with this in general. Did you get an overview of where we > currently > > violate this? Is this something that the Arc42 architecture tests could > > test for so that as a first measure we don't introduce more occurrences > > (cc @Ingo). > > > > Maybe its justified to make a pass over all of these occurrences and > > resolve these occurrences one way or another either making the > > members/parameters @PublicEvoling or actually making a class/method > > @Internal even if its was @PublicEvoling before. I think, this could be > the > > better option than having @PublicEvolving classes/methods that really > > aren't. > > > > Cheers, > > > > Konstantin > > > > Am Fr., 13. Jan. 2023 um 17:02 Uhr schrieb Becket Qin < > > becket@gmail.com > > >: > > > > > Hi Dawid, > > > > > > Thanks for the reply. I am currently looking at the Beam Flink runner, > > and > > > there are quite some hacks the Beam runner has to do in order to deal > > with > > > the backwards incompatible changes in AbstractStreamOperator and some > of > > > the classes exposed by it. Regardless of what we think, the fact is > that > > > AbstractStreamOperator is marked as PublicEvolving today, and our users > > use > > > it. It is a basic rule of public API that anything exposed by a public > > > interface should also be public. This is the direct motivation of this > > > FLIP. > > > > > > Regarding the StreamTask / StreamConfig exposure, if you look at the > > > StreamOperatorFactory which is also a PublicEvolving class, it actually > > > exposes the StreamTask, StreamConfig as well as some other classes in > the > > > StreamOperatorParameters. So these classes are already exposed in > > multiple > > > public APIs. > > > > > > Keeping our public API stability guarantee is really fundamental and > > > critical to the users. With the current status of inconsistent API > > > stability annotations, I don't see how can we assure of that. From > what I > > > can see, accidental backwards incompatible changes is likely going to > > keep > > > happening. So I'd say let's see how to fix forward instead of doing > > > nothing. > > > > > > BTW, Initially I thought this is just an accidental mismatch, but after > > > further exam, it looks that it is a bigger issue. I guess one of the > > > reasons we end up in this situation is that we haven't really thought > it > > > through regarding the boundary between framework and user space, i.e. > > what > > > framework primitives we want to expose to the users. So instead we just > > > expose a bunch of internal things and hope users only use the "stable" > > part > > > of them. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > On Fri, Jan 13, 2023 at 7:28 PM Dawid Wysakowicz < > dwysakow...@apache.org > > > > > > wrote: > > > > > > > Hi Becket, > > > > > > > > May I ask what is the motivation for the change? > > > > > > > > I'm really skeptical about making any of those classes `Public` or > > > > `PublicEvolving`. As far as I am concerned there is no agreement in > the > > > > community if StreamOperator is part of the `Public(Evolving)` API. At > > > > least I think it should not. I understand `AbstractStreamOperator` > was > > > > marked with `PublicEvolving`, but I am really not convinced it was > the > > > > right decision. > > > > > > > > The listed classes are not the only classes exposed to > > > > `AbstractStreamOperator` that are `Internal` that break the > consistency > > > > and I am sure there is no question those should remain `Internal` > such > > > > as e.g. StreamTask, StreamConfig, StreamingRuntimeContext and many > > more. > > > > > > > > As it stands I am strongly against giving any additiona
[jira] [Created] (FLINK-30682) FLIP-283: Use adaptive batch scheduler as default scheduler for batch jobs
JunRui Li created FLINK-30682: - Summary: FLIP-283: Use adaptive batch scheduler as default scheduler for batch jobs Key: FLINK-30682 URL: https://issues.apache.org/jira/browse/FLINK-30682 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: JunRui Li Fix For: 1.17.0 To further use the adaptive batch scheduler to improve flink's batch capability, in this FLIP we aim to make the adaptive batch scheduler as the default batch scheduler and optimize the current adaptive batch scheduler configuration. More details see [FLIP-283|https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30683) Make adaptive batch scheduler as the default batch scheduler
Junrui Li created FLINK-30683: - Summary: Make adaptive batch scheduler as the default batch scheduler Key: FLINK-30683 URL: https://issues.apache.org/jira/browse/FLINK-30683 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Junrui Li Fix For: 1.17.0 Based on the [FLIP-283|https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs], this issue mainly focuses on the first issue. This change proposes to make AdaptiveBatchScheduler as the default batch scheduler and user can use it without explicitly configuring it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30684) Use the default parallelism as a flag for vertices that can automatically derive parallelism.
Junrui Li created FLINK-30684: - Summary: Use the default parallelism as a flag for vertices that can automatically derive parallelism. Key: FLINK-30684 URL: https://issues.apache.org/jira/browse/FLINK-30684 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Junrui Li Fix For: 1.17.0 This change proposes to add a *parallelismConfigured* property as a flag to identify whether the parallelism of node is used "parallelism.default" or not. If the vertex's *parallelismConfigured* is true, the AdaptiveBatchScheduler will not automatically deciding parallelisms for it. Otherwise, AdaptiveBatchScheduler will automatically deciding parallelisms and use the "parallelism.default" as an alternative value for the "jobmanager.adaptive-batch-scheduler.max-parallelism". This change will make user do not need to configure "parallelism.default" as "-1" to automatically deciding parallelisms for vertices. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30685) Support mark the transformations whose parallelism is infected by the input transformation
Junrui Li created FLINK-30685: - Summary: Support mark the transformations whose parallelism is infected by the input transformation Key: FLINK-30685 URL: https://issues.apache.org/jira/browse/FLINK-30685 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Junrui Li Fix For: 1.17.0 In order to chain operators together as much as possible, many downstream operators will use the parallelism of upstream input operators in the table planner. If some operators need to have their own defined parallelism, the parallelism will be explicitly set. Therefore, the operator that takes the parallelism of the upstream operator as its own parallelism should be automatically derived by the AdaptiveBatchScheduler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30686) Simplify the configuration of adaptive batch scheduler
Junrui Li created FLINK-30686: - Summary: Simplify the configuration of adaptive batch scheduler Key: FLINK-30686 URL: https://issues.apache.org/jira/browse/FLINK-30686 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Junrui Li Fix For: 1.17.0 Based on the [FLIP-283|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs]], this issue mainly focuses on the second issue. This change includes three parts: 1.Introduce "execution.batch.adaptive.auto-parallelism.enabled" as a switch for automatic parallelism derivation 2.Modify adaptive batch scheduler configuration default values 3.Rename the configuration of adaptive batch scheduler -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source
+1 for this proposal and thanks Qingsheng for driving this. Considering the interval, we also set the value as 5min, equivalent to the default value of metadata.max.age.ms. Best Yun Tang From: Benchao Li Sent: Friday, January 13, 2023 23:06 To: dev@flink.apache.org Subject: Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source +1, we've enabled this by default (10mins) in our production for years. Jing Ge 于2023年1月13日周五 22:22写道: > +1 for the proposal that makes users' daily work easier and therefore makes > Flink more attractive. > > Best regards, > Jing > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren wrote: > > > Thanks everyone for joining the discussion! > > > > @Martijn: > > > > > All newly discovered partitions will be consumed from the earliest > offset > > possible. > > > > Thanks for the reminder! I checked the logic of KafkaSource and found > that > > new partitions will start from the offset initializer specified by the > user > > instead of the earliest. We need to correct this behavior to avoid > dropping > > messages from new partitions. > > > > > Job restarts from checkpoint > > > > I think the current logic guarantees the exactly-once semantic. New > > partitions created after the checkpoint will be re-discovered again and > > picked up by the source. > > > > @John: > > > > > If you want to be a little conservative with the default, 5 minutes > might > > be better than 30 seconds. > > > > Thanks for the suggestion! I tried to find the equivalent config in Kafka > > but missed it. It would be neat to align with the default value of " > > metadata.max.age.ms". > > > > @Gabor: > > > > > removed partition handling is not yet added > > > > There was a detailed discussion about removing partitions [1] but it > looks > > like this is not an easy task considering the potential data loss and > state > > inconsistency. I'm afraid there's no clear plan on this one and maybe we > > could trigger a new discussion thread about how to correctly handle > removed > > partitions. > > > > [1] https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt > > > > Best regards, > > Qingsheng > > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi > > > wrote: > > > > > +1 on the overall direction, it's an important feature. > > > > > > I've had a look on the latest master and looks like removed partition > > > handling is not yet added but I think this is essential. > > > > > > > > > > > > https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 > > > > > > If a partition all of a sudden disappears then it could lead to data > > loss. > > > Are you planning to add it? > > > If yes then when? > > > > > > G > > > > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler > > wrote: > > > > > > > Thanks for this proposal, Qingsheng! > > > > > > > > If you want to be a little conservative with the default, 5 minutes > > might > > > > be better than 30 seconds. > > > > > > > > The equivalent config in Kafka seems to be metadata.max.age.ms ( > > > > > > > > > > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms > > > ), > > > > which has a default value of 5 minutes. > > > > > > > > Other than that, I’m in favor. I agree, this should be on by default. > > > > > > > > Thanks again, > > > > John > > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: > > > > > Thanks Qingsheng for driving this, enable the dynamic partition > > > > > discovery would be very useful for kafka topic scale partitions > > > > > scenarios. > > > > > > > > > > +1 for the change. > > > > > > > > > > CC: Becket > > > > > > > > > > > > > > > Best, > > > > > Leonard > > > > > > > > > > > > > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu wrote: > > > > >> > > > > >> +1 for the change. I think this is beneficial for users and is > > > > compatible. > > > > >> > > > > >> Best, > > > > >> Jark > > > > >> > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 wrote: > > > > >> > > > > > > > > +1 for this idea, we have enabled kafka dynamic partition > > discovery > > > in > > > > >>> all > > > > jobs. > > > > > > > > > > > > >>> > > > > > > > > > > -- Best, Benchao Li
[jira] [Created] (FLINK-30687) FILTER not effect in count(*)
tanjialiang created FLINK-30687: --- Summary: FILTER not effect in count(*) Key: FLINK-30687 URL: https://issues.apache.org/jira/browse/FLINK-30687 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0 Reporter: tanjialiang When i try to using Flink SQL like this {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} I found 'FILTER(WHERE class_id = 1)' is not effect. But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)