How about porting the `TestFileSystemCatalogFactory` back to 1.19 and rebuild
this catalog jar?
--
Best!
Xuyang
在 2025-01-03 06:23:25,"Vinay Agarwal" <[email protected]> 写道:
Thanks again for your answer. I have to use Flink version 1.20.0 because
`TestFileSystemCatalogFactory` doesn't exist in prior versions. Unfortunately,
I am not able to run 1.20.0 due to the following error. (Version 1.91.1 works
just fine.)
```
14:19:47.094 [main] INFO
org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system started
at pekko://flink
14:19:47.104 [main] INFO
org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Trying to start
local actor system
14:19:47.113 [flink-metrics-8] INFO org.apache.pekko.event.slf4j.Slf4jLogger
- Slf4jLogger started
14:19:47.119 [main] INFO
org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils - Actor system started
at pekko://flink-metrics
14:19:47.128 [main] INFO org.apache.flink.runtime.rpc.pekko.PekkoRpcService -
Starting RPC endpoint for
org.apache.flink.runtime.metrics.dump.MetricQueryService at
pekko://flink-metrics/user/rpc/MetricQueryService .
14:19:47.142 [flink-metrics-pekko.actor.supervisor-dispatcher-9] WARN
org.apache.flink.runtime.rpc.pekko.SupervisorActor - RpcActor
pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down
now.
java.lang.NullPointerException: null
at java.base/java.util.HashMap.putMapEntries(HashMap.java:497)
at java.base/java.util.HashMap.putAll(HashMap.java:781)
at java.base/java.util.Collections$SynchronizedMap.putAll(Collections.java:2604)
at
ch.qos.logback.classic.util.LogbackMDCAdapter.setContextMap(LogbackMDCAdapter.java:197)
at org.slf4j.MDC.setContextMap(MDC.java:264)
at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
How can I get around this error?
Thanks again.
Vinay
On Wed, Jan 1, 2025 at 5:58 PM Xuyang <[email protected]> wrote:
As far as I know, it's not possible to do without using the Catalog.
Oh... I forgot that the FileSystemCatalog is currently only used for
testing[1]. One approach is to implement and package a FileSystemCatalog by
refering this testing catalog.
[1]
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
--
Best!
Xuyang
在 2025-01-01 00:31:43,"Vinay Agarwal" <[email protected]> 写道:
Thanks Xuyang for your answer. I, however, can't find a way to create a
filesystem catalog except by using Hive which seems to require an external Hive
installation. I was hoping to find a solution that doesn't require external
dependencies. Is that possible?
Vinay
On Thu, Dec 26, 2024 at 10:30 PM Xuyang <[email protected]> wrote:
Hi.
In Java/Scala code, using `registerTable` and then querying in the SQL client
will not work, as registerTable only creates a temporal table.
To fulfill your requirements, you may need to use a persistent catalog[1] (such
as a filesystem catalog) where you can create tables in your Java/Scala code.
Once the table is created under that catalog, you can then query it from the
SQL client within the same catalog[2]. For example:
To avoid creating and using the same catalog twice in both Java/Scala code and
the SQL client, you can leverage a catalog store(In higher versions only)[3].
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-java-scala-or-python
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#using-sql-ddl
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-store
--
Best!
Xuyang
At 2024-12-27 01:28:38, "Vinay Agarwal" <[email protected]> wrote:
Hello,
I am looking for an elaboration, or work around, of an old answer to this
question here
(https://stackoverflow.com/questions/56049472/create-sql-table-from-a-datastream-in-a-java-scala-program-and-query-it-from-sql)
which mentions that registering the table can make it visible to the SQL
client.
I am using Flink 1.8.1 with Java 11. I downloaded Flink 1.8.1 bundle, put its
`bin` folder in path, and started the cluster using `start-cluster.sh`.
I created a simple Flink job that generates stream of random integers and
creates a table as follows:
```Java
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create a DataStream of random integers
DataStream<Integer> intStream = env.addSource(new RandomIntegerSource());
// Convert DataStream to Table
Table table = tableEnv.fromDataStream(intStream, "number");
// Register the table
tableEnv.registerTable("RandomNumbers", table);
// Execute the job
env.execute("Random Integer Job");
```
Then, I started `sql-client.sh` and queried as follows
```SQL
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set
Flink SQL> USE CATALOG default_catalog;
[INFO] Execute statement succeed.
Flink SQL> SHOW databases;
+------------------+
| database name |
+------------------+
| default_database |
+------------------+
1 row in set
Flink SQL> USE default_database;
[INFO] Execute statement succeed.
Flink SQL> SHOW tables;
Empty set
```
As can be seen, I don't see `RandomNumbers` table.
How can I make that table visible to SQL client?