[jira] [Created] (FLINK-24624) Add clean up phase when kubernetes session start failed
Aitozi created FLINK-24624: -- Summary: Add clean up phase when kubernetes session start failed Key: FLINK-24624 URL: https://issues.apache.org/jira/browse/FLINK-24624 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Affects Versions: 1.14.0 Reporter: Aitozi Serval k8s resources are created when deploy the kubernetes session. But the resource are left there when deploy failed. This will lead to the next failure or resource leak. So I think we should add the clean up phase when start failed -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24625) Expose the required kubernetes permission from the shell command
Aitozi created FLINK-24625: -- Summary: Expose the required kubernetes permission from the shell command Key: FLINK-24625 URL: https://issues.apache.org/jira/browse/FLINK-24625 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Affects Versions: 1.14.0 Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24626) Flink JDBC Sink may lose data in retract stream
Kenyore created FLINK-24626: --- Summary: Flink JDBC Sink may lose data in retract stream Key: FLINK-24626 URL: https://issues.apache.org/jira/browse/FLINK-24626 Project: Flink Issue Type: Bug Components: Connectors / JDBC Reporter: Kenyore The JDBC sink will lose some data while using TableBufferReducedStatementExecutor. Here are some snippets. {code} @Override public void addToBatch(RowData record) throws SQLException { RowData key = keyExtractor.apply(record); if(record.getRowKind()==RowKind.DELETE) { //XXX cut delete off because the retract stream would generate return; } boolean flag = changeFlag(record.getRowKind()); RowData value = valueTransform.apply(record); // copy or not reduceBuffer.put(key, Tuple2.of(flag, value)); } private boolean changeFlag(RowKind rowKind) { switch (rowKind) { case INSERT: case UPDATE_AFTER: return true; case DELETE: case UPDATE_BEFORE: return false; default: throw new UnsupportedOperationException( String.format( "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," + " DELETE, but get: %s.", rowKind)); } } {code} The code above add changeFlag to Tuple2 as the sign of upsert or delete {code} @Override public void executeBatch() throws SQLException { for (Map.Entry> entry : reduceBuffer.entrySet()) { if (entry.getValue().f0) { upsertExecutor.addToBatch(entry.getValue().f1); } else { // delete by key deleteExecutor.addToBatch(entry.getKey()); } } upsertExecutor.executeBatch(); deleteExecutor.executeBatch(); reduceBuffer.clear(); } {code} executeBatch deletes all false flag data after true flag data. It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we would meet data lose because of this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage
Hi Ingo, I thought again. I'll try to sort out the current catalog behaviors. Actually, we can divide catalogs into three categories: 1. ExternalCatalog: it can only read or create a single table kind which connects to external storage. TableFactory is provided by Catalog, which can have nothing to do with Flink's Factory discovery mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc. Catalog manages the life cycle of its **managed** tables, which means that creation and drop will affect the real physical storage. The DDL has no "connector" option. 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and factories are created through Flink's factory discovery mechanism. At this time, the catalog is actually only a storage medium for saving schema and options, such as GenericInMemoryCatalog. Catalog only saves meta information and does not manage the underlying physical storage of tables. These tables are **unmanaged**. The DDL must have a "connector" option. 3. HybridCatalog: It can save both its own **managed** table and generic Flink **unmanaged** table, such as HiveCatalog. We want to use the "connector" option to distinguish whether it is managed or not. Now, consider the Flink managed table in this FLIP. a. ExternalCatalog can not support Flink managed tables. b. GenericCatalog can support Flink managed tables without the "connector" option. c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to support Flink managed tables: - with "connector" option in Flink dialect is unmanaged tables - Hive DDL in Hive dialect is Hive managed tables, the parser will add "connector = hive" automatically. At present, there are many differences between Flink DDL and Hive DDL, and even their features have many differences. - without "connector" option in Flink dialect is Flink managed tables. In this way, we can support Flink managed tables while maintaining compatibility. Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog. ## Back to your question # > but we should make it clear that this is a limitation and probably document > how users can clean up the underlying physical storage manually in this case Yes, it's strange that the catalog should manage tables, but some catalogs don't have this ability. - For PersistentCatalog, the meta will continue until the underlying physical storage is deleted. - For InMemoryCatalog, yes, we should document it for the underlying physical storage of Flink managed tables. > the HiveCatalog doesn't list a 'connector' option for its tables. Actually, It can be divided into two steps: create and save: - When creating a table, the table seen by HiveCatalog must have "connector = hive", which is the hive table (Hive managed table). You can see the "HiveCatalog.isHiveTable". - When saving the table, it will remove the connector of the hive table. We can do this: with "connector" option is Flink generic table, without "connector" option is Hive table, with "flink-managed = true" is Flink managed table. Best, Jingsong Lee On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk wrote: > > Hi JingSong, > > thank you for the answers! > > > BDT only can be dropped by Flink SQL DDL now. > > Maybe I'm misunderstanding, but that's only true from the Flink side. What > I meant is that a table could disappear from a catalog entirely outside of > Flink. As a simple example, consider a catalog which represents an IMAP > mail server and each folder as a table. If a folder is deleted from the > mail account, the table would disappear, but Flink would have no way of > knowing that. I don't see a way around this problem, to be honest, but we > should make it clear that this is a limitation and probably document how > users can clean up the underlying physical storage manually in this case? > > > - Option 1: Create table without the connector option, the table will > > be forcibly translated to BDT. > > This would be a breaking change, right? If I remember correctly (but I > might not :-)), even the HiveCatalog doesn't list a 'connector' option for > its tables. > > This approach is also very implicit, and creating physical storage isn't > exactly "free", so I personally would favor one of the other approaches. > Option (2) would be explicit for the end user, while Option (3) is again > implicit for the user and only explicit for the catalog implementor, so I > kind of favor Option (2) because I feel that users should be aware of > creating a Flink-managed table. > > We also need to consider the upgrade path here: if a catalog exposes tables > without 'connector' options today, we need to make sure that once this FLIP > is implemented no errors are thrown because codepaths assume that physical > storage must exist for such tables (since they were created before the > FLIP). > > > Best > Ingo > > On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li wrote: > > > Hi Ingo and wenlong, > > > > Thanks for your feedback. Very good
[jira] [Created] (FLINK-24627) Add some generic junit5 extensions to replace junit4 rules
Hang Ruan created FLINK-24627: - Summary: Add some generic junit5 extensions to replace junit4 rules Key: FLINK-24627 URL: https://issues.apache.org/jira/browse/FLINK-24627 Project: Flink Issue Type: Improvement Components: Tests Reporter: Hang Ruan We have to use junit5 extensions to replace the existed junit4 rules in order to change tests to junit5 in flink. There are some generic rules that should be provided in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage
Hi Jingsong, thanks again for the answers. I think requiring catalogs to implement an interface to support BDTs is something we'll need (though personally I still prefer explicit DDL here over the "no connector option" approach). What about more edge cases like CREATE TABLE T (f0 INT); ALTER TABLE T SET ('connector' = '…'); This would have to first create the physical storage and then delete it again, right? On a separate note, he FLIP currently only discusses SQL DDL, and you have also mentioned > BDT only can be dropped by Flink SQL DDL now. Something Flink suffers from a lot is inconsistencies across APIs. I think it is important that we support features on all major APIs, i.e. including Table API. For example for creating a BDT this would mean e.g. adding something like #forManaged(…) to TableDescriptor. Best Ingo On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li wrote: > Hi Ingo, > > I thought again. > > I'll try to sort out the current catalog behaviors. > Actually, we can divide catalogs into three categories: > > 1. ExternalCatalog: it can only read or create a single table kind > which connects to external storage. TableFactory is provided by > Catalog, which can have nothing to do with Flink's Factory discovery > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc. > Catalog manages the life cycle of its **managed** tables, which means > that creation and drop will affect the real physical storage. The DDL > has no "connector" option. > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and > factories are created through Flink's factory discovery mechanism. At > this time, the catalog is actually only a storage medium for saving > schema and options, such as GenericInMemoryCatalog. Catalog only saves > meta information and does not manage the underlying physical storage > of tables. These tables are **unmanaged**. The DDL must have a > "connector" option. > > 3. HybridCatalog: It can save both its own **managed** table and > generic Flink **unmanaged** table, such as HiveCatalog. > > We want to use the "connector" option to distinguish whether it is > managed or not. > > Now, consider the Flink managed table in this FLIP. > a. ExternalCatalog can not support Flink managed tables. > b. GenericCatalog can support Flink managed tables without the > "connector" option. > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to > support Flink managed tables: > - with "connector" option in Flink dialect is unmanaged tables > - Hive DDL in Hive dialect is Hive managed tables, the parser will add > "connector = hive" automatically. At present, there are many > differences between Flink DDL and Hive DDL, and even their features > have many differences. > - without "connector" option in Flink dialect is Flink managed tables. > > In this way, we can support Flink managed tables while maintaining > compatibility. > > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog. > > ## Back to your question # > > > but we should make it clear that this is a limitation and probably > document how users can clean up the underlying physical storage manually in > this case > > Yes, it's strange that the catalog should manage tables, but some > catalogs don't have this ability. > - For PersistentCatalog, the meta will continue until the underlying > physical storage is deleted. > - For InMemoryCatalog, yes, we should document it for the underlying > physical storage of Flink managed tables. > > > the HiveCatalog doesn't list a 'connector' option for its tables. > > Actually, It can be divided into two steps: create and save: > - When creating a table, the table seen by HiveCatalog must have > "connector = hive", which is the hive table (Hive managed table). You > can see the "HiveCatalog.isHiveTable". > - When saving the table, it will remove the connector of the hive > table. We can do this: with "connector" option is Flink generic table, > without "connector" option is Hive table, with "flink-managed = true" > is Flink managed table. > > Best, > Jingsong Lee > > On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk wrote: > > > > Hi JingSong, > > > > thank you for the answers! > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > Maybe I'm misunderstanding, but that's only true from the Flink side. > What > > I meant is that a table could disappear from a catalog entirely outside > of > > Flink. As a simple example, consider a catalog which represents an IMAP > > mail server and each folder as a table. If a folder is deleted from the > > mail account, the table would disappear, but Flink would have no way of > > knowing that. I don't see a way around this problem, to be honest, but we > > should make it clear that this is a limitation and probably document how > > users can clean up the underlying physical storage manually in this case? > > > > > - Option 1: Create table without the connector option, the table will > > > be f