Hi Niklas, The workaround that you described should work fine. However, you don't need a custom sink. Converting the Table into a DataSet and registering the DataSet again as a Table is currently the way to solve this issue.
Best, Fabian Am Di., 20. Nov. 2018 um 17:13 Uhr schrieb Niklas Teichmann < mai11...@studserv.uni-leipzig.de>: > Hi everybody, > > I have a question concerning the Flink Table API, more precisely the > way the results of tables statements are evaluated. In the following > code example, the statement defining the table t1 is evaluated twice, > an effect that leads to some issues of performance and logic in the > program I am trying to write. > > List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L); > DataSet<Long> longDataSet = > getExecutionEnvironment().fromCollection(longList); > > tenv.registerDataSet("longs", longDataSet, "l"); > tenv.registerFunction("time", new Time()); //an example UDF that > evaluates the current time > > Table t1 = tenv.scan("longs"); > t1 = t1.select("l, time() as t"); > > Table t2 = t1.as("l1, id1"); > Table t3 = t1.as("l2, id2"); > > Table t4 = t2.join(t3).where("l1 == l2"); > > t4.writeToSink(new PrintTableSink() ); //a sink that prints the > content of the table > > I realize that this behaviour is defined in the documentation ("A > registered Table is treated similarly to a VIEW ...") and probably > stems from the DataStream API. But is there a preferred way to avoid > this? > > Currently I'm using a workaround that defines a TableSink which in > turn registers its output as a new table. That seems extremely hacky > though. > > Sorry if I missed something obvious! > > All the best, > Niklas > > > -- > > > >