Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-05 Thread liu ron
Hi, Lincoln

Thanks for your appreciation of this design. Regarding your question:

> do we consider adding a benchmark for the operators to intuitively
understand the improvement brought by each improvement?

I think it makes sense to add a benchmark, Spark also has this benchmark
framework. But I think it is another story to introduce a benchmark
framework in Flink, we need to start a new discussion to this work.

> for the implementation plan, mentioned in the FLIP that 1.18 will support
Calc, HashJoin and HashAgg, then what will be the next step? and which
operators do we ultimately expect to cover (all or specific ones)?

Our ultimate goal is to support all operators in batch mode, but we
prioritize them according to their usage. Operators like Calc, HashJoin,
HashAgg, etc. are more commonly used, so we will support them first. Later
we support the rest of the operators step by step. Considering the time
factor and the development workload, so we can only support  Calc,
HashJoin, HashAgg in 1.18. In 1.19 or 1.20, we will complete the rest work.
I will make this clear in FLIP

Best,
Ron

Jingsong Li  于2023年6月5日周一 14:15写道:

> > For the state compatibility session, it seems that the checkpoint
> compatibility would be broken just like [1] did. Could FLIP-190 [2] still
> be helpful in this case for SQL version upgrades?
>
> I guess this is only for batch processing. Streaming should be another
> story?
>
> Best,
> Jingsong
>
> On Mon, Jun 5, 2023 at 2:07 PM Yun Tang  wrote:
> >
> > Hi Ron,
> >
> > I think this FLIP would help to improve the performance, looking forward
> to its completion in Flink!
> >
> > For the state compatibility session, it seems that the checkpoint
> compatibility would be broken just like [1] did. Could FLIP-190 [2] still
> be helpful in this case for SQL version upgrades?
> >
> >
> > [1]
> https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI/edit#heading=h.fri5rtpte0si
> > [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489
> >
> > Best
> > Yun Tang
> >
> > 
> > From: Lincoln Lee 
> > Sent: Monday, June 5, 2023 10:56
> > To: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for
> Flink SQL
> >
> > Hi Ron
> >
> > OFGC looks like an exciting optimization, looking forward to its
> completion
> > in Flink!
> > A small question, do we consider adding a benchmark for the operators to
> > intuitively understand the improvement brought by each improvement?
> > In addition, for the implementation plan, mentioned in the FLIP that 1.18
> > will support Calc, HashJoin and HashAgg, then what will be the next step?
> > and which operators do we ultimately expect to cover (all or specific
> ones)?
> >
> > Best,
> > Lincoln Lee
> >
> >
> > liu ron  于2023年6月5日周一 09:40写道:
> >
> > > Hi, Jark
> > >
> > > Thanks for your feedback, according to my initial assessment, the work
> > > effort is relatively large.
> > >
> > > Moreover, I will add a test result of all queries to the FLIP.
> > >
> > > Best,
> > > Ron
> > >
> > > Jark Wu  于2023年6月1日周四 20:45写道:
> > >
> > > > Hi Ron,
> > > >
> > > > Thanks a lot for the great proposal. The FLIP looks good to me in
> > > general.
> > > > It looks like not an easy work but the performance sounds promising.
> So I
> > > > think it's worth doing.
> > > >
> > > > Besides, if there is a complete test graph with all TPC-DS queries,
> the
> > > > effect of this FLIP will be more intuitive.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > >
> > > > On Wed, 31 May 2023 at 14:27, liu ron  wrote:
> > > >
> > > > > Hi, Jinsong
> > > > >
> > > > > Thanks for your valuable suggestions.
> > > > >
> > > > > Best,
> > > > > Ron
> > > > >
> > > > > Jingsong Li  于2023年5月30日周二 13:22写道:
> > > > >
> > > > > > Thanks Ron for your information.
> > > > > >
> > > > > > I suggest that it can be written in the Motivation of FLIP.
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > > On Tue, May 30, 2023 at 9:57 AM liu ron 
> wrote:
> > > > > > >
> > > > > > > Hi, Jingsong
> > > > > > >
> > > > > > > Thanks for your review. We have tested it in TPC-DS case, and
> got a
> > > > 12%
> > > > > > > gain overall when only supporting only Calc&HashJoin&HashAgg
> > > > operator.
> > > > > In
> > > > > > > some queries, we even get more than 30% gain, it looks like  an
> > > > > effective
> > > > > > > way.
> > > > > > >
> > > > > > > Best,
> > > > > > > Ron
> > > > > > >
> > > > > > > Jingsong Li  于2023年5月29日周一 14:33写道:
> > > > > > >
> > > > > > > > Thanks Ron for the proposal.
> > > > > > > >
> > > > > > > > Do you have some benchmark results for the performance
> > > > improvement? I
> > > > > > > > am more concerned about the improvement on Flink than the
> data in
> > > > > > > > other papers.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jingsong
> > > > > > > >
> > > > > > > > On Mon, May 29, 2023 at 2:16 PM liu ron 
> > > > wrot

[jira] [Created] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)
tanjialiang created FLINK-32252:
---

 Summary: SELECT COUNT(*) will return nothing when source no data 
return
 Key: FLINK-32252
 URL: https://issues.apache.org/jira/browse/FLINK-32252
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: tanjialiang


 

mysql source
{code:java}
CREATE TABLE student(
id int primary key auto_increment,
name varchar(32),
age int
);

INSERT INTO student(name, age) VALUES 
('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
 

Flink SQL
{code:java}
CREATE TABLE student (
`id` INT PRIMARY KEY,
`name` STRING,
`age` INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'student'
); 

SELECT count(*) FROM student WHERE age < 15;{code}
flink will return nothing because jdbc connector push the filter down(after 
flink-connector-jdbc-3.1.0), which make source no data return.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-06-05 Thread Yun Tang
Hi Feng,

I think this FLIP would provide one important feature to unify the stream-SQL 
and batch-SQL when we backfill the historical data in batch mode.

For the "Syntax" session, I think you could add descriptions of how to align 
backfill time travel with querying the latest data. And I think you should also 
update the "Discussion thread" in the original FLIP.

Moreover, I have a question about getting the table schema from the catalog. 
I'm not sure whether the Catalog#getTable(tablePath, timestamp) will be called 
only once. If we have a backfill query between 2023-05-29 and 2023-06-04 in the 
past week, and the table schema changed on 2023-06-01, will the query below 
detect the schema changes during backfill the whole week?

SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP BETWEEN '2023-05-29 
00:00:00' AND '2023-06-05 00:00:00'

Best
Yun Tang



From: Shammon FY 
Sent: Thursday, June 1, 2023 17:57
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

Hi Feng,

I have one minor comment about the public interface `Optional
getSnapshot()` in the `CatalogTable`.

As we can get tables from the new method `Catalog.getTable(ObjectPath
tablePath, long timestamp)`, I think the returned `CatalogBaseTable` will
have the information of timestamp. Flink or connector such as
iceberg/paimon can create sources from the `CatalogBaseTable` directly
without the need to get the snapshot ID from `CatalogTable.getSnapshot()`.
What do you think of it?

Best,
Shammon FY


On Thu, Jun 1, 2023 at 7:22 AM Jing Ge  wrote:

> Hi Feng,
>
> Thanks for the proposal! Very interesting feature. Would you like to update
> your thoughts described in your previous email about why SupportsTimeTravel
> has been rejected into the FLIP? This will help readers understand the
> context (in the future).
>
> Since we always directly add overload methods into Catalog according to new
> requirements, which makes the interface bloated. Just out of curiosity,
> does it make sense to introduce some DSL design? Like
> Catalog.getTable(tablePath).on(timeStamp),
> Catalog.getTable(tablePath).current() for the most current version, and
> more room for further extension like timestamp range, etc. I haven't read
> all the source code yet and I'm not sure if it is possible. But a
> design like this will keep the Catalog API lean and the API/DSL will be
> self described and easier to use.
>
> Best regards,
> Jing
>
>
> On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
> > Ok after second though I'm retracting my previous statement about Catalog
> > changes you proposed.
> > I do see a benefit for Delta connector actually with this change and see
> > why this could be coupled with Catalog.
> >
> > Delta Connector SQL support, also ships a Delta Catalog implementation
> for
> > Flink.
> > For Delta Catalog, table schema information is fetched from underlying
> > _delta_log and not stored in metastore. For time travel we actually had a
> > problem, that if we would like to timetravel back to some old version,
> > where schema was slightly different, then we would have a conflict since
> > Catalog would return current schema and not how it was for version X.
> >
> > With your change, our Delta Catalog can actually fetch schema for
> version X
> > and send it to DeltaTableFactory. Currency, Catalog can fetch only
> current
> > version. What we would also need however is version (number/timestamp)
> for
> > this table passed to DynamicTableFactory so we could properly set Delta
> > standalone library.
> >
> > Regards,
> > Krzysztof
> >
> > śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
> > krzysiek.chmielew...@gmail.com> napisał(a):
> >
> > > Hi,
> > > happy to see such a feature.
> > > Small note from my end regarding Catalog changes.
> > >
> > > TL;DR
> > > I don't think it is necessary to delegate this feature to the catalog.
> I
> > > think that since "timetravel" is per job/query property, its should not
> > be
> > > coupled with the Catalog or table definition. In my opinion this is
> > > something that DynamicTableFactory only has to know about. I would
> rather
> > > see this feature as it is - SQL syntax enhancement but delegate clearly
> > to
> > > DynamicTableFactory.
> > >
> > > I've implemented timetravel feature for Delta Connector  [1]  using
> > > current Flink API.
> > > Docs are pending code review, but you can find them here [2] and
> examples
> > > are available here [3]
> > >
> > > The timetravel feature that I've implemented is based on Flink Query
> > > hints.
> > > "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"
> > >
> > > The " versionAsOf" (we also have 'timestampAsOf') parameter is handled
> > not
> > > by Catalog but by DyntamicTableFactory implementation for Delta
> > connector.
> > > The value of this property is passed to Delta standalone lib API that
> > > returns table view for give

Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-05 Thread liu ron
Hi, Yun, Jinsong, Benchao

Thanks for your valuable input about this FLIP.

First of all, let me emphasize that from the technical implementation point
of view, this design is feasible in both stream and batch scenarios, so I
consider both stream and batch mode in FLIP. In the stream scenario, for
stateful operator, according to our business experience, basically the
bottleneck is on the state access, so the optimization effect of OFCG for
the stream will not be particularly obvious, so we will not give priority
to support it currently. On the contrary, in the batch scenario, where CPU
is the bottleneck, this optimization is gainful.

Taking the above into account, we are able to support both stream and batch
mode optimization in this design, but we will give priority to supporting
batch operators. As benchao said, when we find a suitable streaming
business scenario in the future, we can consider doing this optimization.
Back to Yun issue, the design will break state compatibility in stream mode
as[1] and the version upgrade will not support this OFCG. As mentioned
earlier, we will not support this feature in stream mode in the short term.

Also thanks to Benchao's suggestion, I will state the current goal of that
optimization in the FLIP, scoped to batch mode.

Best,
Ron

liu ron  于2023年6月5日周一 15:04写道:

> Hi, Lincoln
>
> Thanks for your appreciation of this design. Regarding your question:
>
> > do we consider adding a benchmark for the operators to intuitively
> understand the improvement brought by each improvement?
>
> I think it makes sense to add a benchmark, Spark also has this benchmark
> framework. But I think it is another story to introduce a benchmark
> framework in Flink, we need to start a new discussion to this work.
>
> > for the implementation plan, mentioned in the FLIP that 1.18 will
> support Calc, HashJoin and HashAgg, then what will be the next step? and
> which operators do we ultimately expect to cover (all or specific ones)?
>
> Our ultimate goal is to support all operators in batch mode, but we
> prioritize them according to their usage. Operators like Calc, HashJoin,
> HashAgg, etc. are more commonly used, so we will support them first. Later
> we support the rest of the operators step by step. Considering the time
> factor and the development workload, so we can only support  Calc,
> HashJoin, HashAgg in 1.18. In 1.19 or 1.20, we will complete the rest work.
> I will make this clear in FLIP
>
> Best,
> Ron
>
> Jingsong Li  于2023年6月5日周一 14:15写道:
>
>> > For the state compatibility session, it seems that the checkpoint
>> compatibility would be broken just like [1] did. Could FLIP-190 [2] still
>> be helpful in this case for SQL version upgrades?
>>
>> I guess this is only for batch processing. Streaming should be another
>> story?
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 5, 2023 at 2:07 PM Yun Tang  wrote:
>> >
>> > Hi Ron,
>> >
>> > I think this FLIP would help to improve the performance, looking
>> forward to its completion in Flink!
>> >
>> > For the state compatibility session, it seems that the checkpoint
>> compatibility would be broken just like [1] did. Could FLIP-190 [2] still
>> be helpful in this case for SQL version upgrades?
>> >
>> >
>> > [1]
>> https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI/edit#heading=h.fri5rtpte0si
>> > [2]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489
>> >
>> > Best
>> > Yun Tang
>> >
>> > 
>> > From: Lincoln Lee 
>> > Sent: Monday, June 5, 2023 10:56
>> > To: dev@flink.apache.org 
>> > Subject: Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for
>> Flink SQL
>> >
>> > Hi Ron
>> >
>> > OFGC looks like an exciting optimization, looking forward to its
>> completion
>> > in Flink!
>> > A small question, do we consider adding a benchmark for the operators to
>> > intuitively understand the improvement brought by each improvement?
>> > In addition, for the implementation plan, mentioned in the FLIP that
>> 1.18
>> > will support Calc, HashJoin and HashAgg, then what will be the next
>> step?
>> > and which operators do we ultimately expect to cover (all or specific
>> ones)?
>> >
>> > Best,
>> > Lincoln Lee
>> >
>> >
>> > liu ron  于2023年6月5日周一 09:40写道:
>> >
>> > > Hi, Jark
>> > >
>> > > Thanks for your feedback, according to my initial assessment, the work
>> > > effort is relatively large.
>> > >
>> > > Moreover, I will add a test result of all queries to the FLIP.
>> > >
>> > > Best,
>> > > Ron
>> > >
>> > > Jark Wu  于2023年6月1日周四 20:45写道:
>> > >
>> > > > Hi Ron,
>> > > >
>> > > > Thanks a lot for the great proposal. The FLIP looks good to me in
>> > > general.
>> > > > It looks like not an easy work but the performance sounds
>> promising. So I
>> > > > think it's worth doing.
>> > > >
>> > > > Besides, if there is a complete test graph with all TPC-DS queries,
>> the
>> > > > effect of this FLIP will be more intuitive.
>> > > >
>> > > > Best,

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-05 Thread Aitozi
One more thing for discussion:

In our internal implementation, we reuse the option
`table.exec.async-lookup.buffer-capacity` and
`table.exec.async-lookup.timeout` to config
the async udtf. Do you think we should add two extra option to distinguish
from the lookup option such as

`table.exec.async-udtf.buffer-capacity`
`table.exec.async-udtf.timeout`


Best,
Aitozi.



Aitozi  于2023年6月5日周一 12:20写道:

> Hi Jing,
>
> > what is the difference between the RPC call or query you mentioned
> and the lookup in a very
> general way
>
> I think the RPC call or query service is quite similar to the lookup join.
> But lookup join should work
> with `LookupTableSource`.
>
> Let's see how we can perform an async RPC call with lookup join:
>
> (1) Implement an AsyncTableFunction with RPC call logic.
> (2) Implement a `LookupTableSource` connector run with the async udtf
> defined in (1).
> (3) Then define a DDL of this look up table in SQL
>
> CREATE TEMPORARY TABLE Customers (
>   id INT,
>   name STRING,
>   country STRING,
>   zip STRING
> ) WITH (
>   'connector' = 'custom'
> );
>
> (4) Run with the query as below:
>
> SELECT o.order_id, o.total, c.country, c.zip
> FROM Orders AS o
>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> ON o.customer_id = c.id;
>
> This example is from doc
> .You
> can image the look up process as an async RPC call process.
>
> Let's see how we can perform an async RPC call with lateral join:
>
> (1) Implement an AsyncTableFunction with RPC call logic.
> (2) Run query with
>
> Create function f1 as '...' ;
>
> SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
> (f1(order_id)) as T(...);
>
> As you can see, the lateral join version is more simple and intuitive to
> users. Users do not have to wrap a
> LookupTableSource for the purpose of using async udtf.
>
> In the end, We can also see the user defined async table function is an
> enhancement of the current lateral table join
> which only supports sync lateral join now.
>
> Best,
> Aitozi.
>
>
> Jing Ge  于2023年6月2日周五 19:37写道:
>
>> Hi Aitozi,
>>
>> Thanks for the update. Just out of curiosity, what is the difference
>> between the RPC call or query you mentioned and the lookup in a very
>> general way? Since Lateral join is used in the FLIP. Is there any special
>> thought for that? Sorry for asking so many questions. The FLIP contains
>> limited information to understand the motivation.
>>
>> Best regards,
>> Jing
>>
>> On Fri, Jun 2, 2023 at 3:48 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > I have updated the proposed changes to the FLIP. IMO, lookup has its
>> > clear
>> > async call requirement is due to its IO heavy operator. In our usage,
>> sql
>> > users have
>> > logic to do some RPC call or query the third-party service which is
>> also IO
>> > intensive.
>> > In these case, we'd like to leverage the async function to improve the
>> > throughput.
>> >
>> > Thanks,
>> > Aitozi.
>> >
>> > Jing Ge  于2023年6月1日周四 22:55写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Sorry for the late reply. Would you like to update the proposed
>> changes
>> > > with more details into the FLIP too?
>> > > I got your point. It looks like a rational idea. However, since lookup
>> > has
>> > > its clear async call requirement, are there any real use cases that
>> > > need this change? This will help us understand the motivation. After
>> all,
>> > > lateral join and temporal lookup join[1] are quite different.
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
>> > >
>> > > On Wed, May 31, 2023 at 8:53 AM Aitozi  wrote:
>> > >
>> > > > Hi Jing,
>> > > > What do you think about it? Can we move forward this feature?
>> > > >
>> > > > Thanks,
>> > > > Aitozi.
>> > > >
>> > > > Aitozi  于2023年5月29日周一 09:56写道:
>> > > >
>> > > > > Hi Jing,
>> > > > > > "Do you mean to support the AyncTableFunction beyond the
>> > > > > LookupTableSource?"
>> > > > > Yes, I mean to support the AyncTableFunction beyond the
>> > > > LookupTableSource.
>> > > > >
>> > > > > The "AsyncTableFunction" is the function with ability to be
>> executed
>> > > > async
>> > > > > (with AsyncWaitOperator).
>> > > > > The async lookup join is a one of usage of this. So, we don't
>> have to
>> > > > bind
>> > > > > the AyncTableFunction with LookupTableSource.
>> > > > > If User-defined AsyncTableFunction is supported, user can directly
>> > use
>> > > > > lateral table syntax to perform async operation.
>> > > > >
>> > > > > > "It would be better if you could elaborate the proposed changes
>> wrt
>> > > the
>> > > > > CorrelatedCodeGenerator with more details"
>> > > > >
>> > > > > In the proposal, we use lateral table syntax to support

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-05 Thread Paul Lam
Hi Jark,

Thanks for your input! Please see my comments inline.

> Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> DataStream API also doesn't provide a default main class for users,
> why do we need to provide such one for SQL?

Sorry for the confusion I caused. By DataStream jobs, I mean jobs submitted
via Flink CLI which actually could be DataStream/Table jobs.

I think a default main class would be user-friendly which eliminates the need
for users to write a main class as SQLRunner in Flink K8s operator [1].

> I thought the proposed SqlDriver was a dedicated main class accepting SQL 
> files, is
> that correct?

Both JSON plans and SQL files are accepted. SQL Gateway should use JSON plans,
while CLI users may use either JSON plans or SQL files.

Please see the updated FLIP[2] for more details. 

> Personally, I prefer the way of init containers which doesn't depend on
> additional components.
> This can reduce the moving parts of a production environment.
> Depending on a distributed file system makes the testing, demo, and local
> setup harder than init containers.

Please note that we could reuse the checkpoint storage like S3/HDFS, which 
should
be required to run Flink in production, so I guess that would be acceptable for 
most
users. WDYT?

WRT testing, demo, and local setups, I think we could support the local 
filesystem
scheme i.e. file://** as the state backends do. It works as long as SQL Gateway
and JobManager(or SQL Driver) can access the resource directory (specified via 
`sql-gateway.application.storage-dir`).

Thanks!

[1] 
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
[3] 
https://github.com/apache/flink/blob/3245e0443b2a4663552a5b707c5c8c46876c1f6d/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java#L161

Best,
Paul Lam

> 2023年6月3日 12:21,Jark Wu  写道:
> 
> Hi Paul,
> 
> Thanks for your reply. I left my comments inline.
> 
>> As the FLIP said, it’s good to have a default main class for Flink SQLs,
>> which allows users to submit Flink SQLs in the same way as DataStream
>> jobs, or else users need to write their own main class.
> 
> Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> DataStream API also doesn't provide a default main class for users,
> why do we need to provide such one for SQL?
> 
>> With the help of ExecNodeGraph, do we still need the serialized
>> SessionState? If not, we could make SQL Driver accepts two serialized
>> formats:
> 
> No, ExecNodeGraph doesn't need to serialize SessionState. I thought the
> proposed SqlDriver was a dedicated main class accepting SQL files, is
> that correct?
> If true, we have to ship the SessionState for this case which is a large
> work.
> I think we just need a JsonPlanDriver which is a main class that accepts
> JsonPlan as the parameter.
> 
> 
>> The common solutions I know is to use distributed file systems or use
>> init containers to localize the resources.
> 
> Personally, I prefer the way of init containers which doesn't depend on
> additional components.
> This can reduce the moving parts of a production environment.
> Depending on a distributed file system makes the testing, demo, and local
> setup harder than init containers.
> 
> Best,
> Jark
> 
> 
> 
> 
> On Fri, 2 Jun 2023 at 18:10, Paul Lam  > wrote:
> 
>> The FLIP is in the early phase and some details are not included, but
>> fortunately, we got lots of valuable ideas from the discussion.
>> 
>> Thanks to everyone who joined the dissuasion!
>> @Weihua @Shanmon @Shengkai @Biao @Jark
>> 
>> This weekend I’m gonna revisit and update the FLIP, adding more
>> details. Hopefully, we can further align our opinions.
>> 
>> Best,
>> Paul Lam
>> 
>>> 2023年6月2日 18:02,Paul Lam  写道:
>>> 
>>> Hi Jark,
>>> 
>>> Thanks a lot for your input!
>>> 
 If we decide to submit ExecNodeGraph instead of SQL file, is it still
 necessary to support SQL Driver?
>>> 
>>> I think so. Apart from usage in SQL Gateway, SQL Driver could simplify
>>> Flink SQL execution with Flink CLI.
>>> 
>>> As the FLIP said, it’s good to have a default main class for Flink SQLs,
>>> which allows users to submit Flink SQLs in the same way as DataStream
>>> jobs, or else users need to write their own main class.
>>> 
 SQL Driver needs to serialize SessionState which is very challenging
 but not detailed covered in the FLIP.
>>> 
>>> With the help of ExecNodeGraph, do we still need the serialized
>>> SessionState? If not, we could make SQL Driver accepts two serialized
>>> formats:
>>> 
>>> - SQL files for user-facing public usage
>>> - ExecNodeGraph for internal usage
>>> 
>>> It’s kind of similar to the relationship between job jars and jobgraphs.
>>> 
 Regarding "K8S doesn'

[DISCUSS] Make Preconditions/VisibleForTesting public

2023-06-05 Thread Etienne Chauchot

Hi all,

As part of fixing the architecture-tests-production Connector rule in 
this PR [1], it appeared that (external) connectors depending on the 
non-public Preconditions/VisibleForTesting classes was a violation to 
the "connectors should not depend on non-public classes that are not in 
the connectors packages". I'd like to discuss making these tools public.


[1] https://github.com/apache/flink/pull/22667


WDYT?

Best

Etienne



[jira] [Created] (FLINK-32253) Blocklist unblockResources does not update for pending containers

2023-06-05 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-32253:
-

 Summary: Blocklist unblockResources does not update for pending 
containers
 Key: FLINK-32253
 URL: https://issues.apache.org/jira/browse/FLINK-32253
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.0
Reporter: Prabhu Joseph


Blocklist unblockResources does not update the existing pending resource 
request from YARN/K8S. It updates only for the new resource requests. The 
existing pending resource requests are not scheduled on the nodes which are 
unblocked.







--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

2023-06-05 Thread Martijn Visser
Hi Yuxia,

Thanks for the FLIP. I have a couple of questions:

1. The syntax talks about how to CALL or SHOW the available stored
procedures, but not on how to create one. Will there not be a SQL syntax to
create/save a stored procedure?
2. Is there a default syntax in Calcite for stored procedures? What do
other databases do, do they use CALL/SHOW or something like EXEC, USE?

Best regards,

Martijn

On Mon, Jun 5, 2023 at 3:23 AM yuxia  wrote:

> Hi, Jane.
> Thanks for you input. I think we can add the auxiliary command show
> procedures in this FLIP.
> Following the syntax for show functions proposed in FLIP-297.
> The syntax will be
> SHOW PROCEDURES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT]
> (LIKE | ILIKE)  ].
> I have updated to this FLIP.
>
> The other auxiliary commands maybe not suitable currently or need a
> further/dedicated dicussion. Let's keep this FLIP focus.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-297%3A+Improve+Auxiliary+Sql+Statements
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Jane Chan" 
> 收件人: "dev" 
> 发送时间: 星期六, 2023年 6 月 03日 下午 7:04:39
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>
> Hi Yuxia,
>
> Thanks for bringing this to the discussion. The call procedure is a widely
> used feature and will be very useful for users.
>
> I just have one question regarding the usage. The FLIP mentioned that
>
> Flink will allow connector developers to develop their own built-in stored
> > procedures, and then enables users to call these predefiend stored
> > procedures.
> >
> In this FLIP, we don't intend to allow users to customize their own stored
> > procedure  for we don't want to expose too much to users too early.
>
>
> If I understand correctly, we might need to provide some auxiliary commands
> to inform users what built-in procedures are provided and how to use them.
> For example, Snowflake provides commands like [1] [2], and MySQL provides
> commands like [3] [4].
>
> [1] SHOW PROCEDURES,
> https://docs.snowflake.com/en/sql-reference/sql/show-procedures
> [2] DESCRIBE PROCEDURE ,
> https://docs.snowflake.com/en/sql-reference/sql/desc-procedure
> [3] SHOW PROCEDURE CODE,
> https://dev.mysql.com/doc/refman/5.7/en/show-procedure-code.html
> [4] SHOW PROCEDURE STATUS,
> https://dev.mysql.com/doc/refman/5.7/en/show-procedure-status.html
>
> Best,
> Jane
>
> On Sat, Jun 3, 2023 at 3:20 PM Benchao Li  wrote:
>
> > Thanks Yuxia for the explanation, it makes sense to me. It would be great
> > if you also add this to the FLIP doc.
> >
> > yuxia  于2023年6月1日周四 17:11写道:
> >
> > > Hi, Benchao.
> > > Thanks for your attention.
> > >
> > > Initially, I also want to pass `TableEnvironment` to procedure. But
> > > according my investegation and offline discussion with Jingson, the
> real
> > > important thing for procedure devs is the ability to build Flink
> > > datastream. But we can't get the `StreamExecutionEnvironment` which is
> > the
> > > entrypoint to build datastream. That's to say we will lost the ability
> to
> > > build a datastream if we just pass `TableEnvironment`.
> > >
> > > Of course, we can also pass `TableEnvironment` along with
> > > `StreamExecutionEnvironment` to Procedure. But I'm intend to be
> cautious
> > > about exposing too much too early to procedure devs. If someday we find
> > we
> > > will need `TableEnvironment` to custom a procedure, we can then add a
> > > method like `getTableEnvironment()` in `ProcedureContext`.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Benchao Li" 
> > > 收件人: "dev" 
> > > 发送时间: 星期四, 2023年 6 月 01日 下午 12:58:08
> > > 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> > >
> > > Thanks Yuxia for opening this discussion,
> > >
> > > The general idea looks good to me, I only have one question about the
> > > `ProcedureContext#getExecutionEnvironment`. Why are you proposing to
> > return
> > > a `StreamExecutionEnvironment` instead of `TableEnvironment`, could you
> > > elaborate a little more on this?
> > >
> > > Jingsong Li  于2023年5月30日周二 17:58写道:
> > >
> > > > Thanks for your explanation.
> > > >
> > > > We can support Iterable in future. Current design looks good to me.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Tue, May 30, 2023 at 4:56 PM yuxia 
> > > wrote:
> > > > >
> > > > > Hi, Jingsong.
> > > > > Thanks for your feedback.
> > > > >
> > > > > > Does this need to be a function call? Do you have some example?
> > > > > I think it'll be useful to support function call when user call
> > > > procedure.
> > > > > The following example is from iceberg:[1]
> > > > > CALL catalog_name.system.migrate('spark_catalog.db.sample',
> > map('foo',
> > > > 'bar'));
> > > > >
> > > > > It allows user to use `map('foo', 'bar')` to pass a map data to
> > > > procedure.
> > > > >
> > > > > Another case that I can imagine may be rollback a table to the
> > snapshot
> > > > of one week ago.
> > > > > Then, with function call, user may call `roll

Re: [DISCUSS] FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0

2023-06-05 Thread ConradJam
Here I have a suggestion, because I mentioned Flink2.0 earlier, I am
wondering if there is a possibility: whether the user can perform the
migration of all states to Kryo5 when performing the first start-up
task of migrating to version 2.0 in the future, until we give up
maintaining Kryo2 later

Don't know if my idea coincides with Chesnay's

Chesnay Schepler  于2023年6月1日周四 23:25写道:
>
> The version in the state is the serializer version, and applies to the
> entire state, independent of what it contains.
> If you use Kryo2 for reading and Kryo5 for writing (which also implies
> writing the new serializer version into state), then I'd assume that a
> migration is an all-or-nothing kind of deal.
> IOW, you'd have to load a savepoint and write out an entirely new
> savepoint with the new state.
> Otherwise you may have only re-written part of the checkpoint, and now
> contains a mix of Kryo2/Kryo5 serialized classes, which should then fail
> _hard_ on any recovery attempt because we wouldn't use Kryo2 to read
> anything.
>
> If I'm right, then as is this sounds like quite a trap for users to fall
> into because from what I gathered this is the default behavior in the PR
> (I could be wrong though since I haven't fully dug through the 8k lines
> PR yet...)
>
> What we kind of want is this:
> 1) Kryo5 is used as the default for new jobs. (maybe not even that,
> making it an explicit opt-in)
> 2) Kryo2 is used for reading AND writing for existing* jobs by default.
> 3) Users can explicitly (and easily!) do a full migration of their jobs,
> after which 2) should no longer apply.
>
>
>
> In the PR you mentioned running into issues on Java 17; to have have
> some error stacktraces and examples data/serializers still around?
>
> On 30/05/2023 00:38, Kurt Ostfeld wrote:
> >> I’d assumed that there wasn’t a good way to migrate state stored with an 
> >> older version of Kryo to a newer version - if you’ve solved that, kudos.
> > I hope I've solved this. The pull request is supposed to do exactly this. 
> > Please let me know if you can propose a scenario that would break this.
> >
> > The pull-request has both Kryo 2.x and 5.x dependencies. It looks at the 
> > state version number written to the state to determine which version of 
> > Kryo to use for deserialization. Kryo 2.x is not used to write new state.
> >
> > --- Original Message ---
> > On Monday, May 29th, 2023 at 5:17 PM, Ken Krugler 
> >  wrote:
> >
> >
> >>
> >> Hi Kurt,
> >>
> >> I personally think it’s a very nice improvement, and that the longer-term 
> >> goal of removing built-in Kryo support for state serialization (while a 
> >> good one) warrants a separate FLIP.
> >>
> >> Perhaps an intermediate approach would be to disable the use of Kryo for 
> >> state serialization by default, and force a user to disregard warnings and 
> >> explicitly enable it if they want to go down that path.
> >>
> >> I’d assumed that there wasn’t a good way to migrate state stored with an 
> >> older version of Kryo to a newer version - if you’ve solved that, kudos.
> >>
> >> — Ken
> >>
> >>
> >>> On May 29, 2023, at 2:21 PM, Kurt Ostfeld kurtostf...@proton.me.INVALID 
> >>> wrote:
> >>>
> >>> Hi everyone. I would like to start the discussion thread for FLIP-317: 
> >>> Upgrade Kryo from 2.24.0 to 5.5.0 [1].
> >>>
> >>> There is a pull-request associated with this linked in the FLIP.
> >>>
> >>> I'd particularly like to hear about:
> >>>
> >>> - Chesnay Schepler's request to consider removing Kryo serializers from 
> >>> the execution config. Is this a reasonable task to add into this FLIP? Is 
> >>> there consensus on how to resolve that? Would that be better addressed in 
> >>> a separate future FLIP after the Kryo upgrade FLIP is completed?
> >>>
> >>> - Backwards compatibility. The automated CI tests have a lot of backwards 
> >>> compatibility tests that are passing. I also wrote a Flink application 
> >>> with keyed state using custom Kryo v2 serializers and then an upgraded 
> >>> version with both Kryo v2 and Kryo v5 serializers to stress test the 
> >>> upgrade process. I'd like to hear about additional scenarios that need to 
> >>> be tested.
> >>>
> >>> - Is this worth pursuing or is the Flink project looking to go in a 
> >>> different direction? I'd like to do some more work on the pull request if 
> >>> this is being seriously considered for adoption.
> >>>
> >>> I'm looking forward to hearing everyone's feedback and suggestions.
> >>>
> >>> Thank you,
> >>> Kurt
> >>>
> >>> [1] 
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0
> >>
> >> --
> >> Ken Krugler
> >> http://www.scaleunlimited.com
> >> Custom big data solutions
> >> Flink, Pinot, Solr, Elasticsearch
> >>
>


-- 
Best

ConradJam


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-06-05 Thread Martijn Visser
Hi Samrat,

Thanks for the FLIP. If I understand this correctly, the Redshift sink
would not be able to support exactly-once, is that correct?

Best regards,

Martijn

On Sat, Jun 3, 2023 at 9:18 PM Samrat Deb  wrote:

> Hi Jing Ge,
>
> >>> Do you already have any prototype? I'd like to join the reviews.
> The prototype is in progress. I will raise the dedicated PR for review soon
> also notify in this thread as well .
>
> >>> Will the Redshift connector provide additional features
> beyond the mediator/wrapper of the jdbc connector?
>
> Here are the additional features that the Flink connector for AWS Redshift
> can provide on top of using JDBC:
>
> 1. Integration with AWS Redshift Workload Management (WLM): AWS Redshift
> allows you to configure WLM[1] to manage query prioritization and resource
> allocation. The Flink connector for Redshift will be agnostic to the
> configured WLM and utilize it for scaling in and out for the sink. This
> means that the connector can leverage the WLM capabilities of Redshift to
> optimize the execution of queries and allocate resources efficiently based
> on your defined workload priorities.
>
> 2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes
> certain quotas and limits[2] on various aspects such as the number of
> clusters, concurrent connections, queries per second, etc. The Flink
> connector for Redshift will provide an abstraction layer for users,
> allowing them to work with Redshift without having to worry about these
> specific limits. The connector will handle the management of connections
> and queries within the defined quotas and limits, abstracting away the
> complexity and ensuring compliance with Redshift's restrictions.
>
> These features aim to simplify the integration of Flink with AWS Redshift,
> providing optimized resource utilization and transparent handling of
> Redshift-specific limitations.
>
> Bests,
> Samrat
>
> [1]
>
> https://docs.aws.amazon.com/redshift/latest/dg/cm-c-implementing-workload-management.html
> [2]
>
> https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html
>
> On Sat, Jun 3, 2023 at 11:40 PM Samrat Deb  wrote:
>
> > Hi Ahmed,
> >
> > >>> please let me know If you need any collaboration regarding
> integration
> > with
> > AWS connectors credential providers or regarding FLIP-171 I would be more
> > than happy to assist.
> >
> > Sure, I will reach out incase of any hands required.
> >
> >
> >
> > On Fri, Jun 2, 2023 at 6:12 PM Jing Ge 
> wrote:
> >
> >> Hi Samrat,
> >>
> >> Excited to see your proposal. Supporting data warehouses is one of the
> >> major tracks for Flink. Thanks for driving it! Happy to see that we
> >> reached
> >> consensus to prioritize the Sink over Source in the previous discussion.
> >> Do
> >> you already have any prototype? I'd like to join the reviews.
> >>
> >> Just out of curiosity, speaking of JDBC mode, according to the FLIP, it
> >> should be doable to directly use the jdbc connector with Redshift, if I
> am
> >> not mistaken. Will the Redshift connector provide additional features
> >> beyond the mediator/wrapper of the jdbc connector?
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Thu, Jun 1, 2023 at 8:22 PM Ahmed Hamdy 
> wrote:
> >>
> >> > Hi Samrat
> >> >
> >> > Thanks for putting up this FLIP. I agree regarding the importance of
> the
> >> > use case.
> >> > please let me know If you need any collaboration regarding integration
> >> with
> >> > AWS connectors credential providers or regarding FLIP-171 I would be
> >> more
> >> > than happy to assist.
> >> > I also like Leonard's proposal for starting with DataStreamSink and
> >> > TableSink, It would be great to have some milestones delivered as soon
> >> as
> >> > ready.
> >> > best regards
> >> > Ahmed Hamdy
> >> >
> >> >
> >> > On Wed, 31 May 2023 at 11:15, Samrat Deb 
> wrote:
> >> >
> >> > > Hi Liu Ron,
> >> > >
> >> > > > 1. Regarding the  `read.mode` and `write.mode`, you say here
> >> provides
> >> > two
> >> > > modes, respectively, jdbc and `unload or copy`, What is the default
> >> value
> >> > > for `read.mode` and `write.mode?
> >> > >
> >> > > I have made an effort to make the configuration options `read.mode`
> >> and
> >> > > `write.mode` mandatory for the "flink-connector-redshift" according
> to
> >> > > FLIP[1]. The rationale behind this decision is to empower users who
> >> are
> >> > > familiar with their Redshift setup and have specific expectations
> for
> >> the
> >> > > sink. By making these configurations mandatory, users can have more
> >> > control
> >> > > and flexibility in configuring the connector to meet their
> >> requirements.
> >> > >
> >> > > However, I am open to receiving feedback on whether it would be
> >> > beneficial
> >> > > to make the configuration options non-mandatory and set default
> values
> >> > for
> >> > > them. If you believe there are advantages to having default values
> or
> >> any
> >> > > other suggestions, please share your thoughts. Your 

Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-06-05 Thread Martijn Visser
+1 for anything that helps us with externalizing the Hive connector :D

On Thu, Jun 1, 2023 at 3:34 PM Lincoln Lee  wrote:

> +1, thanks yuxia for driving the hive decoupling work!
> Since the 1.16 release, the compatibility of Hive queries has reached a
> relatively high level, so it is time to abandon the internal fallback,
> which will make the behavior of the Hive dialect clearer.
>
> Best,
> Lincoln Lee
>
>
> Jark Wu  于2023年6月1日周四 21:23写道:
>
> > +1, I think this can make the grammar more clear.
> > Please remember to add a release note once the issue is finished.
> >
> > Best,
> > Jark
> >
> > On Thu, 1 Jun 2023 at 11:28, yuxia  wrote:
> >
> > > Hi, Jingsong. It's hard to provide an option regarding to the fact that
> > we
> > > also want to decouple Hive with flink planner.
> > > If we still need this fall back behavior, we will still depend on
> > > `ParserImpl` provided by flink-table-planner  on HiveParser.
> > > But to try best to minimize the impact to users and more user-friendly,
> > > I'll remind users may use set table.sql-dialect = default to switch to
> > > Flink's default dialect in error message when fail to parse the sql in
> > > HiveParser.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Jingsong Li" 
> > > 收件人: "Rui Li" 
> > > 抄送: "dev" , "yuxia"  >,
> > > "User" 
> > > 发送时间: 星期二, 2023年 5 月 30日 下午 3:21:56
> > > 主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default
> > > dialect
> > >
> > > +1, the fallback looks weird now, it is outdated.
> > >
> > > But, it is good to provide an option. I don't know if there are some
> > > users who depend on this fallback.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
> > > >
> > > > +1, the fallback was just intended as a temporary workaround to run
> > > catalog/module related statements with hive dialect.
> > > >
> > > > On Mon, May 29, 2023 at 3:59 PM Benchao Li 
> > wrote:
> > > >>
> > > >> Big +1 on this, thanks yuxia for driving this!
> > > >>
> > > >> yuxia  于2023年5月29日周一 14:55写道:
> > > >>
> > > >> > Hi, community.
> > > >> >
> > > >> > I want to start the discussion about Hive dialect shouldn't fall
> > back
> > > to
> > > >> > Flink's default dialect.
> > > >> >
> > > >> > Currently, when the HiveParser fail to parse the sql in Hive
> > dialect,
> > > >> > it'll fall back to Flink's default parser[1] to handle
> > flink-specific
> > > >> > statements like "CREATE CATALOG xx with (xx);".
> > > >> >
> > > >> > As I‘m involving with Hive dialect and have some communication
> with
> > > >> > community users who use Hive dialectrecently,  I'm thinking throw
> > > exception
> > > >> > directly instead of falling back to Flink's default dialect when
> > fail
> > > to
> > > >> > parse the sql in Hive dialect
> > > >> >
> > > >> > Here're some reasons:
> > > >> >
> > > >> > First of all, it'll hide some error with Hive dialect. For
> example,
> > we
> > > >> > found we can't use Hive dialect any more with Flink sql client in
> > > release
> > > >> > validation phase[2], finally we find a modification in Flink sql
> > > client
> > > >> > cause it, but our test case can't find it earlier for although
> > > HiveParser
> > > >> > faill to parse it but then it'll fall back to default parser and
> > pass
> > > test
> > > >> > case successfully.
> > > >> >
> > > >> > Second, conceptually, Hive dialect should be do nothing with
> Flink's
> > > >> > default dialect. They are two totally different dialect. If we do
> > > need a
> > > >> > dialect mixing Hive dialect and default dialect , may be we need
> to
> > > propose
> > > >> > a new hybrid dialect and announce the hybrid behavior to users.
> > > >> > Also, It made some users confused for the fallback behavior. The
> > fact
> > > >> > comes from I had been ask by community users. Throw an excpetioin
> > > directly
> > > >> > when fail to parse the sql statement in Hive dialect will be more
> > > intuitive.
> > > >> >
> > > >> > Last but not least, it's import to decouple Hive with Flink
> > planner[3]
> > > >> > before we can externalize Hive connector[4]. If we still fall back
> > to
> > > Flink
> > > >> > default dialct, then we will need depend on `ParserImpl` in Flink
> > > planner,
> > > >> > which will block us removing the provided dependency of Hive
> dialect
> > > as
> > > >> > well as externalizing Hive connector.
> > > >> >
> > > >> > Although we hadn't announced the fall back behavior ever, but some
> > > users
> > > >> > may implicitly depend on this behavior in theirs sql jobs. So, I
> > > hereby
> > > >> > open the dicussion about abandoning the fall back behavior to make
> > > Hive
> > > >> > dialect clear and isoloted.
> > > >> > Please remember it won't break the Hive synatax but the syntax
> > > specified
> > > >> > to Flink may fail after then. But for the failed sql, you can use
> > `SET
> > > >> > table.sql-dialect=default;` to switch to Flink dialect.
>

Re: [DISCUSS] FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0

2023-06-05 Thread Martijn Visser
Hi ConradJam,

That assumes that it will be possible to upgrade statefully to Flink 2.0:
given that it is a major breaking change, I wouldn't assume that will be
possible.

Best regards,

Martijn

On Mon, Jun 5, 2023 at 2:37 PM ConradJam  wrote:

> Here I have a suggestion, because I mentioned Flink2.0 earlier, I am
> wondering if there is a possibility: whether the user can perform the
> migration of all states to Kryo5 when performing the first start-up
> task of migrating to version 2.0 in the future, until we give up
> maintaining Kryo2 later
>
> Don't know if my idea coincides with Chesnay's
>
> Chesnay Schepler  于2023年6月1日周四 23:25写道:
> >
> > The version in the state is the serializer version, and applies to the
> > entire state, independent of what it contains.
> > If you use Kryo2 for reading and Kryo5 for writing (which also implies
> > writing the new serializer version into state), then I'd assume that a
> > migration is an all-or-nothing kind of deal.
> > IOW, you'd have to load a savepoint and write out an entirely new
> > savepoint with the new state.
> > Otherwise you may have only re-written part of the checkpoint, and now
> > contains a mix of Kryo2/Kryo5 serialized classes, which should then fail
> > _hard_ on any recovery attempt because we wouldn't use Kryo2 to read
> > anything.
> >
> > If I'm right, then as is this sounds like quite a trap for users to fall
> > into because from what I gathered this is the default behavior in the PR
> > (I could be wrong though since I haven't fully dug through the 8k lines
> > PR yet...)
> >
> > What we kind of want is this:
> > 1) Kryo5 is used as the default for new jobs. (maybe not even that,
> > making it an explicit opt-in)
> > 2) Kryo2 is used for reading AND writing for existing* jobs by default.
> > 3) Users can explicitly (and easily!) do a full migration of their jobs,
> > after which 2) should no longer apply.
> >
> >
> >
> > In the PR you mentioned running into issues on Java 17; to have have
> > some error stacktraces and examples data/serializers still around?
> >
> > On 30/05/2023 00:38, Kurt Ostfeld wrote:
> > >> I’d assumed that there wasn’t a good way to migrate state stored with
> an older version of Kryo to a newer version - if you’ve solved that, kudos.
> > > I hope I've solved this. The pull request is supposed to do exactly
> this. Please let me know if you can propose a scenario that would break
> this.
> > >
> > > The pull-request has both Kryo 2.x and 5.x dependencies. It looks at
> the state version number written to the state to determine which version of
> Kryo to use for deserialization. Kryo 2.x is not used to write new state.
> > >
> > > --- Original Message ---
> > > On Monday, May 29th, 2023 at 5:17 PM, Ken Krugler <
> kkrugler_li...@transpac.com> wrote:
> > >
> > >
> > >>
> > >> Hi Kurt,
> > >>
> > >> I personally think it’s a very nice improvement, and that the
> longer-term goal of removing built-in Kryo support for state serialization
> (while a good one) warrants a separate FLIP.
> > >>
> > >> Perhaps an intermediate approach would be to disable the use of Kryo
> for state serialization by default, and force a user to disregard warnings
> and explicitly enable it if they want to go down that path.
> > >>
> > >> I’d assumed that there wasn’t a good way to migrate state stored with
> an older version of Kryo to a newer version - if you’ve solved that, kudos.
> > >>
> > >> — Ken
> > >>
> > >>
> > >>> On May 29, 2023, at 2:21 PM, Kurt Ostfeld
> kurtostf...@proton.me.INVALID wrote:
> > >>>
> > >>> Hi everyone. I would like to start the discussion thread for
> FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0 [1].
> > >>>
> > >>> There is a pull-request associated with this linked in the FLIP.
> > >>>
> > >>> I'd particularly like to hear about:
> > >>>
> > >>> - Chesnay Schepler's request to consider removing Kryo serializers
> from the execution config. Is this a reasonable task to add into this FLIP?
> Is there consensus on how to resolve that? Would that be better addressed
> in a separate future FLIP after the Kryo upgrade FLIP is completed?
> > >>>
> > >>> - Backwards compatibility. The automated CI tests have a lot of
> backwards compatibility tests that are passing. I also wrote a Flink
> application with keyed state using custom Kryo v2 serializers and then an
> upgraded version with both Kryo v2 and Kryo v5 serializers to stress test
> the upgrade process. I'd like to hear about additional scenarios that need
> to be tested.
> > >>>
> > >>> - Is this worth pursuing or is the Flink project looking to go in a
> different direction? I'd like to do some more work on the pull request if
> this is being seriously considered for adoption.
> > >>>
> > >>> I'm looking forward to hearing everyone's feedback and suggestions.
> > >>>
> > >>> Thank you,
> > >>> Kurt
> > >>>
> > >>> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0
> > >>
> > >> 

[jira] [Created] (FLINK-32254) FineGrainedSlotManager may not allocate enough taskmangers if maxSlotNum is configured

2023-06-05 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-32254:
--

 Summary: FineGrainedSlotManager may not allocate enough 
taskmangers if maxSlotNum is configured
 Key: FLINK-32254
 URL: https://issues.apache.org/jira/browse/FLINK-32254
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Weijie Guo
Assignee: Weijie Guo


We ran a job with {{slotmanager.number-of-slots.max = 10}}, 
{{taskmanager.numberOfTaskSlots = 10}} and {{taskmanager.memory.process.size: 
24000m}}. The resources of the cluster are sufficient, but no TaskManager can 
be allocated. It seems that there is a problem with the calculation logic of 
{{SlotManagerConfiguration#getMaxTotalMem}}. Due to the rounding down of 
division, the calculated {{MemorySize}} is too small.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-06-05 Thread Jing Ge
Hi Samrat,

Thanks for the feedback. I would suggest adding that information into the
FLIP.

+1 Looking forward to your PR :-)

Best regards,
Jing

On Sat, Jun 3, 2023 at 9:19 PM Samrat Deb  wrote:

> Hi Jing Ge,
>
> >>> Do you already have any prototype? I'd like to join the reviews.
> The prototype is in progress. I will raise the dedicated PR for review soon
> also notify in this thread as well .
>
> >>> Will the Redshift connector provide additional features
> beyond the mediator/wrapper of the jdbc connector?
>
> Here are the additional features that the Flink connector for AWS Redshift
> can provide on top of using JDBC:
>
> 1. Integration with AWS Redshift Workload Management (WLM): AWS Redshift
> allows you to configure WLM[1] to manage query prioritization and resource
> allocation. The Flink connector for Redshift will be agnostic to the
> configured WLM and utilize it for scaling in and out for the sink. This
> means that the connector can leverage the WLM capabilities of Redshift to
> optimize the execution of queries and allocate resources efficiently based
> on your defined workload priorities.
>
> 2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes
> certain quotas and limits[2] on various aspects such as the number of
> clusters, concurrent connections, queries per second, etc. The Flink
> connector for Redshift will provide an abstraction layer for users,
> allowing them to work with Redshift without having to worry about these
> specific limits. The connector will handle the management of connections
> and queries within the defined quotas and limits, abstracting away the
> complexity and ensuring compliance with Redshift's restrictions.
>
> These features aim to simplify the integration of Flink with AWS Redshift,
> providing optimized resource utilization and transparent handling of
> Redshift-specific limitations.
>
> Bests,
> Samrat
>
> [1]
>
> https://docs.aws.amazon.com/redshift/latest/dg/cm-c-implementing-workload-management.html
> [2]
>
> https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html
>
> On Sat, Jun 3, 2023 at 11:40 PM Samrat Deb  wrote:
>
> > Hi Ahmed,
> >
> > >>> please let me know If you need any collaboration regarding
> integration
> > with
> > AWS connectors credential providers or regarding FLIP-171 I would be more
> > than happy to assist.
> >
> > Sure, I will reach out incase of any hands required.
> >
> >
> >
> > On Fri, Jun 2, 2023 at 6:12 PM Jing Ge 
> wrote:
> >
> >> Hi Samrat,
> >>
> >> Excited to see your proposal. Supporting data warehouses is one of the
> >> major tracks for Flink. Thanks for driving it! Happy to see that we
> >> reached
> >> consensus to prioritize the Sink over Source in the previous discussion.
> >> Do
> >> you already have any prototype? I'd like to join the reviews.
> >>
> >> Just out of curiosity, speaking of JDBC mode, according to the FLIP, it
> >> should be doable to directly use the jdbc connector with Redshift, if I
> am
> >> not mistaken. Will the Redshift connector provide additional features
> >> beyond the mediator/wrapper of the jdbc connector?
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Thu, Jun 1, 2023 at 8:22 PM Ahmed Hamdy 
> wrote:
> >>
> >> > Hi Samrat
> >> >
> >> > Thanks for putting up this FLIP. I agree regarding the importance of
> the
> >> > use case.
> >> > please let me know If you need any collaboration regarding integration
> >> with
> >> > AWS connectors credential providers or regarding FLIP-171 I would be
> >> more
> >> > than happy to assist.
> >> > I also like Leonard's proposal for starting with DataStreamSink and
> >> > TableSink, It would be great to have some milestones delivered as soon
> >> as
> >> > ready.
> >> > best regards
> >> > Ahmed Hamdy
> >> >
> >> >
> >> > On Wed, 31 May 2023 at 11:15, Samrat Deb 
> wrote:
> >> >
> >> > > Hi Liu Ron,
> >> > >
> >> > > > 1. Regarding the  `read.mode` and `write.mode`, you say here
> >> provides
> >> > two
> >> > > modes, respectively, jdbc and `unload or copy`, What is the default
> >> value
> >> > > for `read.mode` and `write.mode?
> >> > >
> >> > > I have made an effort to make the configuration options `read.mode`
> >> and
> >> > > `write.mode` mandatory for the "flink-connector-redshift" according
> to
> >> > > FLIP[1]. The rationale behind this decision is to empower users who
> >> are
> >> > > familiar with their Redshift setup and have specific expectations
> for
> >> the
> >> > > sink. By making these configurations mandatory, users can have more
> >> > control
> >> > > and flexibility in configuring the connector to meet their
> >> requirements.
> >> > >
> >> > > However, I am open to receiving feedback on whether it would be
> >> > beneficial
> >> > > to make the configuration options non-mandatory and set default
> values
> >> > for
> >> > > them. If you believe there are advantages to having default values
> or
> >> any
> >> > > other suggestions, please share your thoughts. Your feedback is
> highly

Call for Presentations, Community Over Code Asia 2023

2023-06-05 Thread Rich Bowen
You are receiving this message because you are subscribed to one more
more developer mailing lists at the Apache Software Foundation.

The call for presentations is now open at
"https://apachecon.com/acasia2023/cfp.html";, and will be closed by
Sunday, Jun 18th, 2023 11:59 PM GMT.

The event will be held in Beijing, China, August 18-20, 2023.

We are looking for presentations about anything relating to Apache
Software Foundation projects, open-source governance, community, and
software development.
In particular, this year we are building content tracks around the
following specific topics/projects:

AI / Machine learning
API / Microservice
Community
CloudNative
Data Storage & Computing
DataOps
Data Lake & Data Warehouse
OLAP & Data Analysis
Performance Engineering
Incubator
IoT/IIoT
Messaging
RPC
Streaming
Workflow / Data Processing
Web Server / Tomcat

If your proposed presentation falls into one of these categories,
please select that topic in the CFP entry form. Or select Others if
it’s related to another topic or project area.

Looking forward to hearing from you!

Willem Jiang, and the Community Over Code planners



[VOTE] FLIP-312: Add Yarn ACLs to Flink Containers

2023-06-05 Thread Archit Goyal
Hi everyone,

Thanks for all the feedback for FLIP-312: Add Yarn ACLs to Flink 
Containers.
Following is the discussion thread : 
Link

I'd like to start a vote for it. The vote will be open for at least 72 hours 
(until June 9th, 12:00AM GMT) unless there is an objection or an insufficient 
number of votes.

Thanks,
Archit Goyal


[jira] [Created] (FLINK-32255) Pre-deployed idle Task Managers

2023-06-05 Thread Christophe Bornet (Jira)
Christophe Bornet created FLINK-32255:
-

 Summary: Pre-deployed idle Task Managers
 Key: FLINK-32255
 URL: https://issues.apache.org/jira/browse/FLINK-32255
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Christophe Bornet


In the kubernetes operator, it would be nice to have the possibility to define 
a number of pre-deployed spare Task Managers. The operator would ensure that 
there are always a number of ready-to-be-used TMs that can accept new incoming 
jobs.

The goal would be to reduce the startup time of jobs, especially in the session 
cluster mode.

See for inspiration  
[https://www.decodable.co/blog/flink-deployments-at-decodable] -> Custom Flink 
Session Clusters for Preview Jobs: "To further reduce the preview response 
time, we also modified Flink to pre-deploy some idle TaskManagers. When a 
TaskManager is used, a new TaskManager is immediately created so there is 
always a pool of idle TaskManagers."



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32256) Add ARRAY_MAX support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32256:
---

 Summary: Add ARRAY_MAX support in SQL & Table API
 Key: FLINK-32256
 URL: https://issues.apache.org/jira/browse/FLINK-32256
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32257) Add ARRAY_MIN support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32257:
---

 Summary: Add ARRAY_MIN support in SQL & Table API
 Key: FLINK-32257
 URL: https://issues.apache.org/jira/browse/FLINK-32257
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32258) Add ARRAY_SORT support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32258:
---

 Summary: Add ARRAY_SORT support in SQL & Table API
 Key: FLINK-32258
 URL: https://issues.apache.org/jira/browse/FLINK-32258
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32259) Add ARRAY_JOIN support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32259:
---

 Summary: Add ARRAY_JOIN support in SQL & Table API
 Key: FLINK-32259
 URL: https://issues.apache.org/jira/browse/FLINK-32259
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32260) Add SLICE support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32260:
---

 Summary: Add SLICE support in SQL & Table API
 Key: FLINK-32260
 URL: https://issues.apache.org/jira/browse/FLINK-32260
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32261) Add MAP_UNION support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32261:
---

 Summary: Add MAP_UNION support in SQL & Table API
 Key: FLINK-32261
 URL: https://issues.apache.org/jira/browse/FLINK-32261
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32262) Add MAP_ENTRIES support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32262:
---

 Summary: Add MAP_ENTRIES support in SQL & Table API
 Key: FLINK-32262
 URL: https://issues.apache.org/jira/browse/FLINK-32262
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32263) Add ELT support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32263:
---

 Summary: Add ELT support in SQL & Table API
 Key: FLINK-32263
 URL: https://issues.apache.org/jira/browse/FLINK-32263
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32264) Add FIELD support in SQL & Table API

2023-06-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-32264:
---

 Summary: Add FIELD support in SQL & Table API
 Key: FLINK-32264
 URL: https://issues.apache.org/jira/browse/FLINK-32264
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Bonnie Varghese
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-312: Add Yarn ACLs to Flink Containers

2023-06-05 Thread Venkatakrishnan Sowrirajan
Thanks for starting the vote on this one, Archit.

+1 (non-binding)

Regards
Venkata krishnan


On Mon, Jun 5, 2023 at 9:55 AM Archit Goyal 
wrote:

> Hi everyone,
>
> Thanks for all the feedback for FLIP-312: Add Yarn ACLs to Flink
> Containers<
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP*312*3A*Add*Yarn*ACLs*to*Flink*Containers__;KyUrKysrKys!!IKRxdwAv5BmarQ!aWkLc7eHAWyHz5kwEq8kKzEAgbtKtlMmi9ifOy_1GNbiO93taxiMcwdHfENc4inLU_cxZIKPDMwBP97Z4oibXUIM$
> >.
> Following is the discussion thread : Link<
> https://urldefense.com/v3/__https://lists.apache.org/thread/xj3ytkwj9lsl3hpjdb4n8pmy7lk3l8tv__;!!IKRxdwAv5BmarQ!aWkLc7eHAWyHz5kwEq8kKzEAgbtKtlMmi9ifOy_1GNbiO93taxiMcwdHfENc4inLU_cxZIKPDMwBP97Z4u3tNMqI$
> >
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours (until June 9th, 12:00AM GMT) unless there is an objection or an
> insufficient number of votes.
>
> Thanks,
> Archit Goyal
>


Re: [DISCUSS] FLIP-246: Multi Cluster Kafka Source

2023-06-05 Thread Mason Chen
Hi all,

I'm working on FLIP-246 again, for the Multi Cluster Kafka Source
contribution. The document has been updated with some more context about
how it can solve the Kafka topic removal scenario and a sequence diagram to
illustrate how the components interact.

Looking forward to any feedback!

Best,
Mason

On Wed, Oct 12, 2022 at 11:12 PM Mason Chen  wrote:

> Hi Ryan,
>
> Thanks for the additional context! Yes, the offset initializer would need
> to take a cluster as a parameter and the MultiClusterKafkaSourceSplit can
> be exposed in an initializer.
>
> Best,
> Mason
>
> On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
> ryan.vanhuuksl...@shopify.com> wrote:
>
>> Hi Mason,
>>
>> Thanks for the clarification! In regards to the addition to the
>> OffsetInitializer of this API - this would be an awesome addition and I
>> think this entire FLIP would be a great addition to the Flink.
>>
>> To provide more context as to why we need particular offsets, we use
>> Hybrid Source to currently backfill from buckets prior to reading from
>> Kafka. We have a service that will tell us what offset has last been loaded
>> into said bucket which we will use to initialize the KafkaSource
>> OffsetsInitializer. We couldn't use a timestamp here and the offset would
>> be different for each Cluster.
>>
>> In pseudocode, we'd want the ability to do something like this with
>> HybridSources - if this is possible.
>>
>> ```scala
>> val offsetsMetadata: Map[TopicPartition, Long] = // Get current offsets
>> from OffsetReaderService
>> val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data is read
>> from different buckets (multiple topics)
>> val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
>> MultiClusterKafkaSource.builder()
>>   .setKafkaMetadataService(new KafkaMetadataServiceImpl())
>>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
>>   .setGroupId("myConsumerGroup")
>>
>> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
>>   .setStartingOffsets(offsetsMetadata)
>>   .setProperties(properties)
>>   .build()
>> val source =
>> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
>> ```
>>
>> Few notes:
>> - TopicPartition won't work because the topic may be the same name as
>> this is something that is supported IIRC
>> - I chose to pass a map into starting offsets just for demonstrative
>> purposes, I would be fine with whatever data structure would work best
>>
>> Ryan van Huuksloot
>> Data Developer | Production Engineering | Streaming Capabilities
>> [image: Shopify]
>> 
>>
>>
>> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen 
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> Just copying your message over to the email chain.
>>>
>>> Hi Mason,
 First off, thanks for putting this FLIP together! Sorry for the delay.
 Full disclosure Mason and I chatted a little bit at Flink Forward 2022 but
 I have tried to capture the questions I had for him then.
 I'll start the conversation with a few questions:
 1. The concept of streamIds is not clear to me in the proposal and
 could use some more information. If I understand correctly, they will be
 used in the MetadataService to link KafkaClusters to ones you want to use?
 If you assign stream ids using `setStreamIds`, how can you dynamically
 increase the number of clusters you consume if the list of StreamIds is
 static? I am basing this off of your example .setStreamIds(List.of(
 "my-stream-1", "my-stream-2")) so I could be off base with my
 assumption. If you don't mind clearing up the intention, that would be
 great!
 2. How would offsets work if you wanted to use this
 MultiClusterKafkaSource with a file based backfill? In the case I am
 thinking of, you have a bucket backed archive of Kafka data per cluster.
 and you want to pick up from the last offset in the archived system, how
 would you set OffsetInitializers "per cluster" potentially as a function or
 are you limited to setting an OffsetInitializer for the entire Source?
 3. Just to make sure - because this system will layer on top of
 Flink-27 and use KafkaSource for some aspects under the hood, the watermark
 alignment that was introduced in FLIP-182 / Flink 1.15 would be possible
 across multiple clusters if you assign them to the same alignment group?
 Thanks!
 Ryan
>>>
>>>
>>> 1. The stream ids are static--however, what the physical clusters and
>>> topics that they map to can mutate. Let's say my-stream-1 maps to cluster-1
>>> and topic-1. The KafkaMetadataService can return a different mapping when
>>> metadata is fetched the next time e.g. my-stream-1 mapping to cluster-1 and
>>> topic-1, and cluster-2 and topic-2. Let me add more details on how the
>>> KafkaMetadataService is used.
>>> 2. The current design limits itself to a single configured
>>> Offs

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-05 Thread Mason Chen
Hi Paul,

+1 for this feature and supporting SQL file + JSON plans. We get a lot of
requests to just be able to submit a SQL file, but the JSON plan
optimizations make sense.

+1 for init containers or a more generalized way of obtaining arbitrary
files. File fetching isn't specific to just SQL--it also matters for Java
applications if the user doesn't want to rebuild a Flink image and just
wants to modify the user application fat jar.

Please note that we could reuse the checkpoint storage like S3/HDFS, which
> should

be required to run Flink in production, so I guess that would be acceptable
> for most

users. WDYT?


If you do go this route, it would be nice to support writing these files to
S3/HDFS via Flink. This makes access control and policy management simpler.

Also, what do you think about prefixing the config options with
`sql-driver` instead of just `sql` to be more specific?

Best,
Mason

On Mon, Jun 5, 2023 at 2:28 AM Paul Lam  wrote:

> Hi Jark,
>
> Thanks for your input! Please see my comments inline.
>
> > Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> > DataStream API also doesn't provide a default main class for users,
> > why do we need to provide such one for SQL?
>
> Sorry for the confusion I caused. By DataStream jobs, I mean jobs submitted
> via Flink CLI which actually could be DataStream/Table jobs.
>
> I think a default main class would be user-friendly which eliminates the
> need
> for users to write a main class as SQLRunner in Flink K8s operator [1].
>
> > I thought the proposed SqlDriver was a dedicated main class accepting
> SQL files, is
> > that correct?
>
> Both JSON plans and SQL files are accepted. SQL Gateway should use JSON
> plans,
> while CLI users may use either JSON plans or SQL files.
>
> Please see the updated FLIP[2] for more details.
>
> > Personally, I prefer the way of init containers which doesn't depend on
> > additional components.
> > This can reduce the moving parts of a production environment.
> > Depending on a distributed file system makes the testing, demo, and local
> > setup harder than init containers.
>
> Please note that we could reuse the checkpoint storage like S3/HDFS, which
> should
> be required to run Flink in production, so I guess that would be
> acceptable for most
> users. WDYT?
>
> WRT testing, demo, and local setups, I think we could support the local
> filesystem
> scheme i.e. file://** as the state backends do. It works as long as SQL
> Gateway
> and JobManager(or SQL Driver) can access the resource directory (specified
> via
> `sql-gateway.application.storage-dir`).
>
> Thanks!
>
> [1]
> https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> [3]
> https://github.com/apache/flink/blob/3245e0443b2a4663552a5b707c5c8c46876c1f6d/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java#L161
>
> Best,
> Paul Lam
>
> > 2023年6月3日 12:21,Jark Wu  写道:
> >
> > Hi Paul,
> >
> > Thanks for your reply. I left my comments inline.
> >
> >> As the FLIP said, it’s good to have a default main class for Flink SQLs,
> >> which allows users to submit Flink SQLs in the same way as DataStream
> >> jobs, or else users need to write their own main class.
> >
> > Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> > DataStream API also doesn't provide a default main class for users,
> > why do we need to provide such one for SQL?
> >
> >> With the help of ExecNodeGraph, do we still need the serialized
> >> SessionState? If not, we could make SQL Driver accepts two serialized
> >> formats:
> >
> > No, ExecNodeGraph doesn't need to serialize SessionState. I thought the
> > proposed SqlDriver was a dedicated main class accepting SQL files, is
> > that correct?
> > If true, we have to ship the SessionState for this case which is a large
> > work.
> > I think we just need a JsonPlanDriver which is a main class that accepts
> > JsonPlan as the parameter.
> >
> >
> >> The common solutions I know is to use distributed file systems or use
> >> init containers to localize the resources.
> >
> > Personally, I prefer the way of init containers which doesn't depend on
> > additional components.
> > This can reduce the moving parts of a production environment.
> > Depending on a distributed file system makes the testing, demo, and local
> > setup harder than init containers.
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Fri, 2 Jun 2023 at 18:10, Paul Lam  paullin3...@gmail.com>> wrote:
> >
> >> The FLIP is in the early phase and some details are not included, but
> >> fortunately, we got lots of valuable ideas from the discussion.
> >>
> >> Thanks to everyone who joined the dissuasion!
> >> @Weihua @Shanmon @Shengkai @Biao @Jark
> >>
> >> This weekend I’m gonna revisit and update the FL

[jira] [Created] (FLINK-32265) Use default classloader in jobmanager when there are no user jars for job

2023-06-05 Thread Fang Yong (Jira)
Fang Yong created FLINK-32265:
-

 Summary: Use default classloader in jobmanager when there are no 
user jars for job
 Key: FLINK-32265
 URL: https://issues.apache.org/jira/browse/FLINK-32265
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Fang Yong


Currently job manager will create a new class loader for each flink job even it 
has no user jars, which may cause metaspace increasing. Flink can use system 
classloader for the jobs without jars.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

2023-06-05 Thread yuxia
Hi, Martijn.
Thanks for you feedback.
1: In this FLIP we don't intend to allow users to customize their own stored 
procedure for we don't want to expose too much to users too early as the FLIP 
said.
The procedures are supposed to be provided only by Catalog. Catalog devs can 
write their build-in procedures, and return the procedure in method 
Catalog.getProcedure(ObjectPath procedurePath);
So, there won't be SQL syntax to create/save a stored procedure in this FLIP. 
If we find we do need it, we can propse the SQL syntax to create a stored 
procedure in another dedicated FLIP.

2: The syntax `Call procedure_name(xx)` proposed in this FLIP is the default 
syntax in Calcite for call stored procedures. Actaully, we don't need to do any 
modifcation in flink-sql-parser module for syntax of calling a procedure. 
MySQL[1], Postgres[2], Oracle[3] also use the syntax to call a stored procedure.


[1] https://dev.mysql.com/doc/refman/8.0/en/call.html
[2] https://www.postgresql.org/docs/15/sql-call.html
[3] https://docs.oracle.com/javadb/10.8.3.0/ref/rrefcallprocedure.html

Best regards,
Yuxia

- 原始邮件 -
发件人: "Martijn Visser" 
收件人: "dev" 
发送时间: 星期一, 2023年 6 月 05日 下午 8:35:44
主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

Hi Yuxia,

Thanks for the FLIP. I have a couple of questions:

1. The syntax talks about how to CALL or SHOW the available stored
procedures, but not on how to create one. Will there not be a SQL syntax to
create/save a stored procedure?
2. Is there a default syntax in Calcite for stored procedures? What do
other databases do, do they use CALL/SHOW or something like EXEC, USE?

Best regards,

Martijn

On Mon, Jun 5, 2023 at 3:23 AM yuxia  wrote:

> Hi, Jane.
> Thanks for you input. I think we can add the auxiliary command show
> procedures in this FLIP.
> Following the syntax for show functions proposed in FLIP-297.
> The syntax will be
> SHOW PROCEDURES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT]
> (LIKE | ILIKE)  ].
> I have updated to this FLIP.
>
> The other auxiliary commands maybe not suitable currently or need a
> further/dedicated dicussion. Let's keep this FLIP focus.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-297%3A+Improve+Auxiliary+Sql+Statements
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Jane Chan" 
> 收件人: "dev" 
> 发送时间: 星期六, 2023年 6 月 03日 下午 7:04:39
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>
> Hi Yuxia,
>
> Thanks for bringing this to the discussion. The call procedure is a widely
> used feature and will be very useful for users.
>
> I just have one question regarding the usage. The FLIP mentioned that
>
> Flink will allow connector developers to develop their own built-in stored
> > procedures, and then enables users to call these predefiend stored
> > procedures.
> >
> In this FLIP, we don't intend to allow users to customize their own stored
> > procedure  for we don't want to expose too much to users too early.
>
>
> If I understand correctly, we might need to provide some auxiliary commands
> to inform users what built-in procedures are provided and how to use them.
> For example, Snowflake provides commands like [1] [2], and MySQL provides
> commands like [3] [4].
>
> [1] SHOW PROCEDURES,
> https://docs.snowflake.com/en/sql-reference/sql/show-procedures
> [2] DESCRIBE PROCEDURE ,
> https://docs.snowflake.com/en/sql-reference/sql/desc-procedure
> [3] SHOW PROCEDURE CODE,
> https://dev.mysql.com/doc/refman/5.7/en/show-procedure-code.html
> [4] SHOW PROCEDURE STATUS,
> https://dev.mysql.com/doc/refman/5.7/en/show-procedure-status.html
>
> Best,
> Jane
>
> On Sat, Jun 3, 2023 at 3:20 PM Benchao Li  wrote:
>
> > Thanks Yuxia for the explanation, it makes sense to me. It would be great
> > if you also add this to the FLIP doc.
> >
> > yuxia  于2023年6月1日周四 17:11写道:
> >
> > > Hi, Benchao.
> > > Thanks for your attention.
> > >
> > > Initially, I also want to pass `TableEnvironment` to procedure. But
> > > according my investegation and offline discussion with Jingson, the
> real
> > > important thing for procedure devs is the ability to build Flink
> > > datastream. But we can't get the `StreamExecutionEnvironment` which is
> > the
> > > entrypoint to build datastream. That's to say we will lost the ability
> to
> > > build a datastream if we just pass `TableEnvironment`.
> > >
> > > Of course, we can also pass `TableEnvironment` along with
> > > `StreamExecutionEnvironment` to Procedure. But I'm intend to be
> cautious
> > > about exposing too much too early to procedure devs. If someday we find
> > we
> > > will need `TableEnvironment` to custom a procedure, we can then add a
> > > method like `getTableEnvironment()` in `ProcedureContext`.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Benchao Li" 
> > > 收件人: "dev" 
> > > 发送时间: 星期四, 2023年 6 月 01日 下午 12:58:08
> > > 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> > >
> > > Thanks Yuxia for opening this discussio

Re: [VOTE] FLIP-312: Add Yarn ACLs to Flink Containers

2023-06-05 Thread Becket Qin
+1 (binding)

Thanks for driving the FLIP, Archit.

Cheers,

Jiangjie (Becket) Qin

On Tue, Jun 6, 2023 at 4:33 AM Venkatakrishnan Sowrirajan 
wrote:

> Thanks for starting the vote on this one, Archit.
>
> +1 (non-binding)
>
> Regards
> Venkata krishnan
>
>
> On Mon, Jun 5, 2023 at 9:55 AM Archit Goyal 
> wrote:
>
> > Hi everyone,
> >
> > Thanks for all the feedback for FLIP-312: Add Yarn ACLs to Flink
> > Containers<
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP*312*3A*Add*Yarn*ACLs*to*Flink*Containers__;KyUrKysrKys!!IKRxdwAv5BmarQ!aWkLc7eHAWyHz5kwEq8kKzEAgbtKtlMmi9ifOy_1GNbiO93taxiMcwdHfENc4inLU_cxZIKPDMwBP97Z4oibXUIM$
> > >.
> > Following is the discussion thread : Link<
> >
> https://urldefense.com/v3/__https://lists.apache.org/thread/xj3ytkwj9lsl3hpjdb4n8pmy7lk3l8tv__;!!IKRxdwAv5BmarQ!aWkLc7eHAWyHz5kwEq8kKzEAgbtKtlMmi9ifOy_1GNbiO93taxiMcwdHfENc4inLU_cxZIKPDMwBP97Z4u3tNMqI$
> > >
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours (until June 9th, 12:00AM GMT) unless there is an objection or an
> > insufficient number of votes.
> >
> > Thanks,
> > Archit Goyal
> >
>


Re: [VOTE] FLIP-312: Add Yarn ACLs to Flink Containers

2023-06-05 Thread Yang Wang
+1 (binding)

Best,
Yang

Becket Qin  于2023年6月6日周二 10:35写道:

> +1 (binding)
>
> Thanks for driving the FLIP, Archit.
>
> Cheers,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jun 6, 2023 at 4:33 AM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu>
> wrote:
>
> > Thanks for starting the vote on this one, Archit.
> >
> > +1 (non-binding)
> >
> > Regards
> > Venkata krishnan
> >
> >
> > On Mon, Jun 5, 2023 at 9:55 AM Archit Goyal  >
> > wrote:
> >
> > > Hi everyone,
> > >
> > > Thanks for all the feedback for FLIP-312: Add Yarn ACLs to Flink
> > > Containers<
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP*312*3A*Add*Yarn*ACLs*to*Flink*Containers__;KyUrKysrKys!!IKRxdwAv5BmarQ!aWkLc7eHAWyHz5kwEq8kKzEAgbtKtlMmi9ifOy_1GNbiO93taxiMcwdHfENc4inLU_cxZIKPDMwBP97Z4oibXUIM$
> > > >.
> > > Following is the discussion thread : Link<
> > >
> >
> https://urldefense.com/v3/__https://lists.apache.org/thread/xj3ytkwj9lsl3hpjdb4n8pmy7lk3l8tv__;!!IKRxdwAv5BmarQ!aWkLc7eHAWyHz5kwEq8kKzEAgbtKtlMmi9ifOy_1GNbiO93taxiMcwdHfENc4inLU_cxZIKPDMwBP97Z4u3tNMqI$
> > > >
> > >
> > > I'd like to start a vote for it. The vote will be open for at least 72
> > > hours (until June 9th, 12:00AM GMT) unless there is an objection or an
> > > insufficient number of votes.
> > >
> > > Thanks,
> > > Archit Goyal
> > >
> >
>


[jira] [Created] (FLINK-32266) Kafka Source Continues Consuming Previous Topic After Loading Savepoint

2023-06-05 Thread xiechenling (Jira)
xiechenling created FLINK-32266:
---

 Summary: Kafka Source Continues Consuming Previous Topic After 
Loading Savepoint
 Key: FLINK-32266
 URL: https://issues.apache.org/jira/browse/FLINK-32266
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.3
 Environment: Flink version: 1.15.3
Kafka Connector version: 1.15.3 FLIP-27
Reporter: xiechenling


I encountered an issue with the Flink Kafka Connector's Kafka Source where it 
continues consuming data from a previously consumed topic even after loading a 
savepoint and configuring it to consume data from a different topic.

 

Steps to reproduce:
 # Set up the Kafka Source to consume data from Topic A.
 # Start the Flink job.
 # Stop the job and create a savepoint.
 # Modify the configuration to consume data from Topic B.
 # Load the job from the savepoint and start it.
 # Observe that the job consumes data from both Topic A and Topic B, instead of 
just Topic B.

 

Expected behavior:

After loading a savepoint and configuring the Kafka Source to consume data from 
a new topic, the job should only consume data from the newly configured topic.

 

Actual behavior:

The Kafka Source continues consuming data from the previous topic (Topic A), in 
addition to the newly configured topic (Topic B).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-06-05 Thread Shammon FY
Hi Martijn,

Thanks for your attention, I will soon initiate a discussion about FLIP-314.

Best,
Shammon FY


On Fri, Jun 2, 2023 at 2:55 AM Martijn Visser 
wrote:

> Hi Shammon,
>
> Just wanted to chip-in that I like the overall FLIP. Will be interesting to
> see the follow-up discussion on FLIP-314.
>
> Best regards,
>
> Martijn
>
> On Thu, Jun 1, 2023 at 5:45 AM yuxia  wrote:
>
> > Thanks for explanation. Make sense to me.
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Shammon FY" 
> > 收件人: "dev" 
> > 发送时间: 星期四, 2023年 6 月 01日 上午 10:45:12
> > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
> >
> > Thanks yuxia, you're right and I'll add the new database to
> > AlterDatabaseEvent.
> >
> > I added `ignoreIfNotExists` for AlterDatabaseEvent because it is a
> > parameter in the `Catalog.alterDatabase` method. Although this value is
> > currently always false in `AlterDatabaseOperation`, I think it's better
> > to stay consistent with `Catalog.alterDatabase`. What do you think?
> >
> > Best,
> > Shammon FY
> >
> > On Thu, Jun 1, 2023 at 10:25 AM yuxia 
> wrote:
> >
> > > Hi, Shammon.
> > > I mean do we need to contain the new database after alter in
> > > AlterDatabaseEvent?  So that the listener can know what has been
> modified
> > > for the database. Or the listener don't need to care about the actual
> > > modification.
> > > Also, I'm wondering whether AlterDatabaseEven need to include
> > > ignoreIfNotExists method since alter database operation don't have such
> > > syntax like 'alter database if exists xxx'.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Shammon FY" 
> > > 收件人: "dev" 
> > > 发送时间: 星期三, 2023年 5 月 31日 下午 2:55:26
> > > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
> > >
> > > Hi yuxia
> > >
> > > Thanks for your input. The `AlterDatabaseEvent` extends
> > > `DatabaseModificationEvent` which has the original database.
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Wed, May 31, 2023 at 2:24 PM yuxia 
> > wrote:
> > >
> > > > Thanks Shammon for driving it.
> > > > The FLIP generally looks good to me. I only have one question.
> > > > WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name
> > and
> > > > the new CatalogDatabase after modified. Is it enough only pass the
> > origin
> > > > database name? Will it be better to contain the origin
> CatalogDatabase
> > so
> > > > that listener have ways to know what changes?
> > > >
> > > > Best regards,
> > > > Yuxia
> > > >
> > > > - 原始邮件 -
> > > > 发件人: "ron9 liu" 
> > > > 收件人: "dev" 
> > > > 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> > > > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
> > > >
> > > > Hi, Shammon
> > > >
> > > > Thanks for driving this FLIP, It will enforce the Flink metadata
> > > capability
> > > > from the platform produce perspective. The overall design looks good
> to
> > > me,
> > > > I just have some small question:
> > > > 1. Regarding CatalogModificationListenerFactory#createListener
> method,
> > I
> > > > think it would be better to pass Context as its parameter instead of
> > two
> > > > specific Object. In this way, we can easily extend it in the future
> and
> > > > there will be no compatibility problems. Refer to
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> > > > 2. In FLIP, you mentioned that multiple Flink tables may refer to the
> > > same
> > > > physical table, so does the Listener report this physical table
> > > repeatedly?
> > > > 3. When registering a Listener object, will it connect to an external
> > > > system such as Datahub? If the Listener object registration times out
> > due
> > > > to permission issues, it will affect the execution of all subsequent
> > SQL,
> > > > what should we do in this case?
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Shammon FY  于2023年5月31日周三 08:53写道:
> > > >
> > > > > Thanks Feng, the catalog modification listener is only used to
> report
> > > > > read-only ddl information to other components or systems.
> > > > >
> > > > > > 1. Will an exception thrown by the listener affect the normal
> > > execution
> > > > > process?
> > > > >
> > > > > Users need to handle the exception in the listener themselves. Many
> > > DDLs
> > > > > such as drop tables and alter tables cannot be rolled back, Flink
> > > cannot
> > > > > handle these exceptions for the listener. It will cause the
> operation
> > > to
> > > > > exit if an exception is thrown, but the executed DDL will be
> > > successful.
> > > > >
> > > > > > 2. What is the order of execution? Is the listener executed first
> > or
> > > > are
> > > > > specific operations executed first?  If I want to perform DDL
> > > permission
> > > > > verification(such as integrating with Ranger based on the
> listener) ,
> 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-06-05 Thread Shammon FY
Hi devs:

Thanks for all the feedback, and if there are no more comments, I will
start a vote on FLIP-294 [1] later. Thanks again.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener

Best,
Shammon FY

On Tue, Jun 6, 2023 at 1:40 PM Shammon FY  wrote:

> Hi Martijn,
>
> Thanks for your attention, I will soon initiate a discussion about
> FLIP-314.
>
> Best,
> Shammon FY
>
>
> On Fri, Jun 2, 2023 at 2:55 AM Martijn Visser 
> wrote:
>
>> Hi Shammon,
>>
>> Just wanted to chip-in that I like the overall FLIP. Will be interesting
>> to
>> see the follow-up discussion on FLIP-314.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Jun 1, 2023 at 5:45 AM yuxia  wrote:
>>
>> > Thanks for explanation. Make sense to me.
>> >
>> > Best regards,
>> > Yuxia
>> >
>> > - 原始邮件 -
>> > 发件人: "Shammon FY" 
>> > 收件人: "dev" 
>> > 发送时间: 星期四, 2023年 6 月 01日 上午 10:45:12
>> > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>> >
>> > Thanks yuxia, you're right and I'll add the new database to
>> > AlterDatabaseEvent.
>> >
>> > I added `ignoreIfNotExists` for AlterDatabaseEvent because it is a
>> > parameter in the `Catalog.alterDatabase` method. Although this value is
>> > currently always false in `AlterDatabaseOperation`, I think it's better
>> > to stay consistent with `Catalog.alterDatabase`. What do you think?
>> >
>> > Best,
>> > Shammon FY
>> >
>> > On Thu, Jun 1, 2023 at 10:25 AM yuxia 
>> wrote:
>> >
>> > > Hi, Shammon.
>> > > I mean do we need to contain the new database after alter in
>> > > AlterDatabaseEvent?  So that the listener can know what has been
>> modified
>> > > for the database. Or the listener don't need to care about the actual
>> > > modification.
>> > > Also, I'm wondering whether AlterDatabaseEven need to include
>> > > ignoreIfNotExists method since alter database operation don't have
>> such
>> > > syntax like 'alter database if exists xxx'.
>> > >
>> > > Best regards,
>> > > Yuxia
>> > >
>> > > - 原始邮件 -
>> > > 发件人: "Shammon FY" 
>> > > 收件人: "dev" 
>> > > 发送时间: 星期三, 2023年 5 月 31日 下午 2:55:26
>> > > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>> > >
>> > > Hi yuxia
>> > >
>> > > Thanks for your input. The `AlterDatabaseEvent` extends
>> > > `DatabaseModificationEvent` which has the original database.
>> > >
>> > > Best,
>> > > Shammon FY
>> > >
>> > > On Wed, May 31, 2023 at 2:24 PM yuxia 
>> > wrote:
>> > >
>> > > > Thanks Shammon for driving it.
>> > > > The FLIP generally looks good to me. I only have one question.
>> > > > WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name
>> > and
>> > > > the new CatalogDatabase after modified. Is it enough only pass the
>> > origin
>> > > > database name? Will it be better to contain the origin
>> CatalogDatabase
>> > so
>> > > > that listener have ways to know what changes?
>> > > >
>> > > > Best regards,
>> > > > Yuxia
>> > > >
>> > > > - 原始邮件 -
>> > > > 发件人: "ron9 liu" 
>> > > > 收件人: "dev" 
>> > > > 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
>> > > > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data
>> Listener
>> > > >
>> > > > Hi, Shammon
>> > > >
>> > > > Thanks for driving this FLIP, It will enforce the Flink metadata
>> > > capability
>> > > > from the platform produce perspective. The overall design looks
>> good to
>> > > me,
>> > > > I just have some small question:
>> > > > 1. Regarding CatalogModificationListenerFactory#createListener
>> method,
>> > I
>> > > > think it would be better to pass Context as its parameter instead of
>> > two
>> > > > specific Object. In this way, we can easily extend it in the future
>> and
>> > > > there will be no compatibility problems. Refer to
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
>> > > > 2. In FLIP, you mentioned that multiple Flink tables may refer to
>> the
>> > > same
>> > > > physical table, so does the Listener report this physical table
>> > > repeatedly?
>> > > > 3. When registering a Listener object, will it connect to an
>> external
>> > > > system such as Datahub? If the Listener object registration times
>> out
>> > due
>> > > > to permission issues, it will affect the execution of all subsequent
>> > SQL,
>> > > > what should we do in this case?
>> > > >
>> > > > Best,
>> > > > Ron
>> > > >
>> > > > Shammon FY  于2023年5月31日周三 08:53写道:
>> > > >
>> > > > > Thanks Feng, the catalog modification listener is only used to
>> report
>> > > > > read-only ddl information to other components or systems.
>> > > > >
>> > > > > > 1. Will an exception thrown by the listener affect the normal
>> > > execution
>> > > > > process?
>> > > > >
>> > > > > Users need to handle the exception in the listener themselves.
>> Many
>> > > DDLs
>> > > > > such as drop tables and alter tables cannot be rolled b

Re: [DISCUSSION] Improved USER/SYSTEM exception wrapping in Flink code base

2023-06-05 Thread Panagiotis Garefalakis
Thanks for bringing this up Hong!

Classifying exceptions was also the main driving factor behind pluggable
failure enrichers .
However, we could do a much better job maintaining a hierarchy of System
and User exceptions thus making the classification logic more
straightforward.

   - Defining better system/user exceptions with some kind of hierarchy is
   definitely a step forward (and refactoring the existing ones)
   - Classloader filtering could definitely be used for discovering errors
   originating from user defined code, see doc
   

   - Eventually we could also release a simple failure enricher using the
   above improvements to automatically classify errors on JMs exceptions
   endpoint

Cheers,
Panagiotis

On Wed, May 31, 2023 at 9:12 PM Paul Lam  wrote:

> Hi Hong,
>
> Thanks for starting the discussion! I believe the exception classification
> between
> user exceptions and system exceptions has been long-awaited.
>
> It's worth mentioning that years ago there was a related discussion [1],
> FYI.
>
> I’m in favor of the heuristic approach to classify the exceptions by which
> classloader it comes from. In addition, we could introduce extra
> configurations
> to allow manual execution classification based on the package name of
> exceptions.
>
> [1] https://lists.apache.org/thread/gms4nysnb3o4v2k6421m5hsq0g7gtr81
>
> Best,
> Paul Lam
>
> > 2023年5月25日 23:07,Teoh, Hong  写道:
> >
> > Hi all,
> >
> > This discussion thread is to gauge community opinion and gather feedback
> on implementing a better exception hierarchy in Flink to identify
> exceptions that come from running “User job code” and exceptions coming
> from “Flink engine code”.
> >
> > Problem:
> > Flink provides a distributed processing engine (SYSTEM) to run a data
> streaming job (USER). There are many places in code where the engine runs
> “user job provided java classes”, such as serialization/deserialization,
> configuration objects, credential loading, running setup() method on
> certain Operators.
> > Sometimes when evaluating a stack trace, it might be hard to
> automatically determine if an exception is arising out of a Flink engine
> problem, or a problem associated to a particular job.
> >
> > Proposed way forward:
> > - It would be good to have an exception hierarchy maintained by Flink
> that separates out the exceptions arising from running “USER provided
> classes”. That way, we can improve our ability to automatically classify
> and mitigate these exceptions.
> > - We could also include separating out the places where exception
> originates based on function - FlinkSerializationException,
> FlinkConfigurationException.. etc. (we already have a similar concept with
> IncompatibleKeysException)
> > - This has synergy with FLIP-304: Pluggable Failure Enrichers (since it
> would simplify the logic in the USER/SYSTEM classifier there) [1].
> > - In addition, this has been discussed before in the context of updating
> the exception thrown by serialisers to be a Flink-specific serialisation
> exception instead of IllegalStateException [2]
> >
> >
> > Any thoughts on the above?
> >
> > Regards,
> > Hong
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers
> > [2] https://lists.apache.org/thread/0o859h1vdx6mwv0fqvmybpn574692jtg
> >
> >
>
>


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-06-05 Thread Jing Ge
Hi Shammon,

Thanks for driving it! It is a really interesting proposal. Looking forward
to the follow-up FLIP for the lineage feature, users will love it :-)

There are some inconsistencies in the content. In the very below example,
listener.onEvent(CatalogModificationEvent) is called, while in the
CatalogModificationListener interface definition, only
onEvent(CatalogModificationEvent, CatalogModificationContext) has been
defined.  I was wondering(NIT):

1. should there be another overloading method
onEvent(CatalogModificationEvent) alongside
onEvent(CatalogModificationEvent, CatalogModificationContext) ?
2. Since onEvent(CatalogModificationEvent) could be used, do we really need
CatalogModificationContext? API design example as reference: [1]

Best regards,
Jing


[1]
http://www.java2s.com/example/java-src/pkg/java/awt/event/actionlistener-add27.html

On Tue, Jun 6, 2023 at 7:43 AM Shammon FY  wrote:

> Hi devs:
>
> Thanks for all the feedback, and if there are no more comments, I will
> start a vote on FLIP-294 [1] later. Thanks again.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
>
> Best,
> Shammon FY
>
> On Tue, Jun 6, 2023 at 1:40 PM Shammon FY  wrote:
>
> > Hi Martijn,
> >
> > Thanks for your attention, I will soon initiate a discussion about
> > FLIP-314.
> >
> > Best,
> > Shammon FY
> >
> >
> > On Fri, Jun 2, 2023 at 2:55 AM Martijn Visser 
> > wrote:
> >
> >> Hi Shammon,
> >>
> >> Just wanted to chip-in that I like the overall FLIP. Will be interesting
> >> to
> >> see the follow-up discussion on FLIP-314.
> >>
> >> Best regards,
> >>
> >> Martijn
> >>
> >> On Thu, Jun 1, 2023 at 5:45 AM yuxia 
> wrote:
> >>
> >> > Thanks for explanation. Make sense to me.
> >> >
> >> > Best regards,
> >> > Yuxia
> >> >
> >> > - 原始邮件 -
> >> > 发件人: "Shammon FY" 
> >> > 收件人: "dev" 
> >> > 发送时间: 星期四, 2023年 6 月 01日 上午 10:45:12
> >> > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
> >> >
> >> > Thanks yuxia, you're right and I'll add the new database to
> >> > AlterDatabaseEvent.
> >> >
> >> > I added `ignoreIfNotExists` for AlterDatabaseEvent because it is a
> >> > parameter in the `Catalog.alterDatabase` method. Although this value
> is
> >> > currently always false in `AlterDatabaseOperation`, I think it's
> better
> >> > to stay consistent with `Catalog.alterDatabase`. What do you think?
> >> >
> >> > Best,
> >> > Shammon FY
> >> >
> >> > On Thu, Jun 1, 2023 at 10:25 AM yuxia 
> >> wrote:
> >> >
> >> > > Hi, Shammon.
> >> > > I mean do we need to contain the new database after alter in
> >> > > AlterDatabaseEvent?  So that the listener can know what has been
> >> modified
> >> > > for the database. Or the listener don't need to care about the
> actual
> >> > > modification.
> >> > > Also, I'm wondering whether AlterDatabaseEven need to include
> >> > > ignoreIfNotExists method since alter database operation don't have
> >> such
> >> > > syntax like 'alter database if exists xxx'.
> >> > >
> >> > > Best regards,
> >> > > Yuxia
> >> > >
> >> > > - 原始邮件 -
> >> > > 发件人: "Shammon FY" 
> >> > > 收件人: "dev" 
> >> > > 发送时间: 星期三, 2023年 5 月 31日 下午 2:55:26
> >> > > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data
> Listener
> >> > >
> >> > > Hi yuxia
> >> > >
> >> > > Thanks for your input. The `AlterDatabaseEvent` extends
> >> > > `DatabaseModificationEvent` which has the original database.
> >> > >
> >> > > Best,
> >> > > Shammon FY
> >> > >
> >> > > On Wed, May 31, 2023 at 2:24 PM yuxia 
> >> > wrote:
> >> > >
> >> > > > Thanks Shammon for driving it.
> >> > > > The FLIP generally looks good to me. I only have one question.
> >> > > > WRT AlterDatabaseEvent, IIUC, it'll contain the origin database
> name
> >> > and
> >> > > > the new CatalogDatabase after modified. Is it enough only pass the
> >> > origin
> >> > > > database name? Will it be better to contain the origin
> >> CatalogDatabase
> >> > so
> >> > > > that listener have ways to know what changes?
> >> > > >
> >> > > > Best regards,
> >> > > > Yuxia
> >> > > >
> >> > > > - 原始邮件 -
> >> > > > 发件人: "ron9 liu" 
> >> > > > 收件人: "dev" 
> >> > > > 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> >> > > > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data
> >> Listener
> >> > > >
> >> > > > Hi, Shammon
> >> > > >
> >> > > > Thanks for driving this FLIP, It will enforce the Flink metadata
> >> > > capability
> >> > > > from the platform produce perspective. The overall design looks
> >> good to
> >> > > me,
> >> > > > I just have some small question:
> >> > > > 1. Regarding CatalogModificationListenerFactory#createListener
> >> method,
> >> > I
> >> > > > think it would be better to pass Context as its parameter instead
> of
> >> > two
> >> > > > specific Object. In this way, we can easily extend it in the
> future
> >> and
> >> > > > there will be no compatibility problems. Refer to
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.co