Hi,

The problem is that you are using processing time which is
non-deterministic.
Both inputs are consumed at the same time and joined based on which record
arrived first. The result depends on a race condition.

If you change the input table to have event time attributes and use these
to register the time-versioned table, the results should become stable.

Best, Fabian

Am Mo., 21. Jan. 2019 um 15:45 Uhr schrieb Chris Miller <chris...@gmail.com
>:

> Hi all,
>
> I'm new to Flink so am probably missing something simple. I'm using Flink
> 1.7.1 and am trying to use temporal table functions but aren't getting the
> results I expect. With the example code below, I would expect 4 records to
> be output (one for each order), but instead I'm only seeing a (random)
> subset of these records (it varies on each run). To compound my confusion
> further, the CSV output often shows a different subset of results than
> those written to the console. I assume there's a race condition of some
> sort but I can't figure out where it is. Any ideas what I'm doing wrong?
>
>
> import java.time.LocalDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
>
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TemporalTableFunction;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.types.Row;
>
> public class Test {
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>     List<Tuple2<String, Double>> rateData = Arrays.asList(
>         new Tuple2<>("GBP", 1.29),
>         new Tuple2<>("EUR", 1.14),
>         new Tuple2<>("EUR", 1.15),
>         new Tuple2<>("GBP", 1.30));
>     DataStreamSource<Tuple2<String, Double>> rateStream =
> env.addSource(new DelayedSource<>(rateData, 1L));
>     rateStream.returns(new TypeHint<Tuple2<String, Double>>() {});
>
>     Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency,
> Rate, FxRates_ProcTime.proctime");
>     TemporalTableFunction rates =
> rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");
>     tableEnv.registerFunction("FxRates", rates);
>
>     List<Tuple3<Integer, String, Double>> orderData = Arrays.asList(
>         new Tuple3<>(1, "GBP", 4.51),
>         new Tuple3<>(2, "GBP", 23.68),
>         new Tuple3<>(3, "EUR", 2.99),
>         new Tuple3<>(4, "EUR", 14.76));
>
>     DataStreamSource<Tuple3<Integer, String, Double>> orderStream =
> env.addSource(new DelayedSource<>(orderData, 100L));
>     orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>()
> {});
>
>     Table orders = tableEnv.fromDataStream(orderStream, "OrderId,
> o_Currency, Amount, Order_ProcTime.proctime");
>     Table usdOrders = orders.join(new Table(tableEnv,
> "FxRates(Order_ProcTime)"), "o_Currency = Currency")
>                             .select("OrderId, Amount, Currency, Rate,
> (Amount * Rate) as UsdAmount");
>
>     String[] fields = usdOrders.getSchema().getFieldNames();
>     TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes();
>     DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders,
> usdOrders.getSchema().toRowType());
>     CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",",
> 1, FileSystem.WriteMode.OVERWRITE);
>     tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
>     usdOrders.insertInto("csvSink");
>     usdStream.addSink(new PrintSink());
>     env.execute();
>     System.out.println("Test completed at " + time());
>   }
>
>   public static String time() {
>     return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
>   }
>
>   private static class DelayedSource<T> extends RichSourceFunction<T> {
>     private final List<T> data;
>     private final long initialDelay;
>     private volatile boolean shutdown;
>
>     private DelayedSource(List<T> data, long initialDelay) {
>       this.data = data;
>       this.initialDelay = initialDelay;
>     }
>
>     @Override
>     public void run(SourceContext<T> ctx) throws Exception {
>       Iterator<T> iterator = data.iterator();
>       Thread.sleep(initialDelay);
>       while (!shutdown && iterator.hasNext()) {
>         T next = iterator.next();
>         System.out.println(time() + " - producing " + next);
>         ctx.collect(next);
>       }
>     }
>
>     @Override
>     public void cancel() {
>       shutdown = true;
>     }
>   }
>
>   private static class PrintSink extends RichSinkFunction<Row> {
>     @Override
>     public void invoke(Row value, Context context) {
>       Integer orderId = (Integer) value.getField(0);
>       Double amount = (Double) value.getField(1);
>       String currency = (String) value.getField(2);
>       Double rate = (Double) value.getField(3);
>       Double usdAmount = (Double) value.getField(4);
>       System.out.println(time() + " - order " + orderId + " was for " +
> usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')');
>     }
>   }
> }
>

Reply via email to