Dear Fabian,

Thanks to your answer (I think you said same in StackOverflow) but as you
see in my code your solution does not work anymore:

Here is the code, it's split the datasets to list (each list contains
maximum 60 datasets)
After that, I  reduce the dataset using union and map with an IdMapper and
return the id mapped data set.
But when the next reduce (where I want to merge the id mapped stream) the
flink said I reached the limit.

Maybe my IdMapper is wrong... Can you show a correct working IdMapper?

b0c1

ps:
Here is the code segment:
listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()



On Wed, 30 Aug 2017 at 15:44 Fabian Hueske <fhue...@gmail.com> wrote:

> Hi b0c1,
>
> This is an limitation in Flink's optimizer.
> Internally, all binary unions are merged into a single n-ary union. The
> optimizer restricts the number of inputs for an operator to 64.
>
> You can work around this limitation with an identity mapper which prevents
> the union operators from merging:
>
> in1----\
> in2------ Id-Map--- NextOp
> ...       /             / /
> in14--/             / /
>                       / /
> in15------------/ /
> ...                   /
> in74------------/
>
> This is not a super nice solution, but the only way that comes to my mind.
>
> Cheers, Fabian
>
> 2017-08-28 23:29 GMT+02:00 boci <boci.b...@gmail.com>:
>
>> Hi guys!
>>
>> I have one input (from mongo) and I split the incoming data to multiple
>> datasets (each created dynamically from configuration) and before I write
>> back the result I want to merge it to one dataset (there is some common
>> transformation).
>> so the flow:
>>
>> DataSet from Mongod =>
>> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
>> Custom filter and mapping on each dataset =>
>> Union dynamically to one (every mapper result is same type) =>
>> Some another common transformation =>
>> Count the result
>>
>> but when I want to union more than 64 dataset I got these exception:
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Cannot currently handle nodes with more than 64 outputs.
>> at
>> org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
>> at
>> org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)
>>
>> I try to split the incoming (74) list of dataset to split to 60 + 14
>>  dataset and create an id mapper and union the result datasets but no
>> success:
>>
>> val listOfDataSet: List[DataSet[...]] = ....
>>
>> listOfDataSet
>> .sliding(60,60)
>> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
>> //There is an iterator of DataSet
>> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
>> .map(finalDataSet => ... some transformation ...)
>> .count()
>>
>> There is any solution to solve this?
>>
>> Thanks
>> b0c1
>>
>
>

Reply via email to