Hi Chris,

If you only interest the latest data of the dimension table, maybe you can
try
the temporal table join:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#operations
see "Join with Temporal Table"

Best,
Kurt


On Fri, Dec 6, 2019 at 11:13 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Thank you!
> Please let me know if the workaround works for you.
>
> Best, Fabian
>
> Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller <chris...@gmail.com
> >:
>
>> Hi Fabian,
>>
>> Thanks for confirming the issue and suggesting a workaround - I'll give
>> that a try. I've created a JIRA issue as you suggested,
>> https://issues.apache.org/jira/browse/FLINK-15112
>>
>> Many thanks,
>> Chris
>>
>>
>> ------ Original Message ------
>> From: "Fabian Hueske" <fhue...@gmail.com>
>> To: "Chris Miller" <chris...@gmail.com>
>> Cc: "user@flink.apache.org" <user@flink.apache.org>
>> Sent: 06/12/2019 14:52:16
>> Subject: Re: Joining multiple temporal tables
>>
>> Hi Chris,
>>
>> Your query looks OK to me.
>> Moreover, you should get a SQLParseException (or something similar) if it
>> wouldn't be valid SQL.
>>
>> Hence, I assume you are running in a bug in one of the optimizer rules.
>> I tried to reproduce the problem on the SQL training environment and
>> couldn't write a query that joins two temporal tables.
>> What worked though was to first create a view of a query that joins the
>> stream with one temporal table and then join the view with the second one.
>> Maybe that workaround also works for you?
>>
>> It would be great if you could open a Jira issue for this bug including
>> your program to reproduce the bug.
>>
>> Thank you,
>> Fabian
>>
>> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller <
>> chris...@gmail.com>:
>>
>>> I want to decorate/enrich a stream by joining it with "lookups" to the
>>> most recent data available in other streams. For example, suppose I have a
>>> stream of product orders. For each order, I want to add price and FX rate
>>> information based on the order's product ID and order currency.
>>>
>>> Is it possible to join a table with two other temporal tables to achieve
>>> this? I'm trying but getting a NullPointerException deep inside Flink's
>>> Calcite code. I've attached some sample code that demonstrates the problem.
>>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
>>> problem and provide a better error message), or is the SQL correct and this
>>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
>>> result?
>>>
>>> The SQL I'm trying to run:
>>>
>>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>>>   FROM Orders AS o,
>>>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>>>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>>>   WHERE o.currency = f.currency
>>>   AND o.productId = p.productId
>>>
>>> The exception I get:
>>>
>>> Exception in thread "main" java.lang.NullPointerException
>>>     at
>>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
>>>     at
>>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
>>>     at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>>     at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>>>     at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>>>     at
>>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
>>>     at
>>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>>>     at
>>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>>>     at
>>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>>>     at
>>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
>>>     at
>>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
>>>     at
>>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
>>>     at
>>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
>>>     at
>>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
>>>     at
>>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
>>>     at
>>> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
>>>     at
>>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>>>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>     at
>>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
>>>     at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
>>>     at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
>>>     at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
>>>     at test.PojoTest.run(PojoTest.java:96)
>>>     at test.PojoTest.main(PojoTest.java:23)
>>>
>>

Reply via email to