Still the same. $ sql-client.sh -Dtable.catalog-store.kind=file -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/
______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /Users/vagarwal/.flink-sql-history Flink SQL> show catalogs; +-----------------+ | catalog name | +-----------------+ | default_catalog | +-----------------+ 1 row in set Flink SQL> show databases; +------------------+ | database name | +------------------+ | default_database | +------------------+ 1 row in set Flink SQL> show tables; Empty set Flink SQL> show jobs; +----------------------------------+----------------------------------------------------+---------+-------------------------+ | job id | job name | status | start time | +----------------------------------+----------------------------------------------------+---------+-------------------------+ | 663f71ec8246a7b7f8023dba3acfc5fe | Flink Random Sequence Generator environment: local | RUNNING | 2025-01-07T15:51:37.160 | +----------------------------------+----------------------------------------------------+---------+-------------------------+ 1 row in set On Mon, Jan 6, 2025 at 6:00 PM Xuyang <xyzhong...@163.com> wrote: > What about using "./bin/start-cluster.sh -Dtable.catalog-store.kind=file > -Dtable.catalog-store.file.path=/Users/vagarwal/tmp/flink/store/" to start > sql client? > > > -- > Best! > Xuyang > > > 在 2025-01-07 00:30:19,"Vinay Agarwal" <vink...@gmail.com> 写道: > > Still doesn't show. > > ```SQL > Flink SQL> set 'table.catalog-store.kind' = 'file'; > [INFO] Execute statement succeeded. > > Flink SQL> > > set 'table.catalog-store.file.path' = '/Users/vagarwal/tmp/flink/store/'; > [INFO] Execute statement succeeded. > > Flink SQL> show catalogs; > +-----------------+ > | catalog name | > +-----------------+ > | default_catalog | > +-----------------+ > 1 row in set > > Flink SQL> show tables; > Empty set > > Flink SQL> show databases; > +------------------+ > | database name | > +------------------+ > | default_database | > +------------------+ > 1 row in set > ``` > > Thanks for all your help. > Vinay > > On Mon, Jan 6, 2025 at 2:05 AM Xuyang <xyzhong...@163.com> wrote: > >> Can you try to use `SET` command[1] in sql client to set the configs >> related catalog store? >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-configuration >> >> [2] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#configure-catalog-store >> >> >> -- >> Best! >> Xuyang >> >> >> 在 2025-01-05 05:59:22,"Vinay Agarwal" <vink...@gmail.com> 写道: >> >> I got Flink 1.20.0 to work (tweaked my `build.gradle` based on the >> documentation) and created a catalog store and a file system catalog. But >> still don't see my catalog or table in SQL client. My code is >> >> CatalogStore catalogStore = new FileCatalogStore(BASE_PATH + "store/"); >> final EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .inStreamingMode() >> .withCatalogStore(catalogStore) >> .build(); >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, >> settings); >> TestFileSystemCatalog fsCatalog = new TestFileSystemCatalog(BASE_PATH + >> "catalog/", "fscat", "fsdb"); >> tableEnv.registerCatalog("fscat", fsCatalog); >> tableEnv.useCatalog("fscat"); >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS fsdb"); >> >> DataStream<RandomNumber> randomSource = env.addSource(new >> RandomNumberSource()); >> Table table = tableEnv.fromDataStream(randomSource); >> randomSource.print(); >> tableEnv.registerTable("fscat.fsdb.RandomNumbers", table); >> >> env.execute("Flink Random Sequence Generator environment: " + environment); >> >> >> SQL Client >> ```SQL >> Flink SQL> show catalogs; >> +-----------------+ >> | catalog name | >> +-----------------+ >> | default_catalog | >> +-----------------+ >> 1 row in set >> >> Flink SQL> show databases; >> +------------------+ >> | database name | >> +------------------+ >> | default_database | >> +------------------+ >> 1 row in set >> >> Flink SQL> show tables; >> Empty set >> ``` >> >> I thought setting the catalogstore in the SQL client might help but I >> can't find a way to do that. >> >> Thanks, >> Vinay >> >> On Thu, Jan 2, 2025 at 7:08 PM Xuyang <xyzhong...@163.com> wrote: >> >>> How about porting the `TestFileSystemCatalogFactory` back to 1.19 and >>> rebuild this catalog jar? >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> 在 2025-01-03 06:23:25,"Vinay Agarwal" <vink...@gmail.com> 写道: >>> >>> Thanks again for your answer. I have to use Flink version 1.20.0 because >>> `TestFileSystemCatalogFactory` doesn't exist in prior versions. >>> Unfortunately, I am not able to run 1.20.0 due to the following error. >>> (Version 1.91.1 works just fine.) >>> >>> ``` >>> 14:19:47.094 [main] INFO >>> org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system >>> started at pekko://flink >>> 14:19:47.104 [main] INFO >>> org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Trying to start >>> local actor system >>> 14:19:47.113 [flink-metrics-8] INFO >>> org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started >>> 14:19:47.119 [main] INFO >>> org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system >>> started at pekko://flink-metrics >>> 14:19:47.128 [main] INFO >>> org.apache.flink.runtime.rpc.pekko.PekkoRpcService - Starting RPC >>> endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at >>> pekko://flink-metrics/user/rpc/MetricQueryService . >>> 14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN >>> org.apache.flink.runtime.rpc.pekko.SupervisorActor - RpcActor >>> pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it >>> down now. >>> java.lang.NullPointerException: null >>> at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) >>> at java.base/java.util.HashMap.putAll(HashMap.java:781) >>> at >>> java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604) >>> at >>> ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197) >>> at org.slf4j.MDC.setContextMap(MDC.java:264) >>> at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48) >>> at >>> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208) >>> at >>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) >>> at >>> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) >>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>> at >>> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) >>> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) >>> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) >>> at >>> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) >>> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) >>> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) >>> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) >>> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> at java.base/java.lang.Thread.run(Thread.java:829) >>> ``` >>> How can I get around this error? >>> >>> Thanks again. >>> Vinay >>> >>> On Wed, Jan 1, 2025 at 5:58 PM Xuyang <xyzhong...@163.com> wrote: >>> >>>> As far as I know, it's not possible to do without using the Catalog. >>>> >>>> Oh... I forgot that the FileSystemCatalog is currently only used for >>>> testing[1]. One approach is to implement and package a FileSystemCatalog by >>>> refering this testing catalog. >>>> >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java >>>> >>>> >>>> -- >>>> Best! >>>> Xuyang >>>> >>>> >>>> 在 2025-01-01 00:31:43,"Vinay Agarwal" <vink...@gmail.com> 写道: >>>> >>>> Thanks Xuyang for your answer. I, however, can't find a way to create >>>> a filesystem catalog except by using Hive which seems to require an >>>> external Hive installation. I was hoping to find a solution that doesn't >>>> require external dependencies. Is that possible? >>>> >>>> Vinay >>>> >>>> On Thu, Dec 26, 2024 at 10:30 PM Xuyang <xyzhong...@163.com> wrote: >>>> >>>>> Hi. >>>>> >>>>> In Java/Scala code, using `registerTable` and then querying in the SQL >>>>> client will not work, as registerTable only creates a temporal table. >>>>> >>>>> To fulfill your requirements, you may need to use a persistent >>>>> catalog[1] (such as a filesystem catalog) where you can create tables in >>>>> your Java/Scala code. Once the table is created under that catalog, you >>>>> can >>>>> then query it from the SQL client within the same catalog[2]. For example: >>>>> >>>>> To avoid creating and using the same catalog twice in both Java/Scala >>>>> code and the SQL client, you can leverage a catalog store(In higher >>>>> versions only)[3]. >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python >>>>> >>>>> [2] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl >>>>> >>>>> [3] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store >>>>> >>>>> >>>>> -- >>>>> Best! >>>>> Xuyang >>>>> >>>>> >>>>> At 2024-12-27 01:28:38, "Vinay Agarwal" <vink...@gmail.com> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I am looking for an elaboration, or work around, of an old answer to >>>>> this question here ( >>>>> https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql) >>>>> which mentions that registering the table can make it visible to the SQL >>>>> client. >>>>> >>>>> I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, >>>>> put its `bin` folder in path, and started the cluster using >>>>> `start-cluster.sh`. >>>>> >>>>> I created a simple Flink job that generates stream of random integers >>>>> and creates a table as follows: >>>>> >>>>> ```Java >>>>> final StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> StreamTableEnvironment tableEnv = >>>>> StreamTableEnvironment.create(env); >>>>> // Create a DataStream of random integers >>>>> DataStream<Integer> intStream = env.addSource(new >>>>> RandomIntegerSource()); >>>>> // Convert DataStream to Table >>>>> Table table = tableEnv.fromDataStream(intStream, "number"); >>>>> // Register the table >>>>> tableEnv.registerTable("RandomNumbers", table); >>>>> // Execute the job >>>>> env.execute("Random Integer Job"); >>>>> >>>>> ``` >>>>> Then, I started `sql-client.sh` and queried as follows >>>>> ```SQL >>>>> Flink SQL> show catalogs; >>>>> +-----------------+ >>>>> | catalog name | >>>>> +-----------------+ >>>>> | default_catalog | >>>>> +-----------------+ >>>>> 1 row in set >>>>> >>>>> Flink SQL> USE CATALOG default_catalog; >>>>> [INFO] Execute statement succeed. >>>>> >>>>> Flink SQL> SHOW databases; >>>>> +------------------+ >>>>> | database name | >>>>> +------------------+ >>>>> | default_database | >>>>> +------------------+ >>>>> 1 row in set >>>>> >>>>> Flink SQL> USE default_database; >>>>> [INFO] Execute statement succeed. >>>>> >>>>> Flink SQL> SHOW tables; >>>>> Empty set >>>>> ``` >>>>> As can be seen, I don't see `RandomNumbers` table. >>>>> >>>>> How can I make that table visible to SQL client? >>>>> >>>>>