Hi everyone,

sorry for the late reply, feature freeze kept me busy. Mayank, Hao and I synced offline and came up we an improved proposal. Before we update the FLIP let me summarize the most important key facts that hopefully address most concerns:

1) SecretStore
- Similar to CatalogStore, we introduce a SecretStore as the highest level in TableEnvironment. - SecretStore is initialized with options and potentially environment variables. Including EnvironmentSettings.withSecretStore(SecretStore). - The SecretStore is pluggable and discovered using the regular factory-approach. - For example, it could implement Azure Key Vault or other cloud provider secrets stores.
- Goal: Flink and Flink catalogs do not have to deal with sensitive data.

2) Connections
- Connections are catalog objects identified with 3-part identifiers. 3-part identifiers are crucial for managability of larger projects and align with existing catalog objects. - They contain connection details, e.g. URL, query parameters, and other configuration. - They do not contain secrets, but only pointers to secrets in the SecretStore.

3) Connection DDL

CREATE [TEMPORARY] CONNECTION mycat.mydb.OpenAPI WITH (
  'type' = 'basic' | 'bearer' | 'jwt' | 'oauth' | ...,
  ...
)

- Connection type is pluggable and discovered using the regular factory-approach.
- The factory extracts secrets and puts them into SecretStore.
- The factory only leaves non-confidential options left that can be stored in a catalog.

When executing:
CREATE [TEMPORARY] CONNECTION mycat.mydb.OpenAPI WITH (
  'type' = 'basic',
  'url' = 'api.example.com',
  'username' = 'bob',
  'password' = 'xyz'
)

The catalog will receive something similar to:
CREATE [TEMPORARY] CONNECTION mycat.mydb.OpenAPI WITH (
  'type' = 'basic',
  'url' = 'api.example.com',
  'secret.store' = 'azure-key-vault'
  'secret.id' = 'secretId'
)

- However, the exact property design is up to the connection factory.

4) Connection Usage

CREATE TABLE t (...) USING CONNECTION mycat.mydb.OpenAPI;

- MODEL, FUNCTION, TABLE DDL will support USING CONNECTION keyword similar to BigQuery. - The connection will be provided in a table/model provider/function definition factory.

5) CatalogStore / Catalog Initialization

Catalog store or catalog can make use of SecretStore to retrieve initial credentials for bootstrapping. All objects lower then catalog store/catalog can then use connections. If you think we still need system level connections, we can support CREATE SYSTEM CONNECTION GlobalName WITH (..) similar to SYSTEM functions directly store in a ConnectioManager in TableEnvironment. But for now I would suggest to start simple with per-catalog connections and later evolve the design.

Dealing with secrets is a very sensitive topic and I'm clearly not an expert on it. This is why we should try to push the problem to existing solutions and don't start storing secrets in Flink in any way. Thus, the interfaces will be defined very generic.

Looking forward to your feedback.

Cheers,
Timo





On 09.06.25 04:01, Leonard Xu wrote:
Thanks  Timo for joining this thread.

I agree that this feature is needed by the community; the current disagreement 
is only about the implementation method or solution.
Your thoughts looks generally good to me, looking forward to your proposal.

Best,
Leonard

2025 6月 6 22:46,Timo Walther <twal...@apache.org> 写道:

Hi everyone,

thanks for this healthy discussion. Looking at high number of participants, it looks like 
we definitely want this feature. We just need to figure out the "how".

This reminds me very much of the discussion we had for CREATE FUNCTION. There, 
we discussed whether functions should be named globally or catalog-specific. In 
the end, we decided for both `CREATE SYSTEM FUNCTION` and `CREATE FUNCTION`, 
satisfying both the data platform team of an organization (which might provide 
system functions) and individual data teams or use cases (scoped by 
catalog/database).

Looking at other modern vendors like Snowflake there is SECRET (scoped to 
schema) [1] and API INTEGRATION [2] (scoped to account). So also other vendors 
offer global and per-team / per-use case connections details.

In general, I think fitting connections into the existing concepts for catalog 
objects (with three-part identifier) makes managing them easier. But I also see 
the need for global defaults.

Btw keep in mind that a catalog implementation should only store metadata. 
Similar how a CatalogTable doesn't store the actual data, a CatalogConnection 
should not store the credentials. It should only offer a factory that allows 
for storing and retrieving them. In real world scenarios a factory is most 
likely backed by a product like Azure Key Vault.

So code-wise having a ConnectionManager that behaves similar to FunctionManager 
sounds reasonable.

+1 for having special syntax instead of using properties. This allows to access 
connections in tables, models, functions. And catalogs, if we agree to have 
global ones as well.

What do you think?

Let me spend some more thoughts on this and come back with a concrete proposal 
by early next week.

Cheers,
Timo

[1] https://docs.snowflake.com/en/sql-reference/sql/create-secret
[2] https://docs.snowflake.com/en/sql-reference/sql/create-api-integration

On 04.06.25 10:47, Leonard Xu wrote:
Hey,Mayank
Please see my feedback as following:
1. One of the motivations of this FLIP is to improve security. However, the 
current design stores all connection information in the catalog,
and each Flink SQL job reads from the catalog during compilation. The 
connection information is passed between SQL Gateway and the
catalog in plaintext, which actually introduces new security risks.
2. The name "Connection" should be changed to something like ConnectionSpec to 
clearly indicate that it is a object containing only static
properties without a lifecycle. Putting aside the naming issue, I think the 
current model and hierarchy design is somewhat strange. Storing
various kinds of connections (e.g., Kafka, MySQL) in the same Catalog with 
hierarchical identifiers like catalog-name.db-name.connection-name
raises the following questions:
 (1) What is the purpose of this hierarchical structure of Connection object ?
 (2) If we can use a Connection to create a MySQL table, why can't we use a 
Connection to create a MySQL Catalog?
3. Regarding the connector usage examples given in this FLIP:
```sql
1  -- Example 2: Using connection for jdbc tables
2  CREATE OR REPLACE CONNECTION mysql_customer_db
3  WITH (
4    'type' = 'jdbc',
5    'jdbc.url' = 'jdbc:mysql://customer-db.example.com:3306/customerdb',
6    'jdbc.connection.ssl.enabled' = 'true'
7  );
8
9  CREATE TABLE customers (
10   customer_id INT,
11   PRIMARY KEY (customer_id) NOT ENFORCED
12 ) WITH (
13   'connector' = 'jdbc',
14   'jdbc.connection' = 'mysql_customer_db',
15   'jdbc.connection.ssl.enabled' = 'true',
16   'jdbc.connection.max-retry-timeout' = '60s',
17   'jdbc.table-name' = 'customers',
18   'jdbc.lookup.cache' = 'PARTIAL'
19 );
```
I see three issues from SQL semantics and Connector compatibility perspectives:
(1) Look at line 14: `mysql_customer_db` is an object identifier of a 
CONNECTION defined in SQL. However, this identifier is referenced
     via a string value inside the table’s WITH clause, which feel hack for me.
(2) Look at lines 14–16: the use of the specific prefix `jdbc.connection` will 
confuse users because `connection.xx` maybe already used as
  a prefix for existing configuration items.
(3) Look at lines 14–18: Why do all existing configuration options need to be 
prefixed with `jdbc`, even they’re not related to Connection properties?
This completely changes user habits — is it backward compatible?
  In my opinion, Connection should be a model independent of both Catalog and 
Table, and can be referenced by all catalog/table/udf/model object.
It should be managed by a Component such as a ConnectionManager to enable 
reuse. For security purposes, authentication mechanisms could
be supported within the ConnectionManager.
Best,
Leonard
2025 6月 4 02:04,Martijn Visser <martijnvis...@apache.org> 写道:

Hi all,

First of all, I think having a Connection resource is something that will
be beneficial for Apache Flink. I could see that being extended in the
future to allow for easier secret handling [1].
In my mental mind, I'm comparing this proposal against SQL/MED from the ISO
standard [2]. I do think that SQL/MED isn't a very user friendly syntax
though, looking at Postgres for example [3].

I think it's a valid question if Connection should be considered with a
catalog or database-level scope. @Ryan can you share something more, since
you've mentioned "Note: I much prefer catalogs for this case. Which is what
we use internally to manage connection properties". It looks like there
isn't a strong favourable approach looking at other vendors (like,
Databricks does scopes it on a Unity catalog, Snowflake on a database
level).

Also looking forward to Leonard's input.

Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-36818
[2] https://www.iso.org/standard/84804.html
[3] https://www.postgresql.org/docs/current/sql-createserver.html

On Fri, May 30, 2025 at 5:07 AM Leonard Xu <xbjt...@gmail.com> wrote:

Hey Mayank.

Thanks for the FLIP, I went through this FLIP quickly and found some
issues which I think we
need to deep discuss later. As we’re on a short Dragon boat Festival,
could you kindly hold
on this thread? and we will back to continue the FLIP discuss.

Best,
Leonard


2025 4月 29 23:07,Mayank Juneja <mayankjunej...@gmail.com> 写道:

Hi all,

I would like to open up for discussion a new FLIP-529 [1].

Motivation:
Currently, Flink SQL handles external connectivity by defining endpoints
and credentials in table configuration. This approach prevents
reusability
of these connections and makes table definition less secure by exposing
sensitive information.
We propose the introduction of a new "connection" resource in Flink. This
will be a pluggable resource configured with a remote endpoint and
associated access key. Once defined, connections can be reused across
table
definitions, and eventually for model definition (as discussed in
FLIP-437)
for inference, enabling seamless and secure integration with external
systems.
The connection resource will provide a new, optional way to manage
external
connectivity in Flink. Existing methods for table definitions will remain
unchanged.

[1] https://cwiki.apache.org/confluence/x/cYroF

Best Regards,
Mayank Juneja





Reply via email to