Hi, The problem is that you are using processing time which is non-deterministic. Both inputs are consumed at the same time and joined based on which record arrived first. The result depends on a race condition.
If you change the input table to have event time attributes and use these to register the time-versioned table, the results should become stable. Best, Fabian Am Mo., 21. Jan. 2019 um 15:45 Uhr schrieb Chris Miller <chris...@gmail.com >: > Hi all, > > I'm new to Flink so am probably missing something simple. I'm using Flink > 1.7.1 and am trying to use temporal table functions but aren't getting the > results I expect. With the example code below, I would expect 4 records to > be output (one for each order), but instead I'm only seeing a (random) > subset of these records (it varies on each run). To compound my confusion > further, the CSV output often shows a different subset of results than > those written to the console. I assume there's a race condition of some > sort but I can't figure out where it is. Any ideas what I'm doing wrong? > > > import java.time.LocalDateTime; > import java.time.format.DateTimeFormatter; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.core.fs.FileSystem; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableEnvironment; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.functions.TemporalTableFunction; > import org.apache.flink.table.sinks.CsvTableSink; > import org.apache.flink.types.Row; > > public class Test { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > List<Tuple2<String, Double>> rateData = Arrays.asList( > new Tuple2<>("GBP", 1.29), > new Tuple2<>("EUR", 1.14), > new Tuple2<>("EUR", 1.15), > new Tuple2<>("GBP", 1.30)); > DataStreamSource<Tuple2<String, Double>> rateStream = > env.addSource(new DelayedSource<>(rateData, 1L)); > rateStream.returns(new TypeHint<Tuple2<String, Double>>() {}); > > Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, > Rate, FxRates_ProcTime.proctime"); > TemporalTableFunction rates = > rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency"); > tableEnv.registerFunction("FxRates", rates); > > List<Tuple3<Integer, String, Double>> orderData = Arrays.asList( > new Tuple3<>(1, "GBP", 4.51), > new Tuple3<>(2, "GBP", 23.68), > new Tuple3<>(3, "EUR", 2.99), > new Tuple3<>(4, "EUR", 14.76)); > > DataStreamSource<Tuple3<Integer, String, Double>> orderStream = > env.addSource(new DelayedSource<>(orderData, 100L)); > orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>() > {}); > > Table orders = tableEnv.fromDataStream(orderStream, "OrderId, > o_Currency, Amount, Order_ProcTime.proctime"); > Table usdOrders = orders.join(new Table(tableEnv, > "FxRates(Order_ProcTime)"), "o_Currency = Currency") > .select("OrderId, Amount, Currency, Rate, > (Amount * Rate) as UsdAmount"); > > String[] fields = usdOrders.getSchema().getFieldNames(); > TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes(); > DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders, > usdOrders.getSchema().toRowType()); > CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", > 1, FileSystem.WriteMode.OVERWRITE); > tableEnv.registerTableSink("csvSink", fields, types, csvTableSink); > usdOrders.insertInto("csvSink"); > usdStream.addSink(new PrintSink()); > env.execute(); > System.out.println("Test completed at " + time()); > } > > public static String time() { > return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME); > } > > private static class DelayedSource<T> extends RichSourceFunction<T> { > private final List<T> data; > private final long initialDelay; > private volatile boolean shutdown; > > private DelayedSource(List<T> data, long initialDelay) { > this.data = data; > this.initialDelay = initialDelay; > } > > @Override > public void run(SourceContext<T> ctx) throws Exception { > Iterator<T> iterator = data.iterator(); > Thread.sleep(initialDelay); > while (!shutdown && iterator.hasNext()) { > T next = iterator.next(); > System.out.println(time() + " - producing " + next); > ctx.collect(next); > } > } > > @Override > public void cancel() { > shutdown = true; > } > } > > private static class PrintSink extends RichSinkFunction<Row> { > @Override > public void invoke(Row value, Context context) { > Integer orderId = (Integer) value.getField(0); > Double amount = (Double) value.getField(1); > String currency = (String) value.getField(2); > Double rate = (Double) value.getField(3); > Double usdAmount = (Double) value.getField(4); > System.out.println(time() + " - order " + orderId + " was for " + > usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')'); > } > } > } >