Hi Rong, I have to dig deeper into the code to reproduce this error. This seems to > be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this. However from what you explained, if I understand correctly you can do all > of your processing within the TableAPI scope without converting it back and > forth to DataStream. > E.g. if your "map(a -> a)" placeholder represents some sort of map > function that's simple enough, you can implement and connect with the table > API via UserDefinedFunction[1]. > As TableAPI becoming the first class citizen [2,3,4], this would be much > cleaner implementation from my perspective. I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner. We however contain some corner cases that force us to covert Table from and to DataStream. One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does. I have a question regarding the conversion. Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream? Best, Dongwon On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote: > Hi Dongwon, > > I have to dig deeper into the code to reproduce this error. This seems to > be a bug to me and will update once I find anything. > > However from what you explained, if I understand correctly you can do all > of your processing within the TableAPI scope without converting it back and > forth to DataStream. > E.g. if your "map(a -> a)" placeholder represents some sort of map > function that's simple enough, you can implement and connect with the table > API via UserDefinedFunction[1]. > As TableAPI becoming the first class citizen [2,3,4], this would be much > cleaner implementation from my perspective. > > -- > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions > [2] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html > [3] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html > [4] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html > > > On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi Rong, >> >> Thank you for reply :-) >> >> which Flink version are you using? >> >> I'm using Flink-1.8.0. >> >> what is the "sourceTable.getSchema().toRowType()" return? >> >> Row(time1: TimeIndicatorTypeInfo(rowtime)) >> >> what is the line *".map(a -> a)" *do and can you remove it? >> >> *".map(a->a)"* is just to illustrate a problem. >> My actual code contains a process function (instead of .map() in the >> snippet) which appends a new field containing watermark to a row. >> If there were ways to get watermark inside a scalar UDF, I wouldn't >> convert table to datastream and vice versa. >> >> if I am understanding correctly, you are also using "time1" as the >>> rowtime, is that want your intension is to use it later as well? >> >> yup :-) >> >> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >>> adds a type information hint about the return type of this operator. It is >>> used in cases where Flink cannot determine automatically[1]. >> >> The reason why I specify >> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >> information hint as you said. >> That is needed later when I need to make another table like >> "*Table anotherTable = tEnv.fromDataStream(stream);"*, >> Without the type information hint, I've got an error >> "*An input of GenericTypeInfo<Row> cannot be converted to Table. >> Please specify the type of the input with a RowTypeInfo."* >> That's why I give a type information hint in that way. >> >> Best, >> >> Dongwon >> >> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote: >> >>> Hi Dongwon, >>> >>> Can you provide a bit more information: >>> which Flink version are you using? >>> what is the "sourceTable.getSchema().toRowType()" return? >>> what is the line *".map(a -> a)" *do and can you remove it? >>> if I am understanding correctly, you are also using "time1" as the >>> rowtime, is that want your intension is to use it later as well? >>> >>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* >>> only adds a type information hint about the return type of this operator. >>> It is used in cases where Flink cannot determine automatically[1]. >>> >>> Thanks, >>> Rong >>> >>> -- >>> [1] >>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >>> >>> >>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> Consider the following snippet: >>>> >>>>> Table sourceTable = getKafkaSource0(tEnv); >>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, >>>>> Row.class) >>>>> >>>>> * .map(a -> a) >>>>> .returns(sourceTable.getSchema().toRowType());* >>>>> stream.print(); >>>>> >>>> where sourceTable.printSchema() shows: >>>> >>>>> root >>>>> |-- time1: TimeIndicatorTypeInfo(rowtime) >>>> >>>> >>>> >>>> This program returns the following exception: >>>> >>>>> Exception in thread "main" >>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>> failed. >>>>> at >>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>> at >>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>>>> at >>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>>>> at app.metatron.test.Main2.main(Main2.java:231) >>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be >>>>> cast to java.lang.Long* >>>>> * at >>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>>>> ... >>>> >>>> >>>> The row serializer seems to try to deep-copy an instance of >>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. >>>> Could anybody help me? >>>> >>>> Best, >>>> >>>> - Dongwon >>>> >>>> p.s. though removing .returns() makes everything okay, I need to do >>>> that as I want to convert DataStream<Row> into another table later. >>>> p.s. the source table is created as follows: >>>> >>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) >>>>> { >>>>> ConnectorDescriptor connectorDescriptor = new Kafka() >>>>> .version("universal") >>>>> .topic("mytopic") >>>>> .property("bootstrap.servers", "localhost:9092") >>>>> .property("group.id", "mygroup") >>>>> .startFromEarliest(); >>>>> FormatDescriptor formatDescriptor = new Csv() >>>>> .deriveSchema() >>>>> .ignoreParseErrors() >>>>> .fieldDelimiter(','); >>>>> Schema schemaDescriptor = new Schema() >>>>> .field("time1", SQL_TIMESTAMP()) >>>>> .rowtime( >>>>> new Rowtime() >>>>> .timestampsFromField("rowTime") >>>>> .watermarksPeriodicBounded(100) >>>>> ); >>>>> tEnv.connect(connectorDescriptor) >>>>> .withFormat(formatDescriptor) >>>>> .withSchema(schemaDescriptor) >>>>> .inAppendMode() >>>>> .registerTableSource("mysrc"); >>>>> return tEnv.scan("mysrc"); >>>>> } >>>> >>>>