Hi,

the code is part of a bigger project, so I'll try to outline the used
methods and their order:

# Step 1
- Reading a Wikipedia XML Dump into a DataSet of <page>-tag delimited
strings using XmlInputFormat.
- A .distinct() operations removes all duplicates based on the content.
- .map() is used to parse the XML data into a DataSet of Page-POJOs:
-- It has an ArrayList of extracted Revision POJOs.
-- A Revision has text content.
-- A Revision also has a Contributor POJO.

# Step 2
- Obtaining three separate DataSets with a .flatMap()
-- DataSet of Page objects
-- DataSet of Revision objects
-- DataSet of Contributor objects
- .map() on the Revisions to obtain Score-Objects. Score-Objects have
two fields: A pointer to an object and a double value.
-- Assign value of "1.0" for each Revision.
- .groupBy() with a custom KeySelector, followed by .reduce() to get the
accumulated scores.

I'm not sure, if that helps much, but that is it in essence. Looking
more closely on the traces, I believe that the .distinct() could be the
culprit?

> 2017-06-12 21:33:14,016 ERROR org.apache.flink.runtime.operators.BatchTask    
>               - Error in task code:  CHAIN DataSource (at 
> createInput(ExecutionEnvironment.java:552) 
> (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> 
> Combine(Distinct at parseDumpData(SkipDumpParser.java:43)) (28/40)
> java.io.IOException: Cannot write record to fresh sort buffer. Record too 
> large.

Best,
Sebastian

Reply via email to