Ah sorry, I forgot to mention that I don't use any custom kryo serializers..

On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I got those exceptions running 3 different types of jobs..I could have
> tracked the job and the error...my bad!
> However, the most problematic job is the last one, where I run a series of
> jobs one after the other (calling env.execute() in a for loop)..
> I you want I can share with you my code (in private for the moment because
> it's not public yet) or the dashboard screen via skype while the jobs are
> running..
>
>
> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Flavio,
>> thank you for providing additional details.
>> I don't think that missing hashCode / equals() implementations cause such
>> an error. They can cause wrong sorting or partitioning of the data, but the
>> serialization should still work properly.
>> I suspect the issue somewhere in the serialization stack.
>>
>> Are you registering any custom kryo serializers?
>>
>>
>>
>> From your past emails, you saw the following different exceptions:
>>
>> A)  Caused by: java.io.UTFDataFormatException: malformed input around
>> byte 42
>> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>
>> Were they all caused by the same job, or different ones?
>>
>>
>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi Robert,
>>> in this specific case the interested classes are:
>>>
>>>    - Tuple3<String, String, IndexAttributeToExpand>
>>>    (IndexAttributeToExpand is a POJO extending another class and both of 
>>> them
>>>    doesn't implement equals and hashcode)
>>>    - Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>>
>>>    (TreeNode is a POJO containing other TreeNode and it doesn't
>>>    implement  equals and hashcode)
>>>    - Now I've added to both classes hashCode and equals in order to be
>>>    aligned with POJO policies, however the job finished correctly after
>>>    stopping and restarting the cluster...usually when I have strange
>>>    serialization exception I stop and restart the cluster and everything 
>>> works.
>>>
>>> The TreeNode class (the recursive one) is actually the following:
>>>
>>> import java.io.Serializable;
>>> import java.util.ArrayList;
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>> import java.util.LinkedList;
>>> import java.util.List;
>>> import java.util.UUID;
>>>
>>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>>
>>> public class TreeNode<K,V> implements Serializable {
>>>
>>> private static final long serialVersionUID = 1L;
>>> private int level = 0;
>>>
>>> private String uuid;
>>> private K key;
>>> private V value;
>>> @JsonIgnore
>>> private TreeNode<K,V> parent;
>>> private List<TreeNode<K,V>> children;
>>> @JsonIgnore
>>> private HashMap<K, List<TreeNode<K,V>>> lookup;
>>>
>>>     public TreeNode(K key, V value) {
>>>     this.level = 0;
>>>     this.key = key;
>>>     this.uuid = UUID.randomUUID().toString();
>>>         this.value = value;
>>>         this.children = new LinkedList<TreeNode<K,V>>();
>>>         List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
>>>         thisAsList.add(this);
>>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>>         this.lookup.put(key, thisAsList);
>>>     }
>>>
>>>     public TreeNode<K,V> addChild(K key, V value) {
>>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
>>>     childNode.level = level +1;
>>>         childNode.parent = this;
>>>         childNode.lookup = lookup;
>>>         childNode.uuid = UUID.randomUUID().toString();
>>>         this.children.add(childNode);
>>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>>         if(l==null){
>>>         l = new ArrayList<TreeNode<K,V>>();
>>>         lookup.put(childNode.key, l);
>>>         }
>>>         l.add(childNode);
>>>         return childNode;
>>>     }
>>>
>>>     public boolean isLeaf() {
>>> return children.isEmpty() ;
>>> }
>>> public int getLevel() {
>>> return level;
>>> }
>>> public TreeNode<K,V> getParent() {
>>> return parent;
>>> }
>>> public V getValue() {
>>> return value;
>>> }
>>> public String getUuid() {
>>> return uuid;
>>> }
>>> public void setUuid(String uuid) {
>>> this.uuid = uuid;
>>> }
>>> public List<TreeNode<K,V>> getChildren() {
>>> return children;
>>> }
>>> public List<TreeNode<K, V>> getNodesByKey(K key) {
>>> return lookup.get(key);
>>> }
>>> public K getKey() {
>>> return key;
>>> }
>>> public List<TreeNode<K,V>> getLeafs() {
>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>>> getLeafs(ret);
>>> return ret;
>>> }
>>> private void getLeafs(List<TreeNode<K, V>> ret) {
>>> if(children.isEmpty())
>>> ret.add(this);
>>> for (TreeNode<K, V> child : children) {
>>> child.getLeafs(ret);
>>> }
>>> }
>>>
>>> @Override
>>> public String toString() {
>>> return toString(true);
>>> }
>>> public String toString(boolean withChildren) {
>>> if(key==null)
>>> return super.toString();
>>> StringBuffer ret = new StringBuffer();
>>> for (int i = 0; i < level; i++) {
>>> ret.append(" >");
>>> }
>>> ret.append(" " +key.toString());
>>> if(withChildren){
>>> for (TreeNode<K, V> child : children) {
>>> ret.append("\n").append(child.toString());
>>> }
>>> }
>>> return ret.toString();
>>> }
>>>
>>> public void setValue(V value) {
>>> this.value = value;
>>> }
>>>
>>> public void remove(List<TreeNode<K, V>> nodes) {
>>> for (TreeNode<K, V> n : nodes) {
>>> removeChildren(n);
>>> }
>>> for (TreeNode<K, V> n : nodes) {
>>> TreeNode<K, V> parent = n.getParent();
>>> if(parent==null)
>>> continue;
>>> parent.children.remove(n);
>>> }
>>> }
>>>
>>> private void removeChildren(TreeNode<K, V> node) {
>>> lookup.remove(node.getUuid());
>>> if(node.children.isEmpty())
>>> return;
>>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>>> while (it.hasNext()) {
>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>>> removeChildren(child);
>>> it.remove();
>>> }
>>> }
>>>
>>> public void clear() {
>>>     this.key = null;
>>>         this.value = null;
>>>         this.uuid = null;
>>>         for (TreeNode<K, V> child : children) {
>>>         child.clear();
>>> }
>>>         this.children.clear();
>>>         this.lookup.clear();
>>> }
>>>
>>> public TreeNode<K, V> getNodeById(K key, String uuid) {
>>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>>> for (TreeNode<K, V> treeNode : nodes) {
>>> if(uuid.equals(treeNode.getUuid()))
>>> return treeNode;
>>> }
>>> return null;
>>> }
>>>
>>> public HashMap<K, List<TreeNode<K, V>>> getLookup() {
>>> return lookup;
>>> }
>>>
>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) {
>>> this.lookup = lookup;
>>> }
>>>
>>> public void setLevel(int level) {
>>> this.level = level;
>>> }
>>>
>>> public void setKey(K key) {
>>> this.key = key;
>>> }
>>>
>>> public void setParent(TreeNode<K, V> parent) {
>>> this.parent = parent;
>>> }
>>>
>>> public void setChildren(List<TreeNode<K, V>> children) {
>>> this.children = children;
>>> }
>>> }
>>>
>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> which datatype are you using?
>>>>
>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> during these days we've run a lot of Flink jobs and from time to time
>>>>> (apparently randomly) a different Exception arise during their 
>>>>> executions...
>>>>> I hope one of them could help in finding the source of the
>>>>> problem..This time the exception is:
>>>>>
>>>>> An error occurred while reading the next record.
>>>>>      at
>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>>      at
>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>>      at
>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>>      at
>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>>      at
>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>>      at
>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>      at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.UTFDataFormatException: malformed input around byte
>>>>> 42
>>>>>      at
>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>>      at
>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>>      at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>      at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>      at
>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>>      at
>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>>      at
>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>>      at
>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>>
>>>>> Could this error be cause by a missing implementation of hashCode()
>>>>> and equals()?
>>>>>
>>>>> Thanks in advance,
>>>>> Flavio
>>>>>
>>>>
>>>
>>
>

Reply via email to