i think the val you defined are only valid in the driver, you can  try 
boardcast variable.

---Original---
From: "lk_spark"<lk_sp...@163.com>
Date: 2017/2/27 11:14:23
To: "user.spark"<user@spark.apache.org>;
Subject: java.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext


  hi,all:
        I want to extract some info from kafka  useing sparkstream??my code 
like :
        
     val keyword = ""
    val system =  "dmp"
    val datetime_idx = 0
    val  datetime_length = 23
    val logLevelBeginIdx =  datetime_length + 2 - 1
    val logLevelMaxLenght = 5
        
     val lines = messages.filter(record =>  
record.value().matches("\\d{4}.*")).map(record =>  {
      val assembly =  record.topic()
      val value =  record.value
      val datatime =  value.substring(datetime_idx, datetime_length -  1)
      val level =  value.substring(logLevelBeginIdx, logLevelBeginIdx + 
logLevelMaxLenght -  1)
       (assembly,value,datatime,level)
    })
     
     I will get error :
     Caused by: java.io.NotSerializableException:  
org.apache.spark.streaming.StreamingContext
Serialization stack:
 -  object not serializable (class: 
org.apache.spark.streaming.StreamingContext,  value: 
org.apache.spark.streaming.StreamingContext@5a457aa1)
 -  field (class: $iw, name: streamingContext, type: class  
org.apache.spark.streaming.StreamingContext)
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name:  $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name:  $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)
....
 
==================================================================================
   if I change the parameter to constant I will not got  error  :
   
   val lines = messages.filter(record =>  
record.value().matches("\\d{4}.*")).map(record =>  {
      val assembly =  record.topic()
      val value =  record.value
      val datatime = value.substring(0,  22)
      val level = value.substring(24,  27)
       (assembly,value,datatime,level)
       
    })
  
 how can I pass parameter to the map function.
  
  2017-02-27
 
 lk_spark

Reply via email to