Hi, The error message indicates that a Streaming Context object end up in the fields of the closure that Spark tries to serialize.
Could you show us the enclosing function and component ? The workarounds proposed in the following stack overflow reply might help you to fix the problem: http://stackoverflow.com/a/30094847 On Sat, Mar 11, 2017 at 3:10 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote: > i think the val you defined are only valid in the driver, > you can try boardcast variable. > > ---Original--- > *From:* "lk_spark"<lk_sp...@163.com> > *Date:* 2017/2/27 11:14:23 > *To:* "user.spark"<user@spark.apache.org>; > *Subject:* java.io.NotSerializableException: org.apache.spark.streaming. > StreamingContext > > hi,all: > I want to extract some info from kafka useing sparkstream,my code > like : > > val keyword = "" > val system = "dmp" > val datetime_idx = 0 > val datetime_length = 23 > val logLevelBeginIdx = datetime_length + 2 - 1 > val logLevelMaxLenght = 5 > > val lines = messages.filter(record => record.value().matches("\\d{4} > .*")).map(record => { > val assembly = record.topic() > val value = record.value > val datatime = value.substring(datetime_idx, datetime_length - 1) > val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + > logLevelMaxLenght - 1) > (assembly,value,datatime,level) > }) > > I will get error : > Caused by: java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext > Serialization stack: > - object not serializable (class: > org.apache.spark.streaming.StreamingContext, > value: org.apache.spark.streaming.StreamingContext@5a457aa1) > - field (class: $iw, name: streamingContext, type: class > org.apache.spark.streaming.StreamingContext) > - object (class $iw, $iw@38eb2140) > - field (class: $iw, name: $iw, type: class $iw) > - object (class $iw, $iw@2a3ced3d) > - field (class: $iw, name: $iw, type: class $iw) > - object (class $iw, $iw@7c5dbca5) > .... > ============================================================ > ====================== > if I change the parameter to constant I will not got error : > > val lines = messages.filter(record => record.value().matches("\\d{4} > .*")).map(record => { > val assembly = record.topic() > val value = record.value > val datatime = value.substring(0, 22) > val level = value.substring(24, 27) > (assembly,value,datatime,level) > > }) > > how can I pass parameter to the map function. > > 2017-02-27 > ------------------------------ > lk_spark >