Thanks for the clarification, we can live with this restriction I just wanted to make sure that I fully understand why we are getting these errors and if there is any reasonable workaround.
Thanks again :) Gyula On Mon, Apr 20, 2020 at 4:21 PM Kurt Young <ykt...@gmail.com> wrote: > According to the current implementation, yes you are right hive table > source will always be bounded. > But conceptually, we can't do this assumption. For example, we > might further improve hive table source > to also support unbounded cases, .e.g. monitoring hive tables and always > read newly appeared data. > So right now, Flink relies on the "global flag" to distinguish whether the > table should be treated as static > or dynamically changing. > > The "global flag" is whether you are using `BatchTableEnvironment` or > `StreamTableEnvironment` (old versions) > and EnvironmentSettings's batchMode or streamingMode (newer versions). > > But we should admit that Flink hasn't finish the unification work. Your > case will also be considered in the > future when we want to further unify and simplify these concepts and > usages. > > Best, > Kurt > > > On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> The HiveTableSource (and many others) return isBounded() -> true. >> In this case it is not even possible for it to change over time, so I am >> a bit confused. >> >> To me it sounds like you should always be able to join a stream against a >> bounded table, temporal or not it is pretty well defined. >> Maybe there is some fundamental concept that I dont understand, I don't >> have much experience with this to be fair. >> >> Gyula >> >> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> The reason here is Flink doesn't know the hive table is static. After >>> you create these two tables and >>> trying to join them, Flink will assume both table will be changing with >>> time. >>> >>> Best, >>> Kurt >>> >>> >>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> The problem here is that I dont have a temporal table. >>>> >>>> I have a regular stream from kafka (with even time attribute) and a >>>> static table in hive. >>>> The Hive table is static, it doesn't change. It doesn't have any time >>>> attribute, it's not temporal. >>>> >>>> Gyula >>>> >>>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he <godfre...@gmail.com> wrote: >>>> >>>>> Hi Gyual, >>>>> >>>>> Can you convert the regular join to lookup join (temporal join) [1], >>>>> and then you can use window aggregate. >>>>> >>>>> > I understand that the problem is that we cannot join with the Hive >>>>> table and still maintain the watermark/even time column. But why is this? >>>>> Regular join can't maintain the time attribute as increasing trend >>>>> (one record may be joined with a very old record), >>>>> that means the watermark does not also been guaranteed to increase. >>>>> >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table >>>>> >>>>> Best, >>>>> Godfrey >>>>> >>>>> Gyula Fóra <gyula.f...@gmail.com> 于2020年4月20日周一 下午4:46写道: >>>>> >>>>>> Hi All! >>>>>> >>>>>> We hit a the following problem with SQL and trying to understand if >>>>>> there is a valid workaround. >>>>>> >>>>>> We have 2 tables: >>>>>> >>>>>> *Kafka* >>>>>> timestamp (ROWTIME) >>>>>> item >>>>>> quantity >>>>>> >>>>>> *Hive* >>>>>> item >>>>>> price >>>>>> >>>>>> So we basically have incoming (ts, id, quantity) and we want to join >>>>>> it with the hive table to get the total price (price * quantity) got the >>>>>> current item. >>>>>> >>>>>> After this we want to create window aggregate on quantity*price >>>>>> windowed on timestamp (event time attribute). >>>>>> >>>>>> In any way we formulate this query we hit the following error: >>>>>> org.apache.flink.table.api.TableException: Rowtime attributes must >>>>>> not be in the input rows of a regular join. As a workaround you can cast >>>>>> the time attributes of input tables to TIMESTAMP before. >>>>>> >>>>>> I understand that the problem is that we cannot join with the Hive >>>>>> table and still maintain the watermark/even time column. But why is this? >>>>>> >>>>>> In datastream world I would just simply assign Max watermark to my >>>>>> enrichment input and join outputs will get the ts of the input record. >>>>>> Can >>>>>> I achieve something similar in SQL/Table api? >>>>>> >>>>>> Thank you! >>>>>> Gyula >>>>>> >>>>>>