Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink
Hi Panagiotis, Thanks for updating the FLIP. > Regarding the config option `jobmanager.failure-enricher-plugins.enabled` I think a config option `jobmanager.failure-enrichers`, which accepts the names of enrichers to use, may be better. It allows the users to deploy and use the plugins in a more flexible way. The default value of the config can be none, which means failure enrichment will be disabled by default. A reference can be the config option `metrics.reporters` which helps to load metric reporter plugins. Thanks, Zhu Panagiotis Garefalakis 于2023年4月10日周一 03:47写道: > > Hello again everyone, > > FLIP is now updated based on our discussion! > In short, FLIP-304 [1] proposes the addition of a pluggable interface that > will allow users to add custom logic and enrich failures with custom > metadata labels. > While as discussed, custom restart strategies will be part of a different > effort. Every pluggable FaulireEnricher: > >- Is triggered on every global/non-global failure >- Receives a Throwable cause and an immutable Context >- Performs asynchronous execution (separate IoExecutor) to avoid >blocking the main thread for RPCs >- Is completely independent from other Enrichers >- Emits failure labels/tags for its unique, pre-defined keys (defined at >startup time) > > > Check the link for implementation details and please let me know what you > think :) > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers > > > Panagiotis > > > On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu wrote: > > > Hi Panagiotis, > > > > How about to introduce a config option to control which error handling > > plugins should be used? It is more flexible for deployments. Additionally, > > it can also enable users to explicitly specify the order that the plugins > > take effects. > > > > Thanks, > > Zhu > > > > Gen Luo 于2023年3月27日周一 15:02写道: > > > > > > Thanks for the summary! > > > > > > Also +1 to support custom restart strategies in a different FLIP, > > > as long as we can make sure that the plugin interface won't be > > > changed when the restart strategy interface is introduced. > > > > > > To achieve this, maybe we should think well how the handler > > > would cooperate with the restart strategy, like would it executes b > > > efore the strategy (e.g. some strategy may use the tag), or after > > > it (e.g. some metric reporting handler may use the handling result). > > > Though we can implement in one way, and extend if the other is > > > really necessary by someone. > > > > > > Besides, instead of using either of the names, shall we just make > > > them two subclasses named FailureEnricher and FailureListener? > > > The former executes synchronously and can modify the context, > > > while the latter executes asynchronously and has a read-only view > > > of context. In this way we can make sure a handler behaves in > > > the expected way. > > > > > > > > > On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu wrote: > > > > > > > +1 to support custom restart strategies in a different FLIP. > > > > > > > > It's fine to have a different plugin for custom restart strategy. > > > > If so, since we do not treat the FLIP-304 plugin as a common failure > > > > handler, but instead mainly targets to add labels to errors, I would > > > > +1 for the name `FailureEnricher`. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > David Morávek 于2023年3月23日周四 15:51写道: > > > > > > > > > > > > > > > > > One additional remark on introducing it as an async operation: We > > would > > > > > > need a new configuration parameter to define the timeout for such a > > > > > > listener call, wouldn't we? > > > > > > > > > > > > > > > > This could be left up to the implementor to handle. > > > > > > > > > > What about adding an extra method getNamespace() to the Listener > > > > interface > > > > > > which returns an Optional. > > > > > > > > > > > > > > > > I'd avoid mixing an additional concept into this. We can simply have > > a > > > > new > > > > > method that returns a set of keys the listener can output. We can > > > > validate > > > > > this at the JM startup time and fail fast (since it's a configuration > > > > > error) if there is an overlap. If the listener outputs the key that > > is > > > > not > > > > > allowed to, I wouldn't be afraid to call into a fatal error handler > > since > > > > > it's an invalid implementation. > > > > > > > > > > Best, > > > > > D. > > > > > > > > > > On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl > > > > > wrote: > > > > > > > > > > > Sounds good. Two points I want to add: > > > > > > > > > > > >- Listener execution should be independent — however we need a > > way > > > > to > > > > > > > enforce a Label key/key-prefix is only assigned to a single > > Listener, > > > > > > > thinking of a validation step both at Listener init and runtime > > > > stages > > > > > > > > > > > > > What about adding an extra method getNamespace() to the Listener > >
[jira] [Created] (FLINK-31768) Create FLIP-285 interfaces without changing the behavior
Matthias Pohl created FLINK-31768: - Summary: Create FLIP-285 interfaces without changing the behavior Key: FLINK-31768 URL: https://issues.apache.org/jira/browse/FLINK-31768 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink
Hi Panagiotis, Thank you for the update. Looks great! Just one suggestion below: 1. We seem to be waiting for the future(s) to complete before restarting the job - should we add a configurable timeout for the enrichment? Since each failure enricher are run in parallel, we could probably settle for 1 timeout for all failure handlers. 2. +1 to Zhu’s comment on adding a comma separated list of FailureHandlers instead of boolean toggle! Other than the above, the FLIP looks great! Thank you for your efforts. Regards, Hong > On 11 Apr 2023, at 08:01, Zhu Zhu wrote: > > CAUTION: This email originated from outside of the organization. Do not click > links or open attachments unless you can confirm the sender and know the > content is safe. > > > > Hi Panagiotis, > > Thanks for updating the FLIP. > >> Regarding the config option `jobmanager.failure-enricher-plugins.enabled` > I think a config option `jobmanager.failure-enrichers`, which accepts > the names of enrichers to use, may be better. It allows the users to > deploy and use the plugins in a more flexible way. The default value > of the config can be none, which means failure enrichment will be > disabled by default. > A reference can be the config option `metrics.reporters` which helps > to load metric reporter plugins. > > Thanks, > Zhu > > Panagiotis Garefalakis 于2023年4月10日周一 03:47写道: >> >> Hello again everyone, >> >> FLIP is now updated based on our discussion! >> In short, FLIP-304 [1] proposes the addition of a pluggable interface that >> will allow users to add custom logic and enrich failures with custom >> metadata labels. >> While as discussed, custom restart strategies will be part of a different >> effort. Every pluggable FaulireEnricher: >> >> - Is triggered on every global/non-global failure >> - Receives a Throwable cause and an immutable Context >> - Performs asynchronous execution (separate IoExecutor) to avoid >> blocking the main thread for RPCs >> - Is completely independent from other Enrichers >> - Emits failure labels/tags for its unique, pre-defined keys (defined at >> startup time) >> >> >> Check the link for implementation details and please let me know what you >> think :) >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers >> >> >> Panagiotis >> >> >> On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu wrote: >> >>> Hi Panagiotis, >>> >>> How about to introduce a config option to control which error handling >>> plugins should be used? It is more flexible for deployments. Additionally, >>> it can also enable users to explicitly specify the order that the plugins >>> take effects. >>> >>> Thanks, >>> Zhu >>> >>> Gen Luo 于2023年3月27日周一 15:02写道: Thanks for the summary! Also +1 to support custom restart strategies in a different FLIP, as long as we can make sure that the plugin interface won't be changed when the restart strategy interface is introduced. To achieve this, maybe we should think well how the handler would cooperate with the restart strategy, like would it executes b efore the strategy (e.g. some strategy may use the tag), or after it (e.g. some metric reporting handler may use the handling result). Though we can implement in one way, and extend if the other is really necessary by someone. Besides, instead of using either of the names, shall we just make them two subclasses named FailureEnricher and FailureListener? The former executes synchronously and can modify the context, while the latter executes asynchronously and has a read-only view of context. In this way we can make sure a handler behaves in the expected way. On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu wrote: > +1 to support custom restart strategies in a different FLIP. > > It's fine to have a different plugin for custom restart strategy. > If so, since we do not treat the FLIP-304 plugin as a common failure > handler, but instead mainly targets to add labels to errors, I would > +1 for the name `FailureEnricher`. > > Thanks, > Zhu > > David Morávek 于2023年3月23日周四 15:51写道: >> >>> >>> One additional remark on introducing it as an async operation: We >>> would >>> need a new configuration parameter to define the timeout for such a >>> listener call, wouldn't we? >>> >> >> This could be left up to the implementor to handle. >> >> What about adding an extra method getNamespace() to the Listener > interface >>> which returns an Optional. >>> >> >> I'd avoid mixing an additional concept into this. We can simply have >>> a > new >> method that returns a set of keys the listener can output. We can > validate >> this at the JM startup time and fail fast (since it's a configuration >> error) if there is an overlap. If the l
[jira] [Created] (FLINK-31769) Add percentiles to aggregated metrics
Zhanghao Chen created FLINK-31769: - Summary: Add percentiles to aggregated metrics Key: FLINK-31769 URL: https://issues.apache.org/jira/browse/FLINK-31769 Project: Flink Issue Type: Improvement Components: Autoscaler, Runtime / Metrics Reporter: Zhanghao Chen Attachments: image-2023-04-11-15-11-51-471.png *Background* Currently only min/avg/max of metrics are exposed via REST API. Flink Autoscaler relies on these aggregated metrics to make predictions, and the type of aggregation plays an import role. [FLINK-30652] Use max busytime instead of average to compute true processing rate - ASF JIRA (apache.org) suggests that using max aggregator instead of avg of busy time can handle data skew more robustly. However, we found that for large-scale jobs, using max aggregation may be too sensitive. As a result, the true processing rate is underestimated with severe turbulence. The graph below is the true processing rate estimated with different aggregators of a real production data transmission job with a parallelism of 750. !image-2023-04-11-15-11-51-471.png! *Proposal* Add percentiles (p50, p90, p99) to aggregated metrics. Apache common maths can be used for computing that. A follow up would be making Flink autoscaler make use of the new aggregators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31770) OracleExactlyOnceSinkE2eTest.testInsert fails for JDBC connector
Martijn Visser created FLINK-31770: -- Summary: OracleExactlyOnceSinkE2eTest.testInsert fails for JDBC connector Key: FLINK-31770 URL: https://issues.apache.org/jira/browse/FLINK-31770 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: jdbc-3.1.0 Reporter: Martijn Visser {code:java} Caused by: org.apache.flink.util.FlinkRuntimeException: unable to start XA transaction, xid: 201:cea0dbd44c6403283f4050f627bed37c0200:e0070697, error -3: resource manager error has occurred. [XAErr (-3): A resource manager error has occured in the transaction branch. ORA-2045 SQLErr (0)] at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:369) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:67) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:301) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:340) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:280) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:170) at org.apache.flink.connector.jdbc.xa.XaFacadePoolingImpl.start(XaFacadePoolingImpl.java:84) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:316) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.open(JdbcXaSinkFunction.java:241) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) {code} https://github.com/apache/flink-connector-jdbc/actions/runs/4647776511/jobs/8224977183#step:13:325 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
Hi, ron. 1: Considering for deleting rows, Flink will also write delete record to achive purpose of deleting data, it may not as so strange for connector devs to make DynamicTableSink implement SupportsTruncate to support truncate the table. Based on the assume that DynamicTableSink is used for inserting/updating/deleting, I think it's reasonable for DynamicTableSink to implement SupportsTruncate. But I think it sounds reasonable to add a generic interface like DynamicTable to differentiate DynamicTableSource & DynamicTableSink. But it will definitely requires much design and discussion which deserves a dedicated FLIP. I perfer not to do that in this FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe we can discuss it if some day if we do need the new generic table interface. 2: Considering various catalogs and tables, it's hard for Flink to do the unified follow-up actions after truncating table. But still the external connector can do such follow-up actions in method `executeTruncation`. Btw, in Spark, for the newly truncate table interface[1], Spark only recaches the table after truncating table[2] which I think if Flink supports table cache in framework-level, we can also recache in framework-level for truncate table statement. [1] https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java [2] https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala I think the external catalog can implemnet such logic in method `executeTruncation`. Best regards, Yuxia - 原始邮件 - 发件人: "liu ron" 收件人: "dev" 发送时间: 星期二, 2023年 4 月 11日 上午 10:51:36 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement Hi, xia It's a nice improvement to support TRUNCATE TABLE statement, making Flink more feature-rich. I think the truncate syntax is a command that will be executed in the client's process, rather than pulling up a Flink job to execute on the cluster. So on the user-facing exposed interface, I think we should not let users implement the SupportsTruncate interface on the DynamicTableSink interface. This seems a bit strange and also confuses users, as hang said, why Source table does not support truncate. It would be nice if we could come up with a generic interface that supports truncate instead of binding it to the DynamicTableSink interface, and maybe in the future we will support more commands like truncate command. In addition, after truncating data, we may also need to update the metadata of the table, such as Hive table, we need to update the statistics, as well as clear the cache in the metastore, I think we should also consider these capabilities, Sparky has considered these, refer to https://github.com/apache/spark/blob/69dd20b5e45c7e3533efbfdc1974f59931c1b781/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala#L573 . Best, Ron Jim Hughes 于2023年4月11日周二 02:15写道: > Hi Yuxia, > > On Mon, Apr 10, 2023 at 10:35 AM yuxia > wrote: > > > Hi, Jim. > > > > 1: I'm expecting all DynamicTableSinks to support. But it's hard to > > support all at one shot. For the DynamicTableSinks that haven't > implemented > > SupportsTruncate interface, we'll throw exception > > like 'The truncate statement for the table is not supported as it hasn't > > implemented the interface SupportsTruncate'. Also, for some sinks that > > doesn't support deleting data, it can also implements it but throw more > > concrete exception like "xxx donesn't support to truncate a table as > delete > > is impossible for xxx". It depends on the external connector's > > implementation. > > Thanks for your advice, I updated it to the FLIP. > > > > Makes sense. > > > > 2: What do you mean by saying "truncate an input to a streaming query"? > > This FLIP is aimed to support TRUNCATE TABLE statement which is for > > truncating a table. In which case it will inoperates with streaming > queries? > > > > Let's take a source like Kafka as an example. Suppose I have an input > topic Foo, and query which uses it as an input. > > When Foo is truncated, if the truncation works as a delete and create, then > the connector may need to be made aware (otherwise it may try to use > offsets from the previous topic). On the other hand, one may have to ask > Kafka to delete records up to a certain point. > > Also, savepoints for the query may contain information from the truncated > table. Should this FLIP involve invalidating that information in some > manner? Or does truncating a source table for a query cause undefined > behavior on that query? > > Basically, I'm trying to think through the implementations of a truncate > operation to streaming sources and queries. > > Cheers, > > Jim > > > > Best regards, > > Yuxia > > > > - 原始邮件 - > > 发件人: "Jim Hughes" > > 收件人: "
Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints
Hi Rui Fan, Thanks for your comments! > (1) The temporary segment will remain in the physical file for a short time, > right? Yes, any written segment will remain in the physical file until the physical file is deleted. It is controlled by the reference counting. And as discussed in 4.7, this will result in a space amplification problem. > (2) Is subtask granularity confused with shared state? Merging files at granularity of subtask is a general solution for shared states, considering the file may be reused by the following checkpoint after job restore. This design is applicable to sst files and any other shared states that may arise in the future. However, the DSTL files are a special case of shared states, since these files will no longer be shared after job restore. Therefore, we may do an optimization for these files and merge them at the TM level. Currently, the DSTL files are not in the shared directory of checkpoint storage, and I suggest we keep it as it is. I agree that this may bring in some confusion, and I suggest the FLIP mainly discuss the general situation and list the special situations separately without bringing in new concepts. I will add another paragraph describing the file merging for DSTL files. WDYT? > (3) When rescaling, do all shared files need to be copied? I agree with you that only sst files of the base DB need to be copied (or re-uploaded in the next checkpoint). However, section 4.2 simplifies file copying issues (copying all files), following the concept of shared state. > (4) Does the space magnification ratio need a configuration option? Thanks for the reminder, I will add an option in this FLIP. > (5) How many physical files can a TM write at the same checkpoint at the same > time? This is a very good point. Actually, there is a file reuse pool as section 4.6 described. There could be multiple files within this pool, supporting concurrent writing by multiple writers. I suggest providing two configurations to control the file number: state.checkpoints.file-merging.max-file-pool-size: Specifies the upper limit of the file pool size. state.checkpoints.file-merging.max-subtasks-per-file: Specifies the lower limit of the file pool size based on the number of subtasks within each TM. The number of simultaneously open files is controlled by these two options, and the first option takes precedence over the second. WDYT? Thanks a lot for your valuable insight. Best regards, Zakelly On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi all, > > Thanks Zakelly driving this proposal, and thank you all for > the warm discussions. It's really a useful feature. > > I have a few questions about this FLIP. > > (1) The temporary segment will remain in the physical file for > a short time, right? > > FLIP proposes to write segments instead of physical files. > If the physical files are written directly, these temporary files > will be deleted after the checkpoint is aborted. When writing > a segment, how to delete the temporary segment? > Decrement the reference count value by 1? > > (2) Is subtask granularity confused with shared state? > > From the "4.1.2 Merge files within a subtask or a TM" part, > based on the principle of sst files, it is concluded that > "For shared states, files are merged within each subtask." > > I'm not sure whether this conclusion is general or just for sst. > As Yanfei mentioned before: > > > DSTL files are shared between checkpoints, and are > > currently merged in batches at the task manager level. > > DSTL files as the shared state in FLIP-306, however, it > would be better to merge at TM granularity. So, I'm not > sure whether the subtask granularity confused with > shared state? > > And I'm not familiar with DSTL file merging, should > shared state be divided into shared subtask state > and shared TM state? > > (3) When rescaling, do all shared files need to be copied? > > From the "4.2 Rescaling and Physical File Lifecycle" part, > I see a lot of file copying. > > As I understand, only sst files of the baseDB need to be copied. > From the restore code[1], when restoreWithRescaling, flink will > init a base DB instance, read all contents from other temporary > rocksdb instances, and write them into the base DB, and then > the temporary rocksdb instance will be discarded. > > So, I think copying the files of the base rocksdb is enough, and > the files of other rocksdb instances aren't used. > > Or do not copy any files during recovery, upload all sst files > at the first checkpoint. > > (4) Does the space magnification ratio need a configuration option? > > From the step1 of "4.7 Space amplification" part, I see: > > > Checking whether the space amplification of each file is greater than a > preset threshold and collecting files that exceed the threshold for > compaction. > > Should we add a configuration option about the compaction threshold? > I didn't see it at "5. Public interfaces and User Cases" pa
[jira] [Created] (FLINK-31771) Improve select available slot from SlotPool
Weihua Hu created FLINK-31771: - Summary: Improve select available slot from SlotPool Key: FLINK-31771 URL: https://issues.apache.org/jira/browse/FLINK-31771 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Weihua Hu DefaultScheduler will request slots from SlotPool for tasks one by one. For each task, the PhysicalSlotProviderImpl#tryAllocateFromAvailable will retrieve all available slots from DefaultAllocatedSlotPool#getFreeSlotsInformation, and then select the best slot by SlotSelectionStrategy. Currently DefaultAllocatedSlotPool#getFreeSlotsInformation always calculates the taskExecutorUtilization. This causes task schedules to be too slow when there are lots of slots, such as 2 slots total. But only the EvenlySpreadOutLocationPreferenceSlotSelectionStrategy uses this utilization. So I would like to move the calculation of taskExecutorUtilization to usage. DefaultAllocatedSlotPool provides a function: getTaskExecutorUtilization, and is only used in EvenlySpreadOutLocationPreferenceSlotSelectionStrategy. This change could reduce the latency of allocated 2 slots from 72s to 12s in my local IDE. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31772) AsyncSinkWriter Performance regression due to AIMD rate limiting strategy
Ahmed Hamdy created FLINK-31772: --- Summary: AsyncSinkWriter Performance regression due to AIMD rate limiting strategy Key: FLINK-31772 URL: https://issues.apache.org/jira/browse/FLINK-31772 Project: Flink Issue Type: Bug Components: Connectors / Common, Connectors / Firehose, Connectors / Kinesis Affects Versions: aws-connector-4.1.0, aws-connector-4.0.0, 1.16.1, aws-connector-3.0.0, 1.16.0 Reporter: Ahmed Hamdy Fix For: 1.16.2, aws-connector-4.2.0 Attachments: Screenshot 2023-04-11 at 12.56.10.png, Screenshot 2023-04-11 at 12.58.09.png, Screenshot 2023-04-11 at 13.01.47.png h1. Issue While benchmarking the {{KinesisStreamSink}} for 1.15 against the legacy {{FlinkKinesisProduced}} , it is observed that the new sink has a performance regression against the deprecated sink for same environment setting. Further investigation identified that the AIMD Ratelimiting strategy is the bottleneck for the regression. Attached results for {{KinesisStreamSink}} against {FlinkKinesisProducer} and {KinesisStreamSink} after disabling {{AIMDRatelimitingStrategy}} h2. Environment Settings - Benchmarking was performed on AWS KDA. - Application logic is just sending records downstream - Application parallelism was tested to be 1. - Kinesis stream number of shards was tested with 8 and 12. - payload size was 1Kb and 100Kb. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31773) Introduces LeaderElection sub-interface
Matthias Pohl created FLINK-31773: - Summary: Introduces LeaderElection sub-interface Key: FLINK-31773 URL: https://issues.apache.org/jira/browse/FLINK-31773 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints
Hi Zakelly, Since we already had some discussions on this topic in the doc I mentioned, could you please describe the difference in your FLIP? I think we should better have a comparing table across different options just like the doc wrote. And we could also list some of them in your Rejected Alternatives part. Best Yun Tang From: Zakelly Lan Sent: Tuesday, April 11, 2023 17:57 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints Hi Rui Fan, Thanks for your comments! > (1) The temporary segment will remain in the physical file for a short time, > right? Yes, any written segment will remain in the physical file until the physical file is deleted. It is controlled by the reference counting. And as discussed in 4.7, this will result in a space amplification problem. > (2) Is subtask granularity confused with shared state? Merging files at granularity of subtask is a general solution for shared states, considering the file may be reused by the following checkpoint after job restore. This design is applicable to sst files and any other shared states that may arise in the future. However, the DSTL files are a special case of shared states, since these files will no longer be shared after job restore. Therefore, we may do an optimization for these files and merge them at the TM level. Currently, the DSTL files are not in the shared directory of checkpoint storage, and I suggest we keep it as it is. I agree that this may bring in some confusion, and I suggest the FLIP mainly discuss the general situation and list the special situations separately without bringing in new concepts. I will add another paragraph describing the file merging for DSTL files. WDYT? > (3) When rescaling, do all shared files need to be copied? I agree with you that only sst files of the base DB need to be copied (or re-uploaded in the next checkpoint). However, section 4.2 simplifies file copying issues (copying all files), following the concept of shared state. > (4) Does the space magnification ratio need a configuration option? Thanks for the reminder, I will add an option in this FLIP. > (5) How many physical files can a TM write at the same checkpoint at the same > time? This is a very good point. Actually, there is a file reuse pool as section 4.6 described. There could be multiple files within this pool, supporting concurrent writing by multiple writers. I suggest providing two configurations to control the file number: state.checkpoints.file-merging.max-file-pool-size: Specifies the upper limit of the file pool size. state.checkpoints.file-merging.max-subtasks-per-file: Specifies the lower limit of the file pool size based on the number of subtasks within each TM. The number of simultaneously open files is controlled by these two options, and the first option takes precedence over the second. WDYT? Thanks a lot for your valuable insight. Best regards, Zakelly On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi all, > > Thanks Zakelly driving this proposal, and thank you all for > the warm discussions. It's really a useful feature. > > I have a few questions about this FLIP. > > (1) The temporary segment will remain in the physical file for > a short time, right? > > FLIP proposes to write segments instead of physical files. > If the physical files are written directly, these temporary files > will be deleted after the checkpoint is aborted. When writing > a segment, how to delete the temporary segment? > Decrement the reference count value by 1? > > (2) Is subtask granularity confused with shared state? > > From the "4.1.2 Merge files within a subtask or a TM" part, > based on the principle of sst files, it is concluded that > "For shared states, files are merged within each subtask." > > I'm not sure whether this conclusion is general or just for sst. > As Yanfei mentioned before: > > > DSTL files are shared between checkpoints, and are > > currently merged in batches at the task manager level. > > DSTL files as the shared state in FLIP-306, however, it > would be better to merge at TM granularity. So, I'm not > sure whether the subtask granularity confused with > shared state? > > And I'm not familiar with DSTL file merging, should > shared state be divided into shared subtask state > and shared TM state? > > (3) When rescaling, do all shared files need to be copied? > > From the "4.2 Rescaling and Physical File Lifecycle" part, > I see a lot of file copying. > > As I understand, only sst files of the baseDB need to be copied. > From the restore code[1], when restoreWithRescaling, flink will > init a base DB instance, read all contents from other temporary > rocksdb instances, and write them into the base DB, and then > the temporary rocksdb instance will be discarded. > > So, I think copying the files of the base rocksdb is enough, and > the files of other rocksdb instances aren't u
[jira] [Created] (FLINK-31774) Add document for delete and update statement
Aitozi created FLINK-31774: -- Summary: Add document for delete and update statement Key: FLINK-31774 URL: https://issues.apache.org/jira/browse/FLINK-31774 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi I do not find the declaration about the usage of DELETE and UPDATE statement in the SQL section. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] Release flink-connector-kafka, release candidate #1
Hi all, Martijn and I discussed offline to cancel this vote. Moreover, now that Flink 1.17 is out and we still haven't released anything yet for the newly externalized Kafka connector, we've decided to skip releasing a version that matches with Flink 1.16 all together, and instead go straight to supporting Flink 1.17 for our first release. Practically this means: 1. The code as of branch `flink-connector-kafka:v4.0` will be re-versioned as `v3.0` and that will be the actual first release of flink-connector-kafka. 2. v3.0.0 will be the first release of `flink-connector-kafka` and it will initially support Flink 1.17.x series. I'm happy to drive the release efforts for this and will create a new RC shortly over the next day or two. Thanks, Gordon On Wed, Apr 5, 2023 at 9:32 PM Mason Chen wrote: > +1 for new RC! > > Best, > Mason > > On Tue, Apr 4, 2023 at 11:32 AM Tzu-Li (Gordon) Tai > wrote: > > > Hi all, > > > > I've ported the critical fixes I mentioned to v3.0 and v4.0 branches of > > apache/flink-connector-kafka now. > > > > @martijnvis...@apache.org let me know if > you'd > > need help with creating a new RC, if there's too much to juggle on > > your end. Happy to help out. > > > > Thanks, > > Gordon > > > > On Sun, Apr 2, 2023 at 11:21 PM Konstantin Knauf > > wrote: > > > > > +1. Thanks, Gordon! > > > > > > Am Mo., 3. Apr. 2023 um 06:37 Uhr schrieb Tzu-Li (Gordon) Tai < > > > tzuli...@apache.org>: > > > > > > > Hi Martijn, > > > > > > > > Since this RC vote was opened, we had three critical bug fixes that > was > > > > merged for the Kafka connector: > > > > > > > >- https://issues.apache.org/jira/browse/FLINK-31363 > > > >- https://issues.apache.org/jira/browse/FLINK-31305 > > > >- https://issues.apache.org/jira/browse/FLINK-31620 > > > > > > > > Given the severity of these issues (all of them are violations of > > > > exactly-once semantics), and the fact that they are currently not > > > included > > > > yet in any released version, do you think it makes sense to cancel > this > > > RC > > > > in favor of a new one that includes these? > > > > Since this RC vote has been stale for quite some time already, it > > doesn't > > > > seem like we're throwing away too much effort that has already been > > done > > > if > > > > we start a new RC with these critical fixes included. > > > > > > > > What do you think? > > > > > > > > Thanks, > > > > Gordon > > > > > > > > On Thu, Feb 9, 2023 at 3:26 PM Tzu-Li (Gordon) Tai < > > tzuli...@apache.org> > > > > wrote: > > > > > > > > > +1 (binding) > > > > > > > > > > - Verified legals (license headers and root LICENSE / NOTICE file). > > > > AFAICT > > > > > no dependencies require explicit acknowledgement in the NOTICE > files. > > > > > - No binaries in staging area > > > > > - Built source with tests > > > > > - Verified signatures and hashes > > > > > - Web PR changes LGTM > > > > > > > > > > Thanks Martijn! > > > > > > > > > > Cheers, > > > > > Gordon > > > > > > > > > > On Mon, Feb 6, 2023 at 6:12 PM Mason Chen > > > > wrote: > > > > > > > > > >> That makes sense, thanks for the clarification! > > > > >> > > > > >> Best, > > > > >> Mason > > > > >> > > > > >> On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser < > > > martijnvis...@apache.org > > > > > > > > > >> wrote: > > > > >> > > > > >> > Hi Mason, > > > > >> > > > > > >> > Thanks, [4] is indeed a copy-paste error and you've made the > right > > > > >> > assumption that > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/ > > > > >> > is the correct maven central link. > > > > >> > > > > > >> > I think we should use FLINK-30052 to move the Kafka connector > code > > > > from > > > > >> the > > > > >> > 1.17 release also over the Kafka connector repo (especially > since > > > > >> there's > > > > >> > now a v3.0 branch for the Kafka connector, so it can be merged > in > > > > main). > > > > >> > When those commits have been merged, we can make a next Kafka > > > > connector > > > > >> > release (which is equivalent to the 1.17 release, which can only > > be > > > > done > > > > >> > when 1.17 is done because of the split level watermark > alignment) > > > and > > > > >> then > > > > >> > FLINK-30859 can be finished. > > > > >> > > > > > >> > Best regards, > > > > >> > > > > > >> > Martijn > > > > >> > > > > > >> > Op wo 1 feb. 2023 om 09:16 schreef Mason Chen < > > > mas.chen6...@gmail.com > > > > >: > > > > >> > > > > > >> > > +1 (non-binding) > > > > >> > > > > > > >> > > * Verified hashes and signatures > > > > >> > > * Verified no binaries > > > > >> > > * Verified LICENSE and NOTICE files > > > > >> > > * Verified poms point to 3.0.0-1.16 > > > > >> > > * Reviewed web PR > > > > >> > > * Built from source > > > > >> > > * Verified git tag > > > > >> > > > > > > >> > > I think [4] your is a copy-paste error and I did all the > > > > verification > > > > >> > > assuming that
Re: [VOTE] Release flink-connector-kafka, release candidate #1
+1, thanks for driving this Gordon. On Tue, Apr 11, 2023 at 8:15 PM Tzu-Li (Gordon) Tai wrote: > Hi all, > > Martijn and I discussed offline to cancel this vote. > > Moreover, now that Flink 1.17 is out and we still haven't released > anything yet for the newly externalized Kafka connector, we've decided to > skip releasing a version that matches with Flink 1.16 all together, and > instead go straight to supporting Flink 1.17 for our first release. > > Practically this means: > >1. The code as of branch `flink-connector-kafka:v4.0` will be >re-versioned as `v3.0` and that will be the actual first release of >flink-connector-kafka. >2. v3.0.0 will be the first release of `flink-connector-kafka` and it >will initially support Flink 1.17.x series. > > I'm happy to drive the release efforts for this and will create a new RC > shortly over the next day or two. > > Thanks, > Gordon > > On Wed, Apr 5, 2023 at 9:32 PM Mason Chen wrote: > >> +1 for new RC! >> >> Best, >> Mason >> >> On Tue, Apr 4, 2023 at 11:32 AM Tzu-Li (Gordon) Tai >> wrote: >> >> > Hi all, >> > >> > I've ported the critical fixes I mentioned to v3.0 and v4.0 branches of >> > apache/flink-connector-kafka now. >> > >> > @martijnvis...@apache.org let me know if >> you'd >> > need help with creating a new RC, if there's too much to juggle on >> > your end. Happy to help out. >> > >> > Thanks, >> > Gordon >> > >> > On Sun, Apr 2, 2023 at 11:21 PM Konstantin Knauf >> > wrote: >> > >> > > +1. Thanks, Gordon! >> > > >> > > Am Mo., 3. Apr. 2023 um 06:37 Uhr schrieb Tzu-Li (Gordon) Tai < >> > > tzuli...@apache.org>: >> > > >> > > > Hi Martijn, >> > > > >> > > > Since this RC vote was opened, we had three critical bug fixes that >> was >> > > > merged for the Kafka connector: >> > > > >> > > >- https://issues.apache.org/jira/browse/FLINK-31363 >> > > >- https://issues.apache.org/jira/browse/FLINK-31305 >> > > >- https://issues.apache.org/jira/browse/FLINK-31620 >> > > > >> > > > Given the severity of these issues (all of them are violations of >> > > > exactly-once semantics), and the fact that they are currently not >> > > included >> > > > yet in any released version, do you think it makes sense to cancel >> this >> > > RC >> > > > in favor of a new one that includes these? >> > > > Since this RC vote has been stale for quite some time already, it >> > doesn't >> > > > seem like we're throwing away too much effort that has already been >> > done >> > > if >> > > > we start a new RC with these critical fixes included. >> > > > >> > > > What do you think? >> > > > >> > > > Thanks, >> > > > Gordon >> > > > >> > > > On Thu, Feb 9, 2023 at 3:26 PM Tzu-Li (Gordon) Tai < >> > tzuli...@apache.org> >> > > > wrote: >> > > > >> > > > > +1 (binding) >> > > > > >> > > > > - Verified legals (license headers and root LICENSE / NOTICE >> file). >> > > > AFAICT >> > > > > no dependencies require explicit acknowledgement in the NOTICE >> files. >> > > > > - No binaries in staging area >> > > > > - Built source with tests >> > > > > - Verified signatures and hashes >> > > > > - Web PR changes LGTM >> > > > > >> > > > > Thanks Martijn! >> > > > > >> > > > > Cheers, >> > > > > Gordon >> > > > > >> > > > > On Mon, Feb 6, 2023 at 6:12 PM Mason Chen > > >> > > > wrote: >> > > > > >> > > > >> That makes sense, thanks for the clarification! >> > > > >> >> > > > >> Best, >> > > > >> Mason >> > > > >> >> > > > >> On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser < >> > > martijnvis...@apache.org >> > > > > >> > > > >> wrote: >> > > > >> >> > > > >> > Hi Mason, >> > > > >> > >> > > > >> > Thanks, [4] is indeed a copy-paste error and you've made the >> right >> > > > >> > assumption that >> > > > >> > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/ >> > > > >> > is the correct maven central link. >> > > > >> > >> > > > >> > I think we should use FLINK-30052 to move the Kafka connector >> code >> > > > from >> > > > >> the >> > > > >> > 1.17 release also over the Kafka connector repo (especially >> since >> > > > >> there's >> > > > >> > now a v3.0 branch for the Kafka connector, so it can be merged >> in >> > > > main). >> > > > >> > When those commits have been merged, we can make a next Kafka >> > > > connector >> > > > >> > release (which is equivalent to the 1.17 release, which can >> only >> > be >> > > > done >> > > > >> > when 1.17 is done because of the split level watermark >> alignment) >> > > and >> > > > >> then >> > > > >> > FLINK-30859 can be finished. >> > > > >> > >> > > > >> > Best regards, >> > > > >> > >> > > > >> > Martijn >> > > > >> > >> > > > >> > Op wo 1 feb. 2023 om 09:16 schreef Mason Chen < >> > > mas.chen6...@gmail.com >> > > > >: >> > > > >> > >> > > > >> > > +1 (non-binding) >> > > > >> > > >> > > > >> > > * Verified hashes and signatures >> > > > >> > > * Verified no binaries >> > > > >> > > * Verified LICENSE and NOTICE files >>
[jira] [Created] (FLINK-31775) High-Availability not supported in kubernetes when istio enabled
Sergio Sainz created FLINK-31775: Summary: High-Availability not supported in kubernetes when istio enabled Key: FLINK-31775 URL: https://issues.apache.org/jira/browse/FLINK-31775 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.16.1 Reporter: Sergio Sainz When using native kubernetes deployment mode, and when new TaskManager is started to process a job, the TaskManager will attempt to register itself to the resource manager (job manager). the TaskManager looks up the resource manager per ip-address (akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1) Nevertheless when istio is enabled, the resolution by ip address is blocked, and hence we see that the job cannot start because task manager cannot register with the resource manager: 2023-04-10 23:24:19,752 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1, retrying in 1 ms: Could not connect to rpc endpoint under address akka.tcp://flink@192.168.140.164:6123/user/rpc/resourcemanager_1. Notice that when HA is disabled, the resolution of the resource manager is made by service name and so the resource manager can be found 2023-04-11 00:49:34,162 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful registration at resource manager akka.tcp://fl...@local-mci-ar32a-dev-flink-cluster.mstr-env-mci-ar32a-dev:6123/user/rpc/resourcemanager_* under registration id 83ad942597f86aa880ee96f1c2b8b923. Notice it is not possible to disable istio (as explained here : https://doc.akka.io/docs/akka-management/current/bootstrap/istio.html) Although similar to https://issues.apache.org/jira/browse/FLINK-28171 , logging as separate defect as I believe the fix of FLINK-28171 won't fix this case. FLINK-28171 is about Flink Kubernetes Operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Add support for Apache Arrow format
Hi I also think arrow format will be useful when reading/writing with message queue. Arrow defines a language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware like CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead. it will bring a lot. And we may do some surveys, what other engines support like spark/hive/presto and so on, how that supports and how it be used. Best, Jacky. Aitozi 于2023年4月2日周日 22:22写道: > Hi all, > Thanks for your input. > > @Ran > However, as mentioned in the issue you listed, it may take a lot of > work > and the community's consideration for integrating Arrow. > > To clarify, this proposal solely aims to introduce flink-arrow as a new > format, > similar to flink-csv and flink-protobuf. It will not impact the internal > data > structure representation in Flink. For proof of concept, please refer to: > https://github.com/Aitozi/flink/commits/arrow-format. > > @Martijn > I'm wondering if there's really much benefit for the Flink > project to > add another file format, over properly supporting the format that we > already > have in the project. > > Maintain the format we already have and introduce new formats should be > orthogonal. The requirement of supporting arrow format originally observed > in > our internal usage to deserialize the data(VectorSchemaRoot) from other > storage > systems to flink internal RowData and serialize the flink internal RowData > to > VectorSchemaRoot out to the storage system. And the requirement from the > slack[1] is to support the arrow file format. Although, Arrow is not > usually > used as the final disk storage format. But it has a tendency to be used > as the > inter-exchange format between different systems or temporary storage for > analysis due to its columnar format and can be memory mapped to other > analysis > programs. > > So, I think it's meaningful to support arrow formats in Flink. > > @Jim > If the Flink format interface is used there, then it may be useful > to > consider Arrow along with other columnar formats. > > I am not well-versed with the formats utilized in Paimon. Upon checking > [2], it > appears that Paimon does not directly employ flink formats. Instead, it > utilizes > FormatWriterFactory and FormatReaderFactory to handle data serialization > and > deserialization. Therefore, I believe that the current work may not be > applicable for reuse in Paimon at this time. > > Best, > Aitozi. > > [1]: https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629 > [2]: > https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format > > Jim Hughes 于2023年3月31日周五 00:36写道: > > > > Hi all, > > > > How do Flink formats relate to or interact with Paimon (formerly > > Flink-Table-Store)? If the Flink format interface is used there, then it > > may be useful to consider Arrow along with other columnar formats. > > > > Separately, from previous experience, I've seen the Arrow format be > useful > > as an output format for clients to read efficiently. Arrow does support > > returning batches of records, so there may be some options to use the > > format in a streaming situation where a sufficient collection of records > > can be gathered. > > > > Cheers, > > > > Jim > > > > > > > > On Thu, Mar 30, 2023 at 8:32 AM Martijn Visser > > > wrote: > > > > > Hi, > > > > > > To be honest, I haven't seen that much demand for supporting the Arrow > > > format directly in Flink as a flink-format. I'm wondering if there's > really > > > much benefit for the Flink project to add another file format, over > > > properly supporting the format that we already have in the project. > > > > > > Best regards, > > > > > > Martijn > > > > > > On Thu, Mar 30, 2023 at 2:21 PM Ran Tao wrote: > > > > > > > It is a good point that flink integrates apache arrow as a format. > > > > Arrow can take advantage of SIMD-specific or vectorized > optimizations, > > > > which should be of great benefit to batch tasks. > > > > However, as mentioned in the issue you listed, it may take a lot of > work > > > > and the community's consideration for integrating Arrow. > > > > > > > > I think you can try to make a simple poc for verification and some > > > specific > > > > plans. > > > > > > > > > > > > Best Regards, > > > > Ran Tao > > > > > > > > > > > > Aitozi 于2023年3月29日周三 19:12写道: > > > > > > > > > Hi guys > > > > > I'm opening this thread to discuss supporting the Apache Arrow > > > > format > > > > > in Flink. > > > > > Arrow is a language-independent columnar memory format that > has > > > > become > > > > > widely used in different systems, and It can also serve as an > > > > > inter-exchange format between other systems. > > > > > So, using it directly in the Flink system will be nice. We also > > > received > > > > > some requ
Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector
Hi Jark, I hope you don’t mind if I chime in. You have a good point that the sequence of upserts will eventually converge to the correct value under the at-least-once delivery guarantee, but it can still be important to avoid passing on uncommitted results. Some thoughts, numbered for reference: 1. Most generally, if some result R is written to the sink topic, but then the job fails before a checkpoint, rolls back, and reprocesses, producing R’, then it is incorrect to call R an “upsert”. In fact, as far as the system is concerned, R never happened at all (because it was part of a rolled-back batch of processing). 2. Readers may reasonably wish to impose some meaning on the sequence of upserts itself, so including aborted results can lead to wrong semantics downstream. Eg: “how many times has ‘x’ been updated today”? 3. Note that processing may not be deterministic over failures, and, building on (2), readers may have an expectation that every record in the topic corresponds to a real value that was associated with that key at some point. Eg, if we start with x=1, checkpoint, then produce x=99, crash, restart and produce x=2. Under at-least-once, the history of x is[1,99,2], while exactly-once would give the correct history of [1,2]. If we set up an alert if the value of x is ever greater over 10, then at-least-once will erroneously alert us, while exactly-once does not. 4. Sending results for failed processing can also cause operational problems: if you’re processing a high volume of data, and you get into a crash loop, you can create a flood of repeated results. I’ve seen this case cause real world pain for people, and it’s nice to have a way to avoid it. I hope some of these examples show why a user might reasonably want to configure the connector with the exactly-once guarantee. Thanks! -John On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote: > Hi Alexander, > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated records in > case of producer retries > or failovers. But as I explained above, it can’t avoid intentionally > duplicated records. Actually, I would > like to call them "upsert records" instead of "duplicates", that's why the > connector is named "upsert-kafka", > to make Kafka work like a database that supports updating and deleting by > key. > > For example, there is a SQL query: > > SELECT URL, COUNT(*) page_views > FROM access_logs > GROUP BY URL; > > This is a continuous query[1] that continuously emits a new page_views> record once a new URL > access entry is received. The same URLs in the log may be far away and be > processed in different checkpoints. > > It's easy to make upsert-kafka to support exactly-once delivery guarantee, > but as we discussed above, > it's unnecessary to support it and we intend to expose as few > configurations to users as possible. > > > Best, > Jark > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/ > > > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov > wrote: > >> Hi Jark, >> >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent >> producers prevent duplicated records[1], at least in the cases when >> upstream does not produce them intentionally and across checkpoints. >> >> Could you please elaborate or point me to the docs that explain the reason >> for duplicated records upstream and across checkpoints? I am relatively new >> to Flink and not aware of it. According to the kafka connector >> documentation, it does support exactly once semantics by configuring ' >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we >> can't make upsert-kafka configurable in the same way to support this >> delivery guarantee. >> >> Thank you, >> Alexander >> >> 1. >> >> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ >> 2. >> >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees >> >> >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu wrote: >> >> > Hi Alexander, >> > >> > I’m not sure I fully understand the reasons. I left my comments inline. >> > >> > > 1. There might be other non-Flink topic consumers that would rather not >> > have duplicated records. >> > >> > Exactly once can’t avoid producing duplicated records. Because the >> upstream >> > produces duplicated records intentionally and across checkpoints. Exactly >> > once >> > can’t recognize duplicated records and drop duplications. That means >> > duplicated >> > records are written into topics even if exactly-once mode is enabled. >> > >> > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to >> > previous values. >> > >> > Sorry, I don’t understand how exactly once can prevent this rollback >> > behavior. >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and >> > b5, >> > then back a5 if jobs perform checkpoints after produci
Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source
Hi everyone, I have already modified FLIP-288 to provide a newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and KafkaSourceEnumerator. Users can use KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new partitions. Surely, enabling the partition discovery strategy by default and modifying the offset strategy for new partitions should be brought to the user's attention. Therefore, it will be explained in the 1.18 release notes. WDYT?CC, Ruan, Shammon, Gordon and Leonard. Best, Hongshun On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang wrote: > Hi everyone, > Thanks for your participation. > > @Gordon, I looked at the several questions you raised: > >1. Should we use the firstDiscovery flag or two separate >OffsetsInitializers? Actually, I have considered later. If we follow >my initial idea, we can provide a default earliest OffsetsInitializer >for a new partition. However, According to @Shammon's suggestion, different >startup OffsetsInitializers correspond to different post-startup >OffsetsInitializers for Flink's built-in offset strategies. >2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code >again, and it seems that neither @Shammon nor I have figured out . >TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First >get the current end offsets of the partitions. This is going to be used in >case we cannot find a suitable offset based on the timestamp, i.e., the >message meeting the requirement of the timestamp has not been produced to >Kafka yet. *In this case, we just use the latest offset*." Therefore, >using the TimestampOffsetsInitializer will always have an offset at >startup. >3. Clarification on coupling SPECIFIC-OFFSET startup with >SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses >"auto.offset.reset" position for partitions that are not hit. > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is > whether the offset specified at the beginning includes non-exist > partitions. The previous design may have SPECIFIC-OFFSET startup with > future partition. However, I think since different strategies have been > used for the first discovered partition and the later discovered partition, > the specified offset at startup should be the partitions that have been > confirmed to exist, if not an error will be thrown. If partitions still not > exist, it should be specified in the post-startup OffsetsInitializers > (default EARLIEST). > > Best > Hongshun > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY wrote: > >> Thanks Gordon and Leonard >> >> I'm sorry that there is no specific case from my side, but I consider the >> issue as follows >> >> 1. Users may set an offset later than the current time because Flink does >> not limit it >> 2. If we use EARLIEST for a newly discovered partition with different >> OFFSETs, which may be different from the previous strategy. I think it's >> best to keep the same strategy as before if it does not cause data losing >> 3. I think support different OFFSETs in the FLIP will not make the >> implementation more complexity >> >> Of course, if it is confirmed that this is an illegal Timestamp OFFSET and >> Flink validate it. Then we can use the same strategy to apply to the newly >> discovered partition, I think this will be nice too >> >> Best, >> Shammon FY >> >> >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu wrote: >> >> > Thanks Hongshun and Shammon for driving the FLIP! >> > >> > >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer* >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with >> > SPECIFIC-OFFSET >> > > post-startup* >> > >> > Grodan raised a good point about the future TIMESTAMP and >> SPECIFIC-OFFSET, >> > the timestamps/offset of the newly added partition is undetermined when >> the >> > job starts (the partition has not been created yet), and it is the >> > timestamps/offset in the future. >> > >> > I used many message queue systems like Kafka, Pulsar, xxMQ. In my past >> > experience, TIMESTAMP and SPECIFIC-OFFSET startup modes are usually >> used >> > to specify existing timestamps/offset, which are used for business >> > scenarios such as backfilling data and re-refreshing data. At present, >> It's >> > hard to imagine a user scenario specifying a future timestamp to filter >> > data in the current topic of message queue system. Is it overthinking to >> > consider future future TIMESTAMP and SPECIFIC-OFFSET? >> > >> > >> > Best, >> > Leonard >> >