Hi Robert, in this specific case the interested classes are: - Tuple3<String, String, IndexAttributeToExpand> (IndexAttributeToExpand is a POJO extending another class and both of them doesn't implement equals and hashcode) - Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> (TreeNode is a POJO containing other TreeNode and it doesn't implement equals and hashcode) - Now I've added to both classes hashCode and equals in order to be aligned with POJO policies, however the job finished correctly after stopping and restarting the cluster...usually when I have strange serialization exception I stop and restart the cluster and everything works.
The TreeNode class (the recursive one) is actually the following: import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.UUID; import com.fasterxml.jackson.annotation.JsonIgnore; public class TreeNode<K,V> implements Serializable { private static final long serialVersionUID = 1L; private int level = 0; private String uuid; private K key; private V value; @JsonIgnore private TreeNode<K,V> parent; private List<TreeNode<K,V>> children; @JsonIgnore private HashMap<K, List<TreeNode<K,V>>> lookup; public TreeNode(K key, V value) { this.level = 0; this.key = key; this.uuid = UUID.randomUUID().toString(); this.value = value; this.children = new LinkedList<TreeNode<K,V>>(); List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>(); thisAsList.add(this); this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); this.lookup.put(key, thisAsList); } public TreeNode<K,V> addChild(K key, V value) { TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); childNode.level = level +1; childNode.parent = this; childNode.lookup = lookup; childNode.uuid = UUID.randomUUID().toString(); this.children.add(childNode); List<TreeNode<K, V>> l = lookup.get(childNode.key); if(l==null){ l = new ArrayList<TreeNode<K,V>>(); lookup.put(childNode.key, l); } l.add(childNode); return childNode; } public boolean isLeaf() { return children.isEmpty() ; } public int getLevel() { return level; } public TreeNode<K,V> getParent() { return parent; } public V getValue() { return value; } public String getUuid() { return uuid; } public void setUuid(String uuid) { this.uuid = uuid; } public List<TreeNode<K,V>> getChildren() { return children; } public List<TreeNode<K, V>> getNodesByKey(K key) { return lookup.get(key); } public K getKey() { return key; } public List<TreeNode<K,V>> getLeafs() { List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); getLeafs(ret); return ret; } private void getLeafs(List<TreeNode<K, V>> ret) { if(children.isEmpty()) ret.add(this); for (TreeNode<K, V> child : children) { child.getLeafs(ret); } } @Override public String toString() { return toString(true); } public String toString(boolean withChildren) { if(key==null) return super.toString(); StringBuffer ret = new StringBuffer(); for (int i = 0; i < level; i++) { ret.append(" >"); } ret.append(" " +key.toString()); if(withChildren){ for (TreeNode<K, V> child : children) { ret.append("\n").append(child.toString()); } } return ret.toString(); } public void setValue(V value) { this.value = value; } public void remove(List<TreeNode<K, V>> nodes) { for (TreeNode<K, V> n : nodes) { removeChildren(n); } for (TreeNode<K, V> n : nodes) { TreeNode<K, V> parent = n.getParent(); if(parent==null) continue; parent.children.remove(n); } } private void removeChildren(TreeNode<K, V> node) { lookup.remove(node.getUuid()); if(node.children.isEmpty()) return; Iterator<TreeNode<K, V>> it = node.children.iterator(); while (it.hasNext()) { TreeNode<K, V> child = (TreeNode<K, V>) it.next(); removeChildren(child); it.remove(); } } public void clear() { this.key = null; this.value = null; this.uuid = null; for (TreeNode<K, V> child : children) { child.clear(); } this.children.clear(); this.lookup.clear(); } public TreeNode<K, V> getNodeById(K key, String uuid) { List<TreeNode<K, V>> nodes = getNodesByKey(key); for (TreeNode<K, V> treeNode : nodes) { if(uuid.equals(treeNode.getUuid())) return treeNode; } return null; } public HashMap<K, List<TreeNode<K, V>>> getLookup() { return lookup; } public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { this.lookup = lookup; } public void setLevel(int level) { this.level = level; } public void setKey(K key) { this.key = key; } public void setParent(TreeNode<K, V> parent) { this.parent = parent; } public void setChildren(List<TreeNode<K, V>> children) { this.children = children; } } On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Flavio, > > which datatype are you using? > > On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <pomperma...@okkam.it > > wrote: > >> Hi to all, >> during these days we've run a lot of Flink jobs and from time to time >> (apparently randomly) a different Exception arise during their executions... >> I hope one of them could help in finding the source of the problem..This >> time the exception is: >> >> An error occurred while reading the next record. >> at >> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >> at >> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >> at >> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >> at >> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >> at >> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.UTFDataFormatException: malformed input around byte 42 >> at >> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >> at >> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >> at >> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >> at >> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >> >> Could this error be cause by a missing implementation of hashCode() and >> equals()? >> >> Thanks in advance, >> Flavio >> >