I have a job that joins 2 streams using the Table API: DataStream<FluentdMessage> stream = env.fromSource(dataSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<LogDataTable> logTableStream = env.fromSource(logTableSource, WatermarkStrategy.forMonotonousTimestamps(), "Log Table Source");
final Table tableA = tableEnv.fromDataStream(stream); tableEnv.createTemporaryView("LookupTable", logTableStream); final Table result = tableEnv.sqlQuery( "SELECT a.logId, MAX(t.insertTime) as insertTime, t.s3Path, a.message, a.hostname " + "FROM " + tableA + " AS a " + "JOIN LookupTable AS t " + "ON t.logId = a.logId " + "WHERE a.grokFailure = 'No grok pattern matched' " + "GROUP BY a.logId, t.s3Path, a.message, a.hostname" ); DataStream<JoinedRow> joinedStream = tableEnv.toDataStream(result, JoinedRow.class); The LookupTable will get periodic updates telling the query of the new s3Path location by logid. However it appears that Flink does not support grouping in this context. Anyone know of a solution? thanks! ``` org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_6' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[logId, s3Path, message, hostname], select=[logId, s3Path, message, hostname, MAX(insertTime) AS insertTime]) at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:372) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.client.program.PackagedProgram .invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2. 12-1.13.2.jar:1.13.2] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java: 114) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.client.deployment.application. DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.client.deployment.application. DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler .lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.12-1.13.2 .jar:1.13.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run( CompletableFuture.java:1604) [?:1.8.0_302] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) [?:1.8.0_302] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302 ] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_302] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_302] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) [?:1.8.0_302] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) [?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302] Caused by: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_6' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[logId, s3Path, message, hostname], select=[logId, s3Path, message, hostname, MAX(insertTime) AS insertTime]) ``` -- Robert Cullen 240-475-4490