Thanks again for your answer. I have to use Flink version 1.20.0 because `TestFileSystemCatalogFactory` doesn't exist in prior versions. Unfortunately, I am not able to run 1.20.0 due to the following error. (Version 1.91.1 works just fine.)
``` 14:19:47.094 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system started at pekko://flink 14:19:47.104 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Trying to start local actor system 14:19:47.113 [flink-metrics-8] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started 14:19:47.119 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system started at pekko://flink-metrics 14:19:47.128 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcService - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at pekko://flink-metrics/user/rpc/MetricQueryService . 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN org.apache.flink.runtime.rpc.pekko.SupervisorActor - RpcActor pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down now. java.lang.NullPointerException: null at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) at java.base/java.util.HashMap.putAll(HashMap.java:781) at java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604) at ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197) at org.slf4j.MDC.setContextMap(MDC.java:264) at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` How can I get around this error? Thanks again. Vinay On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote: > As far as I know, it's not possible to do without using the Catalog. > > Oh... I forgot that the FileSystemCatalog is currently only used for > testing[1]. One approach is to implement and package a FileSystemCatalog by > refering this testing catalog. > > > [1] > https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java > > > -- > Best! > Xuyang > > > 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道: > > Thanks Xuyang for your answer. I, however, can't find a way to create a > filesystem catalog except by using Hive which seems to require an external > Hive installation. I was hoping to find a solution that doesn't require > external dependencies. Is that possible? > > Vinay > > On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote: > >> Hi. >> >> In Java/Scala code, using `registerTable` and then querying in the SQL >> client will not work, as registerTable only creates a temporal table. >> >> To fulfill your requirements, you may need to use a persistent catalog[1] >> (such as a filesystem catalog) where you can create tables in your >> Java/Scala code. Once the table is created under that catalog, you can then >> query it from the SQL client within the same catalog[2]. For example: >> >> To avoid creating and using the same catalog twice in both Java/Scala >> code and the SQL client, you can leverage a catalog store(In higher >> versions only)[3]. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python >> >> [2] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl >> >> [3] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store >> >> >> -- >> Best! >> Xuyang >> >> >> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote: >> >> Hello, >> >> I am looking for an elaboration, or work around, of an old answer to this >> question here ( >> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql) >> which mentions that registering the table can make it visible to the SQL >> client. >> >> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, put >> its `bin` folder in path, and started the cluster using `start-cluster.sh`. >> >> I created a simple Flink job that generates stream of random integers and >> creates a table as follows: >> >> ```Java >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >> // Create a DataStream of random integers >> DataStream<Integer> intStream = env.addSource(new >> RandomIntegerSource()); >> // Convert DataStream to Table >> Table table = tableEnv.fromDataStream(intStream, "number"); >> // Register the table >> tableEnv.registerTable("RandomNumbers", table); >> // Execute the job >> env.execute("Random Integer Job"); >> >> ``` >> Then, I started `sql-client.sh` and queried as follows >> ```SQL >> Flink SQL> show catalogs; >> +-----------------+ >> | catalog name | >> +-----------------+ >> | default_catalog | >> +-----------------+ >> 1 row in set >> >> Flink SQL> USE CATALOG default_catalog; >> [INFO] Execute statement succeed. >> >> Flink SQL> SHOW databases; >> +------------------+ >> | database name | >> +------------------+ >> | default_database | >> +------------------+ >> 1 row in set >> >> Flink SQL> USE default_database; >> [INFO] Execute statement succeed. >> >> Flink SQL> SHOW tables; >> Empty set >> ``` >> As can be seen, I don't see `RandomNumbers` table. >> >> How can I make that table visible to SQL client? >> >>