Re: [VOTE] FLIP-466: Introduce ProcessFunction Attribute in DataStream API V2

2024-07-19 Thread Xintong Song
+1(binding)

Best,

Xintong



On Thu, Jul 18, 2024 at 9:40 AM weijie guo 
wrote:

> +1(binding)
>
> Best regards,
>
> Weijie
>
>
> Wencong Liu  于2024年7月17日周三 21:31写道:
>
> > Hi dev,
> >
> > I'd like to start a vote on FLIP-466.
> >
> > Discussion thread:
> > https://lists.apache.org/thread/sw2or62299w0hw9jm5kdqjqj3j8rnrdt
> > FLIP:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-466%3A+Introduce+ProcessFunction+Attribute+in+DataStream+API+V2
> >
> > Best regards,
> > Wencong Liu
>


Re: [DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-07-19 Thread Zhu Zhu
+1 for the FLIP

It's a good start to adaptively optimize the logical execution plan with
runtime information.

Thanks,
Zhu

Xia Sun  于2024年7月18日周四 18:23写道:

> Hi devs,
>
> Junrui Lee, Lei Yang, and I would like to initiate a discussion about
> FLIP-470: Support Adaptive Broadcast Join[1].
>
> In general, Broadcast Hash Join is currently the most efficient join
> strategy available in Flink. However, its prerequisite is that the input
> data on one side must be sufficiently small; otherwise, it may lead to
> memory overuse or other issues. Currently, due to the lack of precise
> statistics, it is difficult to make accurate estimations during the Flink
> SQL Planning phase. For example, when an upstream Filter operator is
> present, it is easy to overestimate the size of the table, whereas with
> an expansion operator, the table size tends to be underestimated. Moreover,
> once the join operator is determined, it cannot be modified at runtime.
>
> To address this issue, we plan to introduce Adaptive Broadcast Join
> capability based on FLIP-468: Introducing StreamGraph-Based Job
> Submission[2]
> and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. This will
> allow the join operator to be dynamically optimized to Broadcast Join based
> on the actual input data volume at runtime and fallback when the
> optimization
> conditions are not met.
>
> For more details, please refer to FLIP-470[1]. We look forward to your
> feedback.
>
> Best,
> Junrui Lee, Lei Yang and Xia Sun
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
>


Re: [VOTE] Release 1.20.0, release candidate #1

2024-07-19 Thread Xintong Song
+1 (binding)

- reviewed flink-web PR
- verified checksum and signature
- verified source archives don't contain binaries
- built from source
- tried example jobs on a standalone cluster, and everything looks fine

Best,

Xintong



On Thu, Jul 18, 2024 at 4:25 PM Rui Fan <1996fan...@gmail.com> wrote:

> +1(binding)
>
> - Reviewed the flink-web PR (Left some comments)
> - Checked Github release tag
> - Verified signatures
> - Verified sha512 (hashsums)
> - The source archives don't contain any binaries
> - Build the source with Maven 3 and java8 (Checked the license as well)
> - Start the cluster locally with jdk8, and run the StateMachineExample job,
> it works fine.
>
> Best,
> Rui
>
> On Mon, Jul 15, 2024 at 10:59 PM weijie guo 
> wrote:
>
> > Hi everyone,
> >
> >
> > Please review and vote on the release candidate #1 for the version
> 1.20.0,
> >
> > as follows:
> >
> >
> > [ ] +1, Approve the release
> >
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which includes:
> >
> > * JIRA release notes [1], and the pull request adding release note for
> >
> > users [2]
> >
> > * the official Apache source release and binary convenience releases to
> be
> >
> > deployed to dist.apache.org [3], which are signed with the key with
> >
> > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F  [4],
> >
> > * all artifacts to be deployed to the Maven Central Repository [5],
> >
> > * source code tag "release-1.20.0-rc1" [6],
> >
> > * website pull request listing the new release and adding announcement
> blog
> >
> > post [7].
> >
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> >
> > approval, with at least 3 PMC affirmative votes.
> >
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12354210
> >
> > [2] https://github.com/apache/flink/pull/25091
> >
> > [3] https://dist.apache.org/repos/dist/dev/flink/flink-1.20.0-rc1/
> >
> > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> >
> > [5]
> > https://repository.apache.org/content/repositories/orgapacheflink-1744/
> >
> > [6] https://github.com/apache/flink/releases/tag/release-1.20.0-rc1
> >
> > [7] https://github.com/apache/flink-web/pull/751
> >
> >
> > Best,
> >
> > Robert, Rui, Ufuk, Weijie
> >
>


[jira] [Created] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+

2024-07-19 Thread yux (Jira)
yux created FLINK-35868:
---

 Summary: Bump Mongo driver version to support Mongo 7.0+
 Key: FLINK-35868
 URL: https://issues.apache.org/jira/browse/FLINK-35868
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which 
doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be 
nice since Mongo 7.0 has been released nearly a year ago.

[1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35869) Right side columns of temporal left join are always null when right side is filtered

2024-07-19 Thread Jira
Grzegorz Kołakowski created FLINK-35869:
---

 Summary: Right side columns of temporal left join are always null 
when right side is filtered
 Key: FLINK-35869
 URL: https://issues.apache.org/jira/browse/FLINK-35869
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.19.1
Reporter: Grzegorz Kołakowski


Background:

I have a stream of user information updates, but those updates are partial, 
that is, the event contains only the fields that has changed ("untouched" 
fields are empty strings in this example), for instance.
{noformat}
{"user_id":1}   {"ts":"2024-07-18 
12:00:00","user_id":1,"city":"Warszawa","phone_number":""}
{"user_id":1}   {"ts":"2024-07-18 
12:01:00","user_id":1,"city":"","phone_number":"+48 123456789"}{noformat}
To be able to create a correct versioned table for temporal join I tried to 
filter the right side:
{noformat}
location_changes AS (SELECT * FROM user_data WHERE city <> ''){noformat}
If *WHERE city <> ''* filter is present, then all columns from right side are 
always null:
{noformat}
++-+-++-+-+++
| op |                      ts |     user_id |                         action | 
                    ts0 |    user_id0 |                           city |        
           phone_number |
++-+-++-+-+++
| +I | 2024-07-18 11:59:59.000 |           1 |                          login | 
                  |       |                          |        
                  |
| +I | 2024-07-18 12:00:01.000 |           1 |                          login | 
                  |       |                          |        
                  |
| +I | 2024-07-18 12:01:01.000 |           1 |                          login | 
                  |       |                          |        
                  |
| +I | 2024-07-18 12:02:01.000 |           1 |                          login | 
                  |       |                          |        
                  |
| +I | 2024-07-18 12:03:01.000 |           1 |                          login | 
                  |       |                          |        
                  |
| +I | 2024-07-18 12:04:01.000 |           1 |                          login | 
                  |       |                          |        
                  |{noformat}
If *WHERE city <> ''* filter is NOT present, right side columns are not always 
null (but obviously the results are not as I wanted them to be).
{noformat}
++-+-++-+-+++
| op |                      ts |     user_id |                         action | 
                    ts0 |    user_id0 |                           city |        
           phone_number |
++-+-++-+-+++
| +I | 2024-07-18 11:59:59.000 |           1 |                          login | 
                  |       |                          |        
                  |
| +I | 2024-07-18 12:00:01.000 |           1 |                          login | 
2024-07-18 12:00:00.000 |           1 |                       Warszawa |        
                        |
| +I | 2024-07-18 12:01:01.000 |           1 |                          login | 
2024-07-18 12:01:00.000 |           1 |                                |        
          +48 123456789 |
| +I | 2024-07-18 12:02:01.000 |           1 |                          login | 
2024-07-18 12:02:00.000 |           1 |                         Kraków |        
                        |
| +I | 2024-07-18 12:03:01.000 |           1 |                          login | 
2024-07-18 12:03:00.000 |           1 |                                |        
          +48 987654321 |
| +I | 2024-07-18 12:04:01.000 |           1 |                          login | 
2024-07-18 12:04:00.000 |           1 |                         Gdańsk |        
                        |{noformat}

 

I ran the job with debugger and I noticed that in 
{{org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator#processElement2
 }}UPDATE_AFTER event usually comes before the corresponding UPDATE_BEFORE, 
which I think is weird. In consequence, rightState usually contains only 
UPDATE_BEFORE event for given timestamp.

 
{code:java}
@Override
public void processElement2(StreamRecord element) 

[DISCUSS] FLIP-XXX Amazon SQS Source Connector

2024-07-19 Thread Saurabh Singh
Hi Fink Devs,

Our team has been working on migrating various data pipelines to Flink to
leverage the benefits of exactly-once processing, checkpointing, and
stateful computing. We have several use cases built around the AWS SQS
Service. For this migration, we have developed an SQS Source Connector,
which enables us to run both stateless and stateful Flink-based jobs.

We believe that this SQS Source Connector would be a valuable addition to
the existing connector set. Therefore, we propose a FLIP to include it.

For more information, please refer to the FLIP document.

https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing

Thanks
Saurabh & Abhi


Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-07-19 Thread Kevin Lam
Hi all,

I had chimed
 in on
Anupam's PR that we might have some time to look at this issue FLINK-34440.

Adding @David Mariassy  to this thread, as he's
started to work
 on
our implementation of FLINK-34440:
https://github.com/Shopify/flink/tree/protobuf-confluent-dynamic-deser

On Fri, Apr 19, 2024 at 12:41 AM Anupam Aggarwal 
wrote:

> Thanks David.
> That's a great idea. For deserialization the external schema Id will be
> used to obtain a dynamic message, so in a way it has to be inline with the
> writer schema.
> We could limit it to serialization and rename it according to your
> suggestion.
>
> Thanks
> Anupam
> On Tue, Apr 16, 2024 at 3:38 PM David Radley 
> wrote:
>
> > Hi Anupam,
> > Thanks for your response. I was wondering around the schema id and had
> > some thoughts:
> >
> > I assume that for Confluent Avro, specifying the schema is not normally
> > done, but could be useful to force a particular shape.
> >
> > If you specify a schema id in the format configuration:
> > - for deserialization : does this mean the schema id in the payload has
> to
> > match it. If so we lose the ability to have multiple versions of the
> schema
> > on a topic. For me schemaId makes less sense for deserialization as the
> > existing mechanism used by Avro / confluent avro formats is working well.
> >
> > - I can see it makes sense for the serialization where there is an
> > existing schema in the registry you want to target.
> >
> > I suggest the schemaId be called something like schemaIdForSink or
> > schemaIdForSerilization; to prevent confusion with the deserialization
> > case. We could have the schema as you suggest so we are compatible with
> the
> > confluent avro format.
> >
> >
> > WDYT?
> > Kind regards, David.
> >
> >
> > From: Anupam Aggarwal 
> > Date: Saturday, 13 April 2024 at 16:08
> > To: dev@flink.apache.org 
> > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > Confluent Format
> > Hi David,
> >
> > Thank you for the suggestion.
> > IIUC, you are proposing using an explicit schema string, instead of the
> > schemaID.
> > This makes sense, as it would make the behavior consistent with Avro,
> > although a bit more verbose from a config standpoint.
> >
> > If we go via the schema string route, the user would have to ensure that
> > the input schema string corresponds to an existing schemaID.
> > This however, might end up registering a new id (based on
> >
> >
> https://github.com/confluentinc/schema-registry/issues/878#issuecomment-437510493
> > ).
> >
> > How about adding both the options (explicit schema string/ schemaID).
> > If a schema string is specified we register a new schemaID, if the user
> > specifies an explicit schemaID we just use it directly?
> >
> > Thanks
> > Anupam
> >
> > On Wed, Apr 10, 2024 at 2:27 PM David Radley 
> > wrote:
> >
> > > Hi,
> > > I notice in the draft pr that there is a schema id in the format
> config.
> > I
> > > was wondering why? In the confluent avro and existing debezium formats,
> > > there is no schema id in the config, but there is the ability to
> specify
> > a
> > > complete schema. In the protobuf format there is no schema id.
> > >
> > > I assume the schema id would be used during serialize in the case there
> > is
> > > already an existing registered schema and you have its id. I see in the
> > > docs
> > >
> >
> https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
> > > there is a serialize example where 2 schemas are registered.
> > >
> > > I would suggest aiming to copy what the confluent DeSer libraries do
> > > rather than having a schema id hard coded in the config.
> > >
> > > WDYT?
> > > Kind regards, David.
> > >
> > > From: Kevin Lam 
> > > Date: Tuesday, 26 March 2024 at 20:06
> > > To: dev@flink.apache.org 
> > > Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf
> > > Confluent Format
> > > Thanks Anupam! Looking forward to it.
> > >
> > > On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal <
> > anupam.aggar...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Kevin,
> > > >
> > > > Thanks, these are some great points.
> > > > Just to clarify, I do agree that the subject should be an option
> (like
> > in
> > > > the case of RegistryAvroFormatFactory).
> > > > We could fallback to subject and auto-register schemas, if schema-Id
> > not
> > > > provided explicitly.
> > > > In general, I think it would be good to be more explicit about the
> > > schemas
> > > > used (
> > > >
> > > >
> > >
> >
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > > > <
> > > >
> > >
> >
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> > > > >
> > > > ).
> > > > This

Re: Potential Kafka Connector FLIP: Large Message Handling

2024-07-19 Thread Kevin Lam
Apologies David, here's a temporary link to the image:
https://screenshot.click/19-19-0xwls-jfx7s.png


On Thu, Jul 18, 2024 at 9:00 AM David Radley 
wrote:

> Hi Kevin,
> That sounds good, unfortunately the image did not come through in the
> email for me,kind regards, David
>
> From: Kevin Lam 
> Date: Wednesday, 10 July 2024 at 19:20
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message
> Handling
> Hey David, Yes my proposal was originally to do this at the connector
> level, as you mentioned it doesn't make sense in a format. In the end I
> took the approach in my previous e-mail: I was able to insert our Large
> Message handling by overriding
>
> Hey David,
>
> Yes my proposal was originally to do this at the connector level, as you
> mentioned it doesn't make sense in a format. In the end I took the approach
> in my previous e-mail:
>
> I was able to insert our Large Message handling by overriding
> value.serializer<
> https://kafka.apache.org/documentation/#producerconfigs_value.serializer>
> and value.deserializer<
> https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer>
> in the consumer and producer configuration that Flink sets, using the
> `properties.*` option in the Kafka Connector. This approach doesn't require
> Flink to know anything about large messages or have any major changes made
> to it.
> Flink uses the ByteArray(De|S)erializers by default in its source<
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470>
> and sink<
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>.
> Overriding the source serializer requires a small change to flip this
> boolean<
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464>
> to make it overridable. I'm planning to start a separate thread to propose
> making `value.serializer` overridable.
>
> This is a rough representation of the before and after for producers
> (similar applies for consumers):
> [cid:ii_lyg5y5qx0]
> I assume you would want to add configuration to define where the external
> storage lives and authentication.  Limitations around stack and heap sizes
> would be worth considering.
>
> Yes, we're implementing the 'claim check' pattern and this is definitely
> something we're considering! Thanks for raising it
>
> On Wed, Jul 10, 2024 at 11:06 AM David Radley  > wrote:
> Hi Kevin,
> You mention the link
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> , I assume this is the approach you are considering. And that this is being
> done at the connector level, as the message could be in any of the existing
> supported formats – so is not appropriate as a new format. It sounds like
> for deserialization, the reference to external storage header would be
> found in your deser and the contents then taken from external source and
> put into the Kafka body or the other way round for serialization. This is
> different to my Apicurio work that is to handle format specific headers.
>
> I assume you would want to add configuration to define where the external
> storage lives and authentication.  Limitations around stack and heap sizes
> would be worth considering.
>
> Am I understanding your intent correctly?
>   Kind regards,  David.
>
>
> From: Kevin Lam 
> Date: Wednesday, 10 July 2024 at 14:35
> To: dev@flink.apache.org <
> dev@flink.apache.org>
> Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message
> Handling
> Hey all, just a follow-up here. I was able to insert our Large Message
> handling by overriding value.serializer
> 
> and value.deserializer
> <
> https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer
> >
>  in the consumer and producer configuration that Flink sets, using the
> `properties.*` option in the Kafka Connector. This approach doesn't require
> Flink to know anything about large messages or have any major changes made
> to it.
>
> Flink uses the ByteArray(De|S)erializers by default in its source
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470
> >
> and sink
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-k