I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the documentation) and created a catalog store and a file system catalog. But still don't see my catalog or table in SQL client. My code is
CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); final EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .withCatalogStore(catalogStore) .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + "catalog/", "fscat", "fsdb"); tableEnv.registerCatalog("fscat", fsCatalog); tableEnv.useCatalog("fscat"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); DataStream<RandomNumber> randomSource = env.addSource(new RandomNumberSource()); Table table = tableEnv.fromDataStream(randomSource); randomSource.print(); tableEnv.registerTable("fscat.fsdb.RandomNumbers", table); env.execute("Flink Random Sequence Generator environment: " + environment); SQL Client ```SQL Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | +-----------------+ 1 row in set Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set Flink SQL> show tables; Empty set ``` I thought setting the catalogstore in the SQL client might help but I can't find a way to do that. Thanks, Vinay On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote: > How about porting the `TestFileSystemCatalogFactory` back to 1.19 and > rebuild this catalog jar? > > > -- > Best! > Xuyang > > > 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道: > > Thanks again for your answer. I have to use Flink version 1.20.0 because > `TestFileSystemCatalogFactory` doesn't exist in prior versions. > Unfortunately, I am not able to run 1.20.0 due to the following error. > (Version 1.91.1 works just fine.) > > ``` > 14:19:47.094 [main] INFO > org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system > started at pekko://flink > 14:19:47.104 [main] INFO > org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Trying to start > local actor system > 14:19:47.113 [flink-metrics-8] INFO > org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started > 14:19:47.119 [main] INFO > org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system > started at pekko://flink-metrics > 14:19:47.128 [main] INFO > org.apache.flink.runtime.rpc.pekko.PekkoRpcService - Starting RPC > endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at > pekko://flink-metrics/user/rpc/MetricQueryService . > 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN > org.apache.flink.runtime.rpc.pekko.SupervisorActor - RpcActor > pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it > down now. > java.lang.NullPointerException: null > at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) > at java.base/java.util.HashMap.putAll(HashMap.java:781) > at > java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604) > at > ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197) > at org.slf4j.MDC.setContextMap(MDC.java:264) > at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > ``` > How can I get around this error? > > Thanks again. > Vinay > > On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote: > >> As far as I know, it's not possible to do without using the Catalog. >> >> Oh... I forgot that the FileSystemCatalog is currently only used for >> testing[1]. One approach is to implement and package a FileSystemCatalog by >> refering this testing catalog. >> >> >> [1] >> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java >> >> >> -- >> Best! >> Xuyang >> >> >> 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道: >> >> Thanks Xuyang for your answer. I, however, can't find a way to create a >> filesystem catalog except by using Hive which seems to require an external >> Hive installation. I was hoping to find a solution that doesn't require >> external dependencies. Is that possible? >> >> Vinay >> >> On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote: >> >>> Hi. >>> >>> In Java/Scala code, using `registerTable` and then querying in the SQL >>> client will not work, as registerTable only creates a temporal table. >>> >>> To fulfill your requirements, you may need to use a persistent >>> catalog[1] (such as a filesystem catalog) where you can create tables in >>> your Java/Scala code. Once the table is created under that catalog, you can >>> then query it from the SQL client within the same catalog[2]. For example: >>> >>> To avoid creating and using the same catalog twice in both Java/Scala >>> code and the SQL client, you can leverage a catalog store(In higher >>> versions only)[3]. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python >>> >>> [2] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl >>> >>> [3] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote: >>> >>> Hello, >>> >>> I am looking for an elaboration, or work around, of an old answer to >>> this question here ( >>> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql) >>> which mentions that registering the table can make it visible to the SQL >>> client. >>> >>> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, >>> put its `bin` folder in path, and started the cluster using >>> `start-cluster.sh`. >>> >>> I created a simple Flink job that generates stream of random integers >>> and creates a table as follows: >>> >>> ```Java >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >>> // Create a DataStream of random integers >>> DataStream<Integer> intStream = env.addSource(new >>> RandomIntegerSource()); >>> // Convert DataStream to Table >>> Table table = tableEnv.fromDataStream(intStream, "number"); >>> // Register the table >>> tableEnv.registerTable("RandomNumbers", table); >>> // Execute the job >>> env.execute("Random Integer Job"); >>> >>> ``` >>> Then, I started `sql-client.sh` and queried as follows >>> ```SQL >>> Flink SQL> show catalogs; >>> +-----------------+ >>> | catalog name | >>> +-----------------+ >>> | default_catalog | >>> +-----------------+ >>> 1 row in set >>> >>> Flink SQL> USE CATALOG default_catalog; >>> [INFO] Execute statement succeed. >>> >>> Flink SQL> SHOW databases; >>> +------------------+ >>> | database name | >>> +------------------+ >>> | default_database | >>> +------------------+ >>> 1 row in set >>> >>> Flink SQL> USE default_database; >>> [INFO] Execute statement succeed. >>> >>> Flink SQL> SHOW tables; >>> Empty set >>> ``` >>> As can be seen, I don't see `RandomNumbers` table. >>> >>> How can I make that table visible to SQL client? >>> >>>