Don't worry Robert, I know how hard is to debug such errors :) I hope that maybe the combination of these 3 errors is somehow related...However these are the answers:
- The job (composed of 16 sub-jobs) fails randomly but, usually, the first subjob after the start restart run successfully - In this job I sow both A, B and C (but after changing parallelism) - Yes, the error behave differently depending on the input data (actually the number of default parallelism and slotes in the cluster) One more interesting thing I fixed in my code that could be (maybe?) the of cause of B and C (but not A because that happened after this problem): I'm reading and writing data from some Parquet-thrift directory (using the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ). In one of the 3 jobs I output some dataset in a Parquet-thrift directory after decreasing the parallelism and number of slots from M to N using ds.output(). The first N parquet files were overridden (as expected) but the last M-N were not removed (I was expecting that Parquet thrift directory was managed as a single dir) . Then, when I've read (in the next job) from that directory I discovered that the job was actually reading all files in that folder (I was convinced that despite the the M-N files were left in that dir there was some index file, e.g. _metadata, taking care of be the entry point for the files in that folder). I don't know however if this could be a cause of such errors but I reported it anyway for the sake of completeness and hoping that this real-life debugging story could be helpful to someone else using Parquet on Flink :) Thanks for the support, Flavio On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetz...@apache.org> wrote: > The last one is C or A? > > How often is it failing (every nth run?) Is it always failing at the same > execute() call, or at different ones? > Is it always the exact same exception or is it different ones? > Does the error behave differently depending on the input data? > > Sorry for asking so many questions, but these errors can have many causes > and just searching the code for potential issues can take a lot of time ;) > > On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <pomperma...@okkam.it > > wrote: > >> Ah sorry, I forgot to mention that I don't use any custom kryo >> serializers.. >> >> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> I got those exceptions running 3 different types of jobs..I could have >>> tracked the job and the error...my bad! >>> However, the most problematic job is the last one, where I run a series >>> of jobs one after the other (calling env.execute() in a for loop).. >>> I you want I can share with you my code (in private for the moment >>> because it's not public yet) or the dashboard screen via skype while the >>> jobs are running.. >>> >>> >>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Flavio, >>>> thank you for providing additional details. >>>> I don't think that missing hashCode / equals() implementations cause >>>> such an error. They can cause wrong sorting or partitioning of the data, >>>> but the serialization should still work properly. >>>> I suspect the issue somewhere in the serialization stack. >>>> >>>> Are you registering any custom kryo serializers? >>>> >>>> >>>> >>>> From your past emails, you saw the following different exceptions: >>>> >>>> A) Caused by: java.io.UTFDataFormatException: malformed input around >>>> byte 42 >>>> B) Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>> C) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>> >>>> Were they all caused by the same job, or different ones? >>>> >>>> >>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi Robert, >>>>> in this specific case the interested classes are: >>>>> >>>>> - Tuple3<String, String, IndexAttributeToExpand> >>>>> (IndexAttributeToExpand is a POJO extending another class and both of >>>>> them >>>>> doesn't implement equals and hashcode) >>>>> - Tuple3<String,String, TreeNode<String, Map<String, >>>>> Set<String>>>> (TreeNode is a POJO containing other TreeNode and it >>>>> doesn't >>>>> implement equals and hashcode) >>>>> - Now I've added to both classes hashCode and equals in order to >>>>> be aligned with POJO policies, however the job finished correctly after >>>>> stopping and restarting the cluster...usually when I have strange >>>>> serialization exception I stop and restart the cluster and everything >>>>> works. >>>>> >>>>> The TreeNode class (the recursive one) is actually the following: >>>>> >>>>> import java.io.Serializable; >>>>> import java.util.ArrayList; >>>>> import java.util.HashMap; >>>>> import java.util.Iterator; >>>>> import java.util.LinkedList; >>>>> import java.util.List; >>>>> import java.util.UUID; >>>>> >>>>> import com.fasterxml.jackson.annotation.JsonIgnore; >>>>> >>>>> public class TreeNode<K,V> implements Serializable { >>>>> >>>>> private static final long serialVersionUID = 1L; >>>>> private int level = 0; >>>>> >>>>> private String uuid; >>>>> private K key; >>>>> private V value; >>>>> @JsonIgnore >>>>> private TreeNode<K,V> parent; >>>>> private List<TreeNode<K,V>> children; >>>>> @JsonIgnore >>>>> private HashMap<K, List<TreeNode<K,V>>> lookup; >>>>> >>>>> public TreeNode(K key, V value) { >>>>> this.level = 0; >>>>> this.key = key; >>>>> this.uuid = UUID.randomUUID().toString(); >>>>> this.value = value; >>>>> this.children = new LinkedList<TreeNode<K,V>>(); >>>>> List<TreeNode<K, V>> thisAsList = new >>>>> ArrayList<TreeNode<K,V>>(); >>>>> thisAsList.add(this); >>>>> this.lookup = new HashMap<K, List<TreeNode<K,V>>>(); >>>>> this.lookup.put(key, thisAsList); >>>>> } >>>>> >>>>> public TreeNode<K,V> addChild(K key, V value) { >>>>> TreeNode<K,V> childNode = new TreeNode<K,V>(key, value); >>>>> childNode.level = level +1; >>>>> childNode.parent = this; >>>>> childNode.lookup = lookup; >>>>> childNode.uuid = UUID.randomUUID().toString(); >>>>> this.children.add(childNode); >>>>> List<TreeNode<K, V>> l = lookup.get(childNode.key); >>>>> if(l==null){ >>>>> l = new ArrayList<TreeNode<K,V>>(); >>>>> lookup.put(childNode.key, l); >>>>> } >>>>> l.add(childNode); >>>>> return childNode; >>>>> } >>>>> >>>>> public boolean isLeaf() { >>>>> return children.isEmpty() ; >>>>> } >>>>> public int getLevel() { >>>>> return level; >>>>> } >>>>> public TreeNode<K,V> getParent() { >>>>> return parent; >>>>> } >>>>> public V getValue() { >>>>> return value; >>>>> } >>>>> public String getUuid() { >>>>> return uuid; >>>>> } >>>>> public void setUuid(String uuid) { >>>>> this.uuid = uuid; >>>>> } >>>>> public List<TreeNode<K,V>> getChildren() { >>>>> return children; >>>>> } >>>>> public List<TreeNode<K, V>> getNodesByKey(K key) { >>>>> return lookup.get(key); >>>>> } >>>>> public K getKey() { >>>>> return key; >>>>> } >>>>> public List<TreeNode<K,V>> getLeafs() { >>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>(); >>>>> getLeafs(ret); >>>>> return ret; >>>>> } >>>>> private void getLeafs(List<TreeNode<K, V>> ret) { >>>>> if(children.isEmpty()) >>>>> ret.add(this); >>>>> for (TreeNode<K, V> child : children) { >>>>> child.getLeafs(ret); >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public String toString() { >>>>> return toString(true); >>>>> } >>>>> public String toString(boolean withChildren) { >>>>> if(key==null) >>>>> return super.toString(); >>>>> StringBuffer ret = new StringBuffer(); >>>>> for (int i = 0; i < level; i++) { >>>>> ret.append(" >"); >>>>> } >>>>> ret.append(" " +key.toString()); >>>>> if(withChildren){ >>>>> for (TreeNode<K, V> child : children) { >>>>> ret.append("\n").append(child.toString()); >>>>> } >>>>> } >>>>> return ret.toString(); >>>>> } >>>>> >>>>> public void setValue(V value) { >>>>> this.value = value; >>>>> } >>>>> >>>>> public void remove(List<TreeNode<K, V>> nodes) { >>>>> for (TreeNode<K, V> n : nodes) { >>>>> removeChildren(n); >>>>> } >>>>> for (TreeNode<K, V> n : nodes) { >>>>> TreeNode<K, V> parent = n.getParent(); >>>>> if(parent==null) >>>>> continue; >>>>> parent.children.remove(n); >>>>> } >>>>> } >>>>> >>>>> private void removeChildren(TreeNode<K, V> node) { >>>>> lookup.remove(node.getUuid()); >>>>> if(node.children.isEmpty()) >>>>> return; >>>>> Iterator<TreeNode<K, V>> it = node.children.iterator(); >>>>> while (it.hasNext()) { >>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next(); >>>>> removeChildren(child); >>>>> it.remove(); >>>>> } >>>>> } >>>>> >>>>> public void clear() { >>>>> this.key = null; >>>>> this.value = null; >>>>> this.uuid = null; >>>>> for (TreeNode<K, V> child : children) { >>>>> child.clear(); >>>>> } >>>>> this.children.clear(); >>>>> this.lookup.clear(); >>>>> } >>>>> >>>>> public TreeNode<K, V> getNodeById(K key, String uuid) { >>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key); >>>>> for (TreeNode<K, V> treeNode : nodes) { >>>>> if(uuid.equals(treeNode.getUuid())) >>>>> return treeNode; >>>>> } >>>>> return null; >>>>> } >>>>> >>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup() { >>>>> return lookup; >>>>> } >>>>> >>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup) { >>>>> this.lookup = lookup; >>>>> } >>>>> >>>>> public void setLevel(int level) { >>>>> this.level = level; >>>>> } >>>>> >>>>> public void setKey(K key) { >>>>> this.key = key; >>>>> } >>>>> >>>>> public void setParent(TreeNode<K, V> parent) { >>>>> this.parent = parent; >>>>> } >>>>> >>>>> public void setChildren(List<TreeNode<K, V>> children) { >>>>> this.children = children; >>>>> } >>>>> } >>>>> >>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetz...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> which datatype are you using? >>>>>> >>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Hi to all, >>>>>>> during these days we've run a lot of Flink jobs and from time to >>>>>>> time (apparently randomly) a different Exception arise during their >>>>>>> executions... >>>>>>> I hope one of them could help in finding the source of the >>>>>>> problem..This time the exception is: >>>>>>> >>>>>>> An error occurred while reading the next record. >>>>>>> at >>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148) >>>>>>> at >>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >>>>>>> at >>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >>>>>>> at >>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> Caused by: java.io.UTFDataFormatException: malformed input around >>>>>>> byte 42 >>>>>>> at >>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >>>>>>> at >>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130 >>>>>>> >>>>>>> Could this error be cause by a missing implementation of hashCode() >>>>>>> and equals()? >>>>>>> >>>>>>> Thanks in advance, >>>>>>> Flavio >>>>>>> >>>>>> >>>>> >>>> >>> >