Yes I am On Tue, May 17, 2016 at 3:45 PM, Robert Metzger <rmetz...@apache.org> wrote:
> Are you using 1.0.2 on the cluster as well? > > On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> I tried to debug my application from Eclipse and I got an infinite >> recursive call in the TypeExtractor during the analysis of TreeNode (I'm >> using Flink 1.0.2): >> >> Exception in thread "main" java.lang.StackOverflowError >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) >> >> Why this doesn't happen on the cluster? >> >> >> >> >> On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> Don't worry Robert, >>> I know how hard is to debug such errors :) >>> I hope that maybe the combination of these 3 errors is somehow >>> related...However these are the answers: >>> >>> >>> - The job (composed of 16 sub-jobs) fails randomly but, usually, the >>> first subjob after the start restart run successfully >>> - In this job I sow both A, B and C (but after changing parallelism) >>> - Yes, the error behave differently depending on the input data >>> (actually the number of default parallelism and slotes in the cluster) >>> >>> One more interesting thing I fixed in my code that could be (maybe?) the >>> of cause of B and C (but not A because that happened after this problem): >>> I'm reading and writing data from some Parquet-thrift directory (using >>> the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ). >>> In one of the 3 jobs I output some dataset in a Parquet-thrift directory >>> after decreasing the parallelism and number of slots from M to N using >>> ds.output(). >>> The first N parquet files were overridden (as expected) but the last M-N >>> were not removed (I was expecting that Parquet thrift directory was managed >>> as a single dir) . >>> Then, when I've read (in the next job) from that directory I discovered >>> that the job was actually reading all files in that folder (I was convinced >>> that despite the the M-N files >>> were left in that dir there was some index file, e.g. _metadata, taking >>> care of be the entry point for the files in that folder). >>> I don't know however if this could be a cause of such errors but I >>> reported it anyway for the sake of completeness and hoping that >>> this real-life debugging story could be helpful to someone else using >>> Parquet on Flink :) >>> >>> Thanks for the support, >>> >>> Flavio >>> >>> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> The last one is C or A? >>>> >>>> How often is it failing (every nth run?) Is it always failing at the >>>> same execute() call, or at different ones? >>>> Is it always the exact same exception or is it different ones? >>>> Does the error behave differently depending on the input data? >>>> >>>> Sorry for asking so many questions, but these errors can have many >>>> causes and just searching the code for potential issues can take a lot of >>>> time ;) >>>> >>>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Ah sorry, I forgot to mention that I don't use any custom kryo >>>>> serializers.. >>>>> >>>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> I got those exceptions running 3 different types of jobs..I could >>>>>> have tracked the job and the error...my bad! >>>>>> However, the most problematic job is the last one, where I run a >>>>>> series of jobs one after the other (calling env.execute() in a for >>>>>> loop).. >>>>>> I you want I can share with you my code (in private for the moment >>>>>> because it's not public yet) or the dashboard screen via skype while the >>>>>> jobs are running.. >>>>>> >>>>>> >>>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org >>>>>> > wrote: >>>>>> >>>>>>> Hi Flavio, >>>>>>> thank you for providing additional details. >>>>>>> I don't think that missing hashCode / equals() implementations cause >>>>>>> such an error. They can cause wrong sorting or partitioning of the data, >>>>>>> but the serialization should still work properly. >>>>>>> I suspect the issue somewhere in the serialization stack. >>>>>>> >>>>>>> Are you registering any custom kryo serializers? >>>>>>> >>>>>>> >>>>>>> >>>>>>> From your past emails, you saw the following different exceptions: >>>>>>> >>>>>>> A) Caused by: java.io.UTFDataFormatException: malformed input >>>>>>> around byte 42 >>>>>>> B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>>>>> C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>>>>> >>>>>>> Were they all caused by the same job, or different ones? >>>>>>> >>>>>>> >>>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Hi Robert, >>>>>>>> in this specific case the interested classes are: >>>>>>>> >>>>>>>> - Tuple3<String, String, IndexAttributeToExpand> >>>>>>>> (IndexAttributeToExpand is a POJO extending another class and both >>>>>>>> of them >>>>>>>> doesn't implement equals and hashcode) >>>>>>>> - Tuple3<String,String, TreeNode<String, Map<String, >>>>>>>> Set<String>>>> (TreeNode is a POJO containing other TreeNode and it >>>>>>>> doesn't >>>>>>>> implement equals and hashcode) >>>>>>>> - Now I've added to both classes hashCode and equals in order >>>>>>>> to be aligned with POJO policies, however the job finished >>>>>>>> correctly after >>>>>>>> stopping and restarting the cluster...usually when I have strange >>>>>>>> serialization exception I stop and restart the cluster and >>>>>>>> everything works. >>>>>>>> >>>>>>>> The TreeNode class (the recursive one) is actually the following: >>>>>>>> >>>>>>>> import java.io.Serializable; >>>>>>>> import java.util.ArrayList; >>>>>>>> import java.util.HashMap; >>>>>>>> import java.util.Iterator; >>>>>>>> import java.util.LinkedList; >>>>>>>> import java.util.List; >>>>>>>> import java.util.UUID; >>>>>>>> >>>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore; >>>>>>>> >>>>>>>> public class TreeNode<K,V> implements Serializable { >>>>>>>> >>>>>>>> private static final long serialVersionUID = 1L; >>>>>>>> private int level = 0; >>>>>>>> >>>>>>>> private String uuid; >>>>>>>> private K key; >>>>>>>> private V value; >>>>>>>> @JsonIgnore >>>>>>>> private TreeNode<K,V> parent; >>>>>>>> private List<TreeNode<K,V>> children; >>>>>>>> @JsonIgnore >>>>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup; >>>>>>>> >>>>>>>> public TreeNode(K key, V value) { >>>>>>>> this.level = 0; >>>>>>>> this.key = key; >>>>>>>> this.uuid = UUID.randomUUID().toString(); >>>>>>>> this.value = value; >>>>>>>> this.children = new LinkedList<TreeNode<K,V>>(); >>>>>>>> List<TreeNode<K, V>> thisAsList = new >>>>>>>> ArrayList<TreeNode<K,V>>(); >>>>>>>> thisAsList.add(this); >>>>>>>> this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); >>>>>>>> this.lookup.put(key, thisAsList); >>>>>>>> } >>>>>>>> >>>>>>>> public TreeNode<K,V> addChild(K key, V value) { >>>>>>>> TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); >>>>>>>> childNode.level = level +1; >>>>>>>> childNode.parent = this; >>>>>>>> childNode.lookup = lookup; >>>>>>>> childNode.uuid = UUID.randomUUID().toString(); >>>>>>>> this.children.add(childNode); >>>>>>>> List<TreeNode<K, V>> l = lookup.get(childNode.key); >>>>>>>> if(l==null){ >>>>>>>> l = new ArrayList<TreeNode<K,V>>(); >>>>>>>> lookup.put(childNode.key, l); >>>>>>>> } >>>>>>>> l.add(childNode); >>>>>>>> return childNode; >>>>>>>> } >>>>>>>> >>>>>>>> public boolean isLeaf() { >>>>>>>> return children.isEmpty() ; >>>>>>>> } >>>>>>>> public int getLevel() { >>>>>>>> return level; >>>>>>>> } >>>>>>>> public TreeNode<K,V> getParent() { >>>>>>>> return parent; >>>>>>>> } >>>>>>>> public V getValue() { >>>>>>>> return value; >>>>>>>> } >>>>>>>> public String getUuid() { >>>>>>>> return uuid; >>>>>>>> } >>>>>>>> public void setUuid(String uuid) { >>>>>>>> this.uuid = uuid; >>>>>>>> } >>>>>>>> public List<TreeNode<K,V>> getChildren() { >>>>>>>> return children; >>>>>>>> } >>>>>>>> public List<TreeNode<K, V>> getNodesByKey(K key) { >>>>>>>> return lookup.get(key); >>>>>>>> } >>>>>>>> public K getKey() { >>>>>>>> return key; >>>>>>>> } >>>>>>>> public List<TreeNode<K,V>> getLeafs() { >>>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); >>>>>>>> getLeafs(ret); >>>>>>>> return ret; >>>>>>>> } >>>>>>>> private void getLeafs(List<TreeNode<K, V>> ret) { >>>>>>>> if(children.isEmpty()) >>>>>>>> ret.add(this); >>>>>>>> for (TreeNode<K, V> child : children) { >>>>>>>> child.getLeafs(ret); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public String toString() { >>>>>>>> return toString(true); >>>>>>>> } >>>>>>>> public String toString(boolean withChildren) { >>>>>>>> if(key==null) >>>>>>>> return super.toString(); >>>>>>>> StringBuffer ret = new StringBuffer(); >>>>>>>> for (int i = 0; i < level; i++) { >>>>>>>> ret.append(" >"); >>>>>>>> } >>>>>>>> ret.append(" " +key.toString()); >>>>>>>> if(withChildren){ >>>>>>>> for (TreeNode<K, V> child : children) { >>>>>>>> ret.append("\n").append(child.toString()); >>>>>>>> } >>>>>>>> } >>>>>>>> return ret.toString(); >>>>>>>> } >>>>>>>> >>>>>>>> public void setValue(V value) { >>>>>>>> this.value = value; >>>>>>>> } >>>>>>>> >>>>>>>> public void remove(List<TreeNode<K, V>> nodes) { >>>>>>>> for (TreeNode<K, V> n : nodes) { >>>>>>>> removeChildren(n); >>>>>>>> } >>>>>>>> for (TreeNode<K, V> n : nodes) { >>>>>>>> TreeNode<K, V> parent = n.getParent(); >>>>>>>> if(parent==null) >>>>>>>> continue; >>>>>>>> parent.children.remove(n); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> private void removeChildren(TreeNode<K, V> node) { >>>>>>>> lookup.remove(node.getUuid()); >>>>>>>> if(node.children.isEmpty()) >>>>>>>> return; >>>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator(); >>>>>>>> while (it.hasNext()) { >>>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next(); >>>>>>>> removeChildren(child); >>>>>>>> it.remove(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> public void clear() { >>>>>>>> this.key = null; >>>>>>>> this.value = null; >>>>>>>> this.uuid = null; >>>>>>>> for (TreeNode<K, V> child : children) { >>>>>>>> child.clear(); >>>>>>>> } >>>>>>>> this.children.clear(); >>>>>>>> this.lookup.clear(); >>>>>>>> } >>>>>>>> >>>>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) { >>>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key); >>>>>>>> for (TreeNode<K, V> treeNode : nodes) { >>>>>>>> if(uuid.equals(treeNode.getUuid())) >>>>>>>> return treeNode; >>>>>>>> } >>>>>>>> return null; >>>>>>>> } >>>>>>>> >>>>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup() { >>>>>>>> return lookup; >>>>>>>> } >>>>>>>> >>>>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { >>>>>>>> this.lookup = lookup; >>>>>>>> } >>>>>>>> >>>>>>>> public void setLevel(int level) { >>>>>>>> this.level = level; >>>>>>>> } >>>>>>>> >>>>>>>> public void setKey(K key) { >>>>>>>> this.key = key; >>>>>>>> } >>>>>>>> >>>>>>>> public void setParent(TreeNode<K, V> parent) { >>>>>>>> this.parent = parent; >>>>>>>> } >>>>>>>> >>>>>>>> public void setChildren(List<TreeNode<K, V>> children) { >>>>>>>> this.children = children; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger < >>>>>>>> rmetz...@apache.org> wrote: >>>>>>>> >>>>>>>>> Hi Flavio, >>>>>>>>> >>>>>>>>> which datatype are you using? >>>>>>>>> >>>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> Hi to all, >>>>>>>>>> during these days we've run a lot of Flink jobs and from time to >>>>>>>>>> time (apparently randomly) a different Exception arise during their >>>>>>>>>> executions... >>>>>>>>>> I hope one of them could help in finding the source of the >>>>>>>>>> problem..This time the exception is: >>>>>>>>>> >>>>>>>>>> An error occurred while reading the next record. >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>>>>>>>>> at >>>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed input around >>>>>>>>>> byte 42 >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>>>>>>>>> >>>>>>>>>> Could this error be cause by a missing implementation of >>>>>>>>>> hashCode() and equals()? >>>>>>>>>> >>>>>>>>>> Thanks in advance, >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >