Hi Dongwon,

Can you provide a bit more information:
which Flink version are you using?
what is the "sourceTable.getSchema().toRowType()" return?
what is the line *".map(a -> a)" *do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime,
is that want your intension is to use it later as well?

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
adds a type information hint about the return type of this operator. It is
used in cases where Flink cannot determine automatically[1].

Thanks,
Rong

--
[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hello,
>
> Consider the following snippet:
>
>>     Table sourceTable = getKafkaSource0(tEnv);
>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>
>> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>>     stream.print();
>>
> where sourceTable.printSchema() shows:
>
>> root
>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>
>
>
>  This program returns the following exception:
>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>> at app.metatron.test.Main2.main(Main2.java:231)
>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>> cast to java.lang.Long*
>> * at
>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>> ...
>
>
> The row serializer seems to try to deep-copy an instance of
> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
> Could anybody help me?
>
> Best,
>
> - Dongwon
>
> p.s. though removing .returns() makes everything okay, I need to do that
> as I want to convert DataStream<Row> into another table later.
> p.s. the source table is created as follows:
>
> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>       .version("universal")
>>       .topic("mytopic")
>>       .property("bootstrap.servers", "localhost:9092")
>>       .property("group.id", "mygroup")
>>       .startFromEarliest();
>>     FormatDescriptor formatDescriptor = new Csv()
>>       .deriveSchema()
>>       .ignoreParseErrors()
>>       .fieldDelimiter(',');
>>     Schema schemaDescriptor = new Schema()
>>       .field("time1", SQL_TIMESTAMP())
>>       .rowtime(
>>         new Rowtime()
>>           .timestampsFromField("rowTime")
>>           .watermarksPeriodicBounded(100)
>>       );
>>     tEnv.connect(connectorDescriptor)
>>       .withFormat(formatDescriptor)
>>       .withSchema(schemaDescriptor)
>>       .inAppendMode()
>>       .registerTableSource("mysrc");
>>     return tEnv.scan("mysrc");
>>   }
>
>

Reply via email to