Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.

Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.

I also agree with you in that the first class citizen Table API will make
everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and
to DataStream.
One such case is to append to Table a column showing the current watermark
of each record; there's no other way but to do that as ScalarFunction
doesn't allow us to get the runtime context information as ProcessFunction
does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot
help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walter...@gmail.com> wrote:

> Hi Dongwon,
>
> I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.
>
> However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.
>
> --
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>
>
> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> Thank you for reply :-)
>>
>> which Flink version are you using?
>>
>> I'm using Flink-1.8.0.
>>
>> what is the "sourceTable.getSchema().toRowType()" return?
>>
>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>
>> what is the line *".map(a -> a)" *do and can you remove it?
>>
>> *".map(a->a)"* is just to illustrate a problem.
>> My actual code contains a process function (instead of .map() in the
>> snippet) which appends a new field containing watermark to a row.
>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>> convert table to datastream and vice versa.
>>
>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>
>> yup :-)
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>> adds a type information hint about the return type of this operator. It is
>>> used in cases where Flink cannot determine automatically[1].
>>
>> The reason why I specify
>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>> information hint as you said.
>> That is needed later when I need to make another table like
>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>> Without the type information hint, I've got an error
>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>> Please specify the type of the input with a RowTypeInfo."*
>> That's why I give a type information hint in that way.
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> Can you provide a bit more information:
>>> which Flink version are you using?
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>> only adds a type information hint about the return type of this operator.
>>> It is used in cases where Flink cannot determine automatically[1].
>>>
>>> Thanks,
>>> Rong
>>>
>>> --
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>
>>>
>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Consider the following snippet:
>>>>
>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>> Row.class)
>>>>>
>>>>> *      .map(a -> a)
>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>     stream.print();
>>>>>
>>>> where sourceTable.printSchema() shows:
>>>>
>>>>> root
>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>
>>>>
>>>>
>>>>  This program returns the following exception:
>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>> failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>> at
>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>>>> cast to java.lang.Long*
>>>>> * at
>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>> ...
>>>>
>>>>
>>>> The row serializer seems to try to deep-copy an instance of
>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>> Could anybody help me?
>>>>
>>>> Best,
>>>>
>>>> - Dongwon
>>>>
>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>> that as I want to convert DataStream<Row> into another table later.
>>>> p.s. the source table is created as follows:
>>>>
>>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv)
>>>>> {
>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>       .version("universal")
>>>>>       .topic("mytopic")
>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>       .property("group.id", "mygroup")
>>>>>       .startFromEarliest();
>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>       .deriveSchema()
>>>>>       .ignoreParseErrors()
>>>>>       .fieldDelimiter(',');
>>>>>     Schema schemaDescriptor = new Schema()
>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>       .rowtime(
>>>>>         new Rowtime()
>>>>>           .timestampsFromField("rowTime")
>>>>>           .watermarksPeriodicBounded(100)
>>>>>       );
>>>>>     tEnv.connect(connectorDescriptor)
>>>>>       .withFormat(formatDescriptor)
>>>>>       .withSchema(schemaDescriptor)
>>>>>       .inAppendMode()
>>>>>       .registerTableSource("mysrc");
>>>>>     return tEnv.scan("mysrc");
>>>>>   }
>>>>
>>>>

Reply via email to