Hi everybody,

I have a question concerning the Flink Table API, more precisely the way the results of tables statements are evaluated. In the following code example, the statement defining the table t1 is evaluated twice, an effect that leads to some issues of performance and logic in the program I am trying to write.

List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L);
DataSet<Long> longDataSet = getExecutionEnvironment().fromCollection(longList);

tenv.registerDataSet("longs", longDataSet, "l");
tenv.registerFunction("time", new Time()); //an example UDF that evaluates the current time

Table t1 = tenv.scan("longs");
t1 = t1.select("l, time() as t");

Table t2 = t1.as("l1, id1");
Table t3 = t1.as("l2, id2");

Table t4 = t2.join(t3).where("l1 == l2");

t4.writeToSink(new PrintTableSink() ); //a sink that prints the content of the table

I realize that this behaviour is defined in the documentation ("A registered Table is treated similarly to a VIEW ...") and probably stems from the DataStream API. But is there a preferred way to avoid this?

Currently I'm using a workaround that defines a TableSink which in turn registers its output as a new table. That seems extremely hacky though.

Sorry if I missed something obvious!

All the best,
Niklas


--



Reply via email to