Hi Chris, If you only interest the latest data of the dimension table, maybe you can try the temporal table join: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#operations see "Join with Temporal Table"
Best, Kurt On Fri, Dec 6, 2019 at 11:13 PM Fabian Hueske <fhue...@gmail.com> wrote: > Thank you! > Please let me know if the workaround works for you. > > Best, Fabian > > Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller <chris...@gmail.com > >: > >> Hi Fabian, >> >> Thanks for confirming the issue and suggesting a workaround - I'll give >> that a try. I've created a JIRA issue as you suggested, >> https://issues.apache.org/jira/browse/FLINK-15112 >> >> Many thanks, >> Chris >> >> >> ------ Original Message ------ >> From: "Fabian Hueske" <fhue...@gmail.com> >> To: "Chris Miller" <chris...@gmail.com> >> Cc: "user@flink.apache.org" <user@flink.apache.org> >> Sent: 06/12/2019 14:52:16 >> Subject: Re: Joining multiple temporal tables >> >> Hi Chris, >> >> Your query looks OK to me. >> Moreover, you should get a SQLParseException (or something similar) if it >> wouldn't be valid SQL. >> >> Hence, I assume you are running in a bug in one of the optimizer rules. >> I tried to reproduce the problem on the SQL training environment and >> couldn't write a query that joins two temporal tables. >> What worked though was to first create a view of a query that joins the >> stream with one temporal table and then join the view with the second one. >> Maybe that workaround also works for you? >> >> It would be great if you could open a Jira issue for this bug including >> your program to reproduce the bug. >> >> Thank you, >> Fabian >> >> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller < >> chris...@gmail.com>: >> >>> I want to decorate/enrich a stream by joining it with "lookups" to the >>> most recent data available in other streams. For example, suppose I have a >>> stream of product orders. For each order, I want to add price and FX rate >>> information based on the order's product ID and order currency. >>> >>> Is it possible to join a table with two other temporal tables to achieve >>> this? I'm trying but getting a NullPointerException deep inside Flink's >>> Calcite code. I've attached some sample code that demonstrates the problem. >>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the >>> problem and provide a better error message), or is the SQL correct and this >>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar >>> result? >>> >>> The SQL I'm trying to run: >>> >>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price >>> FROM Orders AS o, >>> LATERAL TABLE (FxRateLookup(o.rowtime)) AS f, >>> LATERAL TABLE (PriceLookup(o.rowtime)) AS p >>> WHERE o.currency = f.currency >>> AND o.productId = p.productId >>> >>> The exception I get: >>> >>> Exception in thread "main" java.lang.NullPointerException >>> at >>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129) >>> at >>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91) >>> at >>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) >>> at >>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) >>> at >>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) >>> at >>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284) >>> at >>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) >>> at >>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) >>> at >>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) >>> at >>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228) >>> at >>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212) >>> at >>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138) >>> at >>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61) >>> at >>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) >>> at >>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187) >>> at >>> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127) >>> at >>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >>> at scala.collection.Iterator.foreach(Iterator.scala:937) >>> at scala.collection.Iterator.foreach$(Iterator.scala:937) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >>> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>> at >>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) >>> at >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) >>> at >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) >>> at >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218) >>> at test.PojoTest.run(PojoTest.java:96) >>> at test.PojoTest.main(PojoTest.java:23) >>> >>