Are you using 1.0.2 on the cluster as well? On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> I tried to debug my application from Eclipse and I got an infinite > recursive call in the TypeExtractor during the analysis of TreeNode (I'm > using Flink 1.0.2): > > Exception in thread "main" java.lang.StackOverflowError > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > > Why this doesn't happen on the cluster? > > > > > On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Don't worry Robert, >> I know how hard is to debug such errors :) >> I hope that maybe the combination of these 3 errors is somehow >> related...However these are the answers: >> >> >> - The job (composed of 16 sub-jobs) fails randomly but, usually, the >> first subjob after the start restart run successfully >> - In this job I sow both A, B and C (but after changing parallelism) >> - Yes, the error behave differently depending on the input data >> (actually the number of default parallelism and slotes in the cluster) >> >> One more interesting thing I fixed in my code that could be (maybe?) the >> of cause of B and C (but not A because that happened after this problem): >> I'm reading and writing data from some Parquet-thrift directory (using >> the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ). >> In one of the 3 jobs I output some dataset in a Parquet-thrift directory >> after decreasing the parallelism and number of slots from M to N using >> ds.output(). >> The first N parquet files were overridden (as expected) but the last M-N >> were not removed (I was expecting that Parquet thrift directory was managed >> as a single dir) . >> Then, when I've read (in the next job) from that directory I discovered >> that the job was actually reading all files in that folder (I was convinced >> that despite the the M-N files >> were left in that dir there was some index file, e.g. _metadata, taking >> care of be the entry point for the files in that folder). >> I don't know however if this could be a cause of such errors but I >> reported it anyway for the sake of completeness and hoping that >> this real-life debugging story could be helpful to someone else using >> Parquet on Flink :) >> >> Thanks for the support, >> >> Flavio >> >> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> The last one is C or A? >>> >>> How often is it failing (every nth run?) Is it always failing at the >>> same execute() call, or at different ones? >>> Is it always the exact same exception or is it different ones? >>> Does the error behave differently depending on the input data? >>> >>> Sorry for asking so many questions, but these errors can have many >>> causes and just searching the code for potential issues can take a lot of >>> time ;) >>> >>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Ah sorry, I forgot to mention that I don't use any custom kryo >>>> serializers.. >>>> >>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> I got those exceptions running 3 different types of jobs..I could have >>>>> tracked the job and the error...my bad! >>>>> However, the most problematic job is the last one, where I run a >>>>> series of jobs one after the other (calling env.execute() in a for loop).. >>>>> I you want I can share with you my code (in private for the moment >>>>> because it's not public yet) or the dashboard screen via skype while the >>>>> jobs are running.. >>>>> >>>>> >>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> thank you for providing additional details. >>>>>> I don't think that missing hashCode / equals() implementations cause >>>>>> such an error. They can cause wrong sorting or partitioning of the data, >>>>>> but the serialization should still work properly. >>>>>> I suspect the issue somewhere in the serialization stack. >>>>>> >>>>>> Are you registering any custom kryo serializers? >>>>>> >>>>>> >>>>>> >>>>>> From your past emails, you saw the following different exceptions: >>>>>> >>>>>> A) Caused by: java.io.UTFDataFormatException: malformed input >>>>>> around byte 42 >>>>>> B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>>>> C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>>>> >>>>>> Were they all caused by the same job, or different ones? >>>>>> >>>>>> >>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Hi Robert, >>>>>>> in this specific case the interested classes are: >>>>>>> >>>>>>> - Tuple3<String, String, IndexAttributeToExpand> >>>>>>> (IndexAttributeToExpand is a POJO extending another class and both >>>>>>> of them >>>>>>> doesn't implement equals and hashcode) >>>>>>> - Tuple3<String,String, TreeNode<String, Map<String, >>>>>>> Set<String>>>> (TreeNode is a POJO containing other TreeNode and it >>>>>>> doesn't >>>>>>> implement equals and hashcode) >>>>>>> - Now I've added to both classes hashCode and equals in order to >>>>>>> be aligned with POJO policies, however the job finished correctly >>>>>>> after >>>>>>> stopping and restarting the cluster...usually when I have strange >>>>>>> serialization exception I stop and restart the cluster and >>>>>>> everything works. >>>>>>> >>>>>>> The TreeNode class (the recursive one) is actually the following: >>>>>>> >>>>>>> import java.io.Serializable; >>>>>>> import java.util.ArrayList; >>>>>>> import java.util.HashMap; >>>>>>> import java.util.Iterator; >>>>>>> import java.util.LinkedList; >>>>>>> import java.util.List; >>>>>>> import java.util.UUID; >>>>>>> >>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore; >>>>>>> >>>>>>> public class TreeNode<K,V> implements Serializable { >>>>>>> >>>>>>> private static final long serialVersionUID = 1L; >>>>>>> private int level = 0; >>>>>>> >>>>>>> private String uuid; >>>>>>> private K key; >>>>>>> private V value; >>>>>>> @JsonIgnore >>>>>>> private TreeNode<K,V> parent; >>>>>>> private List<TreeNode<K,V>> children; >>>>>>> @JsonIgnore >>>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup; >>>>>>> >>>>>>> public TreeNode(K key, V value) { >>>>>>> this.level = 0; >>>>>>> this.key = key; >>>>>>> this.uuid = UUID.randomUUID().toString(); >>>>>>> this.value = value; >>>>>>> this.children = new LinkedList<TreeNode<K,V>>(); >>>>>>> List<TreeNode<K, V>> thisAsList = new >>>>>>> ArrayList<TreeNode<K,V>>(); >>>>>>> thisAsList.add(this); >>>>>>> this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); >>>>>>> this.lookup.put(key, thisAsList); >>>>>>> } >>>>>>> >>>>>>> public TreeNode<K,V> addChild(K key, V value) { >>>>>>> TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); >>>>>>> childNode.level = level +1; >>>>>>> childNode.parent = this; >>>>>>> childNode.lookup = lookup; >>>>>>> childNode.uuid = UUID.randomUUID().toString(); >>>>>>> this.children.add(childNode); >>>>>>> List<TreeNode<K, V>> l = lookup.get(childNode.key); >>>>>>> if(l==null){ >>>>>>> l = new ArrayList<TreeNode<K,V>>(); >>>>>>> lookup.put(childNode.key, l); >>>>>>> } >>>>>>> l.add(childNode); >>>>>>> return childNode; >>>>>>> } >>>>>>> >>>>>>> public boolean isLeaf() { >>>>>>> return children.isEmpty() ; >>>>>>> } >>>>>>> public int getLevel() { >>>>>>> return level; >>>>>>> } >>>>>>> public TreeNode<K,V> getParent() { >>>>>>> return parent; >>>>>>> } >>>>>>> public V getValue() { >>>>>>> return value; >>>>>>> } >>>>>>> public String getUuid() { >>>>>>> return uuid; >>>>>>> } >>>>>>> public void setUuid(String uuid) { >>>>>>> this.uuid = uuid; >>>>>>> } >>>>>>> public List<TreeNode<K,V>> getChildren() { >>>>>>> return children; >>>>>>> } >>>>>>> public List<TreeNode<K, V>> getNodesByKey(K key) { >>>>>>> return lookup.get(key); >>>>>>> } >>>>>>> public K getKey() { >>>>>>> return key; >>>>>>> } >>>>>>> public List<TreeNode<K,V>> getLeafs() { >>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); >>>>>>> getLeafs(ret); >>>>>>> return ret; >>>>>>> } >>>>>>> private void getLeafs(List<TreeNode<K, V>> ret) { >>>>>>> if(children.isEmpty()) >>>>>>> ret.add(this); >>>>>>> for (TreeNode<K, V> child : children) { >>>>>>> child.getLeafs(ret); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public String toString() { >>>>>>> return toString(true); >>>>>>> } >>>>>>> public String toString(boolean withChildren) { >>>>>>> if(key==null) >>>>>>> return super.toString(); >>>>>>> StringBuffer ret = new StringBuffer(); >>>>>>> for (int i = 0; i < level; i++) { >>>>>>> ret.append(" >"); >>>>>>> } >>>>>>> ret.append(" " +key.toString()); >>>>>>> if(withChildren){ >>>>>>> for (TreeNode<K, V> child : children) { >>>>>>> ret.append("\n").append(child.toString()); >>>>>>> } >>>>>>> } >>>>>>> return ret.toString(); >>>>>>> } >>>>>>> >>>>>>> public void setValue(V value) { >>>>>>> this.value = value; >>>>>>> } >>>>>>> >>>>>>> public void remove(List<TreeNode<K, V>> nodes) { >>>>>>> for (TreeNode<K, V> n : nodes) { >>>>>>> removeChildren(n); >>>>>>> } >>>>>>> for (TreeNode<K, V> n : nodes) { >>>>>>> TreeNode<K, V> parent = n.getParent(); >>>>>>> if(parent==null) >>>>>>> continue; >>>>>>> parent.children.remove(n); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> private void removeChildren(TreeNode<K, V> node) { >>>>>>> lookup.remove(node.getUuid()); >>>>>>> if(node.children.isEmpty()) >>>>>>> return; >>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator(); >>>>>>> while (it.hasNext()) { >>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next(); >>>>>>> removeChildren(child); >>>>>>> it.remove(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> public void clear() { >>>>>>> this.key = null; >>>>>>> this.value = null; >>>>>>> this.uuid = null; >>>>>>> for (TreeNode<K, V> child : children) { >>>>>>> child.clear(); >>>>>>> } >>>>>>> this.children.clear(); >>>>>>> this.lookup.clear(); >>>>>>> } >>>>>>> >>>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) { >>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key); >>>>>>> for (TreeNode<K, V> treeNode : nodes) { >>>>>>> if(uuid.equals(treeNode.getUuid())) >>>>>>> return treeNode; >>>>>>> } >>>>>>> return null; >>>>>>> } >>>>>>> >>>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup() { >>>>>>> return lookup; >>>>>>> } >>>>>>> >>>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { >>>>>>> this.lookup = lookup; >>>>>>> } >>>>>>> >>>>>>> public void setLevel(int level) { >>>>>>> this.level = level; >>>>>>> } >>>>>>> >>>>>>> public void setKey(K key) { >>>>>>> this.key = key; >>>>>>> } >>>>>>> >>>>>>> public void setParent(TreeNode<K, V> parent) { >>>>>>> this.parent = parent; >>>>>>> } >>>>>>> >>>>>>> public void setChildren(List<TreeNode<K, V>> children) { >>>>>>> this.children = children; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger < >>>>>>> rmetz...@apache.org> wrote: >>>>>>> >>>>>>>> Hi Flavio, >>>>>>>> >>>>>>>> which datatype are you using? >>>>>>>> >>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Hi to all, >>>>>>>>> during these days we've run a lot of Flink jobs and from time to >>>>>>>>> time (apparently randomly) a different Exception arise during their >>>>>>>>> executions... >>>>>>>>> I hope one of them could help in finding the source of the >>>>>>>>> problem..This time the exception is: >>>>>>>>> >>>>>>>>> An error occurred while reading the next record. >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>>>>>>>> at >>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed input around >>>>>>>>> byte 42 >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>>>>>>>> >>>>>>>>> Could this error be cause by a missing implementation of >>>>>>>>> hashCode() and equals()? >>>>>>>>> >>>>>>>>> Thanks in advance, >>>>>>>>> Flavio >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >