Hello every one
I am having some problem with a simple Scala/ Spark Code in which I am
trying to replaces certain fields in a csv with their hashes
class DSV (var line:String="",fieldsList:Seq[Int], var
delimiter:String=",") extends Serializable {
def hash(s:String):String={
var md = MessageDigest.getInstance("sha")
md.update(s.getBytes("UTF-8"))
var digest = md.digest()
val string:Option[String] = Option(digest).map(Hex.valueOf)
println("Retuning "+string)
string.getOrElse("")
}
def anonymizeFields(l:String):String ={
l.split(delimiter,-1).zipWithIndex
.map {
case (str, index) if( fieldsList.contains(index))
=>hash(str)
case other => other._1
}.mkString(delimiter)
}
}
I am calling the anonymize function like this but the anondata seems to be
the same as the original dsvData
var dsvData = sc.textFile(inputPath+inputFileName).map(
line=>(new DSV(line,List(1,2), "\\|"))
)
println("Lines Processed="+dsvData.count())
var anonData = dsvData.map(l=>l.anonymizeFields(l.line))
println("DSVs Processed="+anonData.count())
anonData.saveAsTextFile(outputPath+outputFileName)
I have tried the execution through shell as well but the problem persists.
The job does finish but the worker log shows the following error message
14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@host:60593] -> [akka.tcp://sparkExecutor@host:51397]:
Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [
Regards
MRK