Hi Xingcan,
thanks for looking into this. This definitely seems to be a bug. Maybe
in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any
case we should create an issue for it.
Regards,
Timo
Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
Hi Xingcan,
Thanks for your help. Attached is a sample code that can reproduce the
problem.
When I was writing the sample code, if I remove the `distinct` keyword
in select clause, the AssertionError doesn't occur.
/String sql1 = "select *distinct* id, eventTs, count(*) over
(partition by id order by eventTs rows between 100 preceding and
current row) as cnt1 from myTable";/
Best
Yan
------------------------------------------------------------------------
*From:* xccui-foxmail <xingc...@gmail.com>
*Sent:* Wednesday, March 7, 2018 8:10 PM
*To:* Yan Zhou [FDS Science]
*Cc:* user@flink.apache.org
*Subject:* Re: flink sql timed-window join throw "mismatched type"
AssertionError on rowtime column
Hi Yan,
I’d like to look into this. Can you share more about your queries and
the full stack trace?
Thank,
Xingcan
On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com
<mailto:yz...@coupang.com>> wrote:
Hi experts,
I am using flink table api to join two tables, which are datastream
underneath. However, I got an assertion error
of"java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on
rowtime column. Below is more details:
There in only one kafka data source, which is then converted to Table
and registered. One existed column is set as rowtime(event time)
attribute. Two over-window aggregation queries are run against the
table and two tables are created as results. Everything works great
so far.
However when timed-window joining two result tables with inherented
rowtime, calcite throw the "java.lang.AssertionError: mismatched type
$1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the
possible cause? F.Y.I., I rename the rowtime column for one of the
result table.
DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
Table table = tableEnv.fromDataStream(dataStream, "col1", "col2",
...);
tableEnv.registerTable(tableName, table);
Table left = tableEnv.sqlQuery("select id,*eventTime*,count (*)
over ... from ...");
Table right = tableEnv.sqlQuery("select id as r_id,*eventTime as
r_event_time*, count (*) over ... from ...");
left.join(right).where("id = r_id && eventTime === r_event_time)
.addSink(...); // here calcite throw exception:
java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)
source table
|-- id: Long
|-- eventTime: TimeIndicatorTypeInfo(rowtime)
|-- ...
|-- ...
result_1 table
|-- id: Long
|-- eventTime: TimeIndicatorTypeInfo(rowtime)
|-- ...
|-- ...
result_2 table
|-- rid: Long
|-- r_event_time: TimeIndicatorTypeInfo(rowtime)
|-- ...
Best
Yan