The first time I hash my data is in the reading phase: each line is added of 
one field that is the hash of its file name, I do this with a custom reader 
that extends the DelimitedInputFormat and override the open, nextRecord and 
readRecord methods

/* … */
private var id : Long = 0L

override def open(split : FileInputSplit) = {
    super.open (split)
    //TODO hasher problem: guava fails, java hashcode works
    //val hf : HashFunction = Hashing.sha256()
    //id = hf.newHasher.putString(split.getPath.getName.toString, 
Charsets.UTF_8).hash.asLong
    id = (split.getPath.getName.toString).hashCode.toLong
  }

override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte], 
offset : Int, numBytes : Int) : (FlinkRegionType) = {
    (parser(id, new String(bytes.slice(offset,offset+numBytes), 
Charset.forName(charsetName))))
  }

override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
    try{
      super.nextRecord(record)
    } catch {
       case e : ParsingException => {
         logger.info(“Region Data format error in the tuple: " + e.getMessage)
         nextRecord(record)
       }
    }
  }

/* … */



Then every time I join two dataset or want to aggregate (groupBy) by many 
different field of the tuple I create a new hash of the concatenation of the 
respective id

val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue], 
List[Array[GValue]], Int, Long)] =
      ref
        .joinWithHuge(exp).where(0,2).equalTo(0,2){
          (r : (String, Int, Int, Long, String, Long, Long, Char, 
Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char, 
Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue], 
List[Array[GValue]], Int, Long)]) => {
            if(/* regions cross */) {

              //TODO hasher problem: guava fails, java hashcode works
              //val hashId = hf.newHasher().putString(r._4.toString + 
x._4.toString, Charsets.UTF_8).hash.asLong
              val hashId = (r._4.toString + x._4.toString).hashCode.toLong

              //TODO hasher problem: guava fails, java hashcode works
              //val aggregationId = hf.newHasher().putString(hashId.toString + 
r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => 
g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong
              val aggregationId = (hashId.toString + r._5.toString + 
r._6.toString + r._7.toString + r._8.toString + r._9.map((g) => 
g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
              out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1, 
aggregationId)
            }
        }
    }


This is just an example, I have two kind of data the one I showed is the core 
data, then I have the meta data associated to the core via the same hash of the 
original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work




Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> ha scritto:

Invalid hash values can certainly cause non-deterministic results.

Can you provide a code snippet that shows how and where you used the Guava 
Hasher?

2015-05-16 11:52 GMT+02:00 Michele Bertoni 
<michele1.bert...@mail.polimi.it<mailto:michele1.bert...@mail.polimi.it>>:
Is it possible that is due to the hasher?

Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got 
different hash for the same id, especially when running on an not-local 
execution environment

I removed it anywhere and I started using the java hashcode, now it is seems to 
work


> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni 
> <michele1.bert...@mail.polimi.it<mailto:michele1.bert...@mail.polimi.it>> ha 
> scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the 
> same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult  = //something that creates the dataset and uses join, 
> group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and 
> sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help



Reply via email to