Hi Flavio,
thank you for providing additional details.
I don't think that missing hashCode / equals() implementations cause such
an error. They can cause wrong sorting or partitioning of the data, but the
serialization should still work properly.
I suspect the issue somewhere in the serialization stack.

Are you registering any custom kryo serializers?



>From your past emails, you saw the following different exceptions:

A)  Caused by: java.io.UTFDataFormatException: malformed input around byte
42
B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2

Were they all caused by the same job, or different ones?


On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Robert,
> in this specific case the interested classes are:
>
>    - Tuple3<String, String, IndexAttributeToExpand>
>    (IndexAttributeToExpand is a POJO extending another class and both of them
>    doesn't implement equals and hashcode)
>    - Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>>
>    (TreeNode is a POJO containing other TreeNode and it doesn't
>    implement  equals and hashcode)
>    - Now I've added to both classes hashCode and equals in order to be
>    aligned with POJO policies, however the job finished correctly after
>    stopping and restarting the cluster...usually when I have strange
>    serialization exception I stop and restart the cluster and everything 
> works.
>
> The TreeNode class (the recursive one) is actually the following:
>
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.LinkedList;
> import java.util.List;
> import java.util.UUID;
>
> import com.fasterxml.jackson.annotation.JsonIgnore;
>
> public class TreeNode<K,V> implements Serializable {
>
> private static final long serialVersionUID = 1L;
> private int level = 0;
>
> private String uuid;
> private K key;
> private V value;
> @JsonIgnore
> private TreeNode<K,V> parent;
> private List<TreeNode<K,V>> children;
> @JsonIgnore
> private HashMap<K, List<TreeNode<K,V>>> lookup;
>
>     public TreeNode(K key, V value) {
>     this.level = 0;
>     this.key = key;
>     this.uuid = UUID.randomUUID().toString();
>         this.value = value;
>         this.children = new LinkedList<TreeNode<K,V>>();
>         List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
>         thisAsList.add(this);
>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>         this.lookup.put(key, thisAsList);
>     }
>
>     public TreeNode<K,V> addChild(K key, V value) {
>     TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
>     childNode.level = level +1;
>         childNode.parent = this;
>         childNode.lookup = lookup;
>         childNode.uuid = UUID.randomUUID().toString();
>         this.children.add(childNode);
>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>         if(l==null){
>         l = new ArrayList<TreeNode<K,V>>();
>         lookup.put(childNode.key, l);
>         }
>         l.add(childNode);
>         return childNode;
>     }
>
>     public boolean isLeaf() {
> return children.isEmpty() ;
> }
> public int getLevel() {
> return level;
> }
> public TreeNode<K,V> getParent() {
> return parent;
> }
> public V getValue() {
> return value;
> }
> public String getUuid() {
> return uuid;
> }
> public void setUuid(String uuid) {
> this.uuid = uuid;
> }
> public List<TreeNode<K,V>> getChildren() {
> return children;
> }
> public List<TreeNode<K, V>> getNodesByKey(K key) {
> return lookup.get(key);
> }
> public K getKey() {
> return key;
> }
> public List<TreeNode<K,V>> getLeafs() {
> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
> getLeafs(ret);
> return ret;
> }
> private void getLeafs(List<TreeNode<K, V>> ret) {
> if(children.isEmpty())
> ret.add(this);
> for (TreeNode<K, V> child : children) {
> child.getLeafs(ret);
> }
> }
>
> @Override
> public String toString() {
> return toString(true);
> }
> public String toString(boolean withChildren) {
> if(key==null)
> return super.toString();
> StringBuffer ret = new StringBuffer();
> for (int i = 0; i < level; i++) {
> ret.append(" >");
> }
> ret.append(" " +key.toString());
> if(withChildren){
> for (TreeNode<K, V> child : children) {
> ret.append("\n").append(child.toString());
> }
> }
> return ret.toString();
> }
>
> public void setValue(V value) {
> this.value = value;
> }
>
> public void remove(List<TreeNode<K, V>> nodes) {
> for (TreeNode<K, V> n : nodes) {
> removeChildren(n);
> }
> for (TreeNode<K, V> n : nodes) {
> TreeNode<K, V> parent = n.getParent();
> if(parent==null)
> continue;
> parent.children.remove(n);
> }
> }
>
> private void removeChildren(TreeNode<K, V> node) {
> lookup.remove(node.getUuid());
> if(node.children.isEmpty())
> return;
> Iterator<TreeNode<K, V>> it = node.children.iterator();
> while (it.hasNext()) {
> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
> removeChildren(child);
> it.remove();
> }
> }
>
> public void clear() {
>     this.key = null;
>         this.value = null;
>         this.uuid = null;
>         for (TreeNode<K, V> child : children) {
>         child.clear();
> }
>         this.children.clear();
>         this.lookup.clear();
> }
>
> public TreeNode<K, V> getNodeById(K key, String uuid) {
> List<TreeNode<K, V>> nodes = getNodesByKey(key);
> for (TreeNode<K, V> treeNode : nodes) {
> if(uuid.equals(treeNode.getUuid()))
> return treeNode;
> }
> return null;
> }
>
> public HashMap<K, List<TreeNode<K, V>>> getLookup() {
> return lookup;
> }
>
> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
> this.lookup = lookup;
> }
>
> public void setLevel(int level) {
> this.level = level;
> }
>
> public void setKey(K key) {
> this.key = key;
> }
>
> public void setParent(TreeNode<K, V> parent) {
> this.parent = parent;
> }
>
> public void setChildren(List<TreeNode<K, V>> children) {
> this.children = children;
> }
> }
>
> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> which datatype are you using?
>>
>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi to all,
>>> during these days we've run a lot of Flink jobs and from time to time
>>> (apparently randomly) a different Exception arise during their executions...
>>> I hope one of them could help in finding the source of the problem..This
>>> time the exception is:
>>>
>>> An error occurred while reading the next record.
>>>      at
>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>      at
>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>      at
>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>      at
>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>      at
>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>      at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>      at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>      at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.UTFDataFormatException: malformed input around byte 42
>>>      at
>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>      at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>      at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>      at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>      at
>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>      at
>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>      at
>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>      at
>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>
>>> Could this error be cause by a missing implementation of hashCode() and
>>> equals()?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>
>

Reply via email to