I tried to debug my application from Eclipse and I got an infinite recursive call in the TypeExtractor during the analysis of TreeNode (I'm using Flink 1.0.2):
Exception in thread "main" java.lang.StackOverflowError at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) Why this doesn't happen on the cluster? On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Don't worry Robert, > I know how hard is to debug such errors :) > I hope that maybe the combination of these 3 errors is somehow > related...However these are the answers: > > > - The job (composed of 16 sub-jobs) fails randomly but, usually, the > first subjob after the start restart run successfully > - In this job I sow both A, B and C (but after changing parallelism) > - Yes, the error behave differently depending on the input data > (actually the number of default parallelism and slotes in the cluster) > > One more interesting thing I fixed in my code that could be (maybe?) the > of cause of B and C (but not A because that happened after this problem): > I'm reading and writing data from some Parquet-thrift directory (using the > Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ). > In one of the 3 jobs I output some dataset in a Parquet-thrift directory > after decreasing the parallelism and number of slots from M to N using > ds.output(). > The first N parquet files were overridden (as expected) but the last M-N > were not removed (I was expecting that Parquet thrift directory was managed > as a single dir) . > Then, when I've read (in the next job) from that directory I discovered > that the job was actually reading all files in that folder (I was convinced > that despite the the M-N files > were left in that dir there was some index file, e.g. _metadata, taking > care of be the entry point for the files in that folder). > I don't know however if this could be a cause of such errors but I > reported it anyway for the sake of completeness and hoping that > this real-life debugging story could be helpful to someone else using > Parquet on Flink :) > > Thanks for the support, > > Flavio > > On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> The last one is C or A? >> >> How often is it failing (every nth run?) Is it always failing at the same >> execute() call, or at different ones? >> Is it always the exact same exception or is it different ones? >> Does the error behave differently depending on the input data? >> >> Sorry for asking so many questions, but these errors can have many causes >> and just searching the code for potential issues can take a lot of time ;) >> >> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Ah sorry, I forgot to mention that I don't use any custom kryo >>> serializers.. >>> >>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> I got those exceptions running 3 different types of jobs..I could have >>>> tracked the job and the error...my bad! >>>> However, the most problematic job is the last one, where I run a series >>>> of jobs one after the other (calling env.execute() in a for loop).. >>>> I you want I can share with you my code (in private for the moment >>>> because it's not public yet) or the dashboard screen via skype while the >>>> jobs are running.. >>>> >>>> >>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> >>>>> Hi Flavio, >>>>> thank you for providing additional details. >>>>> I don't think that missing hashCode / equals() implementations cause >>>>> such an error. They can cause wrong sorting or partitioning of the data, >>>>> but the serialization should still work properly. >>>>> I suspect the issue somewhere in the serialization stack. >>>>> >>>>> Are you registering any custom kryo serializers? >>>>> >>>>> >>>>> >>>>> From your past emails, you saw the following different exceptions: >>>>> >>>>> A) Caused by: java.io.UTFDataFormatException: malformed input around >>>>> byte 42 >>>>> B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>>> C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>>> >>>>> Were they all caused by the same job, or different ones? >>>>> >>>>> >>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi Robert, >>>>>> in this specific case the interested classes are: >>>>>> >>>>>> - Tuple3<String, String, IndexAttributeToExpand> >>>>>> (IndexAttributeToExpand is a POJO extending another class and both of >>>>>> them >>>>>> doesn't implement equals and hashcode) >>>>>> - Tuple3<String,String, TreeNode<String, Map<String, >>>>>> Set<String>>>> (TreeNode is a POJO containing other TreeNode and it >>>>>> doesn't >>>>>> implement equals and hashcode) >>>>>> - Now I've added to both classes hashCode and equals in order to >>>>>> be aligned with POJO policies, however the job finished correctly >>>>>> after >>>>>> stopping and restarting the cluster...usually when I have strange >>>>>> serialization exception I stop and restart the cluster and everything >>>>>> works. >>>>>> >>>>>> The TreeNode class (the recursive one) is actually the following: >>>>>> >>>>>> import java.io.Serializable; >>>>>> import java.util.ArrayList; >>>>>> import java.util.HashMap; >>>>>> import java.util.Iterator; >>>>>> import java.util.LinkedList; >>>>>> import java.util.List; >>>>>> import java.util.UUID; >>>>>> >>>>>> import com.fasterxml.jackson.annotation.JsonIgnore; >>>>>> >>>>>> public class TreeNode<K,V> implements Serializable { >>>>>> >>>>>> private static final long serialVersionUID = 1L; >>>>>> private int level = 0; >>>>>> >>>>>> private String uuid; >>>>>> private K key; >>>>>> private V value; >>>>>> @JsonIgnore >>>>>> private TreeNode<K,V> parent; >>>>>> private List<TreeNode<K,V>> children; >>>>>> @JsonIgnore >>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup; >>>>>> >>>>>> public TreeNode(K key, V value) { >>>>>> this.level = 0; >>>>>> this.key = key; >>>>>> this.uuid = UUID.randomUUID().toString(); >>>>>> this.value = value; >>>>>> this.children = new LinkedList<TreeNode<K,V>>(); >>>>>> List<TreeNode<K, V>> thisAsList = new >>>>>> ArrayList<TreeNode<K,V>>(); >>>>>> thisAsList.add(this); >>>>>> this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); >>>>>> this.lookup.put(key, thisAsList); >>>>>> } >>>>>> >>>>>> public TreeNode<K,V> addChild(K key, V value) { >>>>>> TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); >>>>>> childNode.level = level +1; >>>>>> childNode.parent = this; >>>>>> childNode.lookup = lookup; >>>>>> childNode.uuid = UUID.randomUUID().toString(); >>>>>> this.children.add(childNode); >>>>>> List<TreeNode<K, V>> l = lookup.get(childNode.key); >>>>>> if(l==null){ >>>>>> l = new ArrayList<TreeNode<K,V>>(); >>>>>> lookup.put(childNode.key, l); >>>>>> } >>>>>> l.add(childNode); >>>>>> return childNode; >>>>>> } >>>>>> >>>>>> public boolean isLeaf() { >>>>>> return children.isEmpty() ; >>>>>> } >>>>>> public int getLevel() { >>>>>> return level; >>>>>> } >>>>>> public TreeNode<K,V> getParent() { >>>>>> return parent; >>>>>> } >>>>>> public V getValue() { >>>>>> return value; >>>>>> } >>>>>> public String getUuid() { >>>>>> return uuid; >>>>>> } >>>>>> public void setUuid(String uuid) { >>>>>> this.uuid = uuid; >>>>>> } >>>>>> public List<TreeNode<K,V>> getChildren() { >>>>>> return children; >>>>>> } >>>>>> public List<TreeNode<K, V>> getNodesByKey(K key) { >>>>>> return lookup.get(key); >>>>>> } >>>>>> public K getKey() { >>>>>> return key; >>>>>> } >>>>>> public List<TreeNode<K,V>> getLeafs() { >>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); >>>>>> getLeafs(ret); >>>>>> return ret; >>>>>> } >>>>>> private void getLeafs(List<TreeNode<K, V>> ret) { >>>>>> if(children.isEmpty()) >>>>>> ret.add(this); >>>>>> for (TreeNode<K, V> child : children) { >>>>>> child.getLeafs(ret); >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public String toString() { >>>>>> return toString(true); >>>>>> } >>>>>> public String toString(boolean withChildren) { >>>>>> if(key==null) >>>>>> return super.toString(); >>>>>> StringBuffer ret = new StringBuffer(); >>>>>> for (int i = 0; i < level; i++) { >>>>>> ret.append(" >"); >>>>>> } >>>>>> ret.append(" " +key.toString()); >>>>>> if(withChildren){ >>>>>> for (TreeNode<K, V> child : children) { >>>>>> ret.append("\n").append(child.toString()); >>>>>> } >>>>>> } >>>>>> return ret.toString(); >>>>>> } >>>>>> >>>>>> public void setValue(V value) { >>>>>> this.value = value; >>>>>> } >>>>>> >>>>>> public void remove(List<TreeNode<K, V>> nodes) { >>>>>> for (TreeNode<K, V> n : nodes) { >>>>>> removeChildren(n); >>>>>> } >>>>>> for (TreeNode<K, V> n : nodes) { >>>>>> TreeNode<K, V> parent = n.getParent(); >>>>>> if(parent==null) >>>>>> continue; >>>>>> parent.children.remove(n); >>>>>> } >>>>>> } >>>>>> >>>>>> private void removeChildren(TreeNode<K, V> node) { >>>>>> lookup.remove(node.getUuid()); >>>>>> if(node.children.isEmpty()) >>>>>> return; >>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator(); >>>>>> while (it.hasNext()) { >>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next(); >>>>>> removeChildren(child); >>>>>> it.remove(); >>>>>> } >>>>>> } >>>>>> >>>>>> public void clear() { >>>>>> this.key = null; >>>>>> this.value = null; >>>>>> this.uuid = null; >>>>>> for (TreeNode<K, V> child : children) { >>>>>> child.clear(); >>>>>> } >>>>>> this.children.clear(); >>>>>> this.lookup.clear(); >>>>>> } >>>>>> >>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) { >>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key); >>>>>> for (TreeNode<K, V> treeNode : nodes) { >>>>>> if(uuid.equals(treeNode.getUuid())) >>>>>> return treeNode; >>>>>> } >>>>>> return null; >>>>>> } >>>>>> >>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup() { >>>>>> return lookup; >>>>>> } >>>>>> >>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { >>>>>> this.lookup = lookup; >>>>>> } >>>>>> >>>>>> public void setLevel(int level) { >>>>>> this.level = level; >>>>>> } >>>>>> >>>>>> public void setKey(K key) { >>>>>> this.key = key; >>>>>> } >>>>>> >>>>>> public void setParent(TreeNode<K, V> parent) { >>>>>> this.parent = parent; >>>>>> } >>>>>> >>>>>> public void setChildren(List<TreeNode<K, V>> children) { >>>>>> this.children = children; >>>>>> } >>>>>> } >>>>>> >>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org >>>>>> > wrote: >>>>>> >>>>>>> Hi Flavio, >>>>>>> >>>>>>> which datatype are you using? >>>>>>> >>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Hi to all, >>>>>>>> during these days we've run a lot of Flink jobs and from time to >>>>>>>> time (apparently randomly) a different Exception arise during their >>>>>>>> executions... >>>>>>>> I hope one of them could help in finding the source of the >>>>>>>> problem..This time the exception is: >>>>>>>> >>>>>>>> An error occurred while reading the next record. >>>>>>>> at >>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>>>>>>> at >>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Caused by: java.io.UTFDataFormatException: malformed input around >>>>>>>> byte 42 >>>>>>>> at >>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>>>>>>> >>>>>>>> Could this error be cause by a missing implementation of hashCode() >>>>>>>> and equals()? >>>>>>>> >>>>>>>> Thanks in advance, >>>>>>>> Flavio >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >