[ 
https://issues.apache.org/jira/browse/FLINK-31198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rascal Wu updated FLINK-31198:
------------------------------
    Description: 
Hi, I meet a wired problem about using Flink connector via ./bin/sql-client.sh 
-j connector.jar in Flink 1.16.1. it seems that using incorrect class loader to 
load user classes, but it's working in Flink 1.15, In Flink 1.16, I have to put 
the connector jar into lib folder as the system jar.

 

The default info as follow, I tried test this issue in a standlone cluster, and 
try test hudi/iceberg/flink table store cases, flink table store workd well, 
but others failed.

1. start a standlone cluster without native lib jar

`./bin/start-cluster.sh`

2. create a flink table store catalog succeed.

```

./bin/sql-client.sh embedded -j flink-table-store-dist-0.3.0.jar
 
CREATE CATALOG fts WITH (
'type'='table-store',
'warehouse'='file:///Users/xxxxx/Downloads/flink-1.16.1/data/fts'
);
```

!image-2023-02-23-20-04-01-936.png!

3. create a iceberg hadoop catalog failed

```

./bin/sql-client.sh embedded -j iceberg-flink-runtime-1.16-1.1.0.jar
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='file:///Users/xxxxx/Downloads/flink-1.16.1/data/iceberg/',
'property-version'='1'
);
```

!image-2023-02-23-20-08-54-885.png!

but iceberf FlinkCatalogFactory is loaded by flink user class loader, it's 
wried. !image-2023-02-23-20-10-27-274.png!

4. create hudi catalog failed

```

./bin/sql-client.sh embedded -j hudi-flink1.16-bundle-0.13.0.jar

 
CREATE CATALOG dfs_catalog WITH (
'type'='hudi',
'catalog.path'='file:///Users/xxxx/Downloads/flink-1.16.1/data/'
);
 
USE CATALOG {*}dfs_catalog{*};
CREATE DATABASE hudi_dfs_db;
USE hudi_dfs_db;
 
CREATE TABLE flink_hudi_mor_tbl(
 uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
 name VARCHAR(10),
 age INT,
 ts TIMESTAMP(3),
 `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'precombine.field' = 'ts'
);
 
INSERT INTO `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl` VALUES 
('id31','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');```
As similar with Iceberg case, using flink hudi connector can create dfs catalog 
succeed, but insert data failed as HoodieRecord not found. but flink 1.15.1 
works.

!image-2023-02-23-20-14-24-895.png!

!image-2023-02-23-20-21-52-113.png!

After investigating the reason, I found that there is a change from 
https://issues.apache.org/jira/browse/FLINK-28451, which change some logic 
about parse.parse(statement), In flink 1.15, the parse oepration was been 
wrapped by `context.wrapClassLoader(() -> parser.parse(statement))`, which set 
the flink classloader in the thread, so it can loader the connector classes 
succeed, but Flink 1.16 doesn't work, not sure it's the root cause?

 

 

> Class loader problem by incorrect classloader in flink sql 1.16
> ---------------------------------------------------------------
>
>                 Key: FLINK-31198
>                 URL: https://issues.apache.org/jira/browse/FLINK-31198
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Rascal Wu
>            Priority: Major
>         Attachments: image-2023-02-23-19-25-20-679.png, 
> image-2023-02-23-19-58-48-273.png, image-2023-02-23-19-59-37-233.png, 
> image-2023-02-23-20-04-01-936.png, image-2023-02-23-20-08-54-885.png, 
> image-2023-02-23-20-10-27-274.png, image-2023-02-23-20-14-24-895.png, 
> image-2023-02-23-20-21-52-113.png
>
>
> Hi, I meet a wired problem about using Flink connector via 
> ./bin/sql-client.sh -j connector.jar in Flink 1.16.1. it seems that using 
> incorrect class loader to load user classes, but it's working in Flink 1.15, 
> In Flink 1.16, I have to put the connector jar into lib folder as the system 
> jar.
>  
> The default info as follow, I tried test this issue in a standlone cluster, 
> and try test hudi/iceberg/flink table store cases, flink table store workd 
> well, but others failed.
> 1. start a standlone cluster without native lib jar
> `./bin/start-cluster.sh`
> 2. create a flink table store catalog succeed.
> ```
> ./bin/sql-client.sh embedded -j flink-table-store-dist-0.3.0.jar
>  
> CREATE CATALOG fts WITH (
> 'type'='table-store',
> 'warehouse'='file:///Users/xxxxx/Downloads/flink-1.16.1/data/fts'
> );
> ```
> !image-2023-02-23-20-04-01-936.png!
> 3. create a iceberg hadoop catalog failed
> ```
> ./bin/sql-client.sh embedded -j iceberg-flink-runtime-1.16-1.1.0.jar
> CREATE CATALOG hadoop_catalog WITH (
> 'type'='iceberg',
> 'catalog-type'='hadoop',
> 'warehouse'='file:///Users/xxxxx/Downloads/flink-1.16.1/data/iceberg/',
> 'property-version'='1'
> );
> ```
> !image-2023-02-23-20-08-54-885.png!
> but iceberf FlinkCatalogFactory is loaded by flink user class loader, it's 
> wried. !image-2023-02-23-20-10-27-274.png!
> 4. create hudi catalog failed
> ```
> ./bin/sql-client.sh embedded -j hudi-flink1.16-bundle-0.13.0.jar
>  
> CREATE CATALOG dfs_catalog WITH (
> 'type'='hudi',
> 'catalog.path'='file:///Users/xxxx/Downloads/flink-1.16.1/data/'
> );
>  
> USE CATALOG {*}dfs_catalog{*};
> CREATE DATABASE hudi_dfs_db;
> USE hudi_dfs_db;
>  
> CREATE TABLE flink_hudi_mor_tbl(
>  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
>  name VARCHAR(10),
>  age INT,
>  ts TIMESTAMP(3),
>  `partition` VARCHAR(20)
> )
> PARTITIONED BY (`partition`)
> WITH (
> 'connector' = 'hudi',
> 'table.type' = 'MERGE_ON_READ',
> 'hoodie.datasource.write.recordkey.field' = 'uuid',
> 'precombine.field' = 'ts'
> );
>  
> INSERT INTO `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl` VALUES 
> ('id31','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');```
> As similar with Iceberg case, using flink hudi connector can create dfs 
> catalog succeed, but insert data failed as HoodieRecord not found. but flink 
> 1.15.1 works.
> !image-2023-02-23-20-14-24-895.png!
> !image-2023-02-23-20-21-52-113.png!
> After investigating the reason, I found that there is a change from 
> https://issues.apache.org/jira/browse/FLINK-28451, which change some logic 
> about parse.parse(statement), In flink 1.15, the parse oepration was been 
> wrapped by `context.wrapClassLoader(() -> parser.parse(statement))`, which 
> set the flink classloader in the thread, so it can loader the connector 
> classes succeed, but Flink 1.16 doesn't work, not sure it's the root cause?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to