Hi Dongwon, Sorry for the late reply. I did try some experiment and seems like you are right: Setting the `.return()` type actually alter the underlying type of the DataStream from a GenericType into a specific RowTypeInfo. Please see the JIRA ticket [1] for more info.
Regarding the approach, yes I think you cannot access the timer service from the table/SQL API at this moment so that might be the best approach. And as Fabian suggested, I don't think there's too much problem if you are not changing the type info underlying in your DataStream. I will follow up with this in the JIRA ticket. -- Rong [1] https://issues.apache.org/jira/browse/FLINK-13389 On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Fabian, > > Thanks for clarification :-) > I could convert back and forth without worrying about it as I keep using > Row type during the conversion (even though fields are added). > > Best, > > Dongwon > > > > On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Dongwon, >> >> regarding the question about the conversion: If you keep using the Row >> type and not adding/removing fields, the conversion is pretty much for free >> right now. >> It will be a MapFunction (sometimes even not function at all) that should >> be chained with the other operators. Hence, it should boil down to a >> function call. >> >> Best, Fabian >> >> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < >> eastcirc...@gmail.com>: >> >>> Hi Rong, >>> >>> I have to dig deeper into the code to reproduce this error. This seems >>>> to be a bug to me and will update once I find anything. >>> >>> Thanks a lot for spending your time on this. >>> >>> However from what you explained, if I understand correctly you can do >>>> all of your processing within the TableAPI scope without converting it back >>>> and forth to DataStream. >>>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>>> function that's simple enough, you can implement and connect with the table >>>> API via UserDefinedFunction[1]. >>>> As TableAPI becoming the first class citizen [2,3,4], this would be >>>> much cleaner implementation from my perspective. >>> >>> I also agree with you in that the first class citizen Table API will >>> make everything not only easier but also a lot cleaner. >>> We however contain some corner cases that force us to covert Table from >>> and to DataStream. >>> One such case is to append to Table a column showing the current >>> watermark of each record; there's no other way but to do that as >>> ScalarFunction doesn't allow us to get the runtime context information as >>> ProcessFunction does. >>> >>> I have a question regarding the conversion. >>> Do I have to worry about runtime performance penalty in case that I >>> cannot help but convert back and fourth to DataStream? >>> >>> Best, >>> >>> Dongwon >>> >>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote: >>> >>>> Hi Dongwon, >>>> >>>> I have to dig deeper into the code to reproduce this error. This seems >>>> to be a bug to me and will update once I find anything. >>>> >>>> However from what you explained, if I understand correctly you can do >>>> all of your processing within the TableAPI scope without converting it back >>>> and forth to DataStream. >>>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>>> function that's simple enough, you can implement and connect with the table >>>> API via UserDefinedFunction[1]. >>>> As TableAPI becoming the first class citizen [2,3,4], this would be >>>> much cleaner implementation from my perspective. >>>> >>>> -- >>>> Rong >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions >>>> [2] >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html >>>> [3] >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html >>>> [4] >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html >>>> >>>> >>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> >>>> wrote: >>>> >>>>> Hi Rong, >>>>> >>>>> Thank you for reply :-) >>>>> >>>>> which Flink version are you using? >>>>> >>>>> I'm using Flink-1.8.0. >>>>> >>>>> what is the "sourceTable.getSchema().toRowType()" return? >>>>> >>>>> Row(time1: TimeIndicatorTypeInfo(rowtime)) >>>>> >>>>> what is the line *".map(a -> a)" *do and can you remove it? >>>>> >>>>> *".map(a->a)"* is just to illustrate a problem. >>>>> My actual code contains a process function (instead of .map() in the >>>>> snippet) which appends a new field containing watermark to a row. >>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't >>>>> convert table to datastream and vice versa. >>>>> >>>>> if I am understanding correctly, you are also using "time1" as the >>>>>> rowtime, is that want your intension is to use it later as well? >>>>> >>>>> yup :-) >>>>> >>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >>>>>> adds a type information hint about the return type of this operator. It >>>>>> is >>>>>> used in cases where Flink cannot determine automatically[1]. >>>>> >>>>> The reason why I specify >>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >>>>> information hint as you said. >>>>> That is needed later when I need to make another table like >>>>> "*Table anotherTable = tEnv.fromDataStream(stream);"*, >>>>> Without the type information hint, I've got an error >>>>> "*An input of GenericTypeInfo<Row> cannot be converted to Table. >>>>> Please specify the type of the input with a RowTypeInfo."* >>>>> That's why I give a type information hint in that way. >>>>> >>>>> Best, >>>>> >>>>> Dongwon >>>>> >>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Dongwon, >>>>>> >>>>>> Can you provide a bit more information: >>>>>> which Flink version are you using? >>>>>> what is the "sourceTable.getSchema().toRowType()" return? >>>>>> what is the line *".map(a -> a)" *do and can you remove it? >>>>>> if I am understanding correctly, you are also using "time1" as the >>>>>> rowtime, is that want your intension is to use it later as well? >>>>>> >>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* >>>>>> only adds a type information hint about the return type of this operator. >>>>>> It is used in cases where Flink cannot determine automatically[1]. >>>>>> >>>>>> Thanks, >>>>>> Rong >>>>>> >>>>>> -- >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >>>>>> >>>>>> >>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> Consider the following snippet: >>>>>>> >>>>>>>> Table sourceTable = getKafkaSource0(tEnv); >>>>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, >>>>>>>> Row.class) >>>>>>>> >>>>>>>> * .map(a -> a) >>>>>>>> .returns(sourceTable.getSchema().toRowType());* >>>>>>>> stream.print(); >>>>>>>> >>>>>>> where sourceTable.printSchema() shows: >>>>>>> >>>>>>>> root >>>>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime) >>>>>>> >>>>>>> >>>>>>> >>>>>>> This program returns the following exception: >>>>>>> >>>>>>>> Exception in thread "main" >>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>>>>> failed. >>>>>>>> at >>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>>>>>>> at app.metatron.test.Main2.main(Main2.java:231) >>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot >>>>>>>> be cast to java.lang.Long* >>>>>>>> * at >>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>>>>>>> ... >>>>>>> >>>>>>> >>>>>>> The row serializer seems to try to deep-copy an instance of >>>>>>> java.sql.Timestamp using LongSerializer instead of >>>>>>> SqlTimestampSerializer. >>>>>>> Could anybody help me? >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> - Dongwon >>>>>>> >>>>>>> p.s. though removing .returns() makes everything okay, I need to do >>>>>>> that as I want to convert DataStream<Row> into another table later. >>>>>>> p.s. the source table is created as follows: >>>>>>> >>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment >>>>>>>> tEnv) { >>>>>>>> ConnectorDescriptor connectorDescriptor = new Kafka() >>>>>>>> .version("universal") >>>>>>>> .topic("mytopic") >>>>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>>>> .property("group.id", "mygroup") >>>>>>>> .startFromEarliest(); >>>>>>>> FormatDescriptor formatDescriptor = new Csv() >>>>>>>> .deriveSchema() >>>>>>>> .ignoreParseErrors() >>>>>>>> .fieldDelimiter(','); >>>>>>>> Schema schemaDescriptor = new Schema() >>>>>>>> .field("time1", SQL_TIMESTAMP()) >>>>>>>> .rowtime( >>>>>>>> new Rowtime() >>>>>>>> .timestampsFromField("rowTime") >>>>>>>> .watermarksPeriodicBounded(100) >>>>>>>> ); >>>>>>>> tEnv.connect(connectorDescriptor) >>>>>>>> .withFormat(formatDescriptor) >>>>>>>> .withSchema(schemaDescriptor) >>>>>>>> .inAppendMode() >>>>>>>> .registerTableSource("mysrc"); >>>>>>>> return tEnv.scan("mysrc"); >>>>>>>> } >>>>>>> >>>>>>>