Thanks Xuyang, that shows table in the SQL client correctly! Two follow up questions.
1. When I query that table from the SQL client, it seems to just wait for something. How can that query work? ``` Flink SQL> show tables; +---------------+ | table name | +---------------+ | RandomNumbers | +---------------+ 1 row in set Flink SQL> select * from RandomNumbers; [ERROR] Could not execute SQL statement. Reason: java.lang.InterruptedException: sleep interrupted ``` 2. I tried modifying the number generator from a limit of 100 to a never ending sequence. The code is as follows: CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); final EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().withCatalogStore(catalogStore).build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("drop catalog if exists fscat"); Map<String, String> catalogConf = new HashMap<>(); catalogConf.put(CommonCatalogOptions.CATALOG_TYPE.key(), TestFileSystemTableFactory.IDENTIFIER); catalogConf.put(PATH.key(), BASE_PATH + "catalog/"); catalogConf.put(CommonCatalogOptions.DEFAULT_DATABASE_KEY, "fsdb"); tableEnv.createCatalog( "fscat", CatalogDescriptor.of("fscat", Configuration.fromMap(catalogConf))); tableEnv.useCatalog("fscat"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); tableEnv.executeSql("drop table if exists fscat.fsdb.RandomNumbers"); DataStream<Integer> randomSource = env.addSource( new SourceFunction<Integer>() { private static final int BOUND = 100; private Random rnd = new Random(); @Override public void run(SourceContext<Integer> sourceContext) throws Exception { while (true) { sourceContext.collect(rnd.nextInt(BOUND)); Thread.sleep(1000L); } } @Override public void cancel() {} }); Table table = tableEnv.fromDataStream(randomSource); tableEnv.createTable( "fscat.fsdb.RandomNumbers", TableDescriptor.forConnector(TestFileSystemTableFactory.IDENTIFIER) .schema(Schema.newBuilder().column("data", DataTypes.INT()).build()) .option("path", BASE_PATH + "data/") .option(FactoryUtil.FORMAT.key(), "csv") .build()); // table.executeInsert("fscat.fsdb.RandomNumbers"); System.err.println("======="); // tableEnv.executeSql("select * from fscat.fsdb.RandomNumbers").print(); randomSource.print(); // execute program env.execute("Flink Random Sequence Generator"); The logs show random numbers being generated and SQL client shows the table. But when I try to query the `RandomNumbers` table in the SQL client, I get ``` Flink SQL> select * from RandomNumbers; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [POST request not allowed] ``` and in the logs I see ``` 2025-01-10 08:10:48,327 WARN org.apache.flink.runtime.rest.FileUploadHandler [] - POST request not allowed ``` The BASE_PATH + "data/" does exist. ``` $ ls -l /Users/vagarwal/tmp/flink/data/ total 0 ``` What am I doing wrong? Thanks again for going out of your way to help out! On Wed, Jan 8, 2025 at 10:18 PM Xuyang <xyzhong...@163.com> wrote: > I think I have identified the issue. As I mentioned earlier, the > deprecated `TableEnvironment#registerTable` (which is equivalent to > `TableEnvironment#createTemporaryView`) only creates a *temporary* table. > You need to use the existing createTable method to create a persistent > table. After creating the table in the file catalog using the Java/Scala > code, you should be able to see the table in the SQL client. Once you write > a DataStream into this table from Java/Scala, you should be able to see the > data in the SQL client as well. > > > I have tested the code as below: > > ``` > > CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); > final EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inStreamingMode().withCatalogStore(catalogStore).build(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > > tableEnv.executeSql("drop catalog if exists fscat"); > Map<String, String> catalogConf = new HashMap<>(); > catalogConf.put(CommonCatalogOptions.CATALOG_TYPE.key(), > TestFileSystemTableFactory.IDENTIFIER); > catalogConf.put(PATH.key(), BASE_PATH + "catalog/"); > catalogConf.put(CommonCatalogOptions.DEFAULT_DATABASE_KEY, "fsdb"); > tableEnv.createCatalog( > "fscat", CatalogDescriptor.of("fscat", > Configuration.fromMap(catalogConf))); > // or use ddl to create catalog > // tableEnv.executeSql( > // "CREATE CATALOG fscat with (" > // + "'type' = 'test-filesystem', " > // + "'path' = 'xxx', " > // + "'default-database' = 'fsdb')"); > > tableEnv.useCatalog("fscat"); > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); > > tableEnv.executeSql("drop table if exists fscat.fsdb.RandomNumbers"); > > DataStream<Integer> randomSource = > env.addSource( > new SourceFunction<Integer>() { > @Override > public void run(SourceContext<Integer> sourceContext) throws > Exception { > for (int i = 0; i < 100; i++) { > sourceContext.collect(i); > } > } > > @Override > public void cancel() {} > }); > > Table table = tableEnv.fromDataStream(randomSource); > randomSource.print(); > // should not register a temporal table > // tableEnv.registerTable("fscat.fsdb.RandomNumbers", table); > tableEnv.createTable( > "fscat.fsdb.RandomNumbers", > TableDescriptor.forConnector(TestFileSystemTableFactory.IDENTIFIER) > .schema(Schema.newBuilder().column("data", DataTypes.INT()).build()) > .option("path", BASE_PATH + "data/") > .option(FactoryUtil.FORMAT.key(), "csv") > .build()); > // or use ddl to create table > // tableEnv.executeSql( > // "CREATE TABLE fscat.fsdb.RandomNumbers (" > // + "data INT" > // + ") WITH (" > // + " 'connector' = 'test-filesystem'," > // + " 'path' = 'xxx'," > // + " 'format' = 'csv'" > // + ")"); > table.executeInsert("fscat.fsdb.RandomNumbers").await(); > > System.err.println("======="); > tableEnv.executeSql("select * from fscat.fsdb.RandomNumbers").print(); > > env.execute("Flink Random Sequence Generator environment: TEST"); > > ``` > > When using `./sql-client.sh -Dtable.catalog-store.kind=file > -Dtable.catalog-store.file.path=xxx` to start the sql client, you can find > the table finally. > > ``` > > Flink SQL> show catalogs; > +-----------------+ > | catalog name | > +-----------------+ > | default_catalog | > | fscat | > +-----------------+ > 2 rows in set > > Flink SQL> use catalog fscat; > [INFO] Execute statement succeeded. > > Flink SQL> show tables; > +---------------+ > | table name | > +---------------+ > | RandomNumbers | > +---------------+ > 1 row in set > > ``` > > Please do not forget to package all classes > about TestFileSystemCatalogFactory, TestFileSystemCatalog > and TestFileSystemTableFactory as a jar and add the SPI file for > TestFileSystemCatalogFactory and TestFileSystemTableFactory, and then copy > this jar under the dir `lib`. > > > -- > Best! > Xuyang > > > At 2025-01-07 23:56:18, "Vinay Agarwal" <vink...@gmail.com> wrote: > > Still the same. > > $ sql-client.sh -Dtable.catalog-store.kind=file > -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/ > > ______ _ _ _ _____ ____ _ _____ _ _ _ > BETA > | ____| (_) | | / ____|/ __ \| | / ____| (_) | | > | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ > | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| > | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ > |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| > > Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to > exit. > > Command history file path: /Users/vagarwal/.flink-sql-history > > Flink SQL> show catalogs; > +-----------------+ > | catalog name | > +-----------------+ > | default_catalog | > +-----------------+ > 1 row in set > > Flink SQL> show databases; > +------------------+ > | database name | > +------------------+ > | default_database | > +------------------+ > 1 row in set > > Flink SQL> show tables; > Empty set > > Flink SQL> show jobs; > +----------------------------------+----------------------------------------------------+---------+-------------------------+ > | job id | > job name | status | start time | > +----------------------------------+----------------------------------------------------+---------+-------------------------+ > | 663f71ec8246a7b7f8023dba3acfc5fe | Flink Random Sequence Generator > environment: local | RUNNING | 2025-01-07T15:51:37.160 | > +----------------------------------+----------------------------------------------------+---------+-------------------------+ > 1 row in set > > > On Mon, Jan 6, 2025 at 6:00 PM Xuyang <xyzhong...@163.com> wrote: > >> What about using "./bin/start-cluster.sh -Dtable.catalog-store.kind=file >> -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/" to start >> sql client? >> >> >> -- >> Best! >> Xuyang >> >> >> 在 2025-01-07 00:30:19,"Vinay Agarwal" <vink...@gmail.com> 写道: >> >> Still doesn't show. >> >> ```SQL >> Flink SQL> set 'table.catalog-store.kind' = 'file'; >> [INFO] Execute statement succeeded. >> >> Flink SQL> >> > set 'table.catalog-store.file.path' = >> '/Users/vagarwal/tmp/flink/store/'; >> [INFO] Execute statement succeeded. >> >> Flink SQL> show catalogs; >> +-----------------+ >> | catalog name | >> +-----------------+ >> | default_catalog | >> +-----------------+ >> 1 row in set >> >> Flink SQL> show tables; >> Empty set >> >> Flink SQL> show databases; >> +------------------+ >> | database name | >> +------------------+ >> | default_database | >> +------------------+ >> 1 row in set >> ``` >> >> Thanks for all your help. >> Vinay >> >> On Mon, Jan 6, 2025 at 2:05 AM Xuyang <xyzhong...@163.com> wrote: >> >>> Can you try to use `SET` command[1] in sql client to set the configs >>> related catalog store? >>> >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-configuration >>> >>> [2] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#configure-catalog-store >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> 在 2025-01-05 05:59:22,"Vinay Agarwal" <vink...@gmail.com> 写道: >>> >>> I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the >>> documentation) and created a catalog store and a file system catalog. But >>> still don't see my catalog or table in SQL client. My code is >>> >>> CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); >>> final EnvironmentSettings settings = EnvironmentSettings.newInstance() >>> .inStreamingMode() >>> .withCatalogStore(catalogStore) >>> .build(); >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, >>> settings); >>> TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + >>> "catalog/", "fscat", "fsdb"); >>> tableEnv.registerCatalog("fscat", fsCatalog); >>> tableEnv.useCatalog("fscat"); >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); >>> >>> DataStream<RandomNumber> randomSource = env.addSource(new >>> RandomNumberSource()); >>> Table table = tableEnv.fromDataStream(randomSource); >>> randomSource.print(); >>> tableEnv.registerTable("fscat.fsdb.RandomNumbers", table); >>> >>> env.execute("Flink Random Sequence Generator environment: " + environment); >>> >>> >>> SQL Client >>> ```SQL >>> Flink SQL> show catalogs; >>> +-----------------+ >>> | catalog name | >>> +-----------------+ >>> | default_catalog | >>> +-----------------+ >>> 1 row in set >>> >>> Flink SQL> show databases; >>> +------------------+ >>> | database name | >>> +------------------+ >>> | default_database | >>> +------------------+ >>> 1 row in set >>> >>> Flink SQL> show tables; >>> Empty set >>> ``` >>> >>> I thought setting the catalogstore in the SQL client might help but I >>> can't find a way to do that. >>> >>> Thanks, >>> Vinay >>> >>> On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote: >>> >>>> How about porting the `TestFileSystemCatalogFactory` back to 1.19 and >>>> rebuild this catalog jar? >>>> >>>> >>>> -- >>>> Best! >>>> Xuyang >>>> >>>> >>>> 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道: >>>> >>>> Thanks again for your answer. I have to use Flink version 1.20.0 >>>> because `TestFileSystemCatalogFactory` doesn't exist in prior versions. >>>> Unfortunately, I am not able to run 1.20.0 due to the following error. >>>> (Version 1.91.1 works just fine.) >>>> >>>> ``` >>>> 14:19:47.094 [main] INFO >>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system >>>> started at pekko://flink >>>> 14:19:47.104 [main] INFO >>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Trying to start >>>> local actor system >>>> 14:19:47.113 [flink-metrics-8] INFO >>>> org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started >>>> 14:19:47.119 [main] INFO >>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system >>>> started at pekko://flink-metrics >>>> 14:19:47.128 [main] INFO >>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcService - Starting RPC >>>> endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at >>>> pekko://flink-metrics/user/rpc/MetricQueryService . >>>> 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN >>>> org.apache.flink.runtime.rpc.pekko.SupervisorActor - RpcActor >>>> pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it >>>> down now. >>>> java.lang.NullPointerException: null >>>> at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) >>>> at java.base/java.util.HashMap.putAll(HashMap.java:781) >>>> at >>>> java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604) >>>> at >>>> ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197) >>>> at org.slf4j.MDC.setContextMap(MDC.java:264) >>>> at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48) >>>> at >>>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208) >>>> at >>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) >>>> at >>>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) >>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>>> at >>>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) >>>> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) >>>> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) >>>> at >>>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) >>>> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) >>>> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) >>>> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) >>>> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) >>>> at >>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>>> at >>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>> at java.base/java.lang.Thread.run(Thread.java:829) >>>> ``` >>>> How can I get around this error? >>>> >>>> Thanks again. >>>> Vinay >>>> >>>> On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote: >>>> >>>>> As far as I know, it's not possible to do without using the Catalog. >>>>> >>>>> Oh... I forgot that the FileSystemCatalog is currently only used for >>>>> testing[1]. One approach is to implement and package a FileSystemCatalog >>>>> by >>>>> refering this testing catalog. >>>>> >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java >>>>> >>>>> >>>>> -- >>>>> Best! >>>>> Xuyang >>>>> >>>>> >>>>> 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道: >>>>> >>>>> Thanks Xuyang for your answer. I, however, can't find a way to create >>>>> a filesystem catalog except by using Hive which seems to require an >>>>> external Hive installation. I was hoping to find a solution that doesn't >>>>> require external dependencies. Is that possible? >>>>> >>>>> Vinay >>>>> >>>>> On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote: >>>>> >>>>>> Hi. >>>>>> >>>>>> In Java/Scala code, using `registerTable` and then querying in the >>>>>> SQL client will not work, as registerTable only creates a temporal table. >>>>>> >>>>>> To fulfill your requirements, you may need to use a persistent >>>>>> catalog[1] (such as a filesystem catalog) where you can create tables in >>>>>> your Java/Scala code. Once the table is created under that catalog, you >>>>>> can >>>>>> then query it from the SQL client within the same catalog[2]. For >>>>>> example: >>>>>> >>>>>> To avoid creating and using the same catalog twice in both Java/Scala >>>>>> code and the SQL client, you can leverage a catalog store(In higher >>>>>> versions only)[3]. >>>>>> >>>>>> [1] >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python >>>>>> >>>>>> [2] >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl >>>>>> >>>>>> [3] >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store >>>>>> >>>>>> >>>>>> -- >>>>>> Best! >>>>>> Xuyang >>>>>> >>>>>> >>>>>> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> I am looking for an elaboration, or work around, of an old answer to >>>>>> this question here ( >>>>>> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql) >>>>>> which mentions that registering the table can make it visible to the SQL >>>>>> client. >>>>>> >>>>>> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, >>>>>> put its `bin` folder in path, and started the cluster using >>>>>> `start-cluster.sh`. >>>>>> >>>>>> I created a simple Flink job that generates stream of random integers >>>>>> and creates a table as follows: >>>>>> >>>>>> ```Java >>>>>> final StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> StreamTableEnvironment tableEnv = >>>>>> StreamTableEnvironment.create(env); >>>>>> // Create a DataStream of random integers >>>>>> DataStream<Integer> intStream = env.addSource(new >>>>>> RandomIntegerSource()); >>>>>> // Convert DataStream to Table >>>>>> Table table = tableEnv.fromDataStream(intStream, "number"); >>>>>> // Register the table >>>>>> tableEnv.registerTable("RandomNumbers", table); >>>>>> // Execute the job >>>>>> env.execute("Random Integer Job"); >>>>>> >>>>>> ``` >>>>>> Then, I started `sql-client.sh` and queried as follows >>>>>> ```SQL >>>>>> Flink SQL> show catalogs; >>>>>> +-----------------+ >>>>>> | catalog name | >>>>>> +-----------------+ >>>>>> | default_catalog | >>>>>> +-----------------+ >>>>>> 1 row in set >>>>>> >>>>>> Flink SQL> USE CATALOG default_catalog; >>>>>> [INFO] Execute statement succeed. >>>>>> >>>>>> Flink SQL> SHOW databases; >>>>>> +------------------+ >>>>>> | database name | >>>>>> +------------------+ >>>>>> | default_database | >>>>>> +------------------+ >>>>>> 1 row in set >>>>>> >>>>>> Flink SQL> USE default_database; >>>>>> [INFO] Execute statement succeed. >>>>>> >>>>>> Flink SQL> SHOW tables; >>>>>> Empty set >>>>>> ``` >>>>>> As can be seen, I don't see `RandomNumbers` table. >>>>>> >>>>>> How can I make that table visible to SQL client? >>>>>> >>>>>>