Hi, the following code should do what you want. I included an implementation of an IdMapper. At the end, I print the execution plan which is generated after the optimization (so the pipeline is working until then).
Best, Fabian val data: Seq[Seq[Int]] = (1 until 315).map(i => Seq(1, 2, 3)) val dataSets: Seq[DataSet[Int]] = data.map(env.fromCollection(_)) dataSets.sliding(60, 60) .map(dsg => dsg.reduce( (ds1: DataSet[Int], ds2: DataSet[Int]) => ds1.union(ds2)).map(new IdMapper[Int]())) .reduce( (dsg1: DataSet[Int], dsg2: DataSet[Int]) => dsg1.union(dsg2)) .map(x => x * 2) // do something with the union result .output(new DiscardingOutputFormat[Int]) println(env.getExecutionPlan()) class IdMapper[T] extends MapFunction[T, T] { override def map(value: T): T = value } 2017-08-31 12:30 GMT+02:00 boci <boci.b...@gmail.com>: > Dear Fabian, > > Thanks to your answer (I think you said same in StackOverflow) but as you > see in my code your solution does not work anymore: > > Here is the code, it's split the datasets to list (each list contains > maximum 60 datasets) > After that, I reduce the dataset using union and map with an IdMapper and > return the id mapped data set. > But when the next reduce (where I want to merge the id mapped stream) the > flink said I reached the limit. > > Maybe my IdMapper is wrong... Can you show a correct working IdMapper? > > b0c1 > > ps: > Here is the code segment: > listOfDataSet > .sliding(60,60) > .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper())) > //There is an iterator of DataSet > .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception > .map(finalDataSet => ... some transformation ...) > .count() > > > > On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi b0c1, >> >> This is an limitation in Flink's optimizer. >> Internally, all binary unions are merged into a single n-ary union. The >> optimizer restricts the number of inputs for an operator to 64. >> >> You can work around this limitation with an identity mapper which >> prevents the union operators from merging: >> >> in1----\ >> in2------ Id-Map--- NextOp >> ... / / / >> in14--/ / / >> / / >> in15------------/ / >> ... / >> in74------------/ >> >> This is not a super nice solution, but the only way that comes to my mind. >> >> Cheers, Fabian >> >> 2017-08-28 23:29 GMT+02:00 boci <boci.b...@gmail.com>: >> >>> Hi guys! >>> >>> I have one input (from mongo) and I split the incoming data to multiple >>> datasets (each created dynamically from configuration) and before I write >>> back the result I want to merge it to one dataset (there is some common >>> transformation). >>> so the flow: >>> >>> DataSet from Mongod => >>> Create Mappers dynamically (currently 74) so I have 74 DataSet => >>> Custom filter and mapping on each dataset => >>> Union dynamically to one (every mapper result is same type) => >>> Some another common transformation => >>> Count the result >>> >>> but when I want to union more than 64 dataset I got these exception: >>> >>> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >>> Cannot currently handle nodes with more than 64 outputs. >>> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection( >>> OptimizerNode.java:348) >>> at org.apache.flink.optimizer.dag.SingleInputNode.setInput( >>> SingleInputNode.java:202) >>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit( >>> GraphCreatingVisitor.java:268) >>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit( >>> GraphCreatingVisitor.java:82) >>> >>> I try to split the incoming (74) list of dataset to split to 60 + 14 >>> dataset and create an id mapper and union the result datasets but no >>> success: >>> >>> val listOfDataSet: List[DataSet[...]] = .... >>> >>> listOfDataSet >>> .sliding(60,60) >>> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper())) >>> //There is an iterator of DataSet >>> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception >>> .map(finalDataSet => ... some transformation ...) >>> .count() >>> >>> There is any solution to solve this? >>> >>> Thanks >>> b0c1 >>> >> >>