The first time I hash my data is in the reading phase: each line is added of one field that is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat and override the open, nextRecord and readRecord methods
/* … */ private var id : Long = 0L override def open(split : FileInputSplit) = { super.open (split) //TODO hasher problem: guava fails, java hashcode works //val hf : HashFunction = Hashing.sha256() //id = hf.newHasher.putString(split.getPath.getName.toString, Charsets.UTF_8).hash.asLong id = (split.getPath.getName.toString).hashCode.toLong } override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], offset : Int, numBytes : Int) : (FlinkRegionType) = { (parser(id, new String(bytes.slice(offset,offset+numBytes), Charset.forName(charsetName)))) } override def nextRecord(record : FlinkRegionType) : FlinkRegionType = { try{ super.nextRecord(record) } catch { case e : ParsingException => { logger.info(“Region Data format error in the tuple: " + e.getMessage) nextRecord(record) } } } /* … */ Then every time I join two dataset or want to aggregate (groupBy) by many different field of the tuple I create a new hash of the concatenation of the respective id val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)] = ref .joinWithHuge(exp).where(0,2).equalTo(0,2){ (r : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char, Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue], List[Array[GValue]], Int, Long)]) => { if(/* regions cross */) { //TODO hasher problem: guava fails, java hashcode works //val hashId = hf.newHasher().putString(r._4.toString + x._4.toString, Charsets.UTF_8).hash.asLong val hashId = (r._4.toString + x._4.toString).hashCode.toLong //TODO hasher problem: guava fails, java hashcode works //val aggregationId = hf.newHasher().putString(hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong val aggregationId = (hashId.toString + r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => g.toString).sorted.reduce(_ + _).toString).hashCode.toLong out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, aggregationId) } } } This is just an example, I have two kind of data the one I showed is the core data, then I have the meta data associated to the core via the same hash of the original file name Also on the meta I have similar functionality of joining grouping and re-hashing Again with the java hashcode (see above) anything seems to work Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> ha scritto: Invalid hash values can certainly cause non-deterministic results. Can you provide a code snippet that shows how and where you used the Guava Hasher? 2015-05-16 11:52 GMT+02:00 Michele Bertoni <michele1.bert...@mail.polimi.it<mailto:michele1.bert...@mail.polimi.it>>: Is it possible that is due to the hasher? Inside my code i was using the google guava hasher (sha256 as a Long hash) sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got different hash for the same id, especially when running on an not-local execution environment I removed it anywhere and I started using the java hashcode, now it is seems to work > Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni > <michele1.bert...@mail.polimi.it<mailto:michele1.bert...@mail.polimi.it>> ha > scritto: > > Hi, > it is 2 days i am going mad with a problem, every time i run the code (on the > same dataset) i get a different result > > while i was trying debugging i found this > > i have this code > > val aggregationResult = //something that creates the dataset and uses join, > group, reduce and map > logger.error("res count " + aggregationResult.count) > aggregationResult.print > > > > the logger prints a dataset size of 7 > the output result is made of 6 elements > > this happens randomly sometimes the result is larger than the count and > sometimes they are both correct at 10 > > > > flink version 0.9milestone1 > > any idea of what can make it “not deterministic”? > thanks for help