Dear Fabian, Thanks to your answer (I think you said same in StackOverflow) but as you see in my code your solution does not work anymore:
Here is the code, it's split the datasets to list (each list contains maximum 60 datasets) After that, I reduce the dataset using union and map with an IdMapper and return the id mapped data set. But when the next reduce (where I want to merge the id mapped stream) the flink said I reached the limit. Maybe my IdMapper is wrong... Can you show a correct working IdMapper? b0c1 ps: Here is the code segment: listOfDataSet .sliding(60,60) .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper())) //There is an iterator of DataSet .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception .map(finalDataSet => ... some transformation ...) .count() On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <fhue...@gmail.com> wrote: > Hi b0c1, > > This is an limitation in Flink's optimizer. > Internally, all binary unions are merged into a single n-ary union. The > optimizer restricts the number of inputs for an operator to 64. > > You can work around this limitation with an identity mapper which prevents > the union operators from merging: > > in1----\ > in2------ Id-Map--- NextOp > ... / / / > in14--/ / / > / / > in15------------/ / > ... / > in74------------/ > > This is not a super nice solution, but the only way that comes to my mind. > > Cheers, Fabian > > 2017-08-28 23:29 GMT+02:00 boci <boci.b...@gmail.com>: > >> Hi guys! >> >> I have one input (from mongo) and I split the incoming data to multiple >> datasets (each created dynamically from configuration) and before I write >> back the result I want to merge it to one dataset (there is some common >> transformation). >> so the flow: >> >> DataSet from Mongod => >> Create Mappers dynamically (currently 74) so I have 74 DataSet => >> Custom filter and mapping on each dataset => >> Union dynamically to one (every mapper result is same type) => >> Some another common transformation => >> Count the result >> >> but when I want to union more than 64 dataset I got these exception: >> >> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >> Cannot currently handle nodes with more than 64 outputs. >> at >> org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348) >> at >> org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202) >> at >> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268) >> at >> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82) >> >> I try to split the incoming (74) list of dataset to split to 60 + 14 >> dataset and create an id mapper and union the result datasets but no >> success: >> >> val listOfDataSet: List[DataSet[...]] = .... >> >> listOfDataSet >> .sliding(60,60) >> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper())) >> //There is an iterator of DataSet >> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception >> .map(finalDataSet => ... some transformation ...) >> .count() >> >> There is any solution to solve this? >> >> Thanks >> b0c1 >> > >