[jira] [Created] (FLINK-24624) Add clean up phase when kubernetes session start failed

2021-10-24 Thread Aitozi (Jira)
Aitozi created FLINK-24624:
--

 Summary: Add clean up phase when kubernetes session start failed
 Key: FLINK-24624
 URL: https://issues.apache.org/jira/browse/FLINK-24624
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.14.0
Reporter: Aitozi


Serval k8s resources are created when deploy the kubernetes session. But the 
resource are left there when deploy failed. This will lead to the next failure 
or resource leak. So I think we should add the clean up phase when start failed



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24625) Expose the required kubernetes permission from the shell command

2021-10-24 Thread Aitozi (Jira)
Aitozi created FLINK-24625:
--

 Summary: Expose the required kubernetes permission from the shell 
command
 Key: FLINK-24625
 URL: https://issues.apache.org/jira/browse/FLINK-24625
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.14.0
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24626) Flink JDBC Sink may lose data in retract stream

2021-10-24 Thread Kenyore (Jira)
Kenyore created FLINK-24626:
---

 Summary: Flink JDBC Sink may lose data in retract stream
 Key: FLINK-24626
 URL: https://issues.apache.org/jira/browse/FLINK-24626
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: Kenyore


The JDBC sink will lose some data while using 
TableBufferReducedStatementExecutor.

Here are some snippets.

{code}
@Override
public void addToBatch(RowData record) throws SQLException {
RowData key = keyExtractor.apply(record);
if(record.getRowKind()==RowKind.DELETE) {
//XXX cut delete off because the retract stream would generate
return;
}
boolean flag = changeFlag(record.getRowKind());
RowData value = valueTransform.apply(record); // copy or not
reduceBuffer.put(key, Tuple2.of(flag, value));
}

private boolean changeFlag(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return true;
case DELETE:
case UPDATE_BEFORE:
return false;
default:
throw new UnsupportedOperationException(
String.format(
"Unknown row kind, the supported row kinds is: 
INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
+ " DELETE, but get: %s.",
rowKind));
}
}
{code}

The code above add changeFlag to Tuple2 as the sign of upsert or delete

{code}
@Override
public void executeBatch() throws SQLException {
for (Map.Entry> entry : 
reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch();
deleteExecutor.executeBatch();
reduceBuffer.clear();
}
{code}
executeBatch deletes all false flag data after true flag data.
It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we 
would meet data lose because of this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-24 Thread Jingsong Li
Hi Ingo,

I thought again.

I'll try to sort out the current catalog behaviors.
Actually, we can divide catalogs into three categories:

1. ExternalCatalog: it can only read or create a single table kind
which connects to external storage. TableFactory is provided by
Catalog, which can have nothing to do with Flink's Factory discovery
mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
Catalog manages the life cycle of its **managed** tables, which means
that creation and drop will affect the real physical storage. The DDL
has no "connector" option.

2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
factories are created through Flink's factory discovery mechanism. At
this time, the catalog is actually only a storage medium for saving
schema and options, such as GenericInMemoryCatalog. Catalog only saves
meta information and does not manage the underlying physical storage
of tables. These tables are **unmanaged**. The DDL must have a
"connector" option.

3. HybridCatalog: It can save both its own **managed** table and
generic Flink **unmanaged** table, such as HiveCatalog.

We want to use the "connector" option to distinguish whether it is
managed or not.

Now, consider the Flink managed table in this FLIP.
a. ExternalCatalog can not support Flink managed tables.
b. GenericCatalog can support Flink managed tables without the
"connector" option.
c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
support Flink managed tables:
- with "connector" option in Flink dialect is unmanaged tables
- Hive DDL in Hive dialect is Hive managed tables, the parser will add
"connector = hive" automatically. At present, there are many
differences between Flink DDL and Hive DDL, and even their features
have many differences.
- without "connector" option in Flink dialect is Flink managed tables.

In this way, we can support Flink managed tables while maintaining
compatibility.

Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.

## Back to your question #

> but we should make it clear that this is a limitation and probably document 
> how users can clean up the underlying physical storage manually in this case

Yes, it's strange that the catalog should manage tables, but some
catalogs don't have this ability.
- For PersistentCatalog, the meta will continue until the underlying
physical storage is deleted.
- For InMemoryCatalog, yes, we should document it for the underlying
physical storage of Flink managed tables.

> the HiveCatalog doesn't list a 'connector' option for its tables.

Actually, It can be divided into two steps: create and save:
- When creating a table, the table seen by HiveCatalog must have
"connector = hive", which is the hive table (Hive managed table). You
can see the "HiveCatalog.isHiveTable".
- When saving the table, it will remove the connector of the hive
table. We can do this: with "connector" option is Flink generic table,
without "connector" option is Hive table, with "flink-managed = true"
is Flink managed table.

Best,
Jingsong Lee

On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk  wrote:
>
> Hi JingSong,
>
> thank you for the answers!
>
> > BDT only can be dropped by Flink SQL DDL now.
>
> Maybe I'm misunderstanding, but that's only true from the Flink side. What
> I meant is that a table could disappear from a catalog entirely outside of
> Flink. As a simple example, consider a catalog which represents an IMAP
> mail server and each folder as a table. If a folder is deleted from the
> mail account, the table would disappear, but Flink would have no way of
> knowing that. I don't see a way around this problem, to be honest, but we
> should make it clear that this is a limitation and probably document how
> users can clean up the underlying physical storage manually in this case?
>
> > - Option 1: Create table without the connector option, the table will
> > be forcibly translated to BDT.
>
> This would be a breaking change, right? If I remember correctly (but I
> might not :-)), even the HiveCatalog doesn't list a 'connector' option for
> its tables.
>
> This approach is also very implicit, and creating physical storage isn't
> exactly "free", so I personally would favor one of the other approaches.
> Option (2) would be explicit for the end user, while Option (3) is again
> implicit for the user and only explicit for the catalog implementor, so I
> kind of favor Option (2) because I feel that users should be aware of
> creating a Flink-managed table.
>
> We also need to consider the upgrade path here: if a catalog exposes tables
> without 'connector' options today, we need to make sure that once this FLIP
> is implemented no errors are thrown because codepaths assume that physical
> storage must exist for such tables (since they were created before the
> FLIP).
>
>
> Best
> Ingo
>
> On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li  wrote:
>
> > Hi Ingo and wenlong,
> >
> > Thanks for your feedback. Very good 

[jira] [Created] (FLINK-24627) Add some generic junit5 extensions to replace junit4 rules

2021-10-24 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-24627:
-

 Summary: Add some generic junit5 extensions to replace junit4 rules
 Key: FLINK-24627
 URL: https://issues.apache.org/jira/browse/FLINK-24627
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Hang Ruan


We have to use junit5 extensions to replace the existed junit4 rules in order 
to change tests to junit5 in flink. There are some generic rules that should be 
provided in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-10-24 Thread Ingo Bürk
Hi Jingsong,

thanks again for the answers. I think requiring catalogs to implement an
interface to support BDTs is something we'll need (though personally I
still prefer explicit DDL here over the "no connector option" approach).

What about more edge cases like

CREATE TABLE T (f0 INT);
ALTER TABLE T SET ('connector' = '…');

This would have to first create the physical storage and then delete it
again, right?

On a separate note, he FLIP currently only discusses SQL DDL, and you have
also mentioned

> BDT only can be dropped by Flink SQL DDL now.

Something Flink suffers from a lot is inconsistencies across APIs. I think
it is important that we support features on all major APIs, i.e. including
Table API.
For example for creating a BDT this would mean e.g. adding something like
#forManaged(…) to TableDescriptor.


Best
Ingo

On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li  wrote:

> Hi Ingo,
>
> I thought again.
>
> I'll try to sort out the current catalog behaviors.
> Actually, we can divide catalogs into three categories:
>
> 1. ExternalCatalog: it can only read or create a single table kind
> which connects to external storage. TableFactory is provided by
> Catalog, which can have nothing to do with Flink's Factory discovery
> mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> Catalog manages the life cycle of its **managed** tables, which means
> that creation and drop will affect the real physical storage. The DDL
> has no "connector" option.
>
> 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> factories are created through Flink's factory discovery mechanism. At
> this time, the catalog is actually only a storage medium for saving
> schema and options, such as GenericInMemoryCatalog. Catalog only saves
> meta information and does not manage the underlying physical storage
> of tables. These tables are **unmanaged**. The DDL must have a
> "connector" option.
>
> 3. HybridCatalog: It can save both its own **managed** table and
> generic Flink **unmanaged** table, such as HiveCatalog.
>
> We want to use the "connector" option to distinguish whether it is
> managed or not.
>
> Now, consider the Flink managed table in this FLIP.
> a. ExternalCatalog can not support Flink managed tables.
> b. GenericCatalog can support Flink managed tables without the
> "connector" option.
> c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
> support Flink managed tables:
> - with "connector" option in Flink dialect is unmanaged tables
> - Hive DDL in Hive dialect is Hive managed tables, the parser will add
> "connector = hive" automatically. At present, there are many
> differences between Flink DDL and Hive DDL, and even their features
> have many differences.
> - without "connector" option in Flink dialect is Flink managed tables.
>
> In this way, we can support Flink managed tables while maintaining
> compatibility.
>
> Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
>
> ## Back to your question #
>
> > but we should make it clear that this is a limitation and probably
> document how users can clean up the underlying physical storage manually in
> this case
>
> Yes, it's strange that the catalog should manage tables, but some
> catalogs don't have this ability.
> - For PersistentCatalog, the meta will continue until the underlying
> physical storage is deleted.
> - For InMemoryCatalog, yes, we should document it for the underlying
> physical storage of Flink managed tables.
>
> > the HiveCatalog doesn't list a 'connector' option for its tables.
>
> Actually, It can be divided into two steps: create and save:
> - When creating a table, the table seen by HiveCatalog must have
> "connector = hive", which is the hive table (Hive managed table). You
> can see the "HiveCatalog.isHiveTable".
> - When saving the table, it will remove the connector of the hive
> table. We can do this: with "connector" option is Flink generic table,
> without "connector" option is Hive table, with "flink-managed = true"
> is Flink managed table.
>
> Best,
> Jingsong Lee
>
> On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk  wrote:
> >
> > Hi JingSong,
> >
> > thank you for the answers!
> >
> > > BDT only can be dropped by Flink SQL DDL now.
> >
> > Maybe I'm misunderstanding, but that's only true from the Flink side.
> What
> > I meant is that a table could disappear from a catalog entirely outside
> of
> > Flink. As a simple example, consider a catalog which represents an IMAP
> > mail server and each folder as a table. If a folder is deleted from the
> > mail account, the table would disappear, but Flink would have no way of
> > knowing that. I don't see a way around this problem, to be honest, but we
> > should make it clear that this is a limitation and probably document how
> > users can clean up the underlying physical storage manually in this case?
> >
> > > - Option 1: Create table without the connector option, the table will
> > > be f