Hi Niklas,

The workaround that you described should work fine.
However, you don't need a custom sink.
Converting the Table into a DataSet and registering the DataSet again as a
Table is currently the way to solve this issue.

Best, Fabian

Am Di., 20. Nov. 2018 um 17:13 Uhr schrieb Niklas Teichmann <
mai11...@studserv.uni-leipzig.de>:

> Hi everybody,
>
> I have a question concerning the Flink Table API, more precisely the
> way the results of tables statements are evaluated. In the following
> code example, the statement defining the table t1 is evaluated twice,
> an effect that leads to some issues of performance and logic in the
> program I am trying to write.
>
> List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L);
> DataSet<Long> longDataSet =
> getExecutionEnvironment().fromCollection(longList);
>
> tenv.registerDataSet("longs", longDataSet, "l");
> tenv.registerFunction("time", new Time()); //an example UDF that
> evaluates the current time
>
> Table t1 = tenv.scan("longs");
> t1 = t1.select("l, time() as t");
>
> Table t2 = t1.as("l1, id1");
> Table t3 = t1.as("l2, id2");
>
> Table t4 = t2.join(t3).where("l1 == l2");
>
> t4.writeToSink(new PrintTableSink() ); //a sink that prints the
> content of the table
>
> I realize that this behaviour is defined in the documentation ("A
> registered Table is treated similarly to a VIEW ...") and probably
> stems from the DataStream API. But is there a preferred way to avoid
> this?
>
> Currently I'm using a workaround that defines a TableSink which in
> turn registers its output as a new table. That seems extremely hacky
> though.
>
> Sorry if I missed something obvious!
>
> All the best,
> Niklas
>
>
> --
>
>
>
>

Reply via email to