Hi guys! I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation). so the flow:
DataSet from Mongod => Create Mappers dynamically (currently 74) so I have 74 DataSet => Custom filter and mapping on each dataset => Union dynamically to one (every mapper result is same type) => Some another common transformation => Count the result but when I want to union more than 64 dataset I got these exception: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348) at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82) I try to split the incoming (74) list of dataset to split to 60 + 14 dataset and create an id mapper and union the result datasets but no success: val listOfDataSet: List[DataSet[...]] = .... listOfDataSet .sliding(60,60) .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper())) //There is an iterator of DataSet .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception .map(finalDataSet => ... some transformation ...) .count() There is any solution to solve this? Thanks b0c1