Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector
Hi John, Thank you for your valuable input. It sounds reasonable to me. >From this point of view, the exactly-once is used to guarantee transaction semantics other than avoid duplication/upserts. This is similar to the JDBC connectors that already support eventual consistency with idempotent updates, but we still add the support of 2PC[1]. Best, Jark [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink On Wed, 12 Apr 2023 at 10:36, John Roesler wrote: > Hi Jark, > > I hope you don’t mind if I chime in. > > You have a good point that the sequence of upserts will eventually > converge to the correct value under the at-least-once delivery guarantee, > but it can still be important to avoid passing on uncommitted results. Some > thoughts, numbered for reference: > > 1. Most generally, if some result R is written to the sink topic, but then > the job fails before a checkpoint, rolls back, and reprocesses, producing > R’, then it is incorrect to call R an “upsert”. In fact, as far as the > system is concerned, R never happened at all (because it was part of a > rolled-back batch of processing). > > 2. Readers may reasonably wish to impose some meaning on the sequence of > upserts itself, so including aborted results can lead to wrong semantics > downstream. Eg: “how many times has ‘x’ been updated today”? > > 3. Note that processing may not be deterministic over failures, and, > building on (2), readers may have an expectation that every record in the > topic corresponds to a real value that was associated with that key at some > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash, > restart and produce x=2. Under at-least-once, the history of x is[1,99,2], > while exactly-once would give the correct history of [1,2]. If we set up an > alert if the value of x is ever greater over 10, then at-least-once will > erroneously alert us, while exactly-once does not. > > 4. Sending results for failed processing can also cause operational > problems: if you’re processing a high volume of data, and you get into a > crash loop, you can create a flood of repeated results. I’ve seen this case > cause real world pain for people, and it’s nice to have a way to avoid it. > > I hope some of these examples show why a user might reasonably want to > configure the connector with the exactly-once guarantee. > > Thanks! > -John > > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote: > > Hi Alexander, > > > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated records > in > > case of producer retries > > or failovers. But as I explained above, it can’t avoid intentionally > > duplicated records. Actually, I would > > like to call them "upsert records" instead of "duplicates", that's why > the > > connector is named "upsert-kafka", > > to make Kafka work like a database that supports updating and deleting by > > key. > > > > For example, there is a SQL query: > > > > SELECT URL, COUNT(*) page_views > > FROM access_logs > > GROUP BY URL; > > > > This is a continuous query[1] that continuously emits a new > page_views> record once a new URL > > access entry is received. The same URLs in the log may be far away and be > > processed in different checkpoints. > > > > It's easy to make upsert-kafka to support exactly-once delivery > guarantee, > > but as we discussed above, > > it's unnecessary to support it and we intend to expose as few > > configurations to users as possible. > > > > > > Best, > > Jark > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/ > > > > > > > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov > > wrote: > > > >> Hi Jark, > >> > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with > idempotent > >> producers prevent duplicated records[1], at least in the cases when > >> upstream does not produce them intentionally and across checkpoints. > >> > >> Could you please elaborate or point me to the docs that explain the > reason > >> for duplicated records upstream and across checkpoints? I am relatively > new > >> to Flink and not aware of it. According to the kafka connector > >> documentation, it does support exactly once semantics by configuring ' > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we > >> can't make upsert-kafka configurable in the same way to support this > >> delivery guarantee. > >> > >> Thank you, > >> Alexander > >> > >> 1. > >> > >> > https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ > >> 2. > >> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees > >> > >> > >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu wrote: > >> > >> > Hi Alexander, > >> > > >> > I’m not sure I fully understand the reasons. I left my comments > inline. > >> > > >> > > 1. There might be other non-Flink topic consumers that wo
Re: [VOTE] Release flink-connector-aws, 4.1.0 for Flink 1.17
+1 (binding) - tried out the kinesis sql binary with sql client - staging binaries look fine On Mon, Apr 3, 2023 at 10:12 PM Elphas Tori wrote: > +1 (non-binding) > > + verified hashes and signatures > + checked local build of website pull request and approved > > On 2023/04/03 16:19:00 Danny Cranmer wrote: > > Hi everyone, > > Please review and vote on the binaries for flink-connector-aws version > > 4.1.0-1.17, as follows: > > [ ] +1, Approve the release > > [ ] -1, Do not approve the release (please provide specific comments) > > > > The v4.1.0 source release has already been approved [1], this vote is to > > distribute the binaries for Flink 1.17 support. > > > > The complete staging area is available for your review, which includes: > > * all artifacts to be deployed to the Maven Central Repository [2], which > > are signed with the key with fingerprint > > 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3], > > * website pull request listing the new release [4]. > > > > The vote will be open for at least 72 hours. It is adopted by majority > > approval, with at least 3 PMC affirmative votes. > > > > Thanks, > > Danny > > > > [1] https://lists.apache.org/thread/7q3ysg9jz5cjwdgdmgckbnqhxh44ncmv > > [2] > https://repository.apache.org/content/repositories/orgapacheflink-1602/ > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > [4] https://github.com/apache/flink-web/pull/628 > > >
Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints
Hi Yun, I reorganized our discussion and added a comparison table of state ownership with some previous designs. Please take a look at section "4.9. State ownership comparison with other designs". But I don't see them as alternatives since the design of state ownership is integrated with this FLIP. That is to say, we are providing a file merging solution including file management for new merged files, other ownership models are not feasible for the current merging plan. If the state ownership changes, the design of merging files at different granularities also needs to be changed accordingly. WDYT? Best regards, Zakelly On Tue, Apr 11, 2023 at 10:18 PM Yun Tang wrote: > > Hi Zakelly, > > Since we already had some discussions on this topic in the doc I mentioned, > could you please describe the difference in your FLIP? > > I think we should better have a comparing table across different options just > like the doc wrote. And we could also list some of them in your Rejected > Alternatives part. > > > Best > Yun Tang > > From: Zakelly Lan > Sent: Tuesday, April 11, 2023 17:57 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for > Checkpoints > > Hi Rui Fan, > > Thanks for your comments! > > > (1) The temporary segment will remain in the physical file for a short > > time, right? > > Yes, any written segment will remain in the physical file until the > physical file is deleted. It is controlled by the reference counting. > And as discussed in 4.7, this will result in a space amplification > problem. > > > > (2) Is subtask granularity confused with shared state? > > Merging files at granularity of subtask is a general solution for > shared states, considering the file may be reused by the following > checkpoint after job restore. This design is applicable to sst files > and any other shared states that may arise in the future. However, the > DSTL files are a special case of shared states, since these files will > no longer be shared after job restore. Therefore, we may do an > optimization for these files and merge them at the TM level. > Currently, the DSTL files are not in the shared directory of > checkpoint storage, and I suggest we keep it as it is. I agree that > this may bring in some confusion, and I suggest the FLIP mainly > discuss the general situation and list the special situations > separately without bringing in new concepts. I will add another > paragraph describing the file merging for DSTL files. WDYT? > > > > (3) When rescaling, do all shared files need to be copied? > > I agree with you that only sst files of the base DB need to be copied > (or re-uploaded in the next checkpoint). However, section 4.2 > simplifies file copying issues (copying all files), following the > concept of shared state. > > > > (4) Does the space magnification ratio need a configuration option? > > Thanks for the reminder, I will add an option in this FLIP. > > > > (5) How many physical files can a TM write at the same checkpoint at the > > same time? > > This is a very good point. Actually, there is a file reuse pool as > section 4.6 described. There could be multiple files within this pool, > supporting concurrent writing by multiple writers. I suggest providing > two configurations to control the file number: > > state.checkpoints.file-merging.max-file-pool-size: Specifies the > upper limit of the file pool size. > state.checkpoints.file-merging.max-subtasks-per-file: Specifies the > lower limit of the file pool size based on the number of subtasks > within each TM. > > The number of simultaneously open files is controlled by these two > options, and the first option takes precedence over the second. > > WDYT? > > > > Thanks a lot for your valuable insight. > > Best regards, > Zakelly > > > On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > Hi all, > > > > Thanks Zakelly driving this proposal, and thank you all for > > the warm discussions. It's really a useful feature. > > > > I have a few questions about this FLIP. > > > > (1) The temporary segment will remain in the physical file for > > a short time, right? > > > > FLIP proposes to write segments instead of physical files. > > If the physical files are written directly, these temporary files > > will be deleted after the checkpoint is aborted. When writing > > a segment, how to delete the temporary segment? > > Decrement the reference count value by 1? > > > > (2) Is subtask granularity confused with shared state? > > > > From the "4.1.2 Merge files within a subtask or a TM" part, > > based on the principle of sst files, it is concluded that > > "For shared states, files are merged within each subtask." > > > > I'm not sure whether this conclusion is general or just for sst. > > As Yanfei mentioned before: > > > > > DSTL files are shared between checkpoints, and are > > > currently merged in batches at the task manager level. > > > > DSTL f
Re: [VOTE] Release flink-connector-aws, 4.1.0 for Flink 1.17
+1 (non binding) On Tue, 4 Apr 2023 at 1:42 AM, Elphas Tori wrote: > +1 (non-binding) > > + verified hashes and signatures > + checked local build of website pull request and approved > > On 2023/04/03 16:19:00 Danny Cranmer wrote: > > Hi everyone, > > Please review and vote on the binaries for flink-connector-aws version > > 4.1.0-1.17, as follows: > > [ ] +1, Approve the release > > [ ] -1, Do not approve the release (please provide specific comments) > > > > The v4.1.0 source release has already been approved [1], this vote is to > > distribute the binaries for Flink 1.17 support. > > > > The complete staging area is available for your review, which includes: > > * all artifacts to be deployed to the Maven Central Repository [2], which > > are signed with the key with fingerprint > > 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3], > > * website pull request listing the new release [4]. > > > > The vote will be open for at least 72 hours. It is adopted by majority > > approval, with at least 3 PMC affirmative votes. > > > > Thanks, > > Danny > > > > [1] https://lists.apache.org/thread/7q3ysg9jz5cjwdgdmgckbnqhxh44ncmv > > [2] > https://repository.apache.org/content/repositories/orgapacheflink-1602/ > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > [4] https://github.com/apache/flink-web/pull/628 > > >
[jira] [Created] (FLINK-31776) Introducing sub-interface LeaderElectionService.LeaderElection
Matthias Pohl created FLINK-31776: - Summary: Introducing sub-interface LeaderElectionService.LeaderElection Key: FLINK-31776 URL: https://issues.apache.org/jira/browse/FLINK-31776 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.
Alvin Ge created FLINK-31777: Summary: Upsert Kafka use Avro Confluent, key is ok, but all values are null. Key: FLINK-31777 URL: https://issues.apache.org/jira/browse/FLINK-31777 Project: Flink Issue Type: Improvement Components: kafka Affects Versions: 1.16.0 Environment: Flink: 1.6.0 Confluent version: 7.3.3 Debezium version: 2.1.0/2.0.0 Reporter: Alvin Ge I use debezium send data to kafka with confluent avro format, when I use upsert-kafka connector, all values are null, in 'kafka' connector all values is well. My upsert-kafka table like this: {code:java} // code placeholder create table TEA02 ( SUB_SYSTEM_ENAME varchar(255), REC_CREATOR varchar(255), REC_CREATE_TIME varchar(255), REC_REVISOR varchar(255), REC_REVISE_TIME varchar(255), ARCHIVE_FLAG varchar(255), SUB_SYSTEM_CNAME varchar(255), SUB_SYSTEM_FNAME varchar(255), SUB_SYSTEM_LEVEL varchar(255), primary key (SUB_SYSTEM_ENAME) not enforced ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dev.oracle.JNMMM1.TEA02', 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092', 'properties.group.id' = 'TEA02', 'key.format' = 'avro-confluent', 'key.avro-confluent.url' = 'http://10.0.170.213:8081', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = 'http://10.0.170.213:8081', 'value.fields-include' = 'EXCEPT_KEY' ); {code} query result: ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||...|| |CJ|null|null|null| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31778) Casting array of rows produces incorrect result
Ilya Soin created FLINK-31778: - Summary: Casting array of rows produces incorrect result Key: FLINK-31778 URL: https://issues.apache.org/jira/browse/FLINK-31778 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.15.4, 1.16.1 Reporter: Ilya Soin {code:java} select CAST(commissions AS ARRAY>) as commissions from (select ARRAY[ROW(123), ROW(234)] commissions){code} Expected output: {code:java} +++ | op | commissions | +++ | +I | [(123.0), (234.0)] | +++ {code} Actual output: {code:java} +++ | op | commissions | +++ | +I | [(234.0), (234.0)] | +++ {code} Full working example: https://gist.github.com/soin08/5e0038dbefeba9192706e05a78ef3bc1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31779) Track stable branch of externalized connector instead of specific release tag
Weijie Guo created FLINK-31779: -- Summary: Track stable branch of externalized connector instead of specific release tag Key: FLINK-31779 URL: https://issues.apache.org/jira/browse/FLINK-31779 Project: Flink Issue Type: Improvement Components: Connectors / Common, Documentation Reporter: Weijie Guo Assignee: Weijie Guo Docs should point to the branch where that version of the docs are published, otherwise documentation fixes are not visible until a new release is made. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31780) Allow users to enable Ensemble tracking for ZooKeeper
Oleksandr Nitavskyi created FLINK-31780: --- Summary: Allow users to enable Ensemble tracking for ZooKeeper Key: FLINK-31780 URL: https://issues.apache.org/jira/browse/FLINK-31780 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31782) Make DefaultLeaderElectionService implement MultipleComponentLeaderElectionService.Listener
Matthias Pohl created FLINK-31782: - Summary: Make DefaultLeaderElectionService implement MultipleComponentLeaderElectionService.Listener Key: FLINK-31782 URL: https://issues.apache.org/jira/browse/FLINK-31782 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31781) Introduce contender ID into LeaderElectionService interface
Matthias Pohl created FLINK-31781: - Summary: Introduce contender ID into LeaderElectionService interface Key: FLINK-31781 URL: https://issues.apache.org/jira/browse/FLINK-31781 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31783) Replace LeaderElectionDriver in DefaultLeaderElectionService with MultipleComponentLeaderElectionService
Matthias Pohl created FLINK-31783: - Summary: Replace LeaderElectionDriver in DefaultLeaderElectionService with MultipleComponentLeaderElectionService Key: FLINK-31783 URL: https://issues.apache.org/jira/browse/FLINK-31783 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31784) Add multiple-component support to DefaultLeaderElectionService
Matthias Pohl created FLINK-31784: - Summary: Add multiple-component support to DefaultLeaderElectionService Key: FLINK-31784 URL: https://issues.apache.org/jira/browse/FLINK-31784 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31785) Move LeaderElectionService out of LeaderContender
Matthias Pohl created FLINK-31785: - Summary: Move LeaderElectionService out of LeaderContender Key: FLINK-31785 URL: https://issues.apache.org/jira/browse/FLINK-31785 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31786) Removing unused HighAvailabilityServices implementations
Matthias Pohl created FLINK-31786: - Summary: Removing unused HighAvailabilityServices implementations Key: FLINK-31786 URL: https://issues.apache.org/jira/browse/FLINK-31786 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31787) Add the explicit ROW constructor to the system function doc
Aitozi created FLINK-31787: -- Summary: Add the explicit ROW constructor to the system function doc Key: FLINK-31787 URL: https://issues.apache.org/jira/browse/FLINK-31787 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Add support for Apache Arrow format
Which connectors would be commonly used when reading in Arrow format? Filesystem? On Wed, Apr 12, 2023 at 4:27 AM Jacky Lau wrote: > Hi >I also think arrow format will be useful when reading/writing with > message queue. >Arrow defines a language-independent columnar memory format for flat and > hierarchical data, organized for efficient analytic operations on modern > hardware like CPUs and GPUs. The Arrow memory format also supports > zero-copy reads for lightning-fast data access without serialization > overhead. it will bring a lot. >And we may do some surveys, what other engines support like > spark/hive/presto and so on, how that supports and how it be used. > >Best, >Jacky. > > Aitozi 于2023年4月2日周日 22:22写道: > > > Hi all, > > Thanks for your input. > > > > @Ran > However, as mentioned in the issue you listed, it may take a lot > of > > work > > and the community's consideration for integrating Arrow. > > > > To clarify, this proposal solely aims to introduce flink-arrow as a new > > format, > > similar to flink-csv and flink-protobuf. It will not impact the internal > > data > > structure representation in Flink. For proof of concept, please refer to: > > https://github.com/Aitozi/flink/commits/arrow-format. > > > > @Martijn > I'm wondering if there's really much benefit for the Flink > > project to > > add another file format, over properly supporting the format that we > > already > > have in the project. > > > > Maintain the format we already have and introduce new formats should be > > orthogonal. The requirement of supporting arrow format originally > observed > > in > > our internal usage to deserialize the data(VectorSchemaRoot) from other > > storage > > systems to flink internal RowData and serialize the flink internal > RowData > > to > > VectorSchemaRoot out to the storage system. And the requirement from the > > slack[1] is to support the arrow file format. Although, Arrow is not > > usually > > used as the final disk storage format. But it has a tendency to be used > > as the > > inter-exchange format between different systems or temporary storage for > > analysis due to its columnar format and can be memory mapped to other > > analysis > > programs. > > > > So, I think it's meaningful to support arrow formats in Flink. > > > > @Jim > If the Flink format interface is used there, then it may be > useful > > to > > consider Arrow along with other columnar formats. > > > > I am not well-versed with the formats utilized in Paimon. Upon checking > > [2], it > > appears that Paimon does not directly employ flink formats. Instead, it > > utilizes > > FormatWriterFactory and FormatReaderFactory to handle data serialization > > and > > deserialization. Therefore, I believe that the current work may not be > > applicable for reuse in Paimon at this time. > > > > Best, > > Aitozi. > > > > [1]: > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629 > > [2]: > > > https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format > > > > Jim Hughes 于2023年3月31日周五 00:36写道: > > > > > > Hi all, > > > > > > How do Flink formats relate to or interact with Paimon (formerly > > > Flink-Table-Store)? If the Flink format interface is used there, then > it > > > may be useful to consider Arrow along with other columnar formats. > > > > > > Separately, from previous experience, I've seen the Arrow format be > > useful > > > as an output format for clients to read efficiently. Arrow does > support > > > returning batches of records, so there may be some options to use the > > > format in a streaming situation where a sufficient collection of > records > > > can be gathered. > > > > > > Cheers, > > > > > > Jim > > > > > > > > > > > > On Thu, Mar 30, 2023 at 8:32 AM Martijn Visser < > martijnvis...@apache.org > > > > > > wrote: > > > > > > > Hi, > > > > > > > > To be honest, I haven't seen that much demand for supporting the > Arrow > > > > format directly in Flink as a flink-format. I'm wondering if there's > > really > > > > much benefit for the Flink project to add another file format, over > > > > properly supporting the format that we already have in the project. > > > > > > > > Best regards, > > > > > > > > Martijn > > > > > > > > On Thu, Mar 30, 2023 at 2:21 PM Ran Tao > wrote: > > > > > > > > > It is a good point that flink integrates apache arrow as a format. > > > > > Arrow can take advantage of SIMD-specific or vectorized > > optimizations, > > > > > which should be of great benefit to batch tasks. > > > > > However, as mentioned in the issue you listed, it may take a lot of > > work > > > > > and the community's consideration for integrating Arrow. > > > > > > > > > > I think you can try to make a simple poc for verification and some > > > > specific > > > > > plans. > > > > > > > > > > > > > > > Best Regards, > > > > > Ran Tao > > > > > > > > > > > > > > > Aitozi 于2023年3月29日周三 19:12写道: > > > > > > > > > > > H
[jira] [Created] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction
Feng Jin created FLINK-31788: Summary: Add back Support emitValueWithRetract for TableAggregateFunction Key: FLINK-31788 URL: https://issues.apache.org/jira/browse/FLINK-31788 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Feng Jin This feature was originally implemented in the old planner: [https://github.com/apache/flink/pull/8550/files] However, this logic was not implemented in the new planner , the Blink planner. With the removal of the old planner in version 1.14 [https://github.com/apache/flink/pull/16080] , this code was also removed. We should add it back. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31789) Update axel to current
Morey Straus created FLINK-31789: Summary: Update axel to current Key: FLINK-31789 URL: https://issues.apache.org/jira/browse/FLINK-31789 Project: Flink Issue Type: Bug Affects Versions: 1.17.0 Reporter: Morey Straus Flink is shipping with version 2.6, which contains CVE-2020-13614. This was fixed in 2.17.8. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration
Hi Feng, Thanks for raising this FLIP. I am still confused after completely reading the thread with following questions: 1. Naming confusion - registerCatalog() and addCatalog() have no big difference based on their names. One of them is responsible for data persistence. How about persistCatalog()? 2. As you mentioned that Map catalogs is used as a cache and catalogStore is used for data persistence. I would suggest describing their purpose conceptually and clearly in the FLIP. Some common cache features should be implemented, i.e. data in the cache and the store should be consistent. Same Catalog instance should be found in the store and in the cache(either it has been initialized or it will be lazy initialized) for the same catalog name. The consistency will be taken care of while updating the catalog. 3. As the above discussion moves forward, the option 2 solution looks more like a replacement of option 1, because, afaiu, issues mentioned previously with option 1 are not solved yet. Do you still want to propose both options and ask for suggestions for both of them? 4. After you updated the FLIP, there are some inconsistent descriptions in the content. Would you like to clean them up? Thanks! Best regards, Jing On Fri, Apr 7, 2023 at 9:24 AM Feng Jin wrote: > hi Shammon > > Thank you for your response, and I completely agree with your point of > view. > Initially, I may have over complicated the whole issue. First and foremost, > we need to consider the persistence of the Catalog's Configuration. > If we only need to provide persistence for Catalog Configuration, we can > add a toConfiguration method to the Catalog interface. > This method can convert a Catalog instance to a Map > properties, and the default implementation will throw an exception. > > public interface Catalog { >/** >* Returns a map containing the properties of the catalog object. >* >* @return a map containing the properties of the catalog object >* @throws UnsupportedOperationException if the implementing class does > not override >* the default implementation of this method >*/ > default Map toProperties() { > throw new UnsupportedOperationException("Please implement toProperties > for this catalog"); > } > } > > The specific process is as follows: > > 1. If the user has configured a CatalogStore, the toConfiguration() method > will be called when registering a Catalog instance with > registerCatalog(String catalogName, Catalog catalog). The Catalog instance > will be converted to a Map properties and saved to the > CatalogStore. If some Catalog instances do not implement this method, an > exception will be thrown. > > 2. If the user does not use a CatalogStore, the toConfiguration() method > will not be called, ensuring consistency with the original process. > > 3. Saving both the Map catalogs and the CatalogStore at > the same time can also avoid conflicts > > > For lazy initialization: > we can start from the Catalog itself and provide a dedicated Catalog > implementation for lazy loading, such as ConfigurationCatalog > > public class ConfigurationCatalog implements Catalog { > ConfigurationCatalog(Map properties) { > } > } > > I have added this design to the FLIP. > > > Best, > Feng > > On Thu, Apr 6, 2023 at 10:03 AM Shammon FY wrote: > > > Hi Feng > > > > Thanks for your answer. > > > > I have no questions about the functionality of `CatalogStore`, but the > > different operations of multiple `registerCatalog` or `storeCatalog` in > > `CatalogManager`. The implementation in Trino is also the same, the > > `CatalogManager` in trino only has one `createCatalog`, which will save > the > > catalog to memory and `CatalogStore`. > > > > I think we don't need to add another `registerCatalog` or `addCatalog` to > > save a catalog in `Map catalogs` or `CatalogStore > > catalogStore`. As you mentioned before, the `Map > > catalogs` is a cache in `CatalogManager`. How about saving the catalog in > > `Map catalogs` and `CatalogStore catalogStore` together > > when it is registered or added in `CatalogManager`? > > > > Best, > > Shammon FY > > > > > > On Tue, Apr 4, 2023 at 5:22 PM Feng Jin wrote: > > > > > Thank you for your reply. I am very sorry for the misunderstanding > caused > > > by my deviation from the original discussion. > > > > > > @Shammon > > > > I found there is a pre-discussion [1] for this FLIP > > > Yes, there was indeed such a discussion before. However, in designing > > the > > > whole solution, I found that the logic of CatalogManager itself doesn't > > > need to change much. *We cannot only persist Catalog instances > > themselves*, > > > so exposing only registerCatalog(String catalogName, Catalog catalog) > > might > > > not be enough to save Catalogs, because in the end we still need to > save > > > the configurations corresponding to the Catalog instances. Therefore, > I > > > decided to introduce the CatalogStore interface for configuration > > > persistence. Rega
[mongodb connector] which flink version(s) is the mongodb connector compatible with?
Hello! I have a use case for the mongodb connector (see: https://github.com/apache/flink-connector-mongodb), but I am currently running flink 1.15. As far as I can tell, the connector relies on dependencies that don't exist in 1.15 (one such example is `org/apache/flink/table/connector/source/lookup/cache/LookupCache`). Which flink version(s) is this connector implementation slated to be compatible with? Thank you!
Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink
Hello there, Zhu: agree with the config option, great suggestion Hong: global timeout is also interesting and a good addition -- only downside I see is just another config option If everyone is happy, I suggest we keep the discussion open until Friday and start a Vote shortly after. Cheers, Panagiotis On Tue, Apr 11, 2023 at 12:58 AM Teoh, Hong wrote: > Hi Panagiotis, > > Thank you for the update. Looks great! Just one suggestion below: > > 1. We seem to be waiting for the future(s) to complete before restarting > the job - should we add a configurable timeout for the enrichment? Since > each failure enricher are run in parallel, we could probably settle for 1 > timeout for all failure handlers. > 2. +1 to Zhu’s comment on adding a comma separated list of FailureHandlers > instead of boolean toggle! > > Other than the above, the FLIP looks great! Thank you for your efforts. > > Regards, > Hong > > > > > On 11 Apr 2023, at 08:01, Zhu Zhu wrote: > > > > CAUTION: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > > > > > Hi Panagiotis, > > > > Thanks for updating the FLIP. > > > >> Regarding the config option > `jobmanager.failure-enricher-plugins.enabled` > > I think a config option `jobmanager.failure-enrichers`, which accepts > > the names of enrichers to use, may be better. It allows the users to > > deploy and use the plugins in a more flexible way. The default value > > of the config can be none, which means failure enrichment will be > > disabled by default. > > A reference can be the config option `metrics.reporters` which helps > > to load metric reporter plugins. > > > > Thanks, > > Zhu > > > > Panagiotis Garefalakis 于2023年4月10日周一 03:47写道: > >> > >> Hello again everyone, > >> > >> FLIP is now updated based on our discussion! > >> In short, FLIP-304 [1] proposes the addition of a pluggable interface > that > >> will allow users to add custom logic and enrich failures with custom > >> metadata labels. > >> While as discussed, custom restart strategies will be part of a > different > >> effort. Every pluggable FaulireEnricher: > >> > >> - Is triggered on every global/non-global failure > >> - Receives a Throwable cause and an immutable Context > >> - Performs asynchronous execution (separate IoExecutor) to avoid > >> blocking the main thread for RPCs > >> - Is completely independent from other Enrichers > >> - Emits failure labels/tags for its unique, pre-defined keys (defined > at > >> startup time) > >> > >> > >> Check the link for implementation details and please let me know what > you > >> think :) > >> > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers > >> > >> > >> Panagiotis > >> > >> > >> On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu wrote: > >> > >>> Hi Panagiotis, > >>> > >>> How about to introduce a config option to control which error handling > >>> plugins should be used? It is more flexible for deployments. > Additionally, > >>> it can also enable users to explicitly specify the order that the > plugins > >>> take effects. > >>> > >>> Thanks, > >>> Zhu > >>> > >>> Gen Luo 于2023年3月27日周一 15:02写道: > > Thanks for the summary! > > Also +1 to support custom restart strategies in a different FLIP, > as long as we can make sure that the plugin interface won't be > changed when the restart strategy interface is introduced. > > To achieve this, maybe we should think well how the handler > would cooperate with the restart strategy, like would it executes b > efore the strategy (e.g. some strategy may use the tag), or after > it (e.g. some metric reporting handler may use the handling result). > Though we can implement in one way, and extend if the other is > really necessary by someone. > > Besides, instead of using either of the names, shall we just make > them two subclasses named FailureEnricher and FailureListener? > The former executes synchronously and can modify the context, > while the latter executes asynchronously and has a read-only view > of context. In this way we can make sure a handler behaves in > the expected way. > > > On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu wrote: > > > +1 to support custom restart strategies in a different FLIP. > > > > It's fine to have a different plugin for custom restart strategy. > > If so, since we do not treat the FLIP-304 plugin as a common failure > > handler, but instead mainly targets to add labels to errors, I would > > +1 for the name `FailureEnricher`. > > > > Thanks, > > Zhu > > > > David Morávek 于2023年3月23日周四 15:51写道: > >> > >>> > >>> One additional remark on introducing it as an async operation: We > >>> would > >>> need a new configuration parameter to define the timeout
[VOTE] Release flink-connector-kafka 3.0.0 for Flink 1.17, release candidate #2
Hi everyone, Please review and vote on release candidate #2 for version 3.0.0 of the Apache Flink Kafka Connector, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) The complete staging area is available for your review, which includes: * JIRA release notes [1], * the official Apache source release to be deployed to dist.apache.org [2], which are signed with the key with fingerprint 1C1E2394D3194E1944613488F320986D35C33D6A [3], * all artifacts to be deployed to the Maven Central Repository [4], * source code tag v3.0.0-rc2 [5], * website pull request listing the new release [6]. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Gordon [1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352577 [2] https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc2/ [3] https://dist.apache.org/repos/dist/release/flink/KEYS [4] https://repository.apache.org/content/repositories/orgapacheflink-1607 [5] https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc2 [6] https://github.com/apache/flink-web/pull/632
Re: [VOTE] Release flink-connector-kafka 3.0.0 for Flink 1.17, release candidate #2
A few important remarks about this release candidate: - As mentioned in the previous voting thread of RC1 [1], we've decided to skip releasing a version of the externalized Flink Kafka Connector matching with Flink 1.16.x since the original vote thread stalled, and meanwhile we've already completed externalizing all Kafka connector code as of Flink 1.17.0. - As such, this RC is basically identical to the Kafka connector code bundled with the Flink 1.17.0 release, PLUS a few critical fixes for exactly-once violations, namely FLINK-31305, FLINK-31363, and FLINK-31620 (please see release notes [2]). - As part of preparing this RC, I've also deleted the original v3.0 branch and re-named the v4.0 branch to replace it instead. Effectively, this resets the versioning numbers for the externalized Flink Kafka Connector code repository, so that this first release of the repo starts from v3.0.0. Thanks, Gordon [1] https://lists.apache.org/thread/r97y5qt8x0c72460vs5cjm5c729ljmh6 [2] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352577 On Wed, Apr 12, 2023 at 4:55 PM Tzu-Li (Gordon) Tai wrote: > Hi everyone, > > Please review and vote on release candidate #2 for version 3.0.0 of the > Apache Flink Kafka Connector, as follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release to be deployed to dist.apache.org > [2], which are signed with the key with fingerprint > 1C1E2394D3194E1944613488F320986D35C33D6A [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag v3.0.0-rc2 [5], > * website pull request listing the new release [6]. > > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > > Thanks, > Gordon > > [1] > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352577 > [2] > https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc2/ > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > [4] https://repository.apache.org/content/repositories/orgapacheflink-1607 > [5] > https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc2 > [6] https://github.com/apache/flink-web/pull/632 >
Re: [VOTE] Release flink-connector-kafka, release candidate #1
RC2 (for Flink 1.17.0) vote has started in a separate thread: https://lists.apache.org/thread/mff76c2hzcb1mk8fm5m2h4z0j73qz2vk Please test and cast your votes! On Tue, Apr 11, 2023 at 11:45 AM Martijn Visser wrote: > +1, thanks for driving this Gordon. > > On Tue, Apr 11, 2023 at 8:15 PM Tzu-Li (Gordon) Tai > wrote: > >> Hi all, >> >> Martijn and I discussed offline to cancel this vote. >> >> Moreover, now that Flink 1.17 is out and we still haven't released >> anything yet for the newly externalized Kafka connector, we've decided to >> skip releasing a version that matches with Flink 1.16 all together, and >> instead go straight to supporting Flink 1.17 for our first release. >> >> Practically this means: >> >>1. The code as of branch `flink-connector-kafka:v4.0` will be >>re-versioned as `v3.0` and that will be the actual first release of >>flink-connector-kafka. >>2. v3.0.0 will be the first release of `flink-connector-kafka` and it >>will initially support Flink 1.17.x series. >> >> I'm happy to drive the release efforts for this and will create a new RC >> shortly over the next day or two. >> >> Thanks, >> Gordon >> >> On Wed, Apr 5, 2023 at 9:32 PM Mason Chen wrote: >> >>> +1 for new RC! >>> >>> Best, >>> Mason >>> >>> On Tue, Apr 4, 2023 at 11:32 AM Tzu-Li (Gordon) Tai >> > >>> wrote: >>> >>> > Hi all, >>> > >>> > I've ported the critical fixes I mentioned to v3.0 and v4.0 branches of >>> > apache/flink-connector-kafka now. >>> > >>> > @martijnvis...@apache.org let me know if >>> you'd >>> > need help with creating a new RC, if there's too much to juggle on >>> > your end. Happy to help out. >>> > >>> > Thanks, >>> > Gordon >>> > >>> > On Sun, Apr 2, 2023 at 11:21 PM Konstantin Knauf >>> > wrote: >>> > >>> > > +1. Thanks, Gordon! >>> > > >>> > > Am Mo., 3. Apr. 2023 um 06:37 Uhr schrieb Tzu-Li (Gordon) Tai < >>> > > tzuli...@apache.org>: >>> > > >>> > > > Hi Martijn, >>> > > > >>> > > > Since this RC vote was opened, we had three critical bug fixes >>> that was >>> > > > merged for the Kafka connector: >>> > > > >>> > > >- https://issues.apache.org/jira/browse/FLINK-31363 >>> > > >- https://issues.apache.org/jira/browse/FLINK-31305 >>> > > >- https://issues.apache.org/jira/browse/FLINK-31620 >>> > > > >>> > > > Given the severity of these issues (all of them are violations of >>> > > > exactly-once semantics), and the fact that they are currently not >>> > > included >>> > > > yet in any released version, do you think it makes sense to cancel >>> this >>> > > RC >>> > > > in favor of a new one that includes these? >>> > > > Since this RC vote has been stale for quite some time already, it >>> > doesn't >>> > > > seem like we're throwing away too much effort that has already been >>> > done >>> > > if >>> > > > we start a new RC with these critical fixes included. >>> > > > >>> > > > What do you think? >>> > > > >>> > > > Thanks, >>> > > > Gordon >>> > > > >>> > > > On Thu, Feb 9, 2023 at 3:26 PM Tzu-Li (Gordon) Tai < >>> > tzuli...@apache.org> >>> > > > wrote: >>> > > > >>> > > > > +1 (binding) >>> > > > > >>> > > > > - Verified legals (license headers and root LICENSE / NOTICE >>> file). >>> > > > AFAICT >>> > > > > no dependencies require explicit acknowledgement in the NOTICE >>> files. >>> > > > > - No binaries in staging area >>> > > > > - Built source with tests >>> > > > > - Verified signatures and hashes >>> > > > > - Web PR changes LGTM >>> > > > > >>> > > > > Thanks Martijn! >>> > > > > >>> > > > > Cheers, >>> > > > > Gordon >>> > > > > >>> > > > > On Mon, Feb 6, 2023 at 6:12 PM Mason Chen < >>> mas.chen6...@gmail.com> >>> > > > wrote: >>> > > > > >>> > > > >> That makes sense, thanks for the clarification! >>> > > > >> >>> > > > >> Best, >>> > > > >> Mason >>> > > > >> >>> > > > >> On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser < >>> > > martijnvis...@apache.org >>> > > > > >>> > > > >> wrote: >>> > > > >> >>> > > > >> > Hi Mason, >>> > > > >> > >>> > > > >> > Thanks, [4] is indeed a copy-paste error and you've made the >>> right >>> > > > >> > assumption that >>> > > > >> > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/ >>> > > > >> > is the correct maven central link. >>> > > > >> > >>> > > > >> > I think we should use FLINK-30052 to move the Kafka connector >>> code >>> > > > from >>> > > > >> the >>> > > > >> > 1.17 release also over the Kafka connector repo (especially >>> since >>> > > > >> there's >>> > > > >> > now a v3.0 branch for the Kafka connector, so it can be >>> merged in >>> > > > main). >>> > > > >> > When those commits have been merged, we can make a next Kafka >>> > > > connector >>> > > > >> > release (which is equivalent to the 1.17 release, which can >>> only >>> > be >>> > > > done >>> > > > >> > when 1.17 is done because of the split level watermark >>> alignment) >>> > > and >>> > > > >> then >>> > > > >> > FLINK-30859 can be fi
Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration
Hi Feng Thanks for your update. I found there are two options in `Proposed Changes`, can you put the unselected option in `Rejected Alternatives`? I think this may help us better understand your proposal Best, Shammon FY On Thu, Apr 13, 2023 at 4:49 AM Jing Ge wrote: > Hi Feng, > > Thanks for raising this FLIP. I am still confused after completely reading > the thread with following questions: > > 1. Naming confusion - registerCatalog() and addCatalog() have no big > difference based on their names. One of them is responsible for data > persistence. How about persistCatalog()? > 2. As you mentioned that Map catalogs is used as a cache > and catalogStore is used for data persistence. I would suggest describing > their purpose conceptually and clearly in the FLIP. Some common cache > features should be implemented, i.e. data in the cache and the store should > be consistent. Same Catalog instance should be found in the store and in > the cache(either it has been initialized or it will be lazy initialized) > for the same catalog name. The consistency will be taken care of while > updating the catalog. > 3. As the above discussion moves forward, the option 2 solution looks more > like a replacement of option 1, because, afaiu, issues mentioned > previously with option 1 are not solved yet. Do you still want to propose > both options and ask for suggestions for both of them? > 4. After you updated the FLIP, there are some inconsistent descriptions in > the content. Would you like to clean them up? Thanks! > > Best regards, > Jing > > > On Fri, Apr 7, 2023 at 9:24 AM Feng Jin wrote: > > > hi Shammon > > > > Thank you for your response, and I completely agree with your point of > > view. > > Initially, I may have over complicated the whole issue. First and > foremost, > > we need to consider the persistence of the Catalog's Configuration. > > If we only need to provide persistence for Catalog Configuration, we can > > add a toConfiguration method to the Catalog interface. > > This method can convert a Catalog instance to a Map > > properties, and the default implementation will throw an exception. > > > > public interface Catalog { > >/** > >* Returns a map containing the properties of the catalog object. > >* > >* @return a map containing the properties of the catalog object > >* @throws UnsupportedOperationException if the implementing class does > > not override > >* the default implementation of this method > >*/ > > default Map toProperties() { > > throw new UnsupportedOperationException("Please implement > toProperties > > for this catalog"); > > } > > } > > > > The specific process is as follows: > > > > 1. If the user has configured a CatalogStore, the toConfiguration() > method > > will be called when registering a Catalog instance with > > registerCatalog(String catalogName, Catalog catalog). The Catalog > instance > > will be converted to a Map properties and saved to the > > CatalogStore. If some Catalog instances do not implement this method, an > > exception will be thrown. > > > > 2. If the user does not use a CatalogStore, the toConfiguration() method > > will not be called, ensuring consistency with the original process. > > > > 3. Saving both the Map catalogs and the CatalogStore at > > the same time can also avoid conflicts > > > > > > For lazy initialization: > > we can start from the Catalog itself and provide a dedicated Catalog > > implementation for lazy loading, such as ConfigurationCatalog > > > > public class ConfigurationCatalog implements Catalog { > > ConfigurationCatalog(Map properties) { > > } > > } > > > > I have added this design to the FLIP. > > > > > > Best, > > Feng > > > > On Thu, Apr 6, 2023 at 10:03 AM Shammon FY wrote: > > > > > Hi Feng > > > > > > Thanks for your answer. > > > > > > I have no questions about the functionality of `CatalogStore`, but the > > > different operations of multiple `registerCatalog` or `storeCatalog` in > > > `CatalogManager`. The implementation in Trino is also the same, the > > > `CatalogManager` in trino only has one `createCatalog`, which will save > > the > > > catalog to memory and `CatalogStore`. > > > > > > I think we don't need to add another `registerCatalog` or `addCatalog` > to > > > save a catalog in `Map catalogs` or `CatalogStore > > > catalogStore`. As you mentioned before, the `Map > > > catalogs` is a cache in `CatalogManager`. How about saving the catalog > in > > > `Map catalogs` and `CatalogStore catalogStore` > together > > > when it is registered or added in `CatalogManager`? > > > > > > Best, > > > Shammon FY > > > > > > > > > On Tue, Apr 4, 2023 at 5:22 PM Feng Jin wrote: > > > > > > > Thank you for your reply. I am very sorry for the misunderstanding > > caused > > > > by my deviation from the original discussion. > > > > > > > > @Shammon > > > > > I found there is a pre-discussion [1] for this FLIP > > > > Yes, there was indeed such a discussion before. However, in > d
Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with?
>From the pom[1] of the connector, it seems it's compatible with Flink 1.17. [1] https://github.com/apache/flink-connector-mongodb/blob/main/pom.xml#L56 Best regards, Yuxia - 原始邮件 - 发件人: "Saketh Kurnool" 收件人: "dev" 发送时间: 星期四, 2023年 4 月 13日 上午 5:27:06 主题: [mongodb connector] which flink version(s) is the mongodb connector compatible with? Hello! I have a use case for the mongodb connector (see: https://github.com/apache/flink-connector-mongodb), but I am currently running flink 1.15. As far as I can tell, the connector relies on dependencies that don't exist in 1.15 (one such example is `org/apache/flink/table/connector/source/lookup/cache/LookupCache`). Which flink version(s) is this connector implementation slated to be compatible with? Thank you!
Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector
Hi Jark, John, Thank you for the discussion! I will proceed with completing the patch that adds exactly-once to upsert-kafka connector. Best, Alexander On Wed, Apr 12, 2023 at 12:21 AM Jark Wu wrote: > Hi John, > > Thank you for your valuable input. It sounds reasonable to me. > > From this point of view, the exactly-once is used to guarantee transaction > semantics other than avoid duplication/upserts. > This is similar to the JDBC connectors that already support eventual > consistency with idempotent updates, but we still add the support of > 2PC[1]. > > Best, > Jark > > [1]: > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink > > On Wed, 12 Apr 2023 at 10:36, John Roesler wrote: > > > Hi Jark, > > > > I hope you don’t mind if I chime in. > > > > You have a good point that the sequence of upserts will eventually > > converge to the correct value under the at-least-once delivery guarantee, > > but it can still be important to avoid passing on uncommitted results. > Some > > thoughts, numbered for reference: > > > > 1. Most generally, if some result R is written to the sink topic, but > then > > the job fails before a checkpoint, rolls back, and reprocesses, producing > > R’, then it is incorrect to call R an “upsert”. In fact, as far as the > > system is concerned, R never happened at all (because it was part of a > > rolled-back batch of processing). > > > > 2. Readers may reasonably wish to impose some meaning on the sequence of > > upserts itself, so including aborted results can lead to wrong semantics > > downstream. Eg: “how many times has ‘x’ been updated today”? > > > > 3. Note that processing may not be deterministic over failures, and, > > building on (2), readers may have an expectation that every record in the > > topic corresponds to a real value that was associated with that key at > some > > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash, > > restart and produce x=2. Under at-least-once, the history of x > is[1,99,2], > > while exactly-once would give the correct history of [1,2]. If we set up > an > > alert if the value of x is ever greater over 10, then at-least-once will > > erroneously alert us, while exactly-once does not. > > > > 4. Sending results for failed processing can also cause operational > > problems: if you’re processing a high volume of data, and you get into a > > crash loop, you can create a flood of repeated results. I’ve seen this > case > > cause real world pain for people, and it’s nice to have a way to avoid > it. > > > > I hope some of these examples show why a user might reasonably want to > > configure the connector with the exactly-once guarantee. > > > > Thanks! > > -John > > > > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote: > > > Hi Alexander, > > > > > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated > records > > in > > > case of producer retries > > > or failovers. But as I explained above, it can’t avoid intentionally > > > duplicated records. Actually, I would > > > like to call them "upsert records" instead of "duplicates", that's why > > the > > > connector is named "upsert-kafka", > > > to make Kafka work like a database that supports updating and deleting > by > > > key. > > > > > > For example, there is a SQL query: > > > > > > SELECT URL, COUNT(*) page_views > > > FROM access_logs > > > GROUP BY URL; > > > > > > This is a continuous query[1] that continuously emits a new > > page_views> record once a new URL > > > access entry is received. The same URLs in the log may be far away and > be > > > processed in different checkpoints. > > > > > > It's easy to make upsert-kafka to support exactly-once delivery > > guarantee, > > > but as we discussed above, > > > it's unnecessary to support it and we intend to expose as few > > > configurations to users as possible. > > > > > > > > > Best, > > > Jark > > > > > > [1] > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/ > > > > > > > > > > > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov > > > wrote: > > > > > >> Hi Jark, > > >> > > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with > > idempotent > > >> producers prevent duplicated records[1], at least in the cases when > > >> upstream does not produce them intentionally and across checkpoints. > > >> > > >> Could you please elaborate or point me to the docs that explain the > > reason > > >> for duplicated records upstream and across checkpoints? I am > relatively > > new > > >> to Flink and not aware of it. According to the kafka connector > > >> documentation, it does support exactly once semantics by configuring ' > > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why > we > > >> can't make upsert-kafka configurable in the same way to support this > > >> delivery guarantee. > > >> > > >> Thank you, > > >> Alexander > > >> > > >> 1. > > >> > > >> > > >
Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with?
Oh, correct what I said. The version of released connector jars on Maven central has the supported Flink version as suffix. For the case of the mongodb connector with version 1.0.0-1.16[1], it should be compatible with Flink 1.16. [1] https://central.sonatype.com/artifact/org.apache.flink/flink-connector-mongodb/1.0.0-1.16 Best regards, Yuxia - 原始邮件 - 发件人: "yuxia" 收件人: "dev" 发送时间: 星期四, 2023年 4 月 13日 上午 9:28:37 主题: Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with? >From the pom[1] of the connector, it seems it's compatible with Flink 1.17. [1] https://github.com/apache/flink-connector-mongodb/blob/main/pom.xml#L56 Best regards, Yuxia - 原始邮件 - 发件人: "Saketh Kurnool" 收件人: "dev" 发送时间: 星期四, 2023年 4 月 13日 上午 5:27:06 主题: [mongodb connector] which flink version(s) is the mongodb connector compatible with? Hello! I have a use case for the mongodb connector (see: https://github.com/apache/flink-connector-mongodb), but I am currently running flink 1.15. As far as I can tell, the connector relies on dependencies that don't exist in 1.15 (one such example is `org/apache/flink/table/connector/source/lookup/cache/LookupCache`). Which flink version(s) is this connector implementation slated to be compatible with? Thank you!
[DISCUSS] EncodingFormat and DecondingFormat provide copy API
Hi, devs. I'd like to start a discussion about to EncodingFormat and DecondingFormat provide copy API, which relate to FLINK-31686 [1]. Current, DecodingFormat doesn't support copy(), which makes the DecodingFormat resued after filter/projection is pushed down. The EncodingFormat has the same problem if class implements EncodingFormat#applyWritableMetadata(). So I think EncodingFormat and DecodingFormat need to provide a copy function, and it should be a deep copy if format implements DecodingFormat#applyReadableMetadata/EncodingFormat#applyWritableMetadata/BulkDecodingFormat#applyFilters. Looking forwards to your feedback. [1]: [https://issues.apache.org/jira/browse/FLINK-31686] Best regards, tanjialiang
Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement
Hi, Mang Atomicity is very important for CTAS, especially for batch jobs. This FLIP is a continuation of FLIP-218, which is valuable for CTAS. I just have one question, in the Motivation part of FLIP-218, we mentioned three levels of atomicity semantics, can this current design do the same as Spark's DataSource V2, which can guarantee both atomicity and isolation, for example, can it be done by writing to Hive tables using CTAS? Best, Ron Mang Zhang 于2023年4月10日周一 11:03写道: > Hi, everyone > > > > > I'd like to start a discussion about FLIP-305: Support atomic for CREATE > TABLE AS SELECT(CTAS) statement [1]. > > > > > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not > atomic. It will create the table first before job running. If the job > execution fails, or is cancelled, the table will not be dropped. > > > > > So I want Flink to support atomic CTAS, where only the table is created > when the Job succeeds. Improve user experience. > > > > > Looking forward to your feedback. > > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > > > > > > > > > > > -- > > Best regards, > Mang Zhang
[jira] [Created] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy
Aitozi created FLINK-31790: -- Summary: Filesystem batch sink should also respect to the PartitionCommitPolicy Key: FLINK-31790 URL: https://issues.apache.org/jira/browse/FLINK-31790 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Aitozi Currently, the {{PartitionCommitPolicy}} only take effect in the streaming file sink and hive file sink. The filesystem sink in batch mode should also respect to the commit policy -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with?
Hi Saketh Hi Saketh, As yuxia said, currently the mongodb connector 1.0.0 is compatible with flink 1.16. Some compatibility fixes have been made recently to make it compatible with flink 1.17, but it has not been released yet. Best, Jiabao > 2023年4月13日 上午9:36,yuxia 写道: > > Oh, correct what I said. > The version of released connector jars on Maven central has the supported > Flink version as suffix. > For the case of the mongodb connector with version 1.0.0-1.16[1], it should > be compatible with Flink 1.16. > > [1] > https://central.sonatype.com/artifact/org.apache.flink/flink-connector-mongodb/1.0.0-1.16 > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "yuxia" > 收件人: "dev" > 发送时间: 星期四, 2023年 4 月 13日 上午 9:28:37 > 主题: Re: [mongodb connector] which flink version(s) is the mongodb connector > compatible with? > > From the pom[1] of the connector, it seems it's compatible with Flink 1.17. > > [1] https://github.com/apache/flink-connector-mongodb/blob/main/pom.xml#L56 > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Saketh Kurnool" > 收件人: "dev" > 发送时间: 星期四, 2023年 4 月 13日 上午 5:27:06 > 主题: [mongodb connector] which flink version(s) is the mongodb connector > compatible with? > > Hello! > > I have a use case for the mongodb connector (see: > https://github.com/apache/flink-connector-mongodb), but I am currently > running flink 1.15. As far as I can tell, the connector relies on > dependencies that don't exist in 1.15 (one such example is > `org/apache/flink/table/connector/source/lookup/cache/LookupCache`). > > Which flink version(s) is this connector implementation slated to be > compatible with? > > Thank you!
[RESULT][VOTE] FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration
Hi everyone, FLIP-292 [1] has been accepted and voted through this thread [2]. There are seven approving votes, six of which are binding: - Lincoln Lee (binding) - Jing Ge (binding) - godfrey he (binding) - Jark Wu (binding) - Shuo Cheng - Benchao Li (binding) - Leonard Xu (binding) [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951 [2] https://lists.apache.org/thread/jc5ngs6kxdn179xmj6oqchkl5frdkgr2 Best, Jane Chan
[DISCUSS] Release Paimon 0.4
Hi everyone, I'm going to check out the 0.4 branch out next Monday, and won't merge major refactoring-related PRs into master branch until next Monday. Blockers: - Entire Database Sync CC @Caizhi Weng - CDC Ingestion mysql DATETIME(6) cast error [1] - MySqlSyncTableAction should support case ignore mode [2] If you have other blockers, please let us know. [1] https://github.com/apache/incubator-paimon/issues/860 [2] https://github.com/apache/incubator-paimon/issues/890 Best, Jingsong
Re: [DISCUSS] Release Paimon 0.4
Oh sorry to Flink devs, This email should be sent to paimon dev, please ignore it. Best, Jingsong On Thu, Apr 13, 2023 at 10:56 AM Jingsong Li wrote: > > Hi everyone, > > I'm going to check out the 0.4 branch out next Monday, and won't merge > major refactoring-related PRs into master branch until next Monday. > > Blockers: > - Entire Database Sync CC @Caizhi Weng > - CDC Ingestion mysql DATETIME(6) cast error [1] > - MySqlSyncTableAction should support case ignore mode [2] > > If you have other blockers, please let us know. > > [1] https://github.com/apache/incubator-paimon/issues/860 > [2] https://github.com/apache/incubator-paimon/issues/890 > > Best, > Jingsong
[jira] [Created] (FLINK-31791) FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration
Jane Chan created FLINK-31791: - Summary: FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration Key: FLINK-31791 URL: https://issues.apache.org/jira/browse/FLINK-31791 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: Jane Chan -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
Hi, xia Thanks for your explanation, for the first question, given the current status, I think we can provide the generic interface in the future if we need it. For the second question, it makes sense to me if we can support the table cache at the framework level. Best, Ron yuxia 于2023年4月11日周二 16:12写道: > Hi, ron. > > 1: Considering for deleting rows, Flink will also write delete record to > achive purpose of deleting data, it may not as so strange for connector > devs to make DynamicTableSink implement SupportsTruncate to support > truncate the table. Based on the assume that DynamicTableSink is used for > inserting/updating/deleting, I think it's reasonable for DynamicTableSink > to implement SupportsTruncate. But I think it sounds reasonable to add a > generic interface like DynamicTable to differentiate DynamicTableSource & > DynamicTableSink. But it will definitely requires much design and > discussion which deserves a dedicated FLIP. I perfer not to do that in this > FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe > we can discuss it if some day if we do need the new generic table interface. > > 2: Considering various catalogs and tables, it's hard for Flink to do the > unified follow-up actions after truncating table. But still the external > connector can do such follow-up actions in method `executeTruncation`. > Btw, in Spark, for the newly truncate table interface[1], Spark only > recaches the table after truncating table[2] which I think if Flink > supports table cache in framework-level, > we can also recache in framework-level for truncate table statement. > > [1] > https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java > [2] > https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala > > > I think the external catalog can implemnet such logic in method > `executeTruncation`. > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "liu ron" > 收件人: "dev" > 发送时间: 星期二, 2023年 4 月 11日 上午 10:51:36 > 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement > > Hi, xia > It's a nice improvement to support TRUNCATE TABLE statement, making Flink > more feature-rich. > I think the truncate syntax is a command that will be executed in the > client's process, rather than pulling up a Flink job to execute on the > cluster. So on the user-facing exposed interface, I think we should not let > users implement the SupportsTruncate interface on the DynamicTableSink > interface. This seems a bit strange and also confuses users, as hang said, > why Source table does not support truncate. It would be nice if we could > come up with a generic interface that supports truncate instead of binding > it to the DynamicTableSink interface, and maybe in the future we will > support more commands like truncate command. > > In addition, after truncating data, we may also need to update the metadata > of the table, such as Hive table, we need to update the statistics, as well > as clear the cache in the metastore, I think we should also consider these > capabilities, Sparky has considered these, refer to > > https://github.com/apache/spark/blob/69dd20b5e45c7e3533efbfdc1974f59931c1b781/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala#L573 > . > > Best, > > Ron > > Jim Hughes 于2023年4月11日周二 02:15写道: > > > Hi Yuxia, > > > > On Mon, Apr 10, 2023 at 10:35 AM yuxia > > wrote: > > > > > Hi, Jim. > > > > > > 1: I'm expecting all DynamicTableSinks to support. But it's hard to > > > support all at one shot. For the DynamicTableSinks that haven't > > implemented > > > SupportsTruncate interface, we'll throw exception > > > like 'The truncate statement for the table is not supported as it > hasn't > > > implemented the interface SupportsTruncate'. Also, for some sinks that > > > doesn't support deleting data, it can also implements it but throw more > > > concrete exception like "xxx donesn't support to truncate a table as > > delete > > > is impossible for xxx". It depends on the external connector's > > > implementation. > > > Thanks for your advice, I updated it to the FLIP. > > > > > > > Makes sense. > > > > > > > 2: What do you mean by saying "truncate an input to a streaming query"? > > > This FLIP is aimed to support TRUNCATE TABLE statement which is for > > > truncating a table. In which case it will inoperates with streaming > > queries? > > > > > > > Let's take a source like Kafka as an example. Suppose I have an input > > topic Foo, and query which uses it as an input. > > > > When Foo is truncated, if the truncation works as a delete and create, > then > > the connector may need to be made aware (otherwise it may try to use > > offsets from the previous topic). On the other hand, one may have to ask > > Kafka to delete records up to a certain
Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink
Thanks Panagiotis for the update, the updated FLIP looks good to me. Best, Leonard > On Apr 13, 2023, at 7:42 AM, Panagiotis Garefalakis wrote: > > Hello there, > > Zhu: agree with the config option, great suggestion > Hong: global timeout is also interesting and a good addition -- only > downside I see is just another config option > > If everyone is happy, I suggest we keep the discussion open until Friday > and start a Vote shortly after. > > Cheers, > Panagiotis > > On Tue, Apr 11, 2023 at 12:58 AM Teoh, Hong > wrote: > >> Hi Panagiotis, >> >> Thank you for the update. Looks great! Just one suggestion below: >> >> 1. We seem to be waiting for the future(s) to complete before restarting >> the job - should we add a configurable timeout for the enrichment? Since >> each failure enricher are run in parallel, we could probably settle for 1 >> timeout for all failure handlers. >> 2. +1 to Zhu’s comment on adding a comma separated list of FailureHandlers >> instead of boolean toggle! >> >> Other than the above, the FLIP looks great! Thank you for your efforts. >> >> Regards, >> Hong >> >> >> >>> On 11 Apr 2023, at 08:01, Zhu Zhu wrote: >>> >>> CAUTION: This email originated from outside of the organization. Do not >> click links or open attachments unless you can confirm the sender and know >> the content is safe. >>> >>> >>> >>> Hi Panagiotis, >>> >>> Thanks for updating the FLIP. >>> Regarding the config option >> `jobmanager.failure-enricher-plugins.enabled` >>> I think a config option `jobmanager.failure-enrichers`, which accepts >>> the names of enrichers to use, may be better. It allows the users to >>> deploy and use the plugins in a more flexible way. The default value >>> of the config can be none, which means failure enrichment will be >>> disabled by default. >>> A reference can be the config option `metrics.reporters` which helps >>> to load metric reporter plugins. >>> >>> Thanks, >>> Zhu >>> >>> Panagiotis Garefalakis 于2023年4月10日周一 03:47写道: Hello again everyone, FLIP is now updated based on our discussion! In short, FLIP-304 [1] proposes the addition of a pluggable interface >> that will allow users to add custom logic and enrich failures with custom metadata labels. While as discussed, custom restart strategies will be part of a >> different effort. Every pluggable FaulireEnricher: - Is triggered on every global/non-global failure - Receives a Throwable cause and an immutable Context - Performs asynchronous execution (separate IoExecutor) to avoid blocking the main thread for RPCs - Is completely independent from other Enrichers - Emits failure labels/tags for its unique, pre-defined keys (defined >> at startup time) Check the link for implementation details and please let me know what >> you think :) [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers Panagiotis On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu wrote: > Hi Panagiotis, > > How about to introduce a config option to control which error handling > plugins should be used? It is more flexible for deployments. >> Additionally, > it can also enable users to explicitly specify the order that the >> plugins > take effects. > > Thanks, > Zhu > > Gen Luo 于2023年3月27日周一 15:02写道: >> >> Thanks for the summary! >> >> Also +1 to support custom restart strategies in a different FLIP, >> as long as we can make sure that the plugin interface won't be >> changed when the restart strategy interface is introduced. >> >> To achieve this, maybe we should think well how the handler >> would cooperate with the restart strategy, like would it executes b >> efore the strategy (e.g. some strategy may use the tag), or after >> it (e.g. some metric reporting handler may use the handling result). >> Though we can implement in one way, and extend if the other is >> really necessary by someone. >> >> Besides, instead of using either of the names, shall we just make >> them two subclasses named FailureEnricher and FailureListener? >> The former executes synchronously and can modify the context, >> while the latter executes asynchronously and has a read-only view >> of context. In this way we can make sure a handler behaves in >> the expected way. >> >> >> On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu wrote: >> >>> +1 to support custom restart strategies in a different FLIP. >>> >>> It's fine to have a different plugin for custom restart strategy. >>> If so, since we do not treat the FLIP-304 plugin as a common failure >>> handler, but instead mainly targets to add labels to errors, I would >>> +1 for the name `FailureEnricher`. >>> >>> Th
Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement
Hi, Mang +1 for completing the support for atomicity of CTAS, this is very useful in batch scenarios. I have two questions: 1. naming wise: a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to `Catalog#twoPhaseCreateTable` (and we may add twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later) b) for the `TwoPhaseCommitCatalogTable`, may it be better using `TwoPhaseCatalogTable`? c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction' in the method name, which may remind users of the relevance of transaction support (however, it is not strictly so), so I suggest changing it to `begin` 2. Has this design been validated by any relevant Poc on hive or other catalogs? Best, Lincoln Lee liu ron 于2023年4月13日周四 10:17写道: > Hi, Mang > Atomicity is very important for CTAS, especially for batch jobs. This FLIP > is a continuation of FLIP-218, which is valuable for CTAS. > I just have one question, in the Motivation part of FLIP-218, we mentioned > three levels of atomicity semantics, can this current design do the same as > Spark's DataSource V2, which can guarantee both atomicity and isolation, > for example, can it be done by writing to Hive tables using CTAS? > > Best, > Ron > > Mang Zhang 于2023年4月10日周一 11:03写道: > > > Hi, everyone > > > > > > > > > > I'd like to start a discussion about FLIP-305: Support atomic for CREATE > > TABLE AS SELECT(CTAS) statement [1]. > > > > > > > > > > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not > > atomic. It will create the table first before job running. If the job > > execution fails, or is cancelled, the table will not be dropped. > > > > > > > > > > So I want Flink to support atomic CTAS, where only the table is created > > when the Job succeeds. Improve user experience. > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best regards, > > Mang Zhang >
[jira] [Created] (FLINK-31792) Errors are not reported in the Web UI
David Morávek created FLINK-31792: - Summary: Errors are not reported in the Web UI Key: FLINK-31792 URL: https://issues.apache.org/jira/browse/FLINK-31792 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.17.0 Reporter: David Morávek Assignee: David Morávek After FLINK-29747, NzNotificationService can no longer be resolved by injector, and because we're using the injector directly, this is silently ignored. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Add support for Apache Arrow format
> Which connectors would be commonly used when reading in Arrow format? Filesystem? Currently, yes. The better way is it can be combined used with different connector, but I have not figured out how to integrate the Arrow format deserializer with the `DecodingFormat` or `DeserializationSchema` interface. So, as a first step, I want to introduce it as the file bulk format. Martijn Visser 于2023年4月12日周三 22:53写道: > > Which connectors would be commonly used when reading in Arrow format? > Filesystem? > > On Wed, Apr 12, 2023 at 4:27 AM Jacky Lau wrote: > > > Hi > >I also think arrow format will be useful when reading/writing with > > message queue. > >Arrow defines a language-independent columnar memory format for flat and > > hierarchical data, organized for efficient analytic operations on modern > > hardware like CPUs and GPUs. The Arrow memory format also supports > > zero-copy reads for lightning-fast data access without serialization > > overhead. it will bring a lot. > >And we may do some surveys, what other engines support like > > spark/hive/presto and so on, how that supports and how it be used. > > > >Best, > >Jacky. > > > > Aitozi 于2023年4月2日周日 22:22写道: > > > > > Hi all, > > > Thanks for your input. > > > > > > @Ran > However, as mentioned in the issue you listed, it may take a lot > > of > > > work > > > and the community's consideration for integrating Arrow. > > > > > > To clarify, this proposal solely aims to introduce flink-arrow as a new > > > format, > > > similar to flink-csv and flink-protobuf. It will not impact the internal > > > data > > > structure representation in Flink. For proof of concept, please refer to: > > > https://github.com/Aitozi/flink/commits/arrow-format. > > > > > > @Martijn > I'm wondering if there's really much benefit for the Flink > > > project to > > > add another file format, over properly supporting the format that we > > > already > > > have in the project. > > > > > > Maintain the format we already have and introduce new formats should be > > > orthogonal. The requirement of supporting arrow format originally > > observed > > > in > > > our internal usage to deserialize the data(VectorSchemaRoot) from other > > > storage > > > systems to flink internal RowData and serialize the flink internal > > RowData > > > to > > > VectorSchemaRoot out to the storage system. And the requirement from the > > > slack[1] is to support the arrow file format. Although, Arrow is not > > > usually > > > used as the final disk storage format. But it has a tendency to be used > > > as the > > > inter-exchange format between different systems or temporary storage for > > > analysis due to its columnar format and can be memory mapped to other > > > analysis > > > programs. > > > > > > So, I think it's meaningful to support arrow formats in Flink. > > > > > > @Jim > If the Flink format interface is used there, then it may be > > useful > > > to > > > consider Arrow along with other columnar formats. > > > > > > I am not well-versed with the formats utilized in Paimon. Upon checking > > > [2], it > > > appears that Paimon does not directly employ flink formats. Instead, it > > > utilizes > > > FormatWriterFactory and FormatReaderFactory to handle data serialization > > > and > > > deserialization. Therefore, I believe that the current work may not be > > > applicable for reuse in Paimon at this time. > > > > > > Best, > > > Aitozi. > > > > > > [1]: > > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629 > > > [2]: > > > > > https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format > > > > > > Jim Hughes 于2023年3月31日周五 00:36写道: > > > > > > > > Hi all, > > > > > > > > How do Flink formats relate to or interact with Paimon (formerly > > > > Flink-Table-Store)? If the Flink format interface is used there, then > > it > > > > may be useful to consider Arrow along with other columnar formats. > > > > > > > > Separately, from previous experience, I've seen the Arrow format be > > > useful > > > > as an output format for clients to read efficiently. Arrow does > > support > > > > returning batches of records, so there may be some options to use the > > > > format in a streaming situation where a sufficient collection of > > records > > > > can be gathered. > > > > > > > > Cheers, > > > > > > > > Jim > > > > > > > > > > > > > > > > On Thu, Mar 30, 2023 at 8:32 AM Martijn Visser < > > martijnvis...@apache.org > > > > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > To be honest, I haven't seen that much demand for supporting the > > Arrow > > > > > format directly in Flink as a flink-format. I'm wondering if there's > > > really > > > > > much benefit for the Flink project to add another file format, over > > > > > properly supporting the format that we already have in the project. > > > > > > > > > > Best regards, > > > > > > > > > > Martijn > > > > > > > > > > On Thu, Mar 30, 2023 at 2
Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
Hi, xia > which I think if Flink supports table cache in framework-level, we can also recache in framework-level for truncate table statement. I think currently flink catalog already will some stats for the table, eg: after `ANALYZE TABLE`, the table's Statistics will be stored in the catalog, but truncate table will not correct the statistic. I know it's hard for Flink to do the unified follow-up actions after truncating table. But I think we need define a clear location for the Flink Catalog in mind. IMO, Flink as a compute engine, it's hard for it to maintain the catalog for different storage table itself. So with more and more `Executable` command introduced the data in catalog will be cleaved. In this case, after truncate the catalog's following part may be affected: - the table/column statistic will be not correct - the partition of this table should be cleared Best, Aitozi. liu ron 于2023年4月13日周四 11:28写道: > > Hi, xia > > Thanks for your explanation, for the first question, given the current > status, I think we can provide the generic interface in the future if we > need it. For the second question, it makes sense to me if we can > support the table cache at the framework level. > > Best, > Ron > > yuxia 于2023年4月11日周二 16:12写道: > > > Hi, ron. > > > > 1: Considering for deleting rows, Flink will also write delete record to > > achive purpose of deleting data, it may not as so strange for connector > > devs to make DynamicTableSink implement SupportsTruncate to support > > truncate the table. Based on the assume that DynamicTableSink is used for > > inserting/updating/deleting, I think it's reasonable for DynamicTableSink > > to implement SupportsTruncate. But I think it sounds reasonable to add a > > generic interface like DynamicTable to differentiate DynamicTableSource & > > DynamicTableSink. But it will definitely requires much design and > > discussion which deserves a dedicated FLIP. I perfer not to do that in this > > FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe > > we can discuss it if some day if we do need the new generic table interface. > > > > 2: Considering various catalogs and tables, it's hard for Flink to do the > > unified follow-up actions after truncating table. But still the external > > connector can do such follow-up actions in method `executeTruncation`. > > Btw, in Spark, for the newly truncate table interface[1], Spark only > > recaches the table after truncating table[2] which I think if Flink > > supports table cache in framework-level, > > we can also recache in framework-level for truncate table statement. > > > > [1] > > https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java > > [2] > > https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala > > > > > > I think the external catalog can implemnet such logic in method > > `executeTruncation`. > > > > Best regards, > > Yuxia > > > > - 原始邮件 - > > 发件人: "liu ron" > > 收件人: "dev" > > 发送时间: 星期二, 2023年 4 月 11日 上午 10:51:36 > > 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement > > > > Hi, xia > > It's a nice improvement to support TRUNCATE TABLE statement, making Flink > > more feature-rich. > > I think the truncate syntax is a command that will be executed in the > > client's process, rather than pulling up a Flink job to execute on the > > cluster. So on the user-facing exposed interface, I think we should not let > > users implement the SupportsTruncate interface on the DynamicTableSink > > interface. This seems a bit strange and also confuses users, as hang said, > > why Source table does not support truncate. It would be nice if we could > > come up with a generic interface that supports truncate instead of binding > > it to the DynamicTableSink interface, and maybe in the future we will > > support more commands like truncate command. > > > > In addition, after truncating data, we may also need to update the metadata > > of the table, such as Hive table, we need to update the statistics, as well > > as clear the cache in the metastore, I think we should also consider these > > capabilities, Sparky has considered these, refer to > > > > https://github.com/apache/spark/blob/69dd20b5e45c7e3533efbfdc1974f59931c1b781/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala#L573 > > . > > > > Best, > > > > Ron > > > > Jim Hughes 于2023年4月11日周二 02:15写道: > > > > > Hi Yuxia, > > > > > > On Mon, Apr 10, 2023 at 10:35 AM yuxia > > > wrote: > > > > > > > Hi, Jim. > > > > > > > > 1: I'm expecting all DynamicTableSinks to support. But it's hard to > > > > support all at one shot. For the DynamicTableSinks that haven't > > > implemented > > > > SupportsTruncate interface, we'll throw exception > > > > like 'The truncate statement for the