Ah sorry, I forgot to mention that I don't use any custom kryo serializers..
On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > I got those exceptions running 3 different types of jobs..I could have > tracked the job and the error...my bad! > However, the most problematic job is the last one, where I run a series of > jobs one after the other (calling env.execute() in a for loop).. > I you want I can share with you my code (in private for the moment because > it's not public yet) or the dashboard screen via skype while the jobs are > running.. > > > On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Flavio, >> thank you for providing additional details. >> I don't think that missing hashCode / equals() implementations cause such >> an error. They can cause wrong sorting or partitioning of the data, but the >> serialization should still work properly. >> I suspect the issue somewhere in the serialization stack. >> >> Are you registering any custom kryo serializers? >> >> >> >> From your past emails, you saw the following different exceptions: >> >> A) Caused by: java.io.UTFDataFormatException: malformed input around >> byte 42 >> B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >> C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >> >> Were they all caused by the same job, or different ones? >> >> >> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Hi Robert, >>> in this specific case the interested classes are: >>> >>> - Tuple3<String, String, IndexAttributeToExpand> >>> (IndexAttributeToExpand is a POJO extending another class and both of >>> them >>> doesn't implement equals and hashcode) >>> - Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>> >>> (TreeNode is a POJO containing other TreeNode and it doesn't >>> implement equals and hashcode) >>> - Now I've added to both classes hashCode and equals in order to be >>> aligned with POJO policies, however the job finished correctly after >>> stopping and restarting the cluster...usually when I have strange >>> serialization exception I stop and restart the cluster and everything >>> works. >>> >>> The TreeNode class (the recursive one) is actually the following: >>> >>> import java.io.Serializable; >>> import java.util.ArrayList; >>> import java.util.HashMap; >>> import java.util.Iterator; >>> import java.util.LinkedList; >>> import java.util.List; >>> import java.util.UUID; >>> >>> import com.fasterxml.jackson.annotation.JsonIgnore; >>> >>> public class TreeNode<K,V> implements Serializable { >>> >>> private static final long serialVersionUID = 1L; >>> private int level = 0; >>> >>> private String uuid; >>> private K key; >>> private V value; >>> @JsonIgnore >>> private TreeNode<K,V> parent; >>> private List<TreeNode<K,V>> children; >>> @JsonIgnore >>> private HashMap<K, List<TreeNode<K,V>>> lookup; >>> >>> public TreeNode(K key, V value) { >>> this.level = 0; >>> this.key = key; >>> this.uuid = UUID.randomUUID().toString(); >>> this.value = value; >>> this.children = new LinkedList<TreeNode<K,V>>(); >>> List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>(); >>> thisAsList.add(this); >>> this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); >>> this.lookup.put(key, thisAsList); >>> } >>> >>> public TreeNode<K,V> addChild(K key, V value) { >>> TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); >>> childNode.level = level +1; >>> childNode.parent = this; >>> childNode.lookup = lookup; >>> childNode.uuid = UUID.randomUUID().toString(); >>> this.children.add(childNode); >>> List<TreeNode<K, V>> l = lookup.get(childNode.key); >>> if(l==null){ >>> l = new ArrayList<TreeNode<K,V>>(); >>> lookup.put(childNode.key, l); >>> } >>> l.add(childNode); >>> return childNode; >>> } >>> >>> public boolean isLeaf() { >>> return children.isEmpty() ; >>> } >>> public int getLevel() { >>> return level; >>> } >>> public TreeNode<K,V> getParent() { >>> return parent; >>> } >>> public V getValue() { >>> return value; >>> } >>> public String getUuid() { >>> return uuid; >>> } >>> public void setUuid(String uuid) { >>> this.uuid = uuid; >>> } >>> public List<TreeNode<K,V>> getChildren() { >>> return children; >>> } >>> public List<TreeNode<K, V>> getNodesByKey(K key) { >>> return lookup.get(key); >>> } >>> public K getKey() { >>> return key; >>> } >>> public List<TreeNode<K,V>> getLeafs() { >>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); >>> getLeafs(ret); >>> return ret; >>> } >>> private void getLeafs(List<TreeNode<K, V>> ret) { >>> if(children.isEmpty()) >>> ret.add(this); >>> for (TreeNode<K, V> child : children) { >>> child.getLeafs(ret); >>> } >>> } >>> >>> @Override >>> public String toString() { >>> return toString(true); >>> } >>> public String toString(boolean withChildren) { >>> if(key==null) >>> return super.toString(); >>> StringBuffer ret = new StringBuffer(); >>> for (int i = 0; i < level; i++) { >>> ret.append(" >"); >>> } >>> ret.append(" " +key.toString()); >>> if(withChildren){ >>> for (TreeNode<K, V> child : children) { >>> ret.append("\n").append(child.toString()); >>> } >>> } >>> return ret.toString(); >>> } >>> >>> public void setValue(V value) { >>> this.value = value; >>> } >>> >>> public void remove(List<TreeNode<K, V>> nodes) { >>> for (TreeNode<K, V> n : nodes) { >>> removeChildren(n); >>> } >>> for (TreeNode<K, V> n : nodes) { >>> TreeNode<K, V> parent = n.getParent(); >>> if(parent==null) >>> continue; >>> parent.children.remove(n); >>> } >>> } >>> >>> private void removeChildren(TreeNode<K, V> node) { >>> lookup.remove(node.getUuid()); >>> if(node.children.isEmpty()) >>> return; >>> Iterator<TreeNode<K, V>> it = node.children.iterator(); >>> while (it.hasNext()) { >>> TreeNode<K, V> child = (TreeNode<K, V>) it.next(); >>> removeChildren(child); >>> it.remove(); >>> } >>> } >>> >>> public void clear() { >>> this.key = null; >>> this.value = null; >>> this.uuid = null; >>> for (TreeNode<K, V> child : children) { >>> child.clear(); >>> } >>> this.children.clear(); >>> this.lookup.clear(); >>> } >>> >>> public TreeNode<K, V> getNodeById(K key, String uuid) { >>> List<TreeNode<K, V>> nodes = getNodesByKey(key); >>> for (TreeNode<K, V> treeNode : nodes) { >>> if(uuid.equals(treeNode.getUuid())) >>> return treeNode; >>> } >>> return null; >>> } >>> >>> public HashMap<K, List<TreeNode<K, V>>> getLookup() { >>> return lookup; >>> } >>> >>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { >>> this.lookup = lookup; >>> } >>> >>> public void setLevel(int level) { >>> this.level = level; >>> } >>> >>> public void setKey(K key) { >>> this.key = key; >>> } >>> >>> public void setParent(TreeNode<K, V> parent) { >>> this.parent = parent; >>> } >>> >>> public void setChildren(List<TreeNode<K, V>> children) { >>> this.children = children; >>> } >>> } >>> >>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Flavio, >>>> >>>> which datatype are you using? >>>> >>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi to all, >>>>> during these days we've run a lot of Flink jobs and from time to time >>>>> (apparently randomly) a different Exception arise during their >>>>> executions... >>>>> I hope one of them could help in finding the source of the >>>>> problem..This time the exception is: >>>>> >>>>> An error occurred while reading the next record. >>>>> at >>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>>>> at >>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>>>> at >>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>>>> at >>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>>>> at >>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.io.UTFDataFormatException: malformed input around byte >>>>> 42 >>>>> at >>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> at >>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>>>> at >>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>>>> >>>>> Could this error be cause by a missing implementation of hashCode() >>>>> and equals()? >>>>> >>>>> Thanks in advance, >>>>> Flavio >>>>> >>>> >>> >> >