Hi Dongwon, Can you provide a bit more information: which Flink version are you using? what is the "sourceTable.getSchema().toRowType()" return? what is the line *".map(a -> a)" *do and can you remove it? if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
As far as I know *".returns(sourceTable.getSchema().toRowType());"* only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. Thanks, Rong -- [1] https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hello, > > Consider the following snippet: > >> Table sourceTable = getKafkaSource0(tEnv); >> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class) >> >> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* >> stream.print(); >> > where sourceTable.printSchema() shows: > >> root >> |-- time1: TimeIndicatorTypeInfo(rowtime) > > > > This program returns the following exception: > >> Exception in thread "main" >> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >> at >> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >> at >> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >> at app.metatron.test.Main2.main(Main2.java:231) >> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be >> cast to java.lang.Long* >> * at >> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >> ... > > > The row serializer seems to try to deep-copy an instance of > java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. > Could anybody help me? > > Best, > > - Dongwon > > p.s. though removing .returns() makes everything okay, I need to do that > as I want to convert DataStream<Row> into another table later. > p.s. the source table is created as follows: > > private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { >> ConnectorDescriptor connectorDescriptor = new Kafka() >> .version("universal") >> .topic("mytopic") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "mygroup") >> .startFromEarliest(); >> FormatDescriptor formatDescriptor = new Csv() >> .deriveSchema() >> .ignoreParseErrors() >> .fieldDelimiter(','); >> Schema schemaDescriptor = new Schema() >> .field("time1", SQL_TIMESTAMP()) >> .rowtime( >> new Rowtime() >> .timestampsFromField("rowTime") >> .watermarksPeriodicBounded(100) >> ); >> tEnv.connect(connectorDescriptor) >> .withFormat(formatDescriptor) >> .withSchema(schemaDescriptor) >> .inAppendMode() >> .registerTableSource("mysrc"); >> return tEnv.scan("mysrc"); >> } > >