Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

2023-04-12 Thread Jark Wu
Hi John,

Thank you for your valuable input. It sounds reasonable to me.

>From this point of view, the exactly-once is used to guarantee transaction
semantics other than avoid duplication/upserts.
This is similar to the JDBC connectors that already support eventual
consistency with idempotent updates, but we still add the support of 2PC[1].

Best,
Jark

[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink

On Wed, 12 Apr 2023 at 10:36, John Roesler  wrote:

> Hi Jark,
>
> I hope you don’t mind if I chime in.
>
> You have a good point that the sequence of upserts will eventually
> converge to the correct value under the at-least-once delivery guarantee,
> but it can still be important to avoid passing on uncommitted results. Some
> thoughts, numbered for reference:
>
> 1. Most generally, if some result R is written to the sink topic, but then
> the job fails before a checkpoint, rolls back, and reprocesses, producing
> R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> system is concerned, R never happened at all (because it was part of a
> rolled-back batch of processing).
>
> 2. Readers may reasonably wish to impose some meaning on the sequence of
> upserts itself, so including aborted results can lead to wrong semantics
> downstream. Eg: “how many times has ‘x’ been updated today”?
>
> 3. Note that processing may not be deterministic over failures, and,
> building on (2), readers may have an expectation that every record in the
> topic corresponds to a real value that was associated with that key at some
> point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> restart and produce x=2. Under at-least-once, the history of x is[1,99,2],
> while exactly-once would give the correct history of [1,2]. If we set up an
> alert if the value of x is ever greater over 10, then at-least-once will
> erroneously alert us, while exactly-once does not.
>
> 4. Sending results for failed processing can also cause operational
> problems: if you’re processing a high volume of data, and you get into a
> crash loop, you can create a flood of repeated results. I’ve seen this case
> cause real world pain for people, and it’s nice to have a way to avoid it.
>
> I hope some of these examples show why a user might reasonably want to
> configure the connector with the exactly-once guarantee.
>
> Thanks!
> -John
>
> On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > Hi Alexander,
> >
> > Yes, Kafka’s exactly-once semantics are used to avoid duplicated records
> in
> > case of producer retries
> > or failovers. But as I explained above, it can’t avoid intentionally
> > duplicated records. Actually, I would
> > like to call them "upsert records" instead of "duplicates", that's why
> the
> > connector is named "upsert-kafka",
> > to make Kafka work like a database that supports updating and deleting by
> > key.
> >
> > For example, there is a SQL query:
> >
> > SELECT URL, COUNT(*) page_views
> > FROM access_logs
> > GROUP BY URL;
> >
> > This is a continuous query[1] that continuously emits a new  > page_views> record once a new URL
> > access entry is received. The same URLs in the log may be far away and be
> > processed in different checkpoints.
> >
> > It's easy to make upsert-kafka to support exactly-once delivery
> guarantee,
> > but as we discussed above,
> > it's unnecessary to support it and we intend to expose as few
> > configurations to users as possible.
> >
> >
> > Best,
> > Jark
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> >
> >
> >
> > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> >  wrote:
> >
> >> Hi Jark,
> >>
> >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with
> idempotent
> >> producers prevent duplicated records[1], at least in the cases when
> >> upstream does not produce them intentionally and across checkpoints.
> >>
> >> Could you please elaborate or point me to the docs that explain the
> reason
> >> for duplicated records upstream and across checkpoints? I am relatively
> new
> >> to Flink and not aware of it. According to the kafka connector
> >> documentation, it does support exactly once semantics by configuring '
> >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
> >> can't make upsert-kafka configurable in the same way to support this
> >> delivery guarantee.
> >>
> >> Thank you,
> >> Alexander
> >>
> >> 1.
> >>
> >>
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> >> 2.
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
> >>
> >>
> >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu  wrote:
> >>
> >> > Hi Alexander,
> >> >
> >> > I’m not sure I fully understand the reasons. I left my comments
> inline.
> >> >
> >> > > 1. There might be other non-Flink topic consumers that wo

Re: [VOTE] Release flink-connector-aws, 4.1.0 for Flink 1.17

2023-04-12 Thread Robert Metzger
+1 (binding)

- tried out the kinesis sql binary with sql client
- staging binaries look fine

On Mon, Apr 3, 2023 at 10:12 PM Elphas Tori  wrote:

> +1 (non-binding)
>
> + verified hashes and signatures
> + checked local build of website pull request and approved
>
> On 2023/04/03 16:19:00 Danny Cranmer wrote:
> > Hi everyone,
> > Please review and vote on the binaries for flink-connector-aws version
> > 4.1.0-1.17, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The v4.1.0 source release has already been approved [1], this vote is to
> > distribute the binaries for Flink 1.17 support.
> >
> > The complete staging area is available for your review, which includes:
> > * all artifacts to be deployed to the Maven Central Repository [2], which
> > are signed with the key with fingerprint
> > 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3],
> > * website pull request listing the new release [4].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Danny
> >
> > [1] https://lists.apache.org/thread/7q3ysg9jz5cjwdgdmgckbnqhxh44ncmv
> > [2]
> https://repository.apache.org/content/repositories/orgapacheflink-1602/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4] https://github.com/apache/flink-web/pull/628
> >
>


Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints

2023-04-12 Thread Zakelly Lan
Hi Yun,

I reorganized our discussion and added a comparison table of state
ownership with some previous designs. Please take a look at section
"4.9. State ownership comparison with other designs".

But I don't see them as alternatives since the design of state
ownership is integrated with this FLIP. That is to say, we are
providing a file merging solution including file management for new
merged files, other ownership models are not feasible for the current
merging plan. If the state ownership changes, the design of merging
files at different granularities also needs to be changed accordingly.
WDYT?


Best regards,
Zakelly

On Tue, Apr 11, 2023 at 10:18 PM Yun Tang  wrote:
>
> Hi Zakelly,
>
> Since we already had some discussions on this topic in the doc I mentioned, 
> could you please describe the difference in your FLIP?
>
> I think we should better have a comparing table across different options just 
> like the doc wrote. And we could also list some of them in your Rejected 
> Alternatives part.
>
>
> Best
> Yun Tang
> 
> From: Zakelly Lan 
> Sent: Tuesday, April 11, 2023 17:57
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for 
> Checkpoints
>
> Hi Rui Fan,
>
> Thanks for your comments!
>
> > (1) The temporary segment will remain in the physical file for a short 
> > time, right?
>
> Yes, any written segment will remain in the physical file until the
> physical file is deleted. It is controlled by the reference counting.
> And as discussed in 4.7, this will result in a space amplification
> problem.
>
>
> > (2) Is subtask granularity confused with shared state?
>
> Merging files at granularity of subtask is a general solution for
> shared states, considering the file may be reused by the following
> checkpoint after job restore. This design is applicable to sst files
> and any other shared states that may arise in the future. However, the
> DSTL files are a special case of shared states, since these files will
> no longer be shared after job restore. Therefore, we may do an
> optimization for these files and merge them at the TM level.
> Currently, the DSTL files are not in the shared directory of
> checkpoint storage, and I suggest we keep it as it is. I agree that
> this may bring in some confusion, and I suggest the FLIP mainly
> discuss the general situation and list the special situations
> separately without bringing in new concepts. I will add another
> paragraph describing the file merging for DSTL files. WDYT?
>
>
> > (3) When rescaling, do all shared files need to be copied?
>
> I agree with you that only sst files of the base DB need to be copied
> (or re-uploaded in the next checkpoint). However, section 4.2
> simplifies file copying issues (copying all files), following the
> concept of shared state.
>
>
> > (4) Does the space magnification ratio need a configuration option?
>
> Thanks for the reminder, I will add an option in this FLIP.
>
>
> > (5) How many physical files can a TM write at the same checkpoint at the 
> > same time?
>
> This is a very good point. Actually, there is a file reuse pool as
> section 4.6 described. There could be multiple files within this pool,
> supporting concurrent writing by multiple writers. I suggest providing
> two configurations to control the file number:
>
>   state.checkpoints.file-merging.max-file-pool-size: Specifies the
> upper limit of the file pool size.
>   state.checkpoints.file-merging.max-subtasks-per-file: Specifies the
> lower limit of the file pool size based on the number of subtasks
> within each TM.
>
> The number of simultaneously open files is controlled by these two
> options, and the first option takes precedence over the second.
>
> WDYT?
>
>
>
> Thanks a lot for your valuable insight.
>
> Best regards,
> Zakelly
>
>
> On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi all,
> >
> > Thanks Zakelly driving this proposal, and thank you all for
> > the warm discussions. It's really a useful feature.
> >
> > I have a few questions about this FLIP.
> >
> > (1) The temporary segment will remain in the physical file for
> > a short time, right?
> >
> > FLIP proposes to write segments instead of physical files.
> > If the physical files are written directly, these temporary files
> > will be deleted after the checkpoint is aborted. When writing
> > a segment, how to delete the temporary segment?
> > Decrement the reference count value by 1?
> >
> > (2) Is subtask granularity confused with shared state?
> >
> > From the "4.1.2 Merge files within a subtask or a TM" part,
> > based on the principle of sst files, it is concluded that
> > "For shared states, files are merged within each subtask."
> >
> > I'm not sure whether this conclusion is general or just for sst.
> > As Yanfei mentioned before:
> >
> > > DSTL files are shared between checkpoints, and are
> > > currently merged in batches at the task manager level.
> >
> > DSTL f

Re: [VOTE] Release flink-connector-aws, 4.1.0 for Flink 1.17

2023-04-12 Thread Samrat Deb
+1 (non binding)


On Tue, 4 Apr 2023 at 1:42 AM, Elphas Tori  wrote:

> +1 (non-binding)
>
> + verified hashes and signatures
> + checked local build of website pull request and approved
>
> On 2023/04/03 16:19:00 Danny Cranmer wrote:
> > Hi everyone,
> > Please review and vote on the binaries for flink-connector-aws version
> > 4.1.0-1.17, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The v4.1.0 source release has already been approved [1], this vote is to
> > distribute the binaries for Flink 1.17 support.
> >
> > The complete staging area is available for your review, which includes:
> > * all artifacts to be deployed to the Maven Central Repository [2], which
> > are signed with the key with fingerprint
> > 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3],
> > * website pull request listing the new release [4].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Danny
> >
> > [1] https://lists.apache.org/thread/7q3ysg9jz5cjwdgdmgckbnqhxh44ncmv
> > [2]
> https://repository.apache.org/content/repositories/orgapacheflink-1602/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4] https://github.com/apache/flink-web/pull/628
> >
>


[jira] [Created] (FLINK-31776) Introducing sub-interface LeaderElectionService.LeaderElection

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31776:
-

 Summary: Introducing sub-interface 
LeaderElectionService.LeaderElection
 Key: FLINK-31776
 URL: https://issues.apache.org/jira/browse/FLINK-31776
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-12 Thread Alvin Ge (Jira)
Alvin Ge created FLINK-31777:


 Summary: Upsert Kafka use Avro Confluent, key is ok, but all 
values are null.
 Key: FLINK-31777
 URL: https://issues.apache.org/jira/browse/FLINK-31777
 Project: Flink
  Issue Type: Improvement
  Components: kafka
Affects Versions: 1.16.0
 Environment: Flink: 1.6.0

Confluent version: 7.3.3

Debezium version: 2.1.0/2.0.0

 
Reporter: Alvin Ge


I use debezium send data to kafka with confluent avro format,  when I use 
upsert-kafka connector, all values are null, in 'kafka' connector all values is 
well.

My upsert-kafka table like this:

 
{code:java}
// code placeholder

create table TEA02
(
    SUB_SYSTEM_ENAME varchar(255),
    REC_CREATOR      varchar(255),
    REC_CREATE_TIME  varchar(255),
    REC_REVISOR      varchar(255),
    REC_REVISE_TIME  varchar(255),
    ARCHIVE_FLAG     varchar(255),
    SUB_SYSTEM_CNAME varchar(255),
    SUB_SYSTEM_FNAME varchar(255),
    SUB_SYSTEM_LEVEL varchar(255),
    primary key (SUB_SYSTEM_ENAME) not enforced
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dev.oracle.JNMMM1.TEA02',
 'properties.bootstrap.servers' = 
'10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
 'properties.group.id' = 'TEA02',
 'key.format' = 'avro-confluent',
 'key.avro-confluent.url' = 'http://10.0.170.213:8081',
 'value.format' = 'avro-confluent',
 'value.avro-confluent.url' = 'http://10.0.170.213:8081',
 'value.fields-include' = 'EXCEPT_KEY'
); {code}
 

query result:

 
||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||...||
|CJ|null|null|null|

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31778) Casting array of rows produces incorrect result

2023-04-12 Thread Ilya Soin (Jira)
Ilya Soin created FLINK-31778:
-

 Summary: Casting array of rows produces incorrect result
 Key: FLINK-31778
 URL: https://issues.apache.org/jira/browse/FLINK-31778
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.4, 1.16.1
Reporter: Ilya Soin


{code:java}
select CAST(commissions AS ARRAY>) as commissions
 from (select ARRAY[ROW(123), ROW(234)] commissions){code}
Expected output:
{code:java}
+++
| op |                    commissions |
+++
| +I |             [(123.0), (234.0)] |
+++
 {code}
Actual output:
{code:java}
+++
| op |                    commissions |
+++
| +I |             [(234.0), (234.0)] |
+++ {code}
Full working example: 
https://gist.github.com/soin08/5e0038dbefeba9192706e05a78ef3bc1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31779) Track stable branch of externalized connector instead of specific release tag

2023-04-12 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-31779:
--

 Summary: Track stable branch of externalized connector instead of 
specific release tag
 Key: FLINK-31779
 URL: https://issues.apache.org/jira/browse/FLINK-31779
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common, Documentation
Reporter: Weijie Guo
Assignee: Weijie Guo


Docs should point to the branch where that version of the docs are published, 
otherwise documentation fixes are not visible until a new release is made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31780) Allow users to enable Ensemble tracking for ZooKeeper

2023-04-12 Thread Oleksandr Nitavskyi (Jira)
Oleksandr Nitavskyi created FLINK-31780:
---

 Summary: Allow users to enable Ensemble tracking for ZooKeeper
 Key: FLINK-31780
 URL: https://issues.apache.org/jira/browse/FLINK-31780
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi


In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31782) Make DefaultLeaderElectionService implement MultipleComponentLeaderElectionService.Listener

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31782:
-

 Summary: Make DefaultLeaderElectionService implement 
MultipleComponentLeaderElectionService.Listener
 Key: FLINK-31782
 URL: https://issues.apache.org/jira/browse/FLINK-31782
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31781) Introduce contender ID into LeaderElectionService interface

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31781:
-

 Summary: Introduce contender ID into LeaderElectionService 
interface
 Key: FLINK-31781
 URL: https://issues.apache.org/jira/browse/FLINK-31781
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31783) Replace LeaderElectionDriver in DefaultLeaderElectionService with MultipleComponentLeaderElectionService

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31783:
-

 Summary: Replace LeaderElectionDriver in 
DefaultLeaderElectionService with MultipleComponentLeaderElectionService
 Key: FLINK-31783
 URL: https://issues.apache.org/jira/browse/FLINK-31783
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31784) Add multiple-component support to DefaultLeaderElectionService

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31784:
-

 Summary: Add multiple-component support to 
DefaultLeaderElectionService
 Key: FLINK-31784
 URL: https://issues.apache.org/jira/browse/FLINK-31784
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31785) Move LeaderElectionService out of LeaderContender

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31785:
-

 Summary: Move LeaderElectionService out of LeaderContender
 Key: FLINK-31785
 URL: https://issues.apache.org/jira/browse/FLINK-31785
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31786) Removing unused HighAvailabilityServices implementations

2023-04-12 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31786:
-

 Summary: Removing unused HighAvailabilityServices implementations
 Key: FLINK-31786
 URL: https://issues.apache.org/jira/browse/FLINK-31786
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31787) Add the explicit ROW constructor to the system function doc

2023-04-12 Thread Aitozi (Jira)
Aitozi created FLINK-31787:
--

 Summary: Add the explicit ROW constructor to the system function 
doc
 Key: FLINK-31787
 URL: https://issues.apache.org/jira/browse/FLINK-31787
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Add support for Apache Arrow format

2023-04-12 Thread Martijn Visser
Which connectors would be commonly used when reading in Arrow format?
Filesystem?

On Wed, Apr 12, 2023 at 4:27 AM Jacky Lau  wrote:

> Hi
>I also think arrow format  will be useful when reading/writing with
> message queue.
>Arrow defines a language-independent columnar memory format for flat and
> hierarchical data, organized for efficient analytic operations on modern
> hardware like CPUs and GPUs. The Arrow memory format also supports
> zero-copy reads for lightning-fast data access without serialization
> overhead. it will bring a lot.
>And we  may do some surveys, what other engines support like
> spark/hive/presto and so on, how that supports and how it be used.
>
>Best,
>Jacky.
>
> Aitozi  于2023年4月2日周日 22:22写道:
>
> > Hi all,
> > Thanks for your input.
> >
> > @Ran > However, as mentioned in the issue you listed, it may take a lot
> of
> > work
> > and the community's consideration for integrating Arrow.
> >
> > To clarify, this proposal solely aims to introduce flink-arrow as a new
> > format,
> > similar to flink-csv and flink-protobuf. It will not impact the internal
> > data
> > structure representation in Flink. For proof of concept, please refer to:
> > https://github.com/Aitozi/flink/commits/arrow-format.
> >
> > @Martijn > I'm wondering if there's really much benefit for the Flink
> > project to
> > add another file format, over properly supporting the format that we
> > already
> > have in the project.
> >
> > Maintain the format we already have and introduce new formats should be
> > orthogonal. The requirement of supporting arrow format originally
> observed
> > in
> > our internal usage to deserialize the data(VectorSchemaRoot) from other
> > storage
> > systems to flink internal RowData and serialize the flink internal
> RowData
> > to
> > VectorSchemaRoot out to the storage system.  And the requirement from the
> > slack[1] is to support the arrow file format. Although, Arrow is not
> > usually
> > used as the final disk storage format.  But it has a tendency to be used
> > as the
> > inter-exchange format between different systems or temporary storage for
> > analysis due to its columnar format and can be memory mapped to other
> > analysis
> > programs.
> >
> > So, I think it's meaningful to support arrow formats in Flink.
> >
> > @Jim >  If the Flink format interface is used there, then it may be
> useful
> > to
> > consider Arrow along with other columnar formats.
> >
> > I am not well-versed with the formats utilized in Paimon. Upon checking
> > [2], it
> > appears that Paimon does not directly employ flink formats. Instead, it
> > utilizes
> > FormatWriterFactory and FormatReaderFactory to handle data serialization
> > and
> > deserialization. Therefore, I believe that the current work may not be
> > applicable for reuse in Paimon at this time.
> >
> > Best,
> > Aitozi.
> >
> > [1]:
> https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629
> > [2]:
> >
> https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format
> >
> > Jim Hughes  于2023年3月31日周五 00:36写道:
> > >
> > > Hi all,
> > >
> > > How do Flink formats relate to or interact with Paimon (formerly
> > > Flink-Table-Store)?  If the Flink format interface is used there, then
> it
> > > may be useful to consider Arrow along with other columnar formats.
> > >
> > > Separately, from previous experience, I've seen the Arrow format be
> > useful
> > > as an output format for clients to read efficiently.  Arrow does
> support
> > > returning batches of records, so there may be some options to use the
> > > format in a streaming situation where a sufficient collection of
> records
> > > can be gathered.
> > >
> > > Cheers,
> > >
> > > Jim
> > >
> > >
> > >
> > > On Thu, Mar 30, 2023 at 8:32 AM Martijn Visser <
> martijnvis...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > To be honest, I haven't seen that much demand for supporting the
> Arrow
> > > > format directly in Flink as a flink-format. I'm wondering if there's
> > really
> > > > much benefit for the Flink project to add another file format, over
> > > > properly supporting the format that we already have in the project.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > On Thu, Mar 30, 2023 at 2:21 PM Ran Tao 
> wrote:
> > > >
> > > > > It is a good point that flink integrates apache arrow as a format.
> > > > > Arrow can take advantage of SIMD-specific or vectorized
> > optimizations,
> > > > > which should be of great benefit to batch tasks.
> > > > > However, as mentioned in the issue you listed, it may take a lot of
> > work
> > > > > and the community's consideration for integrating Arrow.
> > > > >
> > > > > I think you can try to make a simple poc for verification and some
> > > > specific
> > > > > plans.
> > > > >
> > > > >
> > > > > Best Regards,
> > > > > Ran Tao
> > > > >
> > > > >
> > > > > Aitozi  于2023年3月29日周三 19:12写道:
> > > > >
> > > > > > H

[jira] [Created] (FLINK-31788) Add back Support emitValueWithRetract for TableAggregateFunction

2023-04-12 Thread Feng Jin (Jira)
Feng Jin created FLINK-31788:


 Summary: Add back Support emitValueWithRetract for 
TableAggregateFunction
 Key: FLINK-31788
 URL: https://issues.apache.org/jira/browse/FLINK-31788
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Feng Jin


This feature was originally implemented in the old planner: 
[https://github.com/apache/flink/pull/8550/files]

However, this logic was not implemented in the new planner , the Blink planner. 

With the removal of the old planner in version 1.14 
[https://github.com/apache/flink/pull/16080] , this code was also removed.

 

We should add it back. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31789) Update axel to current

2023-04-12 Thread Morey Straus (Jira)
Morey Straus created FLINK-31789:


 Summary: Update axel to current
 Key: FLINK-31789
 URL: https://issues.apache.org/jira/browse/FLINK-31789
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.0
Reporter: Morey Straus


Flink is shipping with version 2.6, which contains CVE-2020-13614. This was 
fixed in 2.17.8.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-04-12 Thread Jing Ge
Hi Feng,

Thanks for raising this FLIP. I am still confused after completely reading
the thread with following questions:

1. Naming confusion - registerCatalog() and addCatalog() have no big
difference based on their names. One of them is responsible for data
persistence. How about persistCatalog()?
2. As you mentioned that Map catalogs is used as a cache
and catalogStore is used for data persistence. I would suggest describing
their purpose conceptually and clearly in the FLIP. Some common cache
features should be implemented, i.e. data in the cache and the store should
be consistent. Same Catalog instance should be found in the store and in
the cache(either it has been initialized or it will be lazy initialized)
for the same catalog name. The consistency will be taken care of while
updating the catalog.
3. As the above discussion moves forward, the option 2 solution looks more
like a replacement of option 1, because, afaiu, issues mentioned
previously with option 1 are not solved yet. Do you still want to propose
both options and ask for suggestions for both of them?
4. After you updated the FLIP, there are some inconsistent descriptions in
the content.  Would you like to clean them up? Thanks!

Best regards,
Jing


On Fri, Apr 7, 2023 at 9:24 AM Feng Jin  wrote:

> hi Shammon
>
> Thank you for your response, and I completely agree with your point of
> view.
> Initially, I may have over complicated the whole issue. First and foremost,
> we need to consider the persistence of the Catalog's Configuration.
> If we only need to provide persistence for Catalog Configuration, we can
> add a toConfiguration method to the Catalog interface.
> This method can convert a Catalog instance to a Map
> properties, and the default implementation will throw an exception.
>
> public interface Catalog {
>/**
>* Returns a map containing the properties of the catalog object.
>*
>* @return a map containing the properties of the catalog object
>* @throws UnsupportedOperationException if the implementing class does
> not override
>* the default implementation of this method
>*/
>   default Map toProperties() {
> throw new UnsupportedOperationException("Please implement toProperties
> for this catalog");
>   }
> }
>
> The specific process is as follows:
>
> 1.  If the user has configured a CatalogStore, the toConfiguration() method
> will be called when registering a Catalog instance with
> registerCatalog(String catalogName, Catalog catalog). The Catalog instance
> will be converted to a Map properties and saved to the
> CatalogStore. If some Catalog instances do not implement this method, an
> exception will be thrown.
>
> 2.  If the user does not use a CatalogStore, the toConfiguration() method
> will not be called, ensuring consistency with the original process.
>
> 3. Saving both the Map catalogs and the CatalogStore at
> the same time can also avoid conflicts
>
>
> For lazy initialization:
> we can start from the Catalog itself and provide a dedicated Catalog
> implementation for lazy loading, such as ConfigurationCatalog
>
> public class ConfigurationCatalog implements Catalog {
> ConfigurationCatalog(Map properties) {
> }
> }
>
> I have added this design to the FLIP.
>
>
> Best,
> Feng
>
> On Thu, Apr 6, 2023 at 10:03 AM Shammon FY  wrote:
>
> > Hi Feng
> >
> > Thanks for your answer.
> >
> > I have no questions about the functionality of `CatalogStore`, but the
> > different operations of multiple `registerCatalog` or `storeCatalog` in
> > `CatalogManager`. The implementation in Trino is also the same, the
> > `CatalogManager` in trino only has one `createCatalog`, which will save
> the
> > catalog to memory and `CatalogStore`.
> >
> > I think we don't need to add another `registerCatalog` or `addCatalog` to
> > save a catalog in `Map catalogs` or `CatalogStore
> > catalogStore`.  As you mentioned before, the `Map
> > catalogs` is a cache in `CatalogManager`. How about saving the catalog in
> > `Map catalogs` and `CatalogStore catalogStore` together
> > when it is registered or added in `CatalogManager`?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Apr 4, 2023 at 5:22 PM Feng Jin  wrote:
> >
> > > Thank you for your reply. I am very sorry for the misunderstanding
> caused
> > > by my deviation from the original discussion.
> > >
> > > @Shammon
> > > > I found there is a pre-discussion [1] for this FLIP
> > > Yes, there was indeed such a discussion before.  However, in designing
> > the
> > > whole solution, I found that the logic of CatalogManager itself doesn't
> > > need to change much. *We cannot only persist Catalog instances
> > themselves*,
> > > so exposing only registerCatalog(String catalogName, Catalog catalog)
> > might
> > > not be enough to save Catalogs, because in the end we still need to
> save
> > > the configurations corresponding to the Catalog instances.  Therefore,
> I
> > > decided to introduce the CatalogStore interface for configuration
> > > persistence. Rega

[mongodb connector] which flink version(s) is the mongodb connector compatible with?

2023-04-12 Thread Saketh Kurnool
Hello!

I have a use case for the mongodb connector (see:
https://github.com/apache/flink-connector-mongodb), but I am currently
running flink 1.15. As far as I can tell, the connector relies on
dependencies that don't exist in 1.15 (one such example is
`org/apache/flink/table/connector/source/lookup/cache/LookupCache`).

Which flink version(s) is this connector implementation slated to be
compatible with?

Thank you!


Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-04-12 Thread Panagiotis Garefalakis
Hello there,

Zhu: agree with the config option, great suggestion
Hong: global timeout is also interesting and a good addition -- only
downside I see is just another config option

If everyone is happy, I suggest we keep the discussion open until Friday
and start a Vote shortly after.

Cheers,
Panagiotis

On Tue, Apr 11, 2023 at 12:58 AM Teoh, Hong 
wrote:

> Hi Panagiotis,
>
> Thank you for the update. Looks great! Just one suggestion below:
>
> 1. We seem to be waiting for the future(s) to complete before restarting
> the job - should we add a configurable timeout for the enrichment? Since
> each failure enricher are run in parallel, we could probably settle for 1
> timeout for all failure handlers.
> 2. +1 to Zhu’s comment on adding a comma separated list of FailureHandlers
> instead of boolean toggle!
>
> Other than the above, the FLIP looks great! Thank you for your efforts.
>
> Regards,
> Hong
>
>
>
> > On 11 Apr 2023, at 08:01, Zhu Zhu  wrote:
> >
> > CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
> >
> >
> >
> > Hi Panagiotis,
> >
> > Thanks for updating the FLIP.
> >
> >> Regarding the config option
> `jobmanager.failure-enricher-plugins.enabled`
> > I think a config option `jobmanager.failure-enrichers`, which accepts
> > the names of enrichers to use, may be better. It allows the users to
> > deploy and use the plugins in a more flexible way. The default value
> > of the config can be none, which means failure enrichment will be
> > disabled by default.
> > A reference can be the config option `metrics.reporters` which helps
> > to load metric reporter plugins.
> >
> > Thanks,
> > Zhu
> >
> > Panagiotis Garefalakis  于2023年4月10日周一 03:47写道:
> >>
> >> Hello again everyone,
> >>
> >> FLIP is now updated based on our discussion!
> >> In short, FLIP-304 [1] proposes the addition of a pluggable interface
> that
> >> will allow users to add custom logic and enrich failures with custom
> >> metadata labels.
> >> While as discussed, custom restart strategies will be part of a
> different
> >> effort. Every pluggable FaulireEnricher:
> >>
> >>   - Is triggered on every global/non-global failure
> >>   - Receives a Throwable cause and an immutable Context
> >>   - Performs asynchronous execution (separate IoExecutor) to avoid
> >>   blocking the main thread for RPCs
> >>   - Is completely independent from other Enrichers
> >>   - Emits failure labels/tags for its unique, pre-defined keys (defined
> at
> >>   startup time)
> >>
> >>
> >> Check the link for implementation details and please let me know what
> you
> >> think :)
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers
> >>
> >>
> >> Panagiotis
> >>
> >>
> >> On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu  wrote:
> >>
> >>> Hi Panagiotis,
> >>>
> >>> How about to introduce a config option to control which error handling
> >>> plugins should be used? It is more flexible for deployments.
> Additionally,
> >>> it can also enable users to explicitly specify the order that the
> plugins
> >>> take effects.
> >>>
> >>> Thanks,
> >>> Zhu
> >>>
> >>> Gen Luo  于2023年3月27日周一 15:02写道:
> 
>  Thanks for the summary!
> 
>  Also +1 to support custom restart strategies in a different FLIP,
>  as long as we can make sure that the plugin interface won't be
>  changed when the restart strategy interface is introduced.
> 
>  To achieve this, maybe we should think well how the handler
>  would cooperate with the restart strategy, like would it executes b
>  efore the strategy (e.g. some strategy may use the tag), or after
>  it (e.g. some metric reporting handler may use the handling result).
>  Though we can implement in one way, and extend if the other is
>  really necessary by someone.
> 
>  Besides, instead of using either of the names, shall we just make
>  them two subclasses named FailureEnricher and FailureListener?
>  The former executes synchronously and can modify the context,
>  while the latter executes asynchronously and has a read-only view
>  of context. In this way we can make sure a handler behaves in
>  the expected way.
> 
> 
>  On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu  wrote:
> 
> > +1 to support custom restart strategies in a different FLIP.
> >
> > It's fine to have a different plugin for custom restart strategy.
> > If so, since we do not treat the FLIP-304 plugin as a common failure
> > handler, but instead mainly targets to add labels to errors, I would
> > +1 for the name `FailureEnricher`.
> >
> > Thanks,
> > Zhu
> >
> > David Morávek  于2023年3月23日周四 15:51写道:
> >>
> >>>
> >>> One additional remark on introducing it as an async operation: We
> >>> would
> >>> need a new configuration parameter to define the timeout

[VOTE] Release flink-connector-kafka 3.0.0 for Flink 1.17, release candidate #2

2023-04-12 Thread Tzu-Li (Gordon) Tai
Hi everyone,

Please review and vote on release candidate #2 for version 3.0.0 of the
Apache Flink Kafka Connector, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which are signed with the key with fingerprint
1C1E2394D3194E1944613488F320986D35C33D6A [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v3.0.0-rc2 [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Gordon

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352577
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1607
[5] https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc2
[6] https://github.com/apache/flink-web/pull/632


Re: [VOTE] Release flink-connector-kafka 3.0.0 for Flink 1.17, release candidate #2

2023-04-12 Thread Tzu-Li (Gordon) Tai
A few important remarks about this release candidate:

- As mentioned in the previous voting thread of RC1 [1], we've decided to
skip releasing a version of the externalized Flink Kafka Connector matching
with Flink 1.16.x since the original vote thread stalled, and meanwhile
we've already completed externalizing all Kafka connector code as of Flink
1.17.0.

- As such, this RC is basically identical to the Kafka connector code
bundled with the Flink 1.17.0 release, PLUS a few critical fixes for
exactly-once violations, namely FLINK-31305, FLINK-31363, and FLINK-31620
(please see release notes [2]).

- As part of preparing this RC, I've also deleted the original v3.0 branch
and re-named the v4.0 branch to replace it instead. Effectively, this
resets the versioning numbers for the externalized Flink Kafka Connector
code repository, so that this first release of the repo starts from v3.0.0.

Thanks,
Gordon

[1] https://lists.apache.org/thread/r97y5qt8x0c72460vs5cjm5c729ljmh6
[2]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352577

On Wed, Apr 12, 2023 at 4:55 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi everyone,
>
> Please review and vote on release candidate #2 for version 3.0.0 of the
> Apache Flink Kafka Connector, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2], which are signed with the key with fingerprint
> 1C1E2394D3194E1944613488F320986D35C33D6A [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag v3.0.0-rc2 [5],
> * website pull request listing the new release [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Gordon
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352577
> [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc2/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1607
> [5]
> https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc2
> [6] https://github.com/apache/flink-web/pull/632
>


Re: [VOTE] Release flink-connector-kafka, release candidate #1

2023-04-12 Thread Tzu-Li (Gordon) Tai
RC2 (for Flink 1.17.0) vote has started in a separate thread:
https://lists.apache.org/thread/mff76c2hzcb1mk8fm5m2h4z0j73qz2vk

Please test and cast your votes!

On Tue, Apr 11, 2023 at 11:45 AM Martijn Visser 
wrote:

> +1, thanks for driving this Gordon.
>
> On Tue, Apr 11, 2023 at 8:15 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi all,
>>
>> Martijn and I discussed offline to cancel this vote.
>>
>> Moreover, now that Flink 1.17 is out and we still haven't released
>> anything yet for the newly externalized Kafka connector, we've decided to
>> skip releasing a version that matches with Flink 1.16 all together, and
>> instead go straight to supporting Flink 1.17 for our first release.
>>
>> Practically this means:
>>
>>1. The code as of branch `flink-connector-kafka:v4.0` will be
>>re-versioned as `v3.0` and that will be the actual first release of
>>flink-connector-kafka.
>>2. v3.0.0 will be the first release of `flink-connector-kafka` and it
>>will initially support Flink 1.17.x series.
>>
>> I'm happy to drive the release efforts for this and will create a new RC
>> shortly over the next day or two.
>>
>> Thanks,
>> Gordon
>>
>> On Wed, Apr 5, 2023 at 9:32 PM Mason Chen  wrote:
>>
>>> +1 for new RC!
>>>
>>> Best,
>>> Mason
>>>
>>> On Tue, Apr 4, 2023 at 11:32 AM Tzu-Li (Gordon) Tai >> >
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > I've ported the critical fixes I mentioned to v3.0 and v4.0 branches of
>>> > apache/flink-connector-kafka now.
>>> >
>>> > @martijnvis...@apache.org  let me know if
>>> you'd
>>> > need help with creating a new RC, if there's too much to juggle on
>>> > your end. Happy to help out.
>>> >
>>> > Thanks,
>>> > Gordon
>>> >
>>> > On Sun, Apr 2, 2023 at 11:21 PM Konstantin Knauf 
>>> > wrote:
>>> >
>>> > > +1. Thanks, Gordon!
>>> > >
>>> > > Am Mo., 3. Apr. 2023 um 06:37 Uhr schrieb Tzu-Li (Gordon) Tai <
>>> > > tzuli...@apache.org>:
>>> > >
>>> > > > Hi Martijn,
>>> > > >
>>> > > > Since this RC vote was opened, we had three critical bug fixes
>>> that was
>>> > > > merged for the Kafka connector:
>>> > > >
>>> > > >- https://issues.apache.org/jira/browse/FLINK-31363
>>> > > >- https://issues.apache.org/jira/browse/FLINK-31305
>>> > > >- https://issues.apache.org/jira/browse/FLINK-31620
>>> > > >
>>> > > > Given the severity of these issues (all of them are violations of
>>> > > > exactly-once semantics), and the fact that they are currently not
>>> > > included
>>> > > > yet in any released version, do you think it makes sense to cancel
>>> this
>>> > > RC
>>> > > > in favor of a new one that includes these?
>>> > > > Since this RC vote has been stale for quite some time already, it
>>> > doesn't
>>> > > > seem like we're throwing away too much effort that has already been
>>> > done
>>> > > if
>>> > > > we start a new RC with these critical fixes included.
>>> > > >
>>> > > > What do you think?
>>> > > >
>>> > > > Thanks,
>>> > > > Gordon
>>> > > >
>>> > > > On Thu, Feb 9, 2023 at 3:26 PM Tzu-Li (Gordon) Tai <
>>> > tzuli...@apache.org>
>>> > > > wrote:
>>> > > >
>>> > > > > +1 (binding)
>>> > > > >
>>> > > > > - Verified legals (license headers and root LICENSE / NOTICE
>>> file).
>>> > > > AFAICT
>>> > > > > no dependencies require explicit acknowledgement in the NOTICE
>>> files.
>>> > > > > - No binaries in staging area
>>> > > > > - Built source with tests
>>> > > > > - Verified signatures and hashes
>>> > > > > - Web PR changes LGTM
>>> > > > >
>>> > > > > Thanks Martijn!
>>> > > > >
>>> > > > > Cheers,
>>> > > > > Gordon
>>> > > > >
>>> > > > > On Mon, Feb 6, 2023 at 6:12 PM Mason Chen <
>>> mas.chen6...@gmail.com>
>>> > > > wrote:
>>> > > > >
>>> > > > >> That makes sense, thanks for the clarification!
>>> > > > >>
>>> > > > >> Best,
>>> > > > >> Mason
>>> > > > >>
>>> > > > >> On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser <
>>> > > martijnvis...@apache.org
>>> > > > >
>>> > > > >> wrote:
>>> > > > >>
>>> > > > >> > Hi Mason,
>>> > > > >> >
>>> > > > >> > Thanks, [4] is indeed a copy-paste error and you've made the
>>> right
>>> > > > >> > assumption that
>>> > > > >> >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/
>>> > > > >> > is the correct maven central link.
>>> > > > >> >
>>> > > > >> > I think we should use FLINK-30052 to move the Kafka connector
>>> code
>>> > > > from
>>> > > > >> the
>>> > > > >> > 1.17 release also over the Kafka connector repo (especially
>>> since
>>> > > > >> there's
>>> > > > >> > now a v3.0 branch for the Kafka connector, so it can be
>>> merged in
>>> > > > main).
>>> > > > >> > When those commits have been merged, we can make a next Kafka
>>> > > > connector
>>> > > > >> > release (which is equivalent to the 1.17 release, which can
>>> only
>>> > be
>>> > > > done
>>> > > > >> > when 1.17 is done because of the split level watermark
>>> alignment)
>>> > > and
>>> > > > >> then
>>> > > > >> > FLINK-30859 can be fi

Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-04-12 Thread Shammon FY
Hi Feng

Thanks for your update.

I found there are two options in `Proposed Changes`, can you put the
unselected option in `Rejected Alternatives`? I think this may help us
better understand your proposal


Best,
Shammon FY


On Thu, Apr 13, 2023 at 4:49 AM Jing Ge  wrote:

> Hi Feng,
>
> Thanks for raising this FLIP. I am still confused after completely reading
> the thread with following questions:
>
> 1. Naming confusion - registerCatalog() and addCatalog() have no big
> difference based on their names. One of them is responsible for data
> persistence. How about persistCatalog()?
> 2. As you mentioned that Map catalogs is used as a cache
> and catalogStore is used for data persistence. I would suggest describing
> their purpose conceptually and clearly in the FLIP. Some common cache
> features should be implemented, i.e. data in the cache and the store should
> be consistent. Same Catalog instance should be found in the store and in
> the cache(either it has been initialized or it will be lazy initialized)
> for the same catalog name. The consistency will be taken care of while
> updating the catalog.
> 3. As the above discussion moves forward, the option 2 solution looks more
> like a replacement of option 1, because, afaiu, issues mentioned
> previously with option 1 are not solved yet. Do you still want to propose
> both options and ask for suggestions for both of them?
> 4. After you updated the FLIP, there are some inconsistent descriptions in
> the content.  Would you like to clean them up? Thanks!
>
> Best regards,
> Jing
>
>
> On Fri, Apr 7, 2023 at 9:24 AM Feng Jin  wrote:
>
> > hi Shammon
> >
> > Thank you for your response, and I completely agree with your point of
> > view.
> > Initially, I may have over complicated the whole issue. First and
> foremost,
> > we need to consider the persistence of the Catalog's Configuration.
> > If we only need to provide persistence for Catalog Configuration, we can
> > add a toConfiguration method to the Catalog interface.
> > This method can convert a Catalog instance to a Map
> > properties, and the default implementation will throw an exception.
> >
> > public interface Catalog {
> >/**
> >* Returns a map containing the properties of the catalog object.
> >*
> >* @return a map containing the properties of the catalog object
> >* @throws UnsupportedOperationException if the implementing class does
> > not override
> >* the default implementation of this method
> >*/
> >   default Map toProperties() {
> > throw new UnsupportedOperationException("Please implement
> toProperties
> > for this catalog");
> >   }
> > }
> >
> > The specific process is as follows:
> >
> > 1.  If the user has configured a CatalogStore, the toConfiguration()
> method
> > will be called when registering a Catalog instance with
> > registerCatalog(String catalogName, Catalog catalog). The Catalog
> instance
> > will be converted to a Map properties and saved to the
> > CatalogStore. If some Catalog instances do not implement this method, an
> > exception will be thrown.
> >
> > 2.  If the user does not use a CatalogStore, the toConfiguration() method
> > will not be called, ensuring consistency with the original process.
> >
> > 3. Saving both the Map catalogs and the CatalogStore at
> > the same time can also avoid conflicts
> >
> >
> > For lazy initialization:
> > we can start from the Catalog itself and provide a dedicated Catalog
> > implementation for lazy loading, such as ConfigurationCatalog
> >
> > public class ConfigurationCatalog implements Catalog {
> > ConfigurationCatalog(Map properties) {
> > }
> > }
> >
> > I have added this design to the FLIP.
> >
> >
> > Best,
> > Feng
> >
> > On Thu, Apr 6, 2023 at 10:03 AM Shammon FY  wrote:
> >
> > > Hi Feng
> > >
> > > Thanks for your answer.
> > >
> > > I have no questions about the functionality of `CatalogStore`, but the
> > > different operations of multiple `registerCatalog` or `storeCatalog` in
> > > `CatalogManager`. The implementation in Trino is also the same, the
> > > `CatalogManager` in trino only has one `createCatalog`, which will save
> > the
> > > catalog to memory and `CatalogStore`.
> > >
> > > I think we don't need to add another `registerCatalog` or `addCatalog`
> to
> > > save a catalog in `Map catalogs` or `CatalogStore
> > > catalogStore`.  As you mentioned before, the `Map
> > > catalogs` is a cache in `CatalogManager`. How about saving the catalog
> in
> > > `Map catalogs` and `CatalogStore catalogStore`
> together
> > > when it is registered or added in `CatalogManager`?
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Tue, Apr 4, 2023 at 5:22 PM Feng Jin  wrote:
> > >
> > > > Thank you for your reply. I am very sorry for the misunderstanding
> > caused
> > > > by my deviation from the original discussion.
> > > >
> > > > @Shammon
> > > > > I found there is a pre-discussion [1] for this FLIP
> > > > Yes, there was indeed such a discussion before.  However, in
> d

Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with?

2023-04-12 Thread yuxia
>From the pom[1] of the connector, it seems it's compatible with Flink 1.17.

[1] https://github.com/apache/flink-connector-mongodb/blob/main/pom.xml#L56

Best regards,
Yuxia

- 原始邮件 -
发件人: "Saketh Kurnool" 
收件人: "dev" 
发送时间: 星期四, 2023年 4 月 13日 上午 5:27:06
主题: [mongodb connector] which flink version(s) is the mongodb connector 
compatible with?

Hello!

I have a use case for the mongodb connector (see:
https://github.com/apache/flink-connector-mongodb), but I am currently
running flink 1.15. As far as I can tell, the connector relies on
dependencies that don't exist in 1.15 (one such example is
`org/apache/flink/table/connector/source/lookup/cache/LookupCache`).

Which flink version(s) is this connector implementation slated to be
compatible with?

Thank you!


Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

2023-04-12 Thread Alexander Sorokoumov
Hi Jark, John,

Thank you for the discussion! I will proceed with completing the patch that
adds exactly-once to upsert-kafka connector.

Best,
Alexander

On Wed, Apr 12, 2023 at 12:21 AM Jark Wu  wrote:

> Hi John,
>
> Thank you for your valuable input. It sounds reasonable to me.
>
> From this point of view, the exactly-once is used to guarantee transaction
> semantics other than avoid duplication/upserts.
> This is similar to the JDBC connectors that already support eventual
> consistency with idempotent updates, but we still add the support of
> 2PC[1].
>
> Best,
> Jark
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Wed, 12 Apr 2023 at 10:36, John Roesler  wrote:
>
> > Hi Jark,
> >
> > I hope you don’t mind if I chime in.
> >
> > You have a good point that the sequence of upserts will eventually
> > converge to the correct value under the at-least-once delivery guarantee,
> > but it can still be important to avoid passing on uncommitted results.
> Some
> > thoughts, numbered for reference:
> >
> > 1. Most generally, if some result R is written to the sink topic, but
> then
> > the job fails before a checkpoint, rolls back, and reprocesses, producing
> > R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> > system is concerned, R never happened at all (because it was part of a
> > rolled-back batch of processing).
> >
> > 2. Readers may reasonably wish to impose some meaning on the sequence of
> > upserts itself, so including aborted results can lead to wrong semantics
> > downstream. Eg: “how many times has ‘x’ been updated today”?
> >
> > 3. Note that processing may not be deterministic over failures, and,
> > building on (2), readers may have an expectation that every record in the
> > topic corresponds to a real value that was associated with that key at
> some
> > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> > restart and produce x=2. Under at-least-once, the history of x
> is[1,99,2],
> > while exactly-once would give the correct history of [1,2]. If we set up
> an
> > alert if the value of x is ever greater over 10, then at-least-once will
> > erroneously alert us, while exactly-once does not.
> >
> > 4. Sending results for failed processing can also cause operational
> > problems: if you’re processing a high volume of data, and you get into a
> > crash loop, you can create a flood of repeated results. I’ve seen this
> case
> > cause real world pain for people, and it’s nice to have a way to avoid
> it.
> >
> > I hope some of these examples show why a user might reasonably want to
> > configure the connector with the exactly-once guarantee.
> >
> > Thanks!
> > -John
> >
> > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > > Hi Alexander,
> > >
> > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated
> records
> > in
> > > case of producer retries
> > > or failovers. But as I explained above, it can’t avoid intentionally
> > > duplicated records. Actually, I would
> > > like to call them "upsert records" instead of "duplicates", that's why
> > the
> > > connector is named "upsert-kafka",
> > > to make Kafka work like a database that supports updating and deleting
> by
> > > key.
> > >
> > > For example, there is a SQL query:
> > >
> > > SELECT URL, COUNT(*) page_views
> > > FROM access_logs
> > > GROUP BY URL;
> > >
> > > This is a continuous query[1] that continuously emits a new  > > page_views> record once a new URL
> > > access entry is received. The same URLs in the log may be far away and
> be
> > > processed in different checkpoints.
> > >
> > > It's easy to make upsert-kafka to support exactly-once delivery
> > guarantee,
> > > but as we discussed above,
> > > it's unnecessary to support it and we intend to expose as few
> > > configurations to users as possible.
> > >
> > >
> > > Best,
> > > Jark
> > >
> > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> > >
> > >
> > >
> > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> > >  wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with
> > idempotent
> > >> producers prevent duplicated records[1], at least in the cases when
> > >> upstream does not produce them intentionally and across checkpoints.
> > >>
> > >> Could you please elaborate or point me to the docs that explain the
> > reason
> > >> for duplicated records upstream and across checkpoints? I am
> relatively
> > new
> > >> to Flink and not aware of it. According to the kafka connector
> > >> documentation, it does support exactly once semantics by configuring '
> > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why
> we
> > >> can't make upsert-kafka configurable in the same way to support this
> > >> delivery guarantee.
> > >>
> > >> Thank you,
> > >> Alexander
> > >>
> > >> 1.
> > >>
> > >>
> >
> 

Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with?

2023-04-12 Thread yuxia
Oh, correct what I said. 
The version of released connector jars on Maven central has the supported Flink 
version as suffix. 
For the case of the mongodb connector with version 1.0.0-1.16[1], it should be 
compatible with Flink 1.16.

[1] 
https://central.sonatype.com/artifact/org.apache.flink/flink-connector-mongodb/1.0.0-1.16

Best regards,
Yuxia

- 原始邮件 -
发件人: "yuxia" 
收件人: "dev" 
发送时间: 星期四, 2023年 4 月 13日 上午 9:28:37
主题: Re: [mongodb connector] which flink version(s) is the mongodb connector 
compatible with?

>From the pom[1] of the connector, it seems it's compatible with Flink 1.17.

[1] https://github.com/apache/flink-connector-mongodb/blob/main/pom.xml#L56

Best regards,
Yuxia

- 原始邮件 -
发件人: "Saketh Kurnool" 
收件人: "dev" 
发送时间: 星期四, 2023年 4 月 13日 上午 5:27:06
主题: [mongodb connector] which flink version(s) is the mongodb connector 
compatible with?

Hello!

I have a use case for the mongodb connector (see:
https://github.com/apache/flink-connector-mongodb), but I am currently
running flink 1.15. As far as I can tell, the connector relies on
dependencies that don't exist in 1.15 (one such example is
`org/apache/flink/table/connector/source/lookup/cache/LookupCache`).

Which flink version(s) is this connector implementation slated to be
compatible with?

Thank you!


[DISCUSS] EncodingFormat and DecondingFormat provide copy API

2023-04-12 Thread tanjialiang
Hi, devs. 


I'd like to start a discussion about to EncodingFormat and DecondingFormat 
provide copy API, which relate to FLINK-31686 [1].


Current, DecodingFormat doesn't support copy(), which makes the DecodingFormat 
resued after filter/projection is pushed down. The EncodingFormat has the same 
problem if class implements EncodingFormat#applyWritableMetadata(). So I think 
EncodingFormat and DecodingFormat need to provide a copy function, and it 
should be a deep copy if format implements 
DecodingFormat#applyReadableMetadata/EncodingFormat#applyWritableMetadata/BulkDecodingFormat#applyFilters.
 



Looking forwards to your feedback.


[1]: [https://issues.apache.org/jira/browse/FLINK-31686]


Best regards, 
tanjialiang





Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-04-12 Thread liu ron
Hi, Mang
Atomicity is very important for CTAS, especially for batch jobs. This FLIP
is a continuation of FLIP-218, which is valuable for CTAS.
I just have one question, in the Motivation part of FLIP-218, we mentioned
three levels of atomicity semantics, can this current design do the same as
Spark's DataSource V2, which can guarantee both atomicity and isolation,
for example, can it be done by writing to Hive tables using CTAS?

Best,
Ron

Mang Zhang  于2023年4月10日周一 11:03写道:

> Hi, everyone
>
>
>
>
> I'd like to start a discussion about FLIP-305: Support atomic for CREATE
> TABLE AS SELECT(CTAS) statement [1].
>
>
>
>
> CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> atomic. It will create the table first before job running. If the job
> execution fails, or is cancelled, the table will not be dropped.
>
>
>
>
> So I want Flink to support atomic CTAS, where only the table is created
> when the Job succeeds. Improve user experience.
>
>
>
>
> Looking forward to your feedback.
>
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
>
>
>
>
>
>
>
>
> --
>
> Best regards,
> Mang Zhang


[jira] [Created] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy

2023-04-12 Thread Aitozi (Jira)
Aitozi created FLINK-31790:
--

 Summary: Filesystem batch sink should also respect to the 
PartitionCommitPolicy
 Key: FLINK-31790
 URL: https://issues.apache.org/jira/browse/FLINK-31790
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Aitozi


Currently, the {{PartitionCommitPolicy}} only take effect in the streaming file 
sink and hive file sink. The filesystem sink in batch mode should also respect 
to the commit policy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [mongodb connector] which flink version(s) is the mongodb connector compatible with?

2023-04-12 Thread Jiabao Sun
Hi Saketh

Hi Saketh,

As yuxia said, currently the mongodb connector 1.0.0 is compatible with flink 
1.16.
Some compatibility fixes have been made recently to make it compatible with 
flink 1.17, but it has not been released yet.

Best,
Jiabao


> 2023年4月13日 上午9:36,yuxia  写道:
> 
> Oh, correct what I said. 
> The version of released connector jars on Maven central has the supported 
> Flink version as suffix. 
> For the case of the mongodb connector with version 1.0.0-1.16[1], it should 
> be compatible with Flink 1.16.
> 
> [1] 
> https://central.sonatype.com/artifact/org.apache.flink/flink-connector-mongodb/1.0.0-1.16
> 
> Best regards,
> Yuxia
> 
> - 原始邮件 -
> 发件人: "yuxia" 
> 收件人: "dev" 
> 发送时间: 星期四, 2023年 4 月 13日 上午 9:28:37
> 主题: Re: [mongodb connector] which flink version(s) is the mongodb connector 
> compatible with?
> 
> From the pom[1] of the connector, it seems it's compatible with Flink 1.17.
> 
> [1] https://github.com/apache/flink-connector-mongodb/blob/main/pom.xml#L56
> 
> Best regards,
> Yuxia
> 
> - 原始邮件 -
> 发件人: "Saketh Kurnool" 
> 收件人: "dev" 
> 发送时间: 星期四, 2023年 4 月 13日 上午 5:27:06
> 主题: [mongodb connector] which flink version(s) is the mongodb connector 
> compatible with?
> 
> Hello!
> 
> I have a use case for the mongodb connector (see:
> https://github.com/apache/flink-connector-mongodb), but I am currently
> running flink 1.15. As far as I can tell, the connector relies on
> dependencies that don't exist in 1.15 (one such example is
> `org/apache/flink/table/connector/source/lookup/cache/LookupCache`).
> 
> Which flink version(s) is this connector implementation slated to be
> compatible with?
> 
> Thank you!



[RESULT][VOTE] FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration

2023-04-12 Thread Jane Chan
Hi everyone,

FLIP-292 [1] has been accepted and voted through this thread [2].

There are seven approving votes, six of which are binding:

   - Lincoln Lee (binding)
   - Jing Ge (binding)
   - godfrey he (binding)
   - Jark Wu (binding)
   - Shuo Cheng
   - Benchao Li (binding)
   - Leonard Xu (binding)


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
[2] https://lists.apache.org/thread/jc5ngs6kxdn179xmj6oqchkl5frdkgr2

Best,
Jane Chan


[DISCUSS] Release Paimon 0.4

2023-04-12 Thread Jingsong Li
Hi everyone,

I'm going to check out the 0.4 branch out next Monday, and won't merge
major refactoring-related PRs into master branch until next Monday.

Blockers:
- Entire Database Sync CC @Caizhi Weng
- CDC Ingestion mysql DATETIME(6) cast error [1]
- MySqlSyncTableAction should support case ignore mode [2]

If you have other blockers, please let us know.

[1] https://github.com/apache/incubator-paimon/issues/860
[2] https://github.com/apache/incubator-paimon/issues/890

Best,
Jingsong


Re: [DISCUSS] Release Paimon 0.4

2023-04-12 Thread Jingsong Li
Oh sorry to Flink devs,

This email should be sent to paimon dev, please ignore it.

Best,
Jingsong

On Thu, Apr 13, 2023 at 10:56 AM Jingsong Li  wrote:
>
> Hi everyone,
>
> I'm going to check out the 0.4 branch out next Monday, and won't merge
> major refactoring-related PRs into master branch until next Monday.
>
> Blockers:
> - Entire Database Sync CC @Caizhi Weng
> - CDC Ingestion mysql DATETIME(6) cast error [1]
> - MySqlSyncTableAction should support case ignore mode [2]
>
> If you have other blockers, please let us know.
>
> [1] https://github.com/apache/incubator-paimon/issues/860
> [2] https://github.com/apache/incubator-paimon/issues/890
>
> Best,
> Jingsong


[jira] [Created] (FLINK-31791) FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration

2023-04-12 Thread Jane Chan (Jira)
Jane Chan created FLINK-31791:
-

 Summary: FLIP-292: Enhance COMPILED PLAN to support operator-level 
state TTL configuration
 Key: FLINK-31791
 URL: https://issues.apache.org/jira/browse/FLINK-31791
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-04-12 Thread liu ron
Hi, xia

Thanks for your explanation, for the first question, given the current
status, I think we can provide the generic interface in the future if we
need it. For the second question,  it makes sense to me if we can
support the table cache at the framework level.

Best,
Ron

yuxia  于2023年4月11日周二 16:12写道:

> Hi, ron.
>
> 1: Considering for deleting rows, Flink will also write delete record to
> achive purpose of deleting data, it may not as so strange for connector
> devs to make DynamicTableSink implement SupportsTruncate to support
> truncate the table. Based on the assume that DynamicTableSink is used for
> inserting/updating/deleting, I think it's reasonable for DynamicTableSink
> to implement SupportsTruncate. But I think it sounds reasonable to add a
> generic interface like DynamicTable to differentiate DynamicTableSource &
> DynamicTableSink. But it will definitely requires much design and
> discussion which deserves a dedicated FLIP. I perfer not to do that in this
> FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe
> we can discuss it if some day if we do need the new generic table interface.
>
> 2: Considering various catalogs and tables, it's hard for Flink to do the
> unified follow-up actions after truncating table. But still the external
> connector can do such follow-up actions in method `executeTruncation`.
> Btw, in Spark, for the newly truncate table interface[1], Spark only
> recaches the table after truncating table[2] which I think if Flink
> supports table cache in framework-level,
> we can also recache in framework-level for truncate table statement.
>
> [1]
> https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
> [2]
> https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
>
>
> I think the external catalog can implemnet such logic in method
> `executeTruncation`.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "liu ron" 
> 收件人: "dev" 
> 发送时间: 星期二, 2023年 4 月 11日 上午 10:51:36
> 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
>
> Hi, xia
> It's a nice improvement to support TRUNCATE TABLE statement, making Flink
> more feature-rich.
> I think the truncate syntax is a command that will be executed in the
> client's process, rather than pulling up a Flink job to execute on the
> cluster. So on the user-facing exposed interface, I think we should not let
> users implement the SupportsTruncate interface on the DynamicTableSink
> interface. This seems a bit strange and also confuses users, as hang said,
> why Source table does not support truncate. It would be nice if we could
> come up with a generic interface that supports truncate instead of binding
> it to the DynamicTableSink interface, and maybe in the future we will
> support more commands like truncate command.
>
> In addition, after truncating data, we may also need to update the metadata
> of the table, such as Hive table, we need to update the statistics, as well
> as clear the cache in the metastore, I think we should also consider these
> capabilities, Sparky has considered these, refer to
>
> https://github.com/apache/spark/blob/69dd20b5e45c7e3533efbfdc1974f59931c1b781/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala#L573
> .
>
> Best,
>
> Ron
>
> Jim Hughes  于2023年4月11日周二 02:15写道:
>
> > Hi Yuxia,
> >
> > On Mon, Apr 10, 2023 at 10:35 AM yuxia 
> > wrote:
> >
> > > Hi, Jim.
> > >
> > > 1: I'm expecting all DynamicTableSinks to support. But it's hard to
> > > support all at one shot. For the DynamicTableSinks that haven't
> > implemented
> > > SupportsTruncate interface, we'll throw exception
> > > like 'The truncate statement for the table is not supported as it
> hasn't
> > > implemented the interface SupportsTruncate'. Also, for some sinks that
> > > doesn't support deleting data, it can also implements it but throw more
> > > concrete exception like "xxx donesn't support to truncate a table as
> > delete
> > > is impossible for xxx". It depends on the external connector's
> > > implementation.
> > > Thanks for your advice, I updated it to the FLIP.
> > >
> >
> > Makes sense.
> >
> >
> > > 2: What do you mean by saying "truncate an input to a streaming query"?
> > > This FLIP is aimed to support TRUNCATE TABLE statement which is for
> > > truncating a table. In which case it will inoperates with streaming
> > queries?
> > >
> >
> > Let's take a source like Kafka as an example.  Suppose I have an input
> > topic Foo, and query which uses it as an input.
> >
> > When Foo is truncated, if the truncation works as a delete and create,
> then
> > the connector may need to be made aware (otherwise it may try to use
> > offsets from the previous topic).  On the other hand, one may have to ask
> > Kafka to delete records up to a certain

Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-04-12 Thread Leonard Xu
Thanks Panagiotis for the update, 

the updated FLIP looks good to me.


Best,
Leonard


> On Apr 13, 2023, at 7:42 AM, Panagiotis Garefalakis  wrote:
> 
> Hello there,
> 
> Zhu: agree with the config option, great suggestion
> Hong: global timeout is also interesting and a good addition -- only
> downside I see is just another config option
> 
> If everyone is happy, I suggest we keep the discussion open until Friday
> and start a Vote shortly after.
> 
> Cheers,
> Panagiotis
> 
> On Tue, Apr 11, 2023 at 12:58 AM Teoh, Hong 
> wrote:
> 
>> Hi Panagiotis,
>> 
>> Thank you for the update. Looks great! Just one suggestion below:
>> 
>> 1. We seem to be waiting for the future(s) to complete before restarting
>> the job - should we add a configurable timeout for the enrichment? Since
>> each failure enricher are run in parallel, we could probably settle for 1
>> timeout for all failure handlers.
>> 2. +1 to Zhu’s comment on adding a comma separated list of FailureHandlers
>> instead of boolean toggle!
>> 
>> Other than the above, the FLIP looks great! Thank you for your efforts.
>> 
>> Regards,
>> Hong
>> 
>> 
>> 
>>> On 11 Apr 2023, at 08:01, Zhu Zhu  wrote:
>>> 
>>> CAUTION: This email originated from outside of the organization. Do not
>> click links or open attachments unless you can confirm the sender and know
>> the content is safe.
>>> 
>>> 
>>> 
>>> Hi Panagiotis,
>>> 
>>> Thanks for updating the FLIP.
>>> 
 Regarding the config option
>> `jobmanager.failure-enricher-plugins.enabled`
>>> I think a config option `jobmanager.failure-enrichers`, which accepts
>>> the names of enrichers to use, may be better. It allows the users to
>>> deploy and use the plugins in a more flexible way. The default value
>>> of the config can be none, which means failure enrichment will be
>>> disabled by default.
>>> A reference can be the config option `metrics.reporters` which helps
>>> to load metric reporter plugins.
>>> 
>>> Thanks,
>>> Zhu
>>> 
>>> Panagiotis Garefalakis  于2023年4月10日周一 03:47写道:
 
 Hello again everyone,
 
 FLIP is now updated based on our discussion!
 In short, FLIP-304 [1] proposes the addition of a pluggable interface
>> that
 will allow users to add custom logic and enrich failures with custom
 metadata labels.
 While as discussed, custom restart strategies will be part of a
>> different
 effort. Every pluggable FaulireEnricher:
 
  - Is triggered on every global/non-global failure
  - Receives a Throwable cause and an immutable Context
  - Performs asynchronous execution (separate IoExecutor) to avoid
  blocking the main thread for RPCs
  - Is completely independent from other Enrichers
  - Emits failure labels/tags for its unique, pre-defined keys (defined
>> at
  startup time)
 
 
 Check the link for implementation details and please let me know what
>> you
 think :)
 
 
 [1]
 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers
 
 
 Panagiotis
 
 
 On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu  wrote:
 
> Hi Panagiotis,
> 
> How about to introduce a config option to control which error handling
> plugins should be used? It is more flexible for deployments.
>> Additionally,
> it can also enable users to explicitly specify the order that the
>> plugins
> take effects.
> 
> Thanks,
> Zhu
> 
> Gen Luo  于2023年3月27日周一 15:02写道:
>> 
>> Thanks for the summary!
>> 
>> Also +1 to support custom restart strategies in a different FLIP,
>> as long as we can make sure that the plugin interface won't be
>> changed when the restart strategy interface is introduced.
>> 
>> To achieve this, maybe we should think well how the handler
>> would cooperate with the restart strategy, like would it executes b
>> efore the strategy (e.g. some strategy may use the tag), or after
>> it (e.g. some metric reporting handler may use the handling result).
>> Though we can implement in one way, and extend if the other is
>> really necessary by someone.
>> 
>> Besides, instead of using either of the names, shall we just make
>> them two subclasses named FailureEnricher and FailureListener?
>> The former executes synchronously and can modify the context,
>> while the latter executes asynchronously and has a read-only view
>> of context. In this way we can make sure a handler behaves in
>> the expected way.
>> 
>> 
>> On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu  wrote:
>> 
>>> +1 to support custom restart strategies in a different FLIP.
>>> 
>>> It's fine to have a different plugin for custom restart strategy.
>>> If so, since we do not treat the FLIP-304 plugin as a common failure
>>> handler, but instead mainly targets to add labels to errors, I would
>>> +1 for the name `FailureEnricher`.
>>> 
>>> Th

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-04-12 Thread Lincoln Lee
Hi, Mang

+1 for completing the support for atomicity of CTAS, this is very useful in
batch scenarios.

I have two questions:
1. naming wise:
  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
`Catalog#twoPhaseCreateTable` (and we may add
twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
`TwoPhaseCatalogTable`?
  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
in the method name, which may remind users of the relevance of transaction
support (however, it is not strictly so), so I suggest changing it to
`begin`
2. Has this design been validated by any relevant Poc on hive or other
catalogs?

Best,
Lincoln Lee


liu ron  于2023年4月13日周四 10:17写道:

> Hi, Mang
> Atomicity is very important for CTAS, especially for batch jobs. This FLIP
> is a continuation of FLIP-218, which is valuable for CTAS.
> I just have one question, in the Motivation part of FLIP-218, we mentioned
> three levels of atomicity semantics, can this current design do the same as
> Spark's DataSource V2, which can guarantee both atomicity and isolation,
> for example, can it be done by writing to Hive tables using CTAS?
>
> Best,
> Ron
>
> Mang Zhang  于2023年4月10日周一 11:03写道:
>
> > Hi, everyone
> >
> >
> >
> >
> > I'd like to start a discussion about FLIP-305: Support atomic for CREATE
> > TABLE AS SELECT(CTAS) statement [1].
> >
> >
> >
> >
> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
> > atomic. It will create the table first before job running. If the job
> > execution fails, or is cancelled, the table will not be dropped.
> >
> >
> >
> >
> > So I want Flink to support atomic CTAS, where only the table is created
> > when the Job succeeds. Improve user experience.
> >
> >
> >
> >
> > Looking forward to your feedback.
> >
> >
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best regards,
> > Mang Zhang
>


[jira] [Created] (FLINK-31792) Errors are not reported in the Web UI

2023-04-12 Thread Jira
David Morávek created FLINK-31792:
-

 Summary: Errors are not reported in the Web UI
 Key: FLINK-31792
 URL: https://issues.apache.org/jira/browse/FLINK-31792
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.17.0
Reporter: David Morávek
Assignee: David Morávek


After FLINK-29747, NzNotificationService can no longer be resolved by injector, 
and because we're using the injector directly, this is silently ignored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Add support for Apache Arrow format

2023-04-12 Thread Aitozi
> Which connectors would be commonly used when reading in Arrow format?
Filesystem?

Currently, yes. The better way is it can be combined used with
different connector,
but I have not figured out how to integrate the Arrow format
deserializer with the
`DecodingFormat` or `DeserializationSchema` interface. So, as a first
step, I want to introduce
it as the file bulk format.

Martijn Visser  于2023年4月12日周三 22:53写道:

>
> Which connectors would be commonly used when reading in Arrow format?
> Filesystem?
>
> On Wed, Apr 12, 2023 at 4:27 AM Jacky Lau  wrote:
>
> > Hi
> >I also think arrow format  will be useful when reading/writing with
> > message queue.
> >Arrow defines a language-independent columnar memory format for flat and
> > hierarchical data, organized for efficient analytic operations on modern
> > hardware like CPUs and GPUs. The Arrow memory format also supports
> > zero-copy reads for lightning-fast data access without serialization
> > overhead. it will bring a lot.
> >And we  may do some surveys, what other engines support like
> > spark/hive/presto and so on, how that supports and how it be used.
> >
> >Best,
> >Jacky.
> >
> > Aitozi  于2023年4月2日周日 22:22写道:
> >
> > > Hi all,
> > > Thanks for your input.
> > >
> > > @Ran > However, as mentioned in the issue you listed, it may take a lot
> > of
> > > work
> > > and the community's consideration for integrating Arrow.
> > >
> > > To clarify, this proposal solely aims to introduce flink-arrow as a new
> > > format,
> > > similar to flink-csv and flink-protobuf. It will not impact the internal
> > > data
> > > structure representation in Flink. For proof of concept, please refer to:
> > > https://github.com/Aitozi/flink/commits/arrow-format.
> > >
> > > @Martijn > I'm wondering if there's really much benefit for the Flink
> > > project to
> > > add another file format, over properly supporting the format that we
> > > already
> > > have in the project.
> > >
> > > Maintain the format we already have and introduce new formats should be
> > > orthogonal. The requirement of supporting arrow format originally
> > observed
> > > in
> > > our internal usage to deserialize the data(VectorSchemaRoot) from other
> > > storage
> > > systems to flink internal RowData and serialize the flink internal
> > RowData
> > > to
> > > VectorSchemaRoot out to the storage system.  And the requirement from the
> > > slack[1] is to support the arrow file format. Although, Arrow is not
> > > usually
> > > used as the final disk storage format.  But it has a tendency to be used
> > > as the
> > > inter-exchange format between different systems or temporary storage for
> > > analysis due to its columnar format and can be memory mapped to other
> > > analysis
> > > programs.
> > >
> > > So, I think it's meaningful to support arrow formats in Flink.
> > >
> > > @Jim >  If the Flink format interface is used there, then it may be
> > useful
> > > to
> > > consider Arrow along with other columnar formats.
> > >
> > > I am not well-versed with the formats utilized in Paimon. Upon checking
> > > [2], it
> > > appears that Paimon does not directly employ flink formats. Instead, it
> > > utilizes
> > > FormatWriterFactory and FormatReaderFactory to handle data serialization
> > > and
> > > deserialization. Therefore, I believe that the current work may not be
> > > applicable for reuse in Paimon at this time.
> > >
> > > Best,
> > > Aitozi.
> > >
> > > [1]:
> > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629
> > > [2]:
> > >
> > https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format
> > >
> > > Jim Hughes  于2023年3月31日周五 00:36写道:
> > > >
> > > > Hi all,
> > > >
> > > > How do Flink formats relate to or interact with Paimon (formerly
> > > > Flink-Table-Store)?  If the Flink format interface is used there, then
> > it
> > > > may be useful to consider Arrow along with other columnar formats.
> > > >
> > > > Separately, from previous experience, I've seen the Arrow format be
> > > useful
> > > > as an output format for clients to read efficiently.  Arrow does
> > support
> > > > returning batches of records, so there may be some options to use the
> > > > format in a streaming situation where a sufficient collection of
> > records
> > > > can be gathered.
> > > >
> > > > Cheers,
> > > >
> > > > Jim
> > > >
> > > >
> > > >
> > > > On Thu, Mar 30, 2023 at 8:32 AM Martijn Visser <
> > martijnvis...@apache.org
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > To be honest, I haven't seen that much demand for supporting the
> > Arrow
> > > > > format directly in Flink as a flink-format. I'm wondering if there's
> > > really
> > > > > much benefit for the Flink project to add another file format, over
> > > > > properly supporting the format that we already have in the project.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > On Thu, Mar 30, 2023 at 2

Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-04-12 Thread Aitozi
Hi, xia
   > which I think if Flink supports table cache in framework-level,
we can also recache in framework-level for truncate table statement.

I think currently flink catalog already will some stats for the table,
eg: after `ANALYZE TABLE`, the table's Statistics will be stored in
the
catalog, but truncate table will not correct the statistic.

I know it's hard for Flink to do the unified follow-up actions after
truncating table. But I think we need define a clear location for the
Flink Catalog
in mind.
IMO, Flink as a compute engine, it's hard for it to maintain the
catalog for different storage table itself. So with more and more
`Executable`
command introduced the data in catalog will be cleaved.
In this case, after truncate the catalog's following part may be affected:

- the table/column statistic will be not correct
- the partition of this table should be cleared


Best,
Aitozi.


liu ron  于2023年4月13日周四 11:28写道:

>
> Hi, xia
>
> Thanks for your explanation, for the first question, given the current
> status, I think we can provide the generic interface in the future if we
> need it. For the second question,  it makes sense to me if we can
> support the table cache at the framework level.
>
> Best,
> Ron
>
> yuxia  于2023年4月11日周二 16:12写道:
>
> > Hi, ron.
> >
> > 1: Considering for deleting rows, Flink will also write delete record to
> > achive purpose of deleting data, it may not as so strange for connector
> > devs to make DynamicTableSink implement SupportsTruncate to support
> > truncate the table. Based on the assume that DynamicTableSink is used for
> > inserting/updating/deleting, I think it's reasonable for DynamicTableSink
> > to implement SupportsTruncate. But I think it sounds reasonable to add a
> > generic interface like DynamicTable to differentiate DynamicTableSource &
> > DynamicTableSink. But it will definitely requires much design and
> > discussion which deserves a dedicated FLIP. I perfer not to do that in this
> > FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe
> > we can discuss it if some day if we do need the new generic table interface.
> >
> > 2: Considering various catalogs and tables, it's hard for Flink to do the
> > unified follow-up actions after truncating table. But still the external
> > connector can do such follow-up actions in method `executeTruncation`.
> > Btw, in Spark, for the newly truncate table interface[1], Spark only
> > recaches the table after truncating table[2] which I think if Flink
> > supports table cache in framework-level,
> > we can also recache in framework-level for truncate table statement.
> >
> > [1]
> > https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
> > [2]
> > https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
> >
> >
> > I think the external catalog can implemnet such logic in method
> > `executeTruncation`.
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "liu ron" 
> > 收件人: "dev" 
> > 发送时间: 星期二, 2023年 4 月 11日 上午 10:51:36
> > 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
> >
> > Hi, xia
> > It's a nice improvement to support TRUNCATE TABLE statement, making Flink
> > more feature-rich.
> > I think the truncate syntax is a command that will be executed in the
> > client's process, rather than pulling up a Flink job to execute on the
> > cluster. So on the user-facing exposed interface, I think we should not let
> > users implement the SupportsTruncate interface on the DynamicTableSink
> > interface. This seems a bit strange and also confuses users, as hang said,
> > why Source table does not support truncate. It would be nice if we could
> > come up with a generic interface that supports truncate instead of binding
> > it to the DynamicTableSink interface, and maybe in the future we will
> > support more commands like truncate command.
> >
> > In addition, after truncating data, we may also need to update the metadata
> > of the table, such as Hive table, we need to update the statistics, as well
> > as clear the cache in the metastore, I think we should also consider these
> > capabilities, Sparky has considered these, refer to
> >
> > https://github.com/apache/spark/blob/69dd20b5e45c7e3533efbfdc1974f59931c1b781/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala#L573
> > .
> >
> > Best,
> >
> > Ron
> >
> > Jim Hughes  于2023年4月11日周二 02:15写道:
> >
> > > Hi Yuxia,
> > >
> > > On Mon, Apr 10, 2023 at 10:35 AM yuxia 
> > > wrote:
> > >
> > > > Hi, Jim.
> > > >
> > > > 1: I'm expecting all DynamicTableSinks to support. But it's hard to
> > > > support all at one shot. For the DynamicTableSinks that haven't
> > > implemented
> > > > SupportsTruncate interface, we'll throw exception
> > > > like 'The truncate statement for the