Hi again, To be extra sure it isn't something related to scala I've created the same example in Java:
import java.math.BigDecimal; > import java.time.Instant; > import java.time.temporal.ChronoUnit; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.annotation.DataTypeHint; > import org.apache.flink.table.api.*; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.types.DataType; > > > public class ConvertBetweenDataStreamAndTable { > > public static class Transfer { > public String transferId; > @DataTypeHint(defaultDecimalScale = 18, defaultDecimalPrecision = > 38) > public BigDecimal amount; > public String customerId; > > public Transfer(String transferId, BigDecimal amount, String > customerId) { > this.transferId = transferId; > this.amount = amount; > this.customerId = customerId; > } > public Transfer() { } > } > > public static class Metadata { > public Instant createdAt; > > public Metadata(Instant createdAt) { > this.createdAt = createdAt; > } > > public Metadata() {} > } > > public static class TransferEvent { > public Transfer data; > public Metadata meta; > > public TransferEvent(Transfer data, Metadata meta) { > this.data = data; > this.meta = meta; > } > > public TransferEvent() { > } > } > > > public static void main(String[] args) throws Exception { > new ConvertBetweenDataStreamAndTable().workflow(); > } > > public void workflow() throws Exception { > Instant t1 = Instant.now().minus(1000, ChronoUnit.SECONDS); > Instant t2 = Instant.now().minus(500, ChronoUnit.SECONDS); > Instant t3 = Instant.now().minus(200, ChronoUnit.SECONDS); > > TransferEvent transfer1 = new TransferEvent(new > Transfer("transfer1", BigDecimal.valueOf(100), "customer1"), new > Metadata(t1)); > TransferEvent transfer2 = new TransferEvent(new > Transfer("transfer2", BigDecimal.valueOf(50), "customer2"), new > Metadata(t2)); > TransferEvent transfer3 = new TransferEvent(new > Transfer("transfer3", BigDecimal.valueOf(10), "customer1"), new > Metadata(t3)); > > DataType dataType = DataTypes.ROW( > DataTypes.FIELD("data", > DataTypes.ROW(DataTypes.FIELD("transferId", > DataTypes.STRING()), > DataTypes.FIELD("amount", > DataTypes.DECIMAL(38, 10)), > DataTypes.FIELD("customerId", > DataTypes.STRING())) > ), > DataTypes.FIELD("meta", DataTypes.ROW( > DataTypes.FIELD("createdAt", > DataTypes.TIMESTAMP_LTZ(3)) > )) > ); > > Schema schema = > Schema.newBuilder().fromRowDataType(dataType).build(); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > > TypeInformation<TransferEvent> of = > TypeInformation.of(TransferEvent.class); > SingleOutputStreamOperator<TransferEvent> transfers = > env.fromElements(transfer1, transfer2, transfer3).returns(of); > Table table = tableEnv.fromDataStream(transfers, schema); > tableEnv.createTemporaryView("transfers", table); > > Table x = tableEnv.sqlQuery("select data, meta from transfers"); > > DataStream<TransferEvent> objectDataStream = > tableEnv.toDataStream(x, DataTypes.of(TransferEvent.class)); > objectDataStream.print(); > env.execute(); > } > } > Same exact exception: SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.java.ClosureCleaner > (file:/Users/andre.midea/Library/Caches/Coursier/v1/https/ > repo1.maven.org/maven2/org/apache/flink/flink-core/1.15.2/flink-core-1.15.2.jar) > to field java.lang.String.value > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release > /* 1 */public class > flink_examples$java$datastream_table$ConvertBetweenDataStreamAndTable$TransferEvent$1$Converter > implements org.apache.flink.table.data.conversion.DataStructureConverter { > /* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] > fieldGetters; > /* 3 */ private final > org.apache.flink.table.data.conversion.DataStructureConverter[] > fieldConverters; > /* 4 */ public > flink_examples$java$datastream_table$ConvertBetweenDataStreamAndTable$TransferEvent$1$Converter(org.apache.flink.table.data.RowData.FieldGetter[] > fieldGetters, > org.apache.flink.table.data.conversion.DataStructureConverter[] > fieldConverters) { > /* 5 */ this.fieldGetters = fieldGetters; > /* 6 */ this.fieldConverters = fieldConverters; > /* 7 */ } > /* 8 */ public java.lang.Object toInternal(java.lang.Object o) { > /* 9 */ final > flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent > external = > (flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent) > o; > /* 10 */ final org.apache.flink.table.data.GenericRowData > genericRow = new org.apache.flink.table.data.GenericRowData(2); > /* 11 */ genericRow.setField(0, > fieldConverters[0].toInternalOrNull(((org.apache.flink.types.Row) > external.data))); > /* 12 */ genericRow.setField(1, > fieldConverters[1].toInternalOrNull(((org.apache.flink.types.Row) > external.meta))); > /* 13 */ return genericRow; > /* 14 */ } > /* 15 */ public java.lang.Object toExternal(java.lang.Object o) { > /* 16 */ final org.apache.flink.table.data.RowData internal = > (org.apache.flink.table.data.RowData) o; > /* 17 */ final > flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent > structured = new > flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent(); > /* 18 */ structured.data = > ((flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.Transfer) > fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))); > /* 19 */ structured.meta = > ((flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.Metadata) > fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))); > /* 20 */ return structured; > /* 21 */ } > /* 22 */} > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > at akka.dispatch.OnComplete.internal(Future.scala:300) > at akka.dispatch.OnComplete.internal(Future.scala:297) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) > at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) > at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) > at jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > ... 5 more > Caused by: org.apache.flink.table.api.TableException: Error while > generating structured type converter. > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89) > at > org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46) > at > org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80) > ... 12 more > Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 13 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) > ... 16 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 11, Column > 106: Cannot cast > "flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable$Transfer" > to "org.apache.flink.types.Row" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396) > at org.codehaus.janino.Java$Cast.accept(Java.java:4898) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057) > at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400) > at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924) > at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400) > at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104) > ... 22 more > > Process finished with exit code 1 > On Wed, Oct 19, 2022 at 11:08 AM André Midea Jasiskis <midea.an...@gmail.com> wrote: > Hi all, > > I would like to interchangeably convert between Tables and DataStreams > with the ability to specify what are the types used in the conversion from > a POJO to a Table and vice-versa. > > To convert from a POJO to a table I'm creating a Schema using a ROW-based > DataType, and to convert from a Table to a DataStream I'm using a > StructuredDataType. > > The issue is that I get an exception when converting the table back to a > datastream: > >> >> Caused by: org.apache.flink.api.common.InvalidProgramException: Table >> program cannot be compiled. This is a bug. Please file an issue. >> ... >> Caused by: org.codehaus.commons.compiler.CompileException: Line 11, >> Column 106: Cannot cast "flink_examples.datastream_table.Transfer" to >> "org.apache.flink.types.Row" >> > > I tried many different ways to get this to work, but I can never get a > solution that works for all cases. The example below works just fine if the > source for my table is a Table Connector (e.g: kafka) but not when creating > the table from a DataStream (which we use for testing purposes). > > The full code and exception are as follows: > > case class Transfer(var transferId: String, var amount: math.BigDecimal, >> var customerId: String) { >> def this() = this(null,null,null) >> } >> >> case class Metadata(var createdAt: Instant) { >> def this() = this(null) >> } >> >> case class TransferEvent(var data: Transfer, var meta: Metadata) { >> def this() = this(null, null) >> } >> object ConvertBetweenDataStreamAndTable: >> val t1 = Instant.now().minus(1000, ChronoUnit.SECONDS) >> val t2 = Instant.now().minus(500, ChronoUnit.SECONDS) >> val t3 = Instant.now().minus(200, ChronoUnit.SECONDS) >> >> val transfer1 = TransferEvent(Transfer("transfer1", >> math.BigDecimal.valueOf(100), "customer1"), Metadata(t1)) >> val transfer2 = TransferEvent(Transfer("transfer2", >> math.BigDecimal.valueOf(50), "customer2"), Metadata(t2)) >> val transfer3 = TransferEvent(Transfer("transfer3", >> math.BigDecimal.valueOf(10), "customer1"), Metadata(t3)) >> >> val dataTypeStructured = DataTypes.STRUCTURED( >> classOf[TransferEvent], >> DataTypes.FIELD("data", >> DataTypes.ROW(DataTypes.FIELD("transferId", DataTypes.STRING()), >> DataTypes.FIELD("amount", DataTypes.DECIMAL(38, 10)), >> DataTypes.FIELD("customerId", DataTypes.STRING())) >> ), >> DataTypes.FIELD("meta", DataTypes.ROW( >> DataTypes.FIELD("createdAt", DataTypes.TIMESTAMP_LTZ(3)) >> )) >> ) >> val dataTypeRow = DataTypes.ROW( >> DataTypes.FIELD("data", >> DataTypes.ROW(DataTypes.FIELD("transferId", DataTypes.STRING()), >> DataTypes.FIELD("amount", DataTypes.DECIMAL(38, 10)), >> DataTypes.FIELD("customerId", DataTypes.STRING())) >> ), >> DataTypes.FIELD("meta", DataTypes.ROW( >> DataTypes.FIELD("createdAt", DataTypes.TIMESTAMP_LTZ(3)) >> )) >> ) >> >> val schema = Schema.newBuilder().fromRowDataType(dataTypeRow).build() >> >> >> def workflow() = >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val tEnv = StreamTableEnvironment.create(env) >> >> val flinkType = TypeInformation.of(classOf[TransferEvent]) >> >> println(dataTypeRow) >> >> val ds: SingleOutputStreamOperator[TransferEvent] = >> env.fromElements(transfer1, transfer2, transfer3).returns(flinkType) >> val table = tEnv.fromDataStream(ds, schema) >> tEnv.createTemporaryView("transfers", table) >> >> tEnv.from("transfers").printSchema() >> >> val x = tEnv.sqlQuery("select data, meta from transfers") >> >> val tableAsDataStream: DataStream[TransferEvent] = >> tEnv.toDataStream(x, dataTypeStructured) >> tableAsDataStream.print() >> env.execute() >> >> end ConvertBetweenDataStreamAndTable >> >> @main >> def main() = >> ConvertBetweenDataStreamAndTable.workflow() > > > > Exception: > > ROW<`data` ROW<`transferId` STRING, `amount` DECIMAL(38, 10), `customerId` >> STRING>, `meta` ROW<`createdAt` TIMESTAMP_LTZ(3)>> >> ( >> `data` ROW<`transferId` STRING, `amount` DECIMAL(38, 10), `customerId` >> STRING>, >> `meta` ROW<`createdAt` TIMESTAMP_LTZ(3)> >> ) >> /* 1 */public class >> flink_examples$datastream_table$TransferEvent$3$Converter implements >> org.apache.flink.table.data.conversion.DataStructureConverter { >> /* 2 */ private final >> org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters; >> /* 3 */ private final >> org.apache.flink.table.data.conversion.DataStructureConverter[] >> fieldConverters; >> /* 4 */ public >> flink_examples$datastream_table$TransferEvent$3$Converter(org.apache.flink.table.data.RowData.FieldGetter[] >> fieldGetters, >> org.apache.flink.table.data.conversion.DataStructureConverter[] >> fieldConverters) { >> /* 5 */ this.fieldGetters = fieldGetters; >> /* 6 */ this.fieldConverters = fieldConverters; >> /* 7 */ } >> /* 8 */ public java.lang.Object toInternal(java.lang.Object o) { >> /* 9 */ final flink_examples.datastream_table.TransferEvent >> external = (flink_examples.datastream_table.TransferEvent) o; >> /* 10 */ final org.apache.flink.table.data.GenericRowData >> genericRow = new org.apache.flink.table.data.GenericRowData(2); >> /* 11 */ genericRow.setField(0, >> fieldConverters[0].toInternalOrNull(((org.apache.flink.types.Row) >> external.data()))); >> /* 12 */ genericRow.setField(1, >> fieldConverters[1].toInternalOrNull(((org.apache.flink.types.Row) >> external.meta()))); >> /* 13 */ return genericRow; >> /* 14 */ } >> /* 15 */ public java.lang.Object toExternal(java.lang.Object o) { >> /* 16 */ final org.apache.flink.table.data.RowData internal = >> (org.apache.flink.table.data.RowData) o; >> /* 17 */ final flink_examples.datastream_table.TransferEvent >> structured = new flink_examples.datastream_table.TransferEvent(); >> /* 18 */ >> structured.data_$eq(((flink_examples.datastream_table.Transfer) >> fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal)))); >> /* 19 */ >> structured.meta_$eq(((flink_examples.datastream_table.Metadata) >> fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal)))); >> /* 20 */ return structured; >> /* 21 */ } >> /* 22 */} >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Error while generating structured type converter. >> at >> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89) >> at >> org.apache.flink.table.runtime.typeutils.ExternalSerializer.initializeConverter(ExternalSerializer.java:217) >> at >> org.apache.flink.table.runtime.typeutils.ExternalSerializer.<init>(ExternalSerializer.java:78) >> at >> org.apache.flink.table.runtime.typeutils.ExternalSerializer.of(ExternalSerializer.java:93) >> at >> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.createExternalTypeSerializer(ExternalTypeInfo.java:97) >> at >> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.of(ExternalTypeInfo.java:67) >> at >> org.apache.flink.table.planner.connectors.ExternalDynamicSink.lambda$getSinkRuntimeProvider$2(ExternalDynamicSink.java:115) >> at >> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:452) >> at >> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:193) >> at >> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:167) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:82) >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:81) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188) >> at >> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223) >> at >> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:218) >> at >> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:245) >> at >> flink_examples.datastream_table.ConvertBetweenDataStreamAndTable$.workflow(ConvertBetweenDataStreamAndTable.scala:81) >> at >> flink_examples.datastream_table.ConvertBetweenDataStreamAndTable$package$.main(ConvertBetweenDataStreamAndTable.scala:89) >> at >> flink_examples.datastream_table.main.main(ConvertBetweenDataStreamAndTable.scala:87) >> Caused by: org.apache.flink.util.FlinkRuntimeException: >> org.apache.flink.api.common.InvalidProgramException: Table program cannot >> be compiled. This is a bug. Please file an issue. >> at >> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) >> at >> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80) >> ... 29 more >> Caused by: >> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: >> org.apache.flink.api.common.InvalidProgramException: Table program cannot >> be compiled. This is a bug. Please file an issue. >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) >> at >> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) >> ... 30 more >> Caused by: org.apache.flink.api.common.InvalidProgramException: Table >> program cannot be compiled. This is a bug. Please file an issue. >> at >> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) >> at >> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) >> at >> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) >> ... 33 more >> Caused by: org.codehaus.commons.compiler.CompileException: Line 11, >> Column 106: Cannot cast "flink_examples.datastream_table.Transfer" to >> "org.apache.flink.types.Row" >> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) >> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051) >> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) >> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418) >> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396) >> at org.codehaus.janino.Java$Cast.accept(Java.java:4898) >> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) >> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057) >> at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409) >> at >> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400) >> at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924) >> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400) >> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396) >> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) >> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) >> at >> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) >> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) >> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) >> at >> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) >> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) >> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) >> at >> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) >> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) >> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) >> at >> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) >> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) >> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) >> at >> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) >> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) >> at >> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) >> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) >> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) >> at >> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) >> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) >> at >> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) >> at >> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) >> at >> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) >> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) >> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) >> at >> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) >> at >> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) >> at >> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) >> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) >> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) >> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) >> at >> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) >> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) >> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) >> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) >> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) >> at >> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104) >> ... 39 more >> >> Process finished with exit code 1 >> > > > I appreciate the help. > > Best, > Andre >