I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the
documentation) and created a catalog store and a file system catalog. But
still don't see my catalog or table in SQL client. My code is

CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/");
final EnvironmentSettings settings = EnvironmentSettings.newInstance()
   .inStreamingMode()
   .withCatalogStore(catalogStore)
   .build();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env, settings);
TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH
+ "catalog/", "fscat", "fsdb");
tableEnv.registerCatalog("fscat", fsCatalog);
tableEnv.useCatalog("fscat");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb");

DataStream<RandomNumber> randomSource = env.addSource(new RandomNumberSource());
Table table = tableEnv.fromDataStream(randomSource);
randomSource.print();
tableEnv.registerTable("fscat.fsdb.RandomNumbers", table);

env.execute("Flink Random Sequence Generator environment: " + environment);


SQL Client
```SQL
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

Flink SQL> show tables;
Empty set
```

I thought setting the catalogstore in the SQL client might help but I can't
find a way to do that.

Thanks,
Vinay

On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote:

> How about porting the `TestFileSystemCatalogFactory` back to 1.19 and
> rebuild this catalog jar?
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道:
>
> Thanks again for your answer. I have to use Flink version 1.20.0 because
> `TestFileSystemCatalogFactory` doesn't exist in prior versions.
> Unfortunately, I am not able to run 1.20.0 due to the following error.
> (Version 1.91.1 works just fine.)
>
> ```
> 14:19:47.094 [main] INFO
>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system
> started at pekko://flink
> 14:19:47.104 [main] INFO
>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Trying to start
> local actor system
> 14:19:47.113 [flink-metrics-8] INFO
>  org.apache.pekko.event.slf4j.Slf4jLogger  - Slf4jLogger started
> 14:19:47.119 [main] INFO
>  org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils  - Actor system
> started at pekko://flink-metrics
> 14:19:47.128 [main] INFO
>  org.apache.flink.runtime.rpc.pekko.PekkoRpcService  - Starting RPC
> endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at
> pekko://flink-metrics/user/rpc/MetricQueryService .
> 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN
>  org.apache.flink.runtime.rpc.pekko.SupervisorActor  - RpcActor
> pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it
> down now.
> java.lang.NullPointerException: null
> at java.base/java.util.HashMap.putMapEntries(HashMap.java:497)
> at java.base/java.util.HashMap.putAll(HashMap.java:781)
> at
> java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604)
> at
> ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197)
> at org.slf4j.MDC.setContextMap(MDC.java:264)
> at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> ```
> How can I get around this error?
>
> Thanks again.
> Vinay
>
> On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote:
>
>> As far as I know, it's not possible to do without using the Catalog.
>>
>> Oh... I forgot that the FileSystemCatalog is currently only used for
>> testing[1]. One approach is to implement and package a FileSystemCatalog by
>> refering this testing catalog.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道:
>>
>> Thanks Xuyang for your answer. I, however, can't find a way to create a
>> filesystem catalog except by using Hive which seems to require an external
>> Hive installation. I was hoping to find a solution that doesn't require
>> external dependencies. Is that possible?
>>
>> Vinay
>>
>> On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote:
>>
>>> Hi.
>>>
>>> In Java/Scala code, using `registerTable` and then querying in the SQL
>>> client will not work, as registerTable only creates a temporal table.
>>>
>>> To fulfill your requirements, you may need to use a persistent
>>> catalog[1] (such as a filesystem catalog) where you can create tables in
>>> your Java/Scala code. Once the table is created under that catalog, you can
>>> then query it from the SQL client within the same catalog[2]. For example:
>>>
>>> To avoid creating and using the same catalog twice in both Java/Scala
>>> code and the SQL client, you can leverage a catalog store(In higher
>>> versions only)[3].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python
>>>
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl
>>>
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store
>>>
>>>
>>> --
>>>     Best!
>>>     Xuyang
>>>
>>>
>>> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I am looking for an elaboration, or work around, of an old answer to
>>> this question here (
>>> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql)
>>> which mentions that registering the table can make it visible to the SQL
>>> client.
>>>
>>> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle,
>>> put its `bin` folder in path, and started the cluster using
>>> `start-cluster.sh`.
>>>
>>> I created a simple Flink job that generates stream of random integers
>>> and creates a table as follows:
>>>
>>> ```Java
>>>     final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>     // Create a DataStream of random integers
>>>     DataStream<Integer> intStream = env.addSource(new
>>> RandomIntegerSource());
>>>     // Convert DataStream to Table
>>>     Table table = tableEnv.fromDataStream(intStream, "number");
>>>     // Register the table
>>>     tableEnv.registerTable("RandomNumbers", table);
>>>     // Execute the job
>>>     env.execute("Random Integer Job");
>>>
>>> ```
>>> Then, I started `sql-client.sh` and queried as follows
>>> ```SQL
>>> Flink SQL> show catalogs;
>>> +-----------------+
>>> |    catalog name |
>>> +-----------------+
>>> | default_catalog |
>>> +-----------------+
>>> 1 row in set
>>>
>>> Flink SQL> USE CATALOG default_catalog;
>>> [INFO] Execute statement succeed.
>>>
>>> Flink SQL> SHOW databases;
>>> +------------------+
>>> |    database name |
>>> +------------------+
>>> | default_database |
>>> +------------------+
>>> 1 row in set
>>>
>>> Flink SQL> USE default_database;
>>> [INFO] Execute statement succeed.
>>>
>>> Flink SQL> SHOW tables;
>>> Empty set
>>> ```
>>> As can be seen, I don't see `RandomNumbers` table.
>>>
>>> How can I make that table visible to SQL client?
>>>
>>>

Reply via email to