I think I have identified the issue. As I mentioned earlier, the deprecated 
`TableEnvironment#registerTable` (which is equivalent to 
`TableEnvironment#createTemporaryView`) only creates a temporary table. You 
need to use the existing createTable method to create a persistent table. After 
creating the table in the file catalog using the Java/Scala code, you should be 
able to see the table in the SQL client. Once you write a DataStream into this 
table from Java/Scala, you should be able to see the data in the SQL client as 
well.




I have tested the code as below:

```

CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
final EnvironmentSettings settings =
    
EnvironmentSettings.newInstance().inStreamingMode().withCatalogStore(catalogStore).build();
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);

tableEnv.executeSql("drop catalog if exists fscat");
Map<String, String> catalogConf = new HashMap<>();
catalogConf.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
TestFileSystemTableFactory.IDENTIFIER);
catalogConf.put(PATH.key(), BASE_PATH + "catalog/");
catalogConf.put(CommonCatalogOptions.DEFAULT_DATABASE_KEY, "fsdb");
tableEnv.createCatalog(
"fscat", CatalogDescriptor.of("fscat", Configuration.fromMap(catalogConf)));
// or use ddl to create catalog
//    tableEnv.executeSql(
//        "CREATE CATALOG fscat with ("
//            + "'type' = 'test-filesystem', "
//            + "'path' = 'xxx', "
//            + "'default-database' = 'fsdb')");

tableEnv.useCatalog("fscat");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");

tableEnv.executeSql("drop table if exists fscat.fsdb.RandomNumbers");

DataStream<Integer> randomSource =
    env.addSource(
new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
for (int i = 0; i < 100; i++) {
sourceContext.collect(i);
}
}

@Override
public void cancel() {}
});

Table table = tableEnv.fromDataStream(randomSource);
randomSource.print();
// should not register a temporal table
//    tableEnv.registerTable("fscat.fsdb.RandomNumbers", table);
tableEnv.createTable(
"fscat.fsdb.RandomNumbers",
TableDescriptor.forConnector(TestFileSystemTableFactory.IDENTIFIER)
.schema(Schema.newBuilder().column("data", DataTypes.INT()).build())
.option("path", BASE_PATH + "data/")
.option(FactoryUtil.FORMAT.key(), "csv")
.build());
// or use ddl to create table
//    tableEnv.executeSql(
//        "CREATE TABLE fscat.fsdb.RandomNumbers ("
//            + "data INT"
//            + ") WITH ("
//            + "  'connector' = 'test-filesystem',"
//            + "  'path' = 'xxx',"
//            + "  'format' = 'csv'"
//            + ")");
table.executeInsert("fscat.fsdb.RandomNumbers").await();

System.err.println("=======");
tableEnv.executeSql("select * from fscat.fsdb.RandomNumbers").print();
env.execute("Flink Random Sequence Generator environment: TEST");

```

When using `./sql-client.sh -Dtable.catalog-store.kind=file 
-Dtable.catalog-store.file.path=xxx` to start the sql client, you can find the 
table finally.

```

Flink SQL> show catalogs;
        +-----------------+
        |    catalog name |
        +-----------------+
        | default_catalog |
        |           fscat |
        +-----------------+
2 rows in set

Flink SQL> use catalog fscat;
[INFO] Execute statement succeeded.

Flink SQL> show tables;
        +---------------+
        |    table name |
        +---------------+
        | RandomNumbers |
        +---------------+
1 row in set


```

Please do not forget to package all classes about TestFileSystemCatalogFactory, 
TestFileSystemCatalog and  TestFileSystemTableFactory as a jar and add the SPI 
file for TestFileSystemCatalogFactory and TestFileSystemTableFactory, and then 
copy this jar under the dir `lib`.




--

    Best!
    Xuyang




At 2025-01-07 23:56:18, "Vinay Agarwal" <vink...@gmail.com> wrote:

Still the same.
$ sql-client.sh -Dtable.catalog-store.kind=file 
-Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/

______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /Users/vagarwal/.flink-sql-history

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

Flink SQL> show tables;
Empty set

Flink SQL> show jobs;
+----------------------------------+----------------------------------------------------+---------+-------------------------+
|                           job id |                                           
job name |  status |              start time |
+----------------------------------+----------------------------------------------------+---------+-------------------------+
| 663f71ec8246a7b7f8023dba3acfc5fe | Flink Random Sequence Generator 
environment: local | RUNNING | 2025-01-07T15:51:37.160 |
+----------------------------------+----------------------------------------------------+---------+-------------------------+
1 row in set



On Mon, Jan 6, 2025 at 6:00 PM Xuyang <xyzhong...@163.com> wrote:


What about using "./bin/start-cluster.sh -Dtable.catalog-store.kind=file 
-Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/" to start sql 
client?




--

    Best!
    Xuyang




在 2025-01-07 00:30:19,"Vinay Agarwal" <vink...@gmail.com> 写道:

Still doesn't show.


```SQL
Flink SQL> set 'table.catalog-store.kind' = 'file';
[INFO] Execute statement succeeded.

Flink SQL>
> set 'table.catalog-store.file.path' = '/Users/vagarwal/tmp/flink/store/';
[INFO] Execute statement succeeded.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> show tables;
Empty set

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set
```


Thanks for all your help.
Vinay


On Mon, Jan 6, 2025 at 2:05 AM Xuyang <xyzhong...@163.com> wrote:


Can you try to use `SET` command[1] in sql client to set the configs related 
catalog store?




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-configuration

[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#configure-catalog-store




--

    Best!
    Xuyang




在 2025-01-05 05:59:22,"Vinay Agarwal" <vink...@gmail.com> 写道:

I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the 
documentation) and created a catalog store and a file system catalog. But still 
don't see my catalog or table in SQL client. My code is 
CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
final EnvironmentSettings settings = EnvironmentSettings.newInstance()
   .inStreamingMode()
   .withCatalogStore(catalogStore)
   .build();
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + 
"catalog/", "fscat", "fsdb");
tableEnv.registerCatalog("fscat", fsCatalog);
tableEnv.useCatalog("fscat");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");

DataStream<RandomNumber> randomSource = env.addSource(new RandomNumberSource());
Table table = tableEnv.fromDataStream(randomSource);
randomSource.print();
tableEnv.registerTable("fscat.fsdb.RandomNumbers", table);

env.execute("Flink Random Sequence Generator environment: " + environment);



SQL Client
```SQL
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

Flink SQL> show tables;
Empty set
```


I thought setting the catalogstore in the SQL client might help but I can't 
find a way to do that. 


Thanks,
Vinay


On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote:


How about porting the `TestFileSystemCatalogFactory` back to 1.19 and rebuild 
this catalog jar?




--

    Best!
    Xuyang




在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道:

Thanks again for your answer. I have to use Flink version 1.20.0 because 
`TestFileSystemCatalogFactory` doesn't exist in prior versions. Unfortunately, 
I am not able to run 1.20.0 due to the following error. (Version 1.91.1 works 
just fine.)


```
14:19:47.094 [main] INFO  
org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system started 
at pekko://flink
14:19:47.104 [main] INFO  
org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Trying to start 
local actor system
14:19:47.113 [flink-metrics-8] INFO  org.apache.pekko.event.slf4j.Slf4jLogger  
- Slf4jLogger started
14:19:47.119 [main] INFO  
org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system started 
at pekko://flink-metrics
14:19:47.128 [main] INFO  org.apache.flink.runtime.rpc.pekko.PekkoRpcService  - 
Starting RPC endpoint for 
org.apache.flink.runtime.metrics.dump.MetricQueryService at 
pekko://flink-metrics/user/rpc/MetricQueryService .
14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN  
org.apache.flink.runtime.rpc.pekko.SupervisorActor  - RpcActor 
pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down 
now.
java.lang.NullPointerException: null
at java.base/java.util.HashMap.putMapEntries(HashMap.java:497)
at java.base/java.util.HashMap.putAll(HashMap.java:781)
at java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604)
at 
ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197)
at org.slf4j.MDC.setContextMap(MDC.java:264)
at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

```
How can I get around this error?


Thanks again.
Vinay


On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote:


As far as I know, it's not possible to do without using the Catalog. 

Oh... I forgot that the FileSystemCatalog is currently only used for 
testing[1]. One approach is to implement and package a FileSystemCatalog by 
refering this testing catalog.




[1] 
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java




--

    Best!
    Xuyang




在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道:

Thanks Xuyang for your answer. I, however, can't find a way to create a 
filesystem catalog except by using Hive which seems to require an external Hive 
installation. I was hoping to find a solution that doesn't require external 
dependencies. Is that possible?


Vinay


On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote:


Hi. 

In Java/Scala code, using `registerTable` and then querying in the SQL client 
will not work, as registerTable only creates a temporal table. 

To fulfill your requirements, you may need to use a persistent catalog[1] (such 
as a filesystem catalog) where you can create tables in your Java/Scala code. 
Once the table is created under that catalog, you can then query it from the 
SQL client within the same catalog[2]. For example:

To avoid creating and using the same catalog twice in both Java/Scala code and 
the SQL client, you can leverage a catalog store(In higher versions only)[3].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python

[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl

[3] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store




--

    Best!
    Xuyang




At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote:

Hello,


I am looking for an elaboration, or work around, of an old answer to this 
question here 
(https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql)
 which mentions that registering the table can make it visible to the SQL 
client. 


I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, put its 
`bin` folder in path, and started the cluster using `start-cluster.sh`.

I created a simple Flink job that generates stream of random integers and 
creates a table as follows:

```Java
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    // Create a DataStream of random integers
    DataStream<Integer> intStream = env.addSource(new RandomIntegerSource());
    // Convert DataStream to Table
    Table table = tableEnv.fromDataStream(intStream, "number");
    // Register the table
    tableEnv.registerTable("RandomNumbers", table);
    // Execute the job
    env.execute("Random Integer Job");

```
Then, I started `sql-client.sh` and queried as follows
```SQL
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> USE CATALOG default_catalog;
[INFO] Execute statement succeed.

Flink SQL> SHOW databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

Flink SQL> USE default_database;
[INFO] Execute statement succeed.

Flink SQL> SHOW tables;
Empty set
```
As can be seen, I don't see `RandomNumbers` table.

How can I make that table visible to SQL client?

Reply via email to