Hi all

Trying to stitch together the below.

I first create a table in flink, backed by a hive catalog, data sourced
from kafka, as per below,

The below create table worked...
First, for reference ts is a bigint value representing the timestamp of the
event.

CREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north (
ts BIGINT,
metadata ROW<
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
location ROW<
latitude DOUBLE,
longitude DOUBLE
>,
deviceType STRING
>,
measurement DOUBLE,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'factory_iot_north',
'properties.bootstrap.servers' = 'broker:9092',
'properties.group.id' = 'devlab0',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);

Note, if I run a select I'm NOT getting any data returned...

It does give me a table view of the to be returned columns, namely: op, ts,
metadata and event_time.

Below I want to now create an unnested version of the above.

My thinking at this stage was to keep the data inside flink, as there is
more i want to do with it still.

I will also be creating a output table to stream the below table data
reshaped into a prometheus tsdb.

part of that is to use the below to also do some aggregation, based on
windows and puth that to prometheus,

Then both/lastly the source and aggregated data will be pushed into fluss
and down into paimon on HDFS.

SET 'pipeline.name' = 'Factory_iot_north Unnested';

CREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north_unnested (
ts BIGINT,
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
latitude DOUBLE,
longitude DOUBLE,
deviceType STRING,
measurement DOUBLE,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) ;

The above is failing, because it's I think telling me it does not know how
where to store/manage it.

Once the above works, we insert into the unnested table all records from
the original, first table created.

INSERT INTO hive_catalog.iot.factory_iot_north_unnested
SELECT
ts,
metadata.siteId AS siteId,
metadata.deviceId AS deviceId,
metadata.sensorId AS sensorId,
metadata.unit AS unit,
metadata.ts_human AS ts_human,
metadata.location.latitude AS latitude,
metadata.location.longitude AS longitude,
metadata.deviceType AS deviceType,
measurement
FROM hive_catalog.iot.factory_iot_north;


This will be my source tables, after this we will push the above 6 tables
into fluss. nested and unnested.

Might have to consider moving the unnested table and the aggregated tables
etc all to fluss backed.

*Current errors...*

Flink SQL> CREATE OR REPLACE TABLE
hive_catalog.iot.factory_iot_north_unnested (
>     ts              BIGINT,
>     siteId          INTEGER,
>     deviceId        INTEGER,
>     sensorId        INTEGER,
>     unit            STRING,
>     ts_human        STRING,
>     latitude        DOUBLE,
>     longitude       DOUBLE,
>     deviceType      STRING,
>     measurement     DOUBLE,
>     event_time      AS TO_TIMESTAMP_LTZ(ts, 3),
>     WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error., <Exception on server side:
org.apache.flink.table.gateway.api.utils.SqlGatewayException:
org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
fetchResults.
at
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:91)
at
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
at
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:88)
at java.base/java.util.Optional.ifPresent(Unknown Source)
at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:85)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:50)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:233)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:70)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException:
Failed to fetchResults.
at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:231)
at
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:89)
... 48 more
Caused by:
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed
to execute the operation 16b9e47b-badb-4a7c-93bb-975e192aa346.
at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)
at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
... 1 more
Caused by: org.apache.flink.table.api.TableException: Could not execute
CreateTable in path `hive_catalog`.`iot`.`factory_iot_north_unnested`
at
org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1375)
at
org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:1005)
at
org.apache.flink.table.operations.ddl.CreateTableOperation.execute(CreateTableOperation.java:86)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1102)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:687)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:522)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:243)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:199)
at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:214)
at
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
... 7 more
Caused by: org.apache.flink.table.api.ValidationException: Table options do
not contain an option key 'connector' for discovering a connector.
Therefore, Flink assumes a managed table. However, a managed table factory
that implements org.apache.flink.table.factories.ManagedTableFactory is not
in the classpath.
at
org.apache.flink.table.factories.FactoryUtil.discoverManagedTableFactory(FactoryUtil.java:860)
at
org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory(ManagedTableFactory.java:66)
at
org.apache.flink.table.catalog.ManagedTableListener.enrichOptions(ManagedTableListener.java:167)
at
org.apache.flink.table.catalog.ManagedTableListener.notifyTableCreation(ManagedTableListener.java:66)
at
org.apache.flink.table.catalog.CatalogManager.lambda$createTable$18(CatalogManager.java:1009)
at
org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1369)
... 17 more

End of exception on server side>]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:644)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$6(RestClient.java:628)
at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)



-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to