I think I have identified the issue. As I mentioned earlier, the deprecated `TableEnvironment#registerTable` (which is equivalent to `TableEnvironment#createTemporaryView`) only creates a temporary table. You need to use the existing createTable method to create a persistent table. After creating the table in the file catalog using the Java/Scala code, you should be able to see the table in the SQL client. Once you write a DataStream into this table from Java/Scala, you should be able to see the data in the SQL client as well.
I have tested the code as below: ``` CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); final EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().withCatalogStore(catalogStore).build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("drop catalog if exists fscat"); Map<String, String> catalogConf = new HashMap<>(); catalogConf.put(CommonCatalogOptions.CATALOG_TYPE.key(), TestFileSystemTableFactory.IDENTIFIER); catalogConf.put(PATH.key(), BASE_PATH + "catalog/"); catalogConf.put(CommonCatalogOptions.DEFAULT_DATABASE_KEY, "fsdb"); tableEnv.createCatalog( "fscat", CatalogDescriptor.of("fscat", Configuration.fromMap(catalogConf))); // or use ddl to create catalog // tableEnv.executeSql( // "CREATE CATALOG fscat with (" // + "'type' = 'test-filesystem', " // + "'path' = 'xxx', " // + "'default-database' = 'fsdb')"); tableEnv.useCatalog("fscat"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); tableEnv.executeSql("drop table if exists fscat.fsdb.RandomNumbers"); DataStream<Integer> randomSource = env.addSource( new SourceFunction<Integer>() { @Override public void run(SourceContext<Integer> sourceContext) throws Exception { for (int i = 0; i < 100; i++) { sourceContext.collect(i); } } @Override public void cancel() {} }); Table table = tableEnv.fromDataStream(randomSource); randomSource.print(); // should not register a temporal table // tableEnv.registerTable("fscat.fsdb.RandomNumbers", table); tableEnv.createTable( "fscat.fsdb.RandomNumbers", TableDescriptor.forConnector(TestFileSystemTableFactory.IDENTIFIER) .schema(Schema.newBuilder().column("data", DataTypes.INT()).build()) .option("path", BASE_PATH + "data/") .option(FactoryUtil.FORMAT.key(), "csv") .build()); // or use ddl to create table // tableEnv.executeSql( // "CREATE TABLE fscat.fsdb.RandomNumbers (" // + "data INT" // + ") WITH (" // + " 'connector' = 'test-filesystem'," // + " 'path' = 'xxx'," // + " 'format' = 'csv'" // + ")"); table.executeInsert("fscat.fsdb.RandomNumbers").await(); System.err.println("======="); tableEnv.executeSql("select * from fscat.fsdb.RandomNumbers").print(); env.execute("Flink Random Sequence Generator environment: TEST"); ``` When using `./sql-client.sh -Dtable.catalog-store.kind=file -Dtable.catalog-store.file.path=xxx` to start the sql client, you can find the table finally. ``` Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | | fscat | +-----------------+ 2 rows in set Flink SQL> use catalog fscat; [INFO] Execute statement succeeded. Flink SQL> show tables; +---------------+ | table name | +---------------+ | RandomNumbers | +---------------+ 1 row in set ``` Please do not forget to package all classes about TestFileSystemCatalogFactory, TestFileSystemCatalog and TestFileSystemTableFactory as a jar and add the SPI file for TestFileSystemCatalogFactory and TestFileSystemTableFactory, and then copy this jar under the dir `lib`. -- Best! Xuyang At 2025-01-07 23:56:18, "Vinay Agarwal" <vink...@gmail.com> wrote: Still the same. $ sql-client.sh -Dtable.catalog-store.kind=file -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /Users/vagarwal/.flink-sql-history Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | +-----------------+ 1 row in set Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set Flink SQL> show tables; Empty set Flink SQL> show jobs; +----------------------------------+----------------------------------------------------+---------+-------------------------+ | job id | job name | status | start time | +----------------------------------+----------------------------------------------------+---------+-------------------------+ | 663f71ec8246a7b7f8023dba3acfc5fe | Flink Random Sequence Generator environment: local | RUNNING | 2025-01-07T15:51:37.160 | +----------------------------------+----------------------------------------------------+---------+-------------------------+ 1 row in set On Mon, Jan 6, 2025 at 6:00 PM Xuyang <xyzhong...@163.com> wrote: What about using "./bin/start-cluster.sh -Dtable.catalog-store.kind=file -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/" to start sql client? -- Best! Xuyang 在 2025-01-07 00:30:19,"Vinay Agarwal" <vink...@gmail.com> 写道: Still doesn't show. ```SQL Flink SQL> set 'table.catalog-store.kind' = 'file'; [INFO] Execute statement succeeded. Flink SQL> > set 'table.catalog-store.file.path' = '/Users/vagarwal/tmp/flink/store/'; [INFO] Execute statement succeeded. Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | +-----------------+ 1 row in set Flink SQL> show tables; Empty set Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set ``` Thanks for all your help. Vinay On Mon, Jan 6, 2025 at 2:05 AM Xuyang <xyzhong...@163.com> wrote: Can you try to use `SET` command[1] in sql client to set the configs related catalog store? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-configuration [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#configure-catalog-store -- Best! Xuyang 在 2025-01-05 05:59:22,"Vinay Agarwal" <vink...@gmail.com> 写道: I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the documentation) and created a catalog store and a file system catalog. But still don't see my catalog or table in SQL client. My code is CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); final EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .withCatalogStore(catalogStore) .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + "catalog/", "fscat", "fsdb"); tableEnv.registerCatalog("fscat", fsCatalog); tableEnv.useCatalog("fscat"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); DataStream<RandomNumber> randomSource = env.addSource(new RandomNumberSource()); Table table = tableEnv.fromDataStream(randomSource); randomSource.print(); tableEnv.registerTable("fscat.fsdb.RandomNumbers", table); env.execute("Flink Random Sequence Generator environment: " + environment); SQL Client ```SQL Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | +-----------------+ 1 row in set Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set Flink SQL> show tables; Empty set ``` I thought setting the catalogstore in the SQL client might help but I can't find a way to do that. Thanks, Vinay On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote: How about porting the `TestFileSystemCatalogFactory` back to 1.19 and rebuild this catalog jar? -- Best! Xuyang 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道: Thanks again for your answer. I have to use Flink version 1.20.0 because `TestFileSystemCatalogFactory` doesn't exist in prior versions. Unfortunately, I am not able to run 1.20.0 due to the following error. (Version 1.91.1 works just fine.) ``` 14:19:47.094 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system started at pekko://flink 14:19:47.104 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Trying to start local actor system 14:19:47.113 [flink-metrics-8] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started 14:19:47.119 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system started at pekko://flink-metrics 14:19:47.128 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcService - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at pekko://flink-metrics/user/rpc/MetricQueryService . 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN org.apache.flink.runtime.rpc.pekko.SupervisorActor - RpcActor pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down now. java.lang.NullPointerException: null at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) at java.base/java.util.HashMap.putAll(HashMap.java:781) at java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604) at ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197) at org.slf4j.MDC.setContextMap(MDC.java:264) at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` How can I get around this error? Thanks again. Vinay On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote: As far as I know, it's not possible to do without using the Catalog. Oh... I forgot that the FileSystemCatalog is currently only used for testing[1]. One approach is to implement and package a FileSystemCatalog by refering this testing catalog. [1] https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java -- Best! Xuyang 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道: Thanks Xuyang for your answer. I, however, can't find a way to create a filesystem catalog except by using Hive which seems to require an external Hive installation. I was hoping to find a solution that doesn't require external dependencies. Is that possible? Vinay On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote: Hi. In Java/Scala code, using `registerTable` and then querying in the SQL client will not work, as registerTable only creates a temporal table. To fulfill your requirements, you may need to use a persistent catalog[1] (such as a filesystem catalog) where you can create tables in your Java/Scala code. Once the table is created under that catalog, you can then query it from the SQL client within the same catalog[2]. For example: To avoid creating and using the same catalog twice in both Java/Scala code and the SQL client, you can leverage a catalog store(In higher versions only)[3]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl [3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store -- Best! Xuyang At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote: Hello, I am looking for an elaboration, or work around, of an old answer to this question here (https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql) which mentions that registering the table can make it visible to the SQL client. I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, put its `bin` folder in path, and started the cluster using `start-cluster.sh`. I created a simple Flink job that generates stream of random integers and creates a table as follows: ```Java final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Create a DataStream of random integers DataStream<Integer> intStream = env.addSource(new RandomIntegerSource()); // Convert DataStream to Table Table table = tableEnv.fromDataStream(intStream, "number"); // Register the table tableEnv.registerTable("RandomNumbers", table); // Execute the job env.execute("Random Integer Job"); ``` Then, I started `sql-client.sh` and queried as follows ```SQL Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | +-----------------+ 1 row in set Flink SQL> USE CATALOG default_catalog; [INFO] Execute statement succeed. Flink SQL> SHOW databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set Flink SQL> USE default_database; [INFO] Execute statement succeed. Flink SQL> SHOW tables; Empty set ``` As can be seen, I don't see `RandomNumbers` table. How can I make that table visible to SQL client?