Hi Flavio, thank you for providing additional details. I don't think that missing hashCode / equals() implementations cause such an error. They can cause wrong sorting or partitioning of the data, but the serialization should still work properly. I suspect the issue somewhere in the serialization stack.
Are you registering any custom kryo serializers? >From your past emails, you saw the following different exceptions: A) Caused by: java.io.UTFDataFormatException: malformed input around byte 42 B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 Were they all caused by the same job, or different ones? On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi Robert, > in this specific case the interested classes are: > > - Tuple3<String, String, IndexAttributeToExpand> > (IndexAttributeToExpand is a POJO extending another class and both of them > doesn't implement equals and hashcode) > - Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> > (TreeNode is a POJO containing other TreeNode and it doesn't > implement equals and hashcode) > - Now I've added to both classes hashCode and equals in order to be > aligned with POJO policies, however the job finished correctly after > stopping and restarting the cluster...usually when I have strange > serialization exception I stop and restart the cluster and everything > works. > > The TreeNode class (the recursive one) is actually the following: > > import java.io.Serializable; > import java.util.ArrayList; > import java.util.HashMap; > import java.util.Iterator; > import java.util.LinkedList; > import java.util.List; > import java.util.UUID; > > import com.fasterxml.jackson.annotation.JsonIgnore; > > public class TreeNode<K,V> implements Serializable { > > private static final long serialVersionUID = 1L; > private int level = 0; > > private String uuid; > private K key; > private V value; > @JsonIgnore > private TreeNode<K,V> parent; > private List<TreeNode<K,V>> children; > @JsonIgnore > private HashMap<K, List<TreeNode<K,V>>> lookup; > > public TreeNode(K key, V value) { > this.level = 0; > this.key = key; > this.uuid = UUID.randomUUID().toString(); > this.value = value; > this.children = new LinkedList<TreeNode<K,V>>(); > List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>(); > thisAsList.add(this); > this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); > this.lookup.put(key, thisAsList); > } > > public TreeNode<K,V> addChild(K key, V value) { > TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); > childNode.level = level +1; > childNode.parent = this; > childNode.lookup = lookup; > childNode.uuid = UUID.randomUUID().toString(); > this.children.add(childNode); > List<TreeNode<K, V>> l = lookup.get(childNode.key); > if(l==null){ > l = new ArrayList<TreeNode<K,V>>(); > lookup.put(childNode.key, l); > } > l.add(childNode); > return childNode; > } > > public boolean isLeaf() { > return children.isEmpty() ; > } > public int getLevel() { > return level; > } > public TreeNode<K,V> getParent() { > return parent; > } > public V getValue() { > return value; > } > public String getUuid() { > return uuid; > } > public void setUuid(String uuid) { > this.uuid = uuid; > } > public List<TreeNode<K,V>> getChildren() { > return children; > } > public List<TreeNode<K, V>> getNodesByKey(K key) { > return lookup.get(key); > } > public K getKey() { > return key; > } > public List<TreeNode<K,V>> getLeafs() { > List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); > getLeafs(ret); > return ret; > } > private void getLeafs(List<TreeNode<K, V>> ret) { > if(children.isEmpty()) > ret.add(this); > for (TreeNode<K, V> child : children) { > child.getLeafs(ret); > } > } > > @Override > public String toString() { > return toString(true); > } > public String toString(boolean withChildren) { > if(key==null) > return super.toString(); > StringBuffer ret = new StringBuffer(); > for (int i = 0; i < level; i++) { > ret.append(" >"); > } > ret.append(" " +key.toString()); > if(withChildren){ > for (TreeNode<K, V> child : children) { > ret.append("\n").append(child.toString()); > } > } > return ret.toString(); > } > > public void setValue(V value) { > this.value = value; > } > > public void remove(List<TreeNode<K, V>> nodes) { > for (TreeNode<K, V> n : nodes) { > removeChildren(n); > } > for (TreeNode<K, V> n : nodes) { > TreeNode<K, V> parent = n.getParent(); > if(parent==null) > continue; > parent.children.remove(n); > } > } > > private void removeChildren(TreeNode<K, V> node) { > lookup.remove(node.getUuid()); > if(node.children.isEmpty()) > return; > Iterator<TreeNode<K, V>> it = node.children.iterator(); > while (it.hasNext()) { > TreeNode<K, V> child = (TreeNode<K, V>) it.next(); > removeChildren(child); > it.remove(); > } > } > > public void clear() { > this.key = null; > this.value = null; > this.uuid = null; > for (TreeNode<K, V> child : children) { > child.clear(); > } > this.children.clear(); > this.lookup.clear(); > } > > public TreeNode<K, V> getNodeById(K key, String uuid) { > List<TreeNode<K, V>> nodes = getNodesByKey(key); > for (TreeNode<K, V> treeNode : nodes) { > if(uuid.equals(treeNode.getUuid())) > return treeNode; > } > return null; > } > > public HashMap<K, List<TreeNode<K, V>>> getLookup() { > return lookup; > } > > public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { > this.lookup = lookup; > } > > public void setLevel(int level) { > this.level = level; > } > > public void setKey(K key) { > this.key = key; > } > > public void setParent(TreeNode<K, V> parent) { > this.parent = parent; > } > > public void setChildren(List<TreeNode<K, V>> children) { > this.children = children; > } > } > > On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Flavio, >> >> which datatype are you using? >> >> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Hi to all, >>> during these days we've run a lot of Flink jobs and from time to time >>> (apparently randomly) a different Exception arise during their executions... >>> I hope one of them could help in finding the source of the problem..This >>> time the exception is: >>> >>> An error occurred while reading the next record. >>> at >>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>> at >>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>> at >>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>> at >>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>> at >>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.io.UTFDataFormatException: malformed input around byte 42 >>> at >>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>> at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>> at >>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>> at >>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>> at >>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>> >>> Could this error be cause by a missing implementation of hashCode() and >>> equals()? >>> >>> Thanks in advance, >>> Flavio >>> >> >