Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to
be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all
of your processing within the TableAPI scope without converting it back and
forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function
that's simple enough, you can implement and connect with the table API via
UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much
cleaner implementation from my perspective.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html


On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>
> yup :-)
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>
> The reason why I specify
> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
> information hint as you said.
> That is needed later when I need to make another table like
>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
> Without the type information hint, I've got an error
>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
> Please specify the type of the input with a RowTypeInfo."*
> That's why I give a type information hint in that way.
>
> Best,
>
> Dongwon
>
> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> Can you provide a bit more information:
>> which Flink version are you using?
>> what is the "sourceTable.getSchema().toRowType()" return?
>> what is the line *".map(a -> a)" *do and can you remove it?
>> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>>
>> Thanks,
>> Rong
>>
>> --
>> [1]
>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>
>>
>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Consider the following snippet:
>>>
>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>>
>>>> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>>>>     stream.print();
>>>>
>>> where sourceTable.printSchema() shows:
>>>
>>>> root
>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>
>>>
>>>
>>>  This program returns the following exception:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>> failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>> at
>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>> at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>>> cast to java.lang.Long*
>>>> * at
>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>> ...
>>>
>>>
>>> The row serializer seems to try to deep-copy an instance of
>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>> Could anybody help me?
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>> p.s. though removing .returns() makes everything okay, I need to do that
>>> as I want to convert DataStream<Row> into another table later.
>>> p.s. the source table is created as follows:
>>>
>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>       .version("universal")
>>>>       .topic("mytopic")
>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>       .property("group.id", "mygroup")
>>>>       .startFromEarliest();
>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>       .deriveSchema()
>>>>       .ignoreParseErrors()
>>>>       .fieldDelimiter(',');
>>>>     Schema schemaDescriptor = new Schema()
>>>>       .field("time1", SQL_TIMESTAMP())
>>>>       .rowtime(
>>>>         new Rowtime()
>>>>           .timestampsFromField("rowTime")
>>>>           .watermarksPeriodicBounded(100)
>>>>       );
>>>>     tEnv.connect(connectorDescriptor)
>>>>       .withFormat(formatDescriptor)
>>>>       .withSchema(schemaDescriptor)
>>>>       .inAppendMode()
>>>>       .registerTableSource("mysrc");
>>>>     return tEnv.scan("mysrc");
>>>>   }
>>>
>>>

Reply via email to