Hi, the code is part of a bigger project, so I'll try to outline the used methods and their order:
# Step 1 - Reading a Wikipedia XML Dump into a DataSet of <page>-tag delimited strings using XmlInputFormat. - A .distinct() operations removes all duplicates based on the content. - .map() is used to parse the XML data into a DataSet of Page-POJOs: -- It has an ArrayList of extracted Revision POJOs. -- A Revision has text content. -- A Revision also has a Contributor POJO. # Step 2 - Obtaining three separate DataSets with a .flatMap() -- DataSet of Page objects -- DataSet of Revision objects -- DataSet of Contributor objects - .map() on the Revisions to obtain Score-Objects. Score-Objects have two fields: A pointer to an object and a double value. -- Assign value of "1.0" for each Revision. - .groupBy() with a custom KeySelector, followed by .reduce() to get the accumulated scores. I'm not sure, if that helps much, but that is it in essence. Looking more closely on the traces, I believe that the .distinct() could be the culprit? > 2017-06-12 21:33:14,016 ERROR org.apache.flink.runtime.operators.BatchTask > - Error in task code: CHAIN DataSource (at > createInput(ExecutionEnvironment.java:552) > (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> > Combine(Distinct at parseDumpData(SkipDumpParser.java:43)) (28/40) > java.io.IOException: Cannot write record to fresh sort buffer. Record too > large. Best, Sebastian