Hi,

The error message indicates that a Streaming Context object end up in the
fields of the closure that Spark tries to serialize.

Could you show us the enclosing function and component ?

The workarounds proposed in the following stack overflow reply might help
you to fix the problem:
  http://stackoverflow.com/a/30094847



On Sat, Mar 11, 2017 at 3:10 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> i think the val you defined are only valid in the driver,
> you can  try boardcast variable.
>
> ---Original---
> *From:* "lk_spark"<lk_sp...@163.com>
> *Date:* 2017/2/27 11:14:23
> *To:* "user.spark"<user@spark.apache.org>;
> *Subject:* java.io.NotSerializableException: org.apache.spark.streaming.
> StreamingContext
>
> hi,all:
>        I want to extract some info from kafka useing sparkstream,my code
> like :
>
>     val keyword = ""
>     val system = "dmp"
>     val datetime_idx = 0
>     val datetime_length = 23
>     val logLevelBeginIdx = datetime_length + 2 - 1
>     val logLevelMaxLenght = 5
>
>     val lines = messages.filter(record => record.value().matches("\\d{4}
> .*")).map(record => {
>       val assembly = record.topic()
>       val value = record.value
>       val datatime = value.substring(datetime_idx, datetime_length - 1)
>       val level = value.substring(logLevelBeginIdx, logLevelBeginIdx +
> logLevelMaxLenght - 1)
>       (assembly,value,datatime,level)
>     })
>
>     I will get error :
>     Caused by: java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
> Serialization stack:
>  - object not serializable (class: 
> org.apache.spark.streaming.StreamingContext,
> value: org.apache.spark.streaming.StreamingContext@5a457aa1)
>  - field (class: $iw, name: streamingContext, type: class
> org.apache.spark.streaming.StreamingContext)
>  - object (class $iw, $iw@38eb2140)
>  - field (class: $iw, name: $iw, type: class $iw)
>  - object (class $iw, $iw@2a3ced3d)
>  - field (class: $iw, name: $iw, type: class $iw)
>  - object (class $iw, $iw@7c5dbca5)
> ....
> ============================================================
> ======================
>   if I change the parameter to constant I will not got error :
>
>   val lines = messages.filter(record => record.value().matches("\\d{4}
> .*")).map(record => {
>       val assembly = record.topic()
>       val value = record.value
>       val datatime = value.substring(0, 22)
>       val level = value.substring(24, 27)
>       (assembly,value,datatime,level)
>
>     })
>
> how can I pass parameter to the map function.
>
> 2017-02-27
> ------------------------------
> lk_spark
>

Reply via email to