Thanks Xuyang, that shows table in the SQL client correctly! Two follow
up questions.

   1. When I query that table from the SQL client, it seems to just wait
   for something. How can that query work?

```
Flink SQL> show tables;
+---------------+
|    table name |
+---------------+
| RandomNumbers |
+---------------+
1 row in set

Flink SQL> select * from RandomNumbers;
[ERROR] Could not execute SQL statement. Reason:
java.lang.InterruptedException: sleep interrupted
```

2. I tried modifying the number generator from a limit of 100 to a never
ending sequence. The code is as follows:

   CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
   final EnvironmentSettings settings =
     
EnvironmentSettings.newInstance().inStreamingMode().withCatalogStore(catalogStore).build();
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env, settings);

   tableEnv.executeSql("drop catalog if exists fscat");
   Map<String, String> catalogConf = new HashMap<>();
   catalogConf.put(CommonCatalogOptions.CATALOG_TYPE.key(),
TestFileSystemTableFactory.IDENTIFIER);
   catalogConf.put(PATH.key(), BASE_PATH + "catalog/");
   catalogConf.put(CommonCatalogOptions.DEFAULT_DATABASE_KEY, "fsdb");
   tableEnv.createCatalog(
     "fscat", CatalogDescriptor.of("fscat",
Configuration.fromMap(catalogConf)));

   tableEnv.useCatalog("fscat");
   tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");

   tableEnv.executeSql("drop table if exists fscat.fsdb.RandomNumbers");

   DataStream<Integer> randomSource =
     env.addSource(
       new SourceFunction<Integer>() {
        private static final int BOUND = 100;
        private Random rnd = new Random();
        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
         while (true) {
          sourceContext.collect(rnd.nextInt(BOUND));
          Thread.sleep(1000L);
         }
        }

        @Override
        public void cancel() {}
       });

   Table table = tableEnv.fromDataStream(randomSource);
   tableEnv.createTable(
     "fscat.fsdb.RandomNumbers",
     TableDescriptor.forConnector(TestFileSystemTableFactory.IDENTIFIER)
       .schema(Schema.newBuilder().column("data", DataTypes.INT()).build())
       .option("path", BASE_PATH + "data/")
       .option(FactoryUtil.FORMAT.key(), "csv")
       .build());
//   table.executeInsert("fscat.fsdb.RandomNumbers");

   System.err.println("=======");
//   tableEnv.executeSql("select * from fscat.fsdb.RandomNumbers").print();
   randomSource.print();

   // execute program
   env.execute("Flink Random Sequence Generator");

The logs show random numbers being generated and SQL client shows the
table. But when I try to query the `RandomNumbers` table in the SQL client,
I get
```
Flink SQL> select * from RandomNumbers;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [POST request not
allowed]
```

and in the logs I see
```
2025-01-10 08:10:48,327 WARN
 org.apache.flink.runtime.rest.FileUploadHandler              [] - POST
request not allowed
```

The BASE_PATH + "data/" does exist.
```
$ ls -l /Users/vagarwal/tmp/flink/data/
total 0
```

What am I doing wrong?

Thanks again for going out of your way to help out!



On Wed, Jan 8, 2025 at 10:18 PM Xuyang <xyzhong...@163.com> wrote:

> I think I have identified the issue. As I mentioned earlier, the
> deprecated `TableEnvironment#registerTable` (which is equivalent to
> `TableEnvironment#createTemporaryView`) only creates a *temporary* table.
> You need to use the existing createTable method to create a persistent
> table. After creating the table in the file catalog using the Java/Scala
> code, you should be able to see the table in the SQL client. Once you write
> a DataStream into this table from Java/Scala, you should be able to see the
> data in the SQL client as well.
>
>
> I have tested the code as below:
>
> ```
>
> CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
> final EnvironmentSettings settings =
>     
> EnvironmentSettings.newInstance().inStreamingMode().withCatalogStore(catalogStore).build();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
>
> tableEnv.executeSql("drop catalog if exists fscat");
> Map<String, String> catalogConf = new HashMap<>();
> catalogConf.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
> TestFileSystemTableFactory.IDENTIFIER);
> catalogConf.put(PATH.key(), BASE_PATH + "catalog/");
> catalogConf.put(CommonCatalogOptions.DEFAULT_DATABASE_KEY, "fsdb");
> tableEnv.createCatalog(
>     "fscat", CatalogDescriptor.of("fscat", 
> Configuration.fromMap(catalogConf)));
> // or use ddl to create catalog
> //    tableEnv.executeSql(
> //        "CREATE CATALOG fscat with ("
> //            + "'type' = 'test-filesystem', "
> //            + "'path' = 'xxx', "
> //            + "'default-database' = 'fsdb')");
>
> tableEnv.useCatalog("fscat");
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");
>
> tableEnv.executeSql("drop table if exists fscat.fsdb.RandomNumbers");
>
> DataStream<Integer> randomSource =
>     env.addSource(
>         new SourceFunction<Integer>() {
>           @Override
>           public void run(SourceContext<Integer> sourceContext) throws 
> Exception {
>             for (int i = 0; i < 100; i++) {
>               sourceContext.collect(i);
>             }
>           }
>
>           @Override
>           public void cancel() {}
>         });
>
> Table table = tableEnv.fromDataStream(randomSource);
> randomSource.print();
> // should not register a temporal table
> //    tableEnv.registerTable("fscat.fsdb.RandomNumbers", table);
> tableEnv.createTable(
>     "fscat.fsdb.RandomNumbers",
>     TableDescriptor.forConnector(TestFileSystemTableFactory.IDENTIFIER)
>         .schema(Schema.newBuilder().column("data", DataTypes.INT()).build())
>         .option("path", BASE_PATH + "data/")
>         .option(FactoryUtil.FORMAT.key(), "csv")
>         .build());
> // or use ddl to create table
> //    tableEnv.executeSql(
> //        "CREATE TABLE fscat.fsdb.RandomNumbers ("
> //            + "data INT"
> //            + ") WITH ("
> //            + "  'connector' = 'test-filesystem',"
> //            + "  'path' = 'xxx',"
> //            + "  'format' = 'csv'"
> //            + ")");
> table.executeInsert("fscat.fsdb.RandomNumbers").await();
>
> System.err.println("=======");
> tableEnv.executeSql("select * from fscat.fsdb.RandomNumbers").print();
>
> env.execute("Flink Random Sequence Generator environment: TEST");
>
> ```
>
> When using `./sql-client.sh -Dtable.catalog-store.kind=file
> -Dtable.catalog-store.file.path=xxx` to start the sql client, you can find
> the table finally.
>
> ```
>
> Flink SQL> show catalogs;
>         +-----------------+
>         |    catalog name |
>         +-----------------+
>         | default_catalog |
>         |           fscat |
>         +-----------------+
>         2 rows in set
>
> Flink SQL> use catalog fscat;
> [INFO] Execute statement succeeded.
>
> Flink SQL> show tables;
>         +---------------+
>         |    table name |
>         +---------------+
>         | RandomNumbers |
>         +---------------+
>         1 row in set
>
> ```
>
> Please do not forget to package all classes
> about TestFileSystemCatalogFactory, TestFileSystemCatalog
> and  TestFileSystemTableFactory as a jar and add the SPI file for
> TestFileSystemCatalogFactory and TestFileSystemTableFactory, and then copy
> this jar under the dir `lib`.
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2025-01-07 23:56:18, "Vinay Agarwal" <vink...@gmail.com> wrote:
>
> Still the same.
>
> $ sql-client.sh -Dtable.catalog-store.kind=file 
> -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/
>
>     ______ _ _       _       _____  ____  _         _____ _ _            _  
> BETA
>    |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
>    | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
>    |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
>    | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
>    |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
>
>         Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to 
> exit.
>
> Command history file path: /Users/vagarwal/.flink-sql-history
>
> Flink SQL> show catalogs;
> +-----------------+
> |    catalog name |
> +-----------------+
> | default_catalog |
> +-----------------+
> 1 row in set
>
> Flink SQL> show databases;
> +------------------+
> |    database name |
> +------------------+
> | default_database |
> +------------------+
> 1 row in set
>
> Flink SQL> show tables;
> Empty set
>
> Flink SQL> show jobs;
> +----------------------------------+----------------------------------------------------+---------+-------------------------+
> |                           job id |                                          
>  job name |  status |              start time |
> +----------------------------------+----------------------------------------------------+---------+-------------------------+
> | 663f71ec8246a7b7f8023dba3acfc5fe | Flink Random Sequence Generator 
> environment: local | RUNNING | 2025-01-07T15:51:37.160 |
> +----------------------------------+----------------------------------------------------+---------+-------------------------+
> 1 row in set
>
>
> On Mon, Jan 6, 2025 at 6:00 PM Xuyang <xyzhong...@163.com> wrote:
>
>> What about using "./bin/start-cluster.sh -Dtable.catalog-store.kind=file
>> -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/" to start
>> sql client?
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> 在 2025-01-07 00:30:19,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>
>> Still doesn't show.
>>
>> ```SQL
>> Flink SQL> set 'table.catalog-store.kind' = 'file';
>> [INFO] Execute statement succeeded.
>>
>> Flink SQL>
>> > set 'table.catalog-store.file.path' =
>> '/Users/vagarwal/tmp/flink/store/';
>> [INFO] Execute statement succeeded.
>>
>> Flink SQL> show catalogs;
>> +-----------------+
>> |    catalog name |
>> +-----------------+
>> | default_catalog |
>> +-----------------+
>> 1 row in set
>>
>> Flink SQL> show tables;
>> Empty set
>>
>> Flink SQL> show databases;
>> +------------------+
>> |    database name |
>> +------------------+
>> | default_database |
>> +------------------+
>> 1 row in set
>> ```
>>
>> Thanks for all your help.
>> Vinay
>>
>> On Mon, Jan 6, 2025 at 2:05 AM Xuyang <xyzhong...@163.com> wrote:
>>
>>> Can you try to use `SET` command[1] in sql client to set the configs
>>> related catalog store?
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-configuration
>>>
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#configure-catalog-store
>>>
>>>
>>> --
>>>     Best!
>>>     Xuyang
>>>
>>>
>>> 在 2025-01-05 05:59:22,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>>
>>> I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the
>>> documentation) and created a catalog store and a file system catalog. But
>>> still don't see my catalog or table in SQL client. My code is
>>>
>>> CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
>>> final EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>    .inStreamingMode()
>>>    .withCatalogStore(catalogStore)
>>>    .build();
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
>>> settings);
>>> TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + 
>>> "catalog/", "fscat", "fsdb");
>>> tableEnv.registerCatalog("fscat", fsCatalog);
>>> tableEnv.useCatalog("fscat");
>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");
>>>
>>> DataStream<RandomNumber> randomSource = env.addSource(new 
>>> RandomNumberSource());
>>> Table table = tableEnv.fromDataStream(randomSource);
>>> randomSource.print();
>>> tableEnv.registerTable("fscat.fsdb.RandomNumbers", table);
>>>
>>> env.execute("Flink Random Sequence Generator environment: " + environment);
>>>
>>>
>>> SQL Client
>>> ```SQL
>>> Flink SQL> show catalogs;
>>> +-----------------+
>>> |    catalog name |
>>> +-----------------+
>>> | default_catalog |
>>> +-----------------+
>>> 1 row in set
>>>
>>> Flink SQL> show databases;
>>> +------------------+
>>> |    database name |
>>> +------------------+
>>> | default_database |
>>> +------------------+
>>> 1 row in set
>>>
>>> Flink SQL> show tables;
>>> Empty set
>>> ```
>>>
>>> I thought setting the catalogstore in the SQL client might help but I
>>> can't find a way to do that.
>>>
>>> Thanks,
>>> Vinay
>>>
>>> On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote:
>>>
>>>> How about porting the `TestFileSystemCatalogFactory` back to 1.19 and
>>>> rebuild this catalog jar?
>>>>
>>>>
>>>> --
>>>>     Best!
>>>>     Xuyang
>>>>
>>>>
>>>> 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>>>
>>>> Thanks again for your answer. I have to use Flink version 1.20.0
>>>> because `TestFileSystemCatalogFactory` doesn't exist in prior versions.
>>>> Unfortunately, I am not able to run 1.20.0 due to the following error.
>>>> (Version 1.91.1 works just fine.)
>>>>
>>>> ```
>>>> 14:19:47.094 [main] INFO
>>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system
>>>> started at pekko://flink
>>>> 14:19:47.104 [main] INFO
>>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Trying to start
>>>> local actor system
>>>> 14:19:47.113 [flink-metrics-8] INFO
>>>>  org.apache.pekko.event.slf4j.Slf4jLogger  - Slf4jLogger started
>>>> 14:19:47.119 [main] INFO
>>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system
>>>> started at pekko://flink-metrics
>>>> 14:19:47.128 [main] INFO
>>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcService  - Starting RPC
>>>> endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at
>>>> pekko://flink-metrics/user/rpc/MetricQueryService .
>>>> 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN
>>>>  org.apache.flink.runtime.rpc.pekko.SupervisorActor  - RpcActor
>>>> pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it
>>>> down now.
>>>> java.lang.NullPointerException: null
>>>> at java.base/java.util.HashMap.putMapEntries(HashMap.java:497)
>>>> at java.base/java.util.HashMap.putAll(HashMap.java:781)
>>>> at
>>>> java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604)
>>>> at
>>>> ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197)
>>>> at org.slf4j.MDC.setContextMap(MDC.java:264)
>>>> at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
>>>> at
>>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
>>>> at
>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>>>> at
>>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>> at
>>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>>> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>>>> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>>>> at
>>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>>>> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>>>> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>>>> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>>>> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>> ```
>>>> How can I get around this error?
>>>>
>>>> Thanks again.
>>>> Vinay
>>>>
>>>> On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote:
>>>>
>>>>> As far as I know, it's not possible to do without using the Catalog.
>>>>>
>>>>> Oh... I forgot that the FileSystemCatalog is currently only used for
>>>>> testing[1]. One approach is to implement and package a FileSystemCatalog 
>>>>> by
>>>>> refering this testing catalog.
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
>>>>>
>>>>>
>>>>> --
>>>>>     Best!
>>>>>     Xuyang
>>>>>
>>>>>
>>>>> 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>>>>
>>>>> Thanks Xuyang for your answer. I, however, can't find a way to create
>>>>> a filesystem catalog except by using Hive which seems to require an
>>>>> external Hive installation. I was hoping to find a solution that doesn't
>>>>> require external dependencies. Is that possible?
>>>>>
>>>>> Vinay
>>>>>
>>>>> On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote:
>>>>>
>>>>>> Hi.
>>>>>>
>>>>>> In Java/Scala code, using `registerTable` and then querying in the
>>>>>> SQL client will not work, as registerTable only creates a temporal table.
>>>>>>
>>>>>> To fulfill your requirements, you may need to use a persistent
>>>>>> catalog[1] (such as a filesystem catalog) where you can create tables in
>>>>>> your Java/Scala code. Once the table is created under that catalog, you 
>>>>>> can
>>>>>> then query it from the SQL client within the same catalog[2]. For 
>>>>>> example:
>>>>>>
>>>>>> To avoid creating and using the same catalog twice in both Java/Scala
>>>>>> code and the SQL client, you can leverage a catalog store(In higher
>>>>>> versions only)[3].
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python
>>>>>>
>>>>>> [2]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl
>>>>>>
>>>>>> [3]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store
>>>>>>
>>>>>>
>>>>>> --
>>>>>>     Best!
>>>>>>     Xuyang
>>>>>>
>>>>>>
>>>>>> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I am looking for an elaboration, or work around, of an old answer to
>>>>>> this question here (
>>>>>> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql)
>>>>>> which mentions that registering the table can make it visible to the SQL
>>>>>> client.
>>>>>>
>>>>>> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle,
>>>>>> put its `bin` folder in path, and started the cluster using
>>>>>> `start-cluster.sh`.
>>>>>>
>>>>>> I created a simple Flink job that generates stream of random integers
>>>>>> and creates a table as follows:
>>>>>>
>>>>>> ```Java
>>>>>>     final StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>     StreamTableEnvironment tableEnv =
>>>>>> StreamTableEnvironment.create(env);
>>>>>>     // Create a DataStream of random integers
>>>>>>     DataStream<Integer> intStream = env.addSource(new
>>>>>> RandomIntegerSource());
>>>>>>     // Convert DataStream to Table
>>>>>>     Table table = tableEnv.fromDataStream(intStream, "number");
>>>>>>     // Register the table
>>>>>>     tableEnv.registerTable("RandomNumbers", table);
>>>>>>     // Execute the job
>>>>>>     env.execute("Random Integer Job");
>>>>>>
>>>>>> ```
>>>>>> Then, I started `sql-client.sh` and queried as follows
>>>>>> ```SQL
>>>>>> Flink SQL> show catalogs;
>>>>>> +-----------------+
>>>>>> |    catalog name |
>>>>>> +-----------------+
>>>>>> | default_catalog |
>>>>>> +-----------------+
>>>>>> 1 row in set
>>>>>>
>>>>>> Flink SQL> USE CATALOG default_catalog;
>>>>>> [INFO] Execute statement succeed.
>>>>>>
>>>>>> Flink SQL> SHOW databases;
>>>>>> +------------------+
>>>>>> |    database name |
>>>>>> +------------------+
>>>>>> | default_database |
>>>>>> +------------------+
>>>>>> 1 row in set
>>>>>>
>>>>>> Flink SQL> USE default_database;
>>>>>> [INFO] Execute statement succeed.
>>>>>>
>>>>>> Flink SQL> SHOW tables;
>>>>>> Empty set
>>>>>> ```
>>>>>> As can be seen, I don't see `RandomNumbers` table.
>>>>>>
>>>>>> How can I make that table visible to SQL client?
>>>>>>
>>>>>>

Reply via email to