Hi Dongwon, I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.
However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream. E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1]. As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. -- Rong [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html [3] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html [4] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Rong, > > Thank you for reply :-) > > which Flink version are you using? > > I'm using Flink-1.8.0. > > what is the "sourceTable.getSchema().toRowType()" return? > > Row(time1: TimeIndicatorTypeInfo(rowtime)) > > what is the line *".map(a -> a)" *do and can you remove it? > > *".map(a->a)"* is just to illustrate a problem. > My actual code contains a process function (instead of .map() in the > snippet) which appends a new field containing watermark to a row. > If there were ways to get watermark inside a scalar UDF, I wouldn't > convert table to datastream and vice versa. > > if I am understanding correctly, you are also using "time1" as the >> rowtime, is that want your intension is to use it later as well? > > yup :-) > > As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >> adds a type information hint about the return type of this operator. It is >> used in cases where Flink cannot determine automatically[1]. > > The reason why I specify > *".returns(sourceTable.getSchema().toRowType());"* is to give a type > information hint as you said. > That is needed later when I need to make another table like > "*Table anotherTable = tEnv.fromDataStream(stream);"*, > Without the type information hint, I've got an error > "*An input of GenericTypeInfo<Row> cannot be converted to Table. > Please specify the type of the input with a RowTypeInfo."* > That's why I give a type information hint in that way. > > Best, > > Dongwon > > On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote: > >> Hi Dongwon, >> >> Can you provide a bit more information: >> which Flink version are you using? >> what is the "sourceTable.getSchema().toRowType()" return? >> what is the line *".map(a -> a)" *do and can you remove it? >> if I am understanding correctly, you are also using "time1" as the >> rowtime, is that want your intension is to use it later as well? >> >> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >> adds a type information hint about the return type of this operator. It is >> used in cases where Flink cannot determine automatically[1]. >> >> Thanks, >> Rong >> >> -- >> [1] >> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >> >> >> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hello, >>> >>> Consider the following snippet: >>> >>>> Table sourceTable = getKafkaSource0(tEnv); >>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class) >>>> >>>> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* >>>> stream.print(); >>>> >>> where sourceTable.printSchema() shows: >>> >>>> root >>>> |-- time1: TimeIndicatorTypeInfo(rowtime) >>> >>> >>> >>> This program returns the following exception: >>> >>>> Exception in thread "main" >>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>> failed. >>>> at >>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>> at >>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>>> at >>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>>> at app.metatron.test.Main2.main(Main2.java:231) >>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be >>>> cast to java.lang.Long* >>>> * at >>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>>> ... >>> >>> >>> The row serializer seems to try to deep-copy an instance of >>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. >>> Could anybody help me? >>> >>> Best, >>> >>> - Dongwon >>> >>> p.s. though removing .returns() makes everything okay, I need to do that >>> as I want to convert DataStream<Row> into another table later. >>> p.s. the source table is created as follows: >>> >>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { >>>> ConnectorDescriptor connectorDescriptor = new Kafka() >>>> .version("universal") >>>> .topic("mytopic") >>>> .property("bootstrap.servers", "localhost:9092") >>>> .property("group.id", "mygroup") >>>> .startFromEarliest(); >>>> FormatDescriptor formatDescriptor = new Csv() >>>> .deriveSchema() >>>> .ignoreParseErrors() >>>> .fieldDelimiter(','); >>>> Schema schemaDescriptor = new Schema() >>>> .field("time1", SQL_TIMESTAMP()) >>>> .rowtime( >>>> new Rowtime() >>>> .timestampsFromField("rowTime") >>>> .watermarksPeriodicBounded(100) >>>> ); >>>> tEnv.connect(connectorDescriptor) >>>> .withFormat(formatDescriptor) >>>> .withSchema(schemaDescriptor) >>>> .inAppendMode() >>>> .registerTableSource("mysrc"); >>>> return tEnv.scan("mysrc"); >>>> } >>> >>>