Hi all Trying to stitch together the below.
I first create a table in flink, backed by a hive catalog, data sourced from kafka, as per below, The below create table worked... First, for reference ts is a bigint value representing the timestamp of the event. CREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north ( ts BIGINT, metadata ROW< siteId INTEGER, deviceId INTEGER, sensorId INTEGER, unit STRING, ts_human STRING, location ROW< latitude DOUBLE, longitude DOUBLE >, deviceType STRING >, measurement DOUBLE, event_time AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'factory_iot_north', 'properties.bootstrap.servers' = 'broker:9092', 'properties.group.id' = 'devlab0', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); Note, if I run a select I'm NOT getting any data returned... It does give me a table view of the to be returned columns, namely: op, ts, metadata and event_time. Below I want to now create an unnested version of the above. My thinking at this stage was to keep the data inside flink, as there is more i want to do with it still. I will also be creating a output table to stream the below table data reshaped into a prometheus tsdb. part of that is to use the below to also do some aggregation, based on windows and puth that to prometheus, Then both/lastly the source and aggregated data will be pushed into fluss and down into paimon on HDFS. SET 'pipeline.name' = 'Factory_iot_north Unnested'; CREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north_unnested ( ts BIGINT, siteId INTEGER, deviceId INTEGER, sensorId INTEGER, unit STRING, ts_human STRING, latitude DOUBLE, longitude DOUBLE, deviceType STRING, measurement DOUBLE, event_time AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) ; The above is failing, because it's I think telling me it does not know how where to store/manage it. Once the above works, we insert into the unnested table all records from the original, first table created. INSERT INTO hive_catalog.iot.factory_iot_north_unnested SELECT ts, metadata.siteId AS siteId, metadata.deviceId AS deviceId, metadata.sensorId AS sensorId, metadata.unit AS unit, metadata.ts_human AS ts_human, metadata.location.latitude AS latitude, metadata.location.longitude AS longitude, metadata.deviceType AS deviceType, measurement FROM hive_catalog.iot.factory_iot_north; This will be my source tables, after this we will push the above 6 tables into fluss. nested and unnested. Might have to consider moving the unnested table and the aggregated tables etc all to fluss backed. *Current errors...* Flink SQL> CREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north_unnested ( > ts BIGINT, > siteId INTEGER, > deviceId INTEGER, > sensorId INTEGER, > unit STRING, > ts_human STRING, > latitude DOUBLE, > longitude DOUBLE, > deviceType STRING, > measurement DOUBLE, > event_time AS TO_TIMESTAMP_LTZ(ts, 3), > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND > ) ; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.table.gateway.api.utils.SqlGatewayException: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:91) at org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) at org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:88) at java.base/java.util.Optional.ifPresent(Unknown Source) at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:85) at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:50) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:233) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:70) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:231) at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:89) ... 48 more Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation 16b9e47b-badb-4a7c-93bb-975e192aa346. at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more Caused by: org.apache.flink.table.api.TableException: Could not execute CreateTable in path `hive_catalog`.`iot`.`factory_iot_north_unnested` at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1375) at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:1005) at org.apache.flink.table.operations.ddl.CreateTableOperation.execute(CreateTableOperation.java:86) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1102) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:687) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:522) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:243) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:199) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:214) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) ... 7 more Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. Therefore, Flink assumes a managed table. However, a managed table factory that implements org.apache.flink.table.factories.ManagedTableFactory is not in the classpath. at org.apache.flink.table.factories.FactoryUtil.discoverManagedTableFactory(FactoryUtil.java:860) at org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory(ManagedTableFactory.java:66) at org.apache.flink.table.catalog.ManagedTableListener.enrichOptions(ManagedTableListener.java:167) at org.apache.flink.table.catalog.ManagedTableListener.notifyTableCreation(ManagedTableListener.java:66) at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$18(CatalogManager.java:1009) at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1369) ... 17 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:644) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$6(RestClient.java:628) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!