Still the same.

$ sql-client.sh -Dtable.catalog-store.kind=file
-Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /Users/vagarwal/.flink-sql-history

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

Flink SQL> show tables;
Empty set

Flink SQL> show jobs;
+----------------------------------+----------------------------------------------------+---------+-------------------------+
|                           job id |
        job name |  status |              start time |
+----------------------------------+----------------------------------------------------+---------+-------------------------+
| 663f71ec8246a7b7f8023dba3acfc5fe | Flink Random Sequence Generator
environment: local | RUNNING | 2025-01-07T15:51:37.160 |
+----------------------------------+----------------------------------------------------+---------+-------------------------+
1 row in set


On Mon, Jan 6, 2025 at 6:00 PM Xuyang <xyzhong...@163.com> wrote:

> What about using "./bin/start-cluster.sh -Dtable.catalog-store.kind=file
> -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/" to start
> sql client?
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2025-01-07 00:30:19,"Vinay Agarwal" <vink...@gmail.com> 写道:
>
> Still doesn't show.
>
> ```SQL
> Flink SQL> set 'table.catalog-store.kind' = 'file';
> [INFO] Execute statement succeeded.
>
> Flink SQL>
> > set 'table.catalog-store.file.path' = '/Users/vagarwal/tmp/flink/store/';
> [INFO] Execute statement succeeded.
>
> Flink SQL> show catalogs;
> +-----------------+
> |    catalog name |
> +-----------------+
> | default_catalog |
> +-----------------+
> 1 row in set
>
> Flink SQL> show tables;
> Empty set
>
> Flink SQL> show databases;
> +------------------+
> |    database name |
> +------------------+
> | default_database |
> +------------------+
> 1 row in set
> ```
>
> Thanks for all your help.
> Vinay
>
> On Mon, Jan 6, 2025 at 2:05 AM Xuyang <xyzhong...@163.com> wrote:
>
>> Can you try to use `SET` command[1] in sql client to set the configs
>> related catalog store?
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-configuration
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#configure-catalog-store
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> 在 2025-01-05 05:59:22,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>
>> I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the
>> documentation) and created a catalog store and a file system catalog. But
>> still don't see my catalog or table in SQL client. My code is
>>
>> CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
>> final EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>    .inStreamingMode()
>>    .withCatalogStore(catalogStore)
>>    .build();
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
>> settings);
>> TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + 
>> "catalog/", "fscat", "fsdb");
>> tableEnv.registerCatalog("fscat", fsCatalog);
>> tableEnv.useCatalog("fscat");
>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");
>>
>> DataStream<RandomNumber> randomSource = env.addSource(new 
>> RandomNumberSource());
>> Table table = tableEnv.fromDataStream(randomSource);
>> randomSource.print();
>> tableEnv.registerTable("fscat.fsdb.RandomNumbers", table);
>>
>> env.execute("Flink Random Sequence Generator environment: " + environment);
>>
>>
>> SQL Client
>> ```SQL
>> Flink SQL> show catalogs;
>> +-----------------+
>> |    catalog name |
>> +-----------------+
>> | default_catalog |
>> +-----------------+
>> 1 row in set
>>
>> Flink SQL> show databases;
>> +------------------+
>> |    database name |
>> +------------------+
>> | default_database |
>> +------------------+
>> 1 row in set
>>
>> Flink SQL> show tables;
>> Empty set
>> ```
>>
>> I thought setting the catalogstore in the SQL client might help but I
>> can't find a way to do that.
>>
>> Thanks,
>> Vinay
>>
>> On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote:
>>
>>> How about porting the `TestFileSystemCatalogFactory` back to 1.19 and
>>> rebuild this catalog jar?
>>>
>>>
>>> --
>>>     Best!
>>>     Xuyang
>>>
>>>
>>> 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>>
>>> Thanks again for your answer. I have to use Flink version 1.20.0 because
>>> `TestFileSystemCatalogFactory` doesn't exist in prior versions.
>>> Unfortunately, I am not able to run 1.20.0 due to the following error.
>>> (Version 1.91.1 works just fine.)
>>>
>>> ```
>>> 14:19:47.094 [main] INFO
>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system
>>> started at pekko://flink
>>> 14:19:47.104 [main] INFO
>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Trying to start
>>> local actor system
>>> 14:19:47.113 [flink-metrics-8] INFO
>>>  org.apache.pekko.event.slf4j.Slf4jLogger  - Slf4jLogger started
>>> 14:19:47.119 [main] INFO
>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system
>>> started at pekko://flink-metrics
>>> 14:19:47.128 [main] INFO
>>>  org.apache.flink.runtime.rpc.pekko.PekkoRpcService  - Starting RPC
>>> endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at
>>> pekko://flink-metrics/user/rpc/MetricQueryService .
>>> 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN
>>>  org.apache.flink.runtime.rpc.pekko.SupervisorActor  - RpcActor
>>> pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it
>>> down now.
>>> java.lang.NullPointerException: null
>>> at java.base/java.util.HashMap.putMapEntries(HashMap.java:497)
>>> at java.base/java.util.HashMap.putAll(HashMap.java:781)
>>> at
>>> java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604)
>>> at
>>> ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197)
>>> at org.slf4j.MDC.setContextMap(MDC.java:264)
>>> at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
>>> at
>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
>>> at
>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>>> at
>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>> at
>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>>> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>>> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>>> at
>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>>> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>>> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>>> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>>> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>> ```
>>> How can I get around this error?
>>>
>>> Thanks again.
>>> Vinay
>>>
>>> On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote:
>>>
>>>> As far as I know, it's not possible to do without using the Catalog.
>>>>
>>>> Oh... I forgot that the FileSystemCatalog is currently only used for
>>>> testing[1]. One approach is to implement and package a FileSystemCatalog by
>>>> refering this testing catalog.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
>>>>
>>>>
>>>> --
>>>>     Best!
>>>>     Xuyang
>>>>
>>>>
>>>> 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>>>
>>>> Thanks Xuyang for your answer. I, however, can't find a way to create
>>>> a filesystem catalog except by using Hive which seems to require an
>>>> external Hive installation. I was hoping to find a solution that doesn't
>>>> require external dependencies. Is that possible?
>>>>
>>>> Vinay
>>>>
>>>> On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote:
>>>>
>>>>> Hi.
>>>>>
>>>>> In Java/Scala code, using `registerTable` and then querying in the SQL
>>>>> client will not work, as registerTable only creates a temporal table.
>>>>>
>>>>> To fulfill your requirements, you may need to use a persistent
>>>>> catalog[1] (such as a filesystem catalog) where you can create tables in
>>>>> your Java/Scala code. Once the table is created under that catalog, you 
>>>>> can
>>>>> then query it from the SQL client within the same catalog[2]. For example:
>>>>>
>>>>> To avoid creating and using the same catalog twice in both Java/Scala
>>>>> code and the SQL client, you can leverage a catalog store(In higher
>>>>> versions only)[3].
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python
>>>>>
>>>>> [2]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl
>>>>>
>>>>> [3]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store
>>>>>
>>>>>
>>>>> --
>>>>>     Best!
>>>>>     Xuyang
>>>>>
>>>>>
>>>>> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am looking for an elaboration, or work around, of an old answer to
>>>>> this question here (
>>>>> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql)
>>>>> which mentions that registering the table can make it visible to the SQL
>>>>> client.
>>>>>
>>>>> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle,
>>>>> put its `bin` folder in path, and started the cluster using
>>>>> `start-cluster.sh`.
>>>>>
>>>>> I created a simple Flink job that generates stream of random integers
>>>>> and creates a table as follows:
>>>>>
>>>>> ```Java
>>>>>     final StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     StreamTableEnvironment tableEnv =
>>>>> StreamTableEnvironment.create(env);
>>>>>     // Create a DataStream of random integers
>>>>>     DataStream<Integer> intStream = env.addSource(new
>>>>> RandomIntegerSource());
>>>>>     // Convert DataStream to Table
>>>>>     Table table = tableEnv.fromDataStream(intStream, "number");
>>>>>     // Register the table
>>>>>     tableEnv.registerTable("RandomNumbers", table);
>>>>>     // Execute the job
>>>>>     env.execute("Random Integer Job");
>>>>>
>>>>> ```
>>>>> Then, I started `sql-client.sh` and queried as follows
>>>>> ```SQL
>>>>> Flink SQL> show catalogs;
>>>>> +-----------------+
>>>>> |    catalog name |
>>>>> +-----------------+
>>>>> | default_catalog |
>>>>> +-----------------+
>>>>> 1 row in set
>>>>>
>>>>> Flink SQL> USE CATALOG default_catalog;
>>>>> [INFO] Execute statement succeed.
>>>>>
>>>>> Flink SQL> SHOW databases;
>>>>> +------------------+
>>>>> |    database name |
>>>>> +------------------+
>>>>> | default_database |
>>>>> +------------------+
>>>>> 1 row in set
>>>>>
>>>>> Flink SQL> USE default_database;
>>>>> [INFO] Execute statement succeed.
>>>>>
>>>>> Flink SQL> SHOW tables;
>>>>> Empty set
>>>>> ```
>>>>> As can be seen, I don't see `RandomNumbers` table.
>>>>>
>>>>> How can I make that table visible to SQL client?
>>>>>
>>>>>

Reply via email to