I found that in the cluster I was using a release version of a dependency that has changed..so now I have the error also in the cluster :)
This is caused by the addition of the setParent() method to TreeNode: public void setParent(TreeNode<K, V> parent) { this.parent = parent; } without that Flink doesn't complain at least..then I'm not sure whether things are working correctly then... On Tue, May 17, 2016 at 3:53 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Yes I am > > > On Tue, May 17, 2016 at 3:45 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Are you using 1.0.2 on the cluster as well? >> >> On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> I tried to debug my application from Eclipse and I got an infinite >>> recursive call in the TypeExtractor during the analysis of TreeNode (I'm >>> using Flink 1.0.2): >>> >>> Exception in thread "main" java.lang.StackOverflowError >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) >>> >>> Why this doesn't happen on the cluster? >>> >>> >>> >>> >>> On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Don't worry Robert, >>>> I know how hard is to debug such errors :) >>>> I hope that maybe the combination of these 3 errors is somehow >>>> related...However these are the answers: >>>> >>>> >>>> - The job (composed of 16 sub-jobs) fails randomly but, usually, >>>> the first subjob after the start restart run successfully >>>> - In this job I sow both A, B and C (but after changing parallelism) >>>> - Yes, the error behave differently depending on the input data >>>> (actually the number of default parallelism and slotes in the cluster) >>>> >>>> One more interesting thing I fixed in my code that could be (maybe?) >>>> the of cause of B and C (but not A because that happened after this >>>> problem): >>>> I'm reading and writing data from some Parquet-thrift directory (using >>>> the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ). >>>> In one of the 3 jobs I output some dataset in a Parquet-thrift >>>> directory after decreasing the parallelism and number of slots from M to N >>>> using ds.output(). >>>> The first N parquet files were overridden (as expected) but the last >>>> M-N were not removed (I was expecting that Parquet thrift directory was >>>> managed as a single dir) . >>>> Then, when I've read (in the next job) from that directory I discovered >>>> that the job was actually reading all files in that folder (I was convinced >>>> that despite the the M-N files >>>> were left in that dir there was some index file, e.g. _metadata, taking >>>> care of be the entry point for the files in that folder). >>>> I don't know however if this could be a cause of such errors but I >>>> reported it anyway for the sake of completeness and hoping that >>>> this real-life debugging story could be helpful to someone else using >>>> Parquet on Flink :) >>>> >>>> Thanks for the support, >>>> >>>> Flavio >>>> >>>> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> >>>>> The last one is C or A? >>>>> >>>>> How often is it failing (every nth run?) Is it always failing at the >>>>> same execute() call, or at different ones? >>>>> Is it always the exact same exception or is it different ones? >>>>> Does the error behave differently depending on the input data? >>>>> >>>>> Sorry for asking so many questions, but these errors can have many >>>>> causes and just searching the code for potential issues can take a lot of >>>>> time ;) >>>>> >>>>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Ah sorry, I forgot to mention that I don't use any custom kryo >>>>>> serializers.. >>>>>> >>>>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> I got those exceptions running 3 different types of jobs..I could >>>>>>> have tracked the job and the error...my bad! >>>>>>> However, the most problematic job is the last one, where I run a >>>>>>> series of jobs one after the other (calling env.execute() in a for >>>>>>> loop).. >>>>>>> I you want I can share with you my code (in private for the moment >>>>>>> because it's not public yet) or the dashboard screen via skype while the >>>>>>> jobs are running.. >>>>>>> >>>>>>> >>>>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger < >>>>>>> rmetz...@apache.org> wrote: >>>>>>> >>>>>>>> Hi Flavio, >>>>>>>> thank you for providing additional details. >>>>>>>> I don't think that missing hashCode / equals() implementations >>>>>>>> cause such an error. They can cause wrong sorting or partitioning of >>>>>>>> the >>>>>>>> data, but the serialization should still work properly. >>>>>>>> I suspect the issue somewhere in the serialization stack. >>>>>>>> >>>>>>>> Are you registering any custom kryo serializers? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> From your past emails, you saw the following different exceptions: >>>>>>>> >>>>>>>> A) Caused by: java.io.UTFDataFormatException: malformed input >>>>>>>> around byte 42 >>>>>>>> B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>>>>>> C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>>>>>> >>>>>>>> Were they all caused by the same job, or different ones? >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Hi Robert, >>>>>>>>> in this specific case the interested classes are: >>>>>>>>> >>>>>>>>> - Tuple3<String, String, IndexAttributeToExpand> >>>>>>>>> (IndexAttributeToExpand is a POJO extending another class and both >>>>>>>>> of them >>>>>>>>> doesn't implement equals and hashcode) >>>>>>>>> - Tuple3<String,String, TreeNode<String, Map<String, >>>>>>>>> Set<String>>>> (TreeNode is a POJO containing other TreeNode and >>>>>>>>> it doesn't >>>>>>>>> implement equals and hashcode) >>>>>>>>> - Now I've added to both classes hashCode and equals in order >>>>>>>>> to be aligned with POJO policies, however the job finished >>>>>>>>> correctly after >>>>>>>>> stopping and restarting the cluster...usually when I have strange >>>>>>>>> serialization exception I stop and restart the cluster and >>>>>>>>> everything works. >>>>>>>>> >>>>>>>>> The TreeNode class (the recursive one) is actually the following: >>>>>>>>> >>>>>>>>> import java.io.Serializable; >>>>>>>>> import java.util.ArrayList; >>>>>>>>> import java.util.HashMap; >>>>>>>>> import java.util.Iterator; >>>>>>>>> import java.util.LinkedList; >>>>>>>>> import java.util.List; >>>>>>>>> import java.util.UUID; >>>>>>>>> >>>>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore; >>>>>>>>> >>>>>>>>> public class TreeNode<K,V> implements Serializable { >>>>>>>>> >>>>>>>>> private static final long serialVersionUID = 1L; >>>>>>>>> private int level = 0; >>>>>>>>> >>>>>>>>> private String uuid; >>>>>>>>> private K key; >>>>>>>>> private V value; >>>>>>>>> @JsonIgnore >>>>>>>>> private TreeNode<K,V> parent; >>>>>>>>> private List<TreeNode<K,V>> children; >>>>>>>>> @JsonIgnore >>>>>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup; >>>>>>>>> >>>>>>>>> public TreeNode(K key, V value) { >>>>>>>>> this.level = 0; >>>>>>>>> this.key = key; >>>>>>>>> this.uuid = UUID.randomUUID().toString(); >>>>>>>>> this.value = value; >>>>>>>>> this.children = new LinkedList<TreeNode<K,V>>(); >>>>>>>>> List<TreeNode<K, V>> thisAsList = new >>>>>>>>> ArrayList<TreeNode<K,V>>(); >>>>>>>>> thisAsList.add(this); >>>>>>>>> this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); >>>>>>>>> this.lookup.put(key, thisAsList); >>>>>>>>> } >>>>>>>>> >>>>>>>>> public TreeNode<K,V> addChild(K key, V value) { >>>>>>>>> TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); >>>>>>>>> childNode.level = level +1; >>>>>>>>> childNode.parent = this; >>>>>>>>> childNode.lookup = lookup; >>>>>>>>> childNode.uuid = UUID.randomUUID().toString(); >>>>>>>>> this.children.add(childNode); >>>>>>>>> List<TreeNode<K, V>> l = lookup.get(childNode.key); >>>>>>>>> if(l==null){ >>>>>>>>> l = new ArrayList<TreeNode<K,V>>(); >>>>>>>>> lookup.put(childNode.key, l); >>>>>>>>> } >>>>>>>>> l.add(childNode); >>>>>>>>> return childNode; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public boolean isLeaf() { >>>>>>>>> return children.isEmpty() ; >>>>>>>>> } >>>>>>>>> public int getLevel() { >>>>>>>>> return level; >>>>>>>>> } >>>>>>>>> public TreeNode<K,V> getParent() { >>>>>>>>> return parent; >>>>>>>>> } >>>>>>>>> public V getValue() { >>>>>>>>> return value; >>>>>>>>> } >>>>>>>>> public String getUuid() { >>>>>>>>> return uuid; >>>>>>>>> } >>>>>>>>> public void setUuid(String uuid) { >>>>>>>>> this.uuid = uuid; >>>>>>>>> } >>>>>>>>> public List<TreeNode<K,V>> getChildren() { >>>>>>>>> return children; >>>>>>>>> } >>>>>>>>> public List<TreeNode<K, V>> getNodesByKey(K key) { >>>>>>>>> return lookup.get(key); >>>>>>>>> } >>>>>>>>> public K getKey() { >>>>>>>>> return key; >>>>>>>>> } >>>>>>>>> public List<TreeNode<K,V>> getLeafs() { >>>>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); >>>>>>>>> getLeafs(ret); >>>>>>>>> return ret; >>>>>>>>> } >>>>>>>>> private void getLeafs(List<TreeNode<K, V>> ret) { >>>>>>>>> if(children.isEmpty()) >>>>>>>>> ret.add(this); >>>>>>>>> for (TreeNode<K, V> child : children) { >>>>>>>>> child.getLeafs(ret); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public String toString() { >>>>>>>>> return toString(true); >>>>>>>>> } >>>>>>>>> public String toString(boolean withChildren) { >>>>>>>>> if(key==null) >>>>>>>>> return super.toString(); >>>>>>>>> StringBuffer ret = new StringBuffer(); >>>>>>>>> for (int i = 0; i < level; i++) { >>>>>>>>> ret.append(" >"); >>>>>>>>> } >>>>>>>>> ret.append(" " +key.toString()); >>>>>>>>> if(withChildren){ >>>>>>>>> for (TreeNode<K, V> child : children) { >>>>>>>>> ret.append("\n").append(child.toString()); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> return ret.toString(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void setValue(V value) { >>>>>>>>> this.value = value; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void remove(List<TreeNode<K, V>> nodes) { >>>>>>>>> for (TreeNode<K, V> n : nodes) { >>>>>>>>> removeChildren(n); >>>>>>>>> } >>>>>>>>> for (TreeNode<K, V> n : nodes) { >>>>>>>>> TreeNode<K, V> parent = n.getParent(); >>>>>>>>> if(parent==null) >>>>>>>>> continue; >>>>>>>>> parent.children.remove(n); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> private void removeChildren(TreeNode<K, V> node) { >>>>>>>>> lookup.remove(node.getUuid()); >>>>>>>>> if(node.children.isEmpty()) >>>>>>>>> return; >>>>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator(); >>>>>>>>> while (it.hasNext()) { >>>>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next(); >>>>>>>>> removeChildren(child); >>>>>>>>> it.remove(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void clear() { >>>>>>>>> this.key = null; >>>>>>>>> this.value = null; >>>>>>>>> this.uuid = null; >>>>>>>>> for (TreeNode<K, V> child : children) { >>>>>>>>> child.clear(); >>>>>>>>> } >>>>>>>>> this.children.clear(); >>>>>>>>> this.lookup.clear(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) { >>>>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key); >>>>>>>>> for (TreeNode<K, V> treeNode : nodes) { >>>>>>>>> if(uuid.equals(treeNode.getUuid())) >>>>>>>>> return treeNode; >>>>>>>>> } >>>>>>>>> return null; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup() { >>>>>>>>> return lookup; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { >>>>>>>>> this.lookup = lookup; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void setLevel(int level) { >>>>>>>>> this.level = level; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void setKey(K key) { >>>>>>>>> this.key = key; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void setParent(TreeNode<K, V> parent) { >>>>>>>>> this.parent = parent; >>>>>>>>> } >>>>>>>>> >>>>>>>>> public void setChildren(List<TreeNode<K, V>> children) { >>>>>>>>> this.children = children; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger < >>>>>>>>> rmetz...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi Flavio, >>>>>>>>>> >>>>>>>>>> which datatype are you using? >>>>>>>>>> >>>>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>>> Hi to all, >>>>>>>>>>> during these days we've run a lot of Flink jobs and from time to >>>>>>>>>>> time (apparently randomly) a different Exception arise during their >>>>>>>>>>> executions... >>>>>>>>>>> I hope one of them could help in finding the source of the >>>>>>>>>>> problem..This time the exception is: >>>>>>>>>>> >>>>>>>>>>> An error occurred while reading the next record. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>>>>>>>>>> at >>>>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed input >>>>>>>>>>> around byte 42 >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>>>>>>>>>> >>>>>>>>>>> Could this error be cause by a missing implementation of >>>>>>>>>>> hashCode() and equals()? >>>>>>>>>>> >>>>>>>>>>> Thanks in advance, >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>> >> >