Re: Task not Serializable Exception

2017-01-02 Thread Prashant Sharma
Can you minimize the code snippet with which we can get this
`NotSerializableException`
exception?

Thanks,


-
Prashant Sharma
Spark Technology Center
http://www.spark.tc/
--


On Sun, Jan 1, 2017 at 9:36 AM, khyati  wrote:

> Getting error for the following code snippet:
>
> object SparkTaskTry extends Logging {
>   63   /**
>   64* Extends the normal Try constructor to allow TaskKilledExceptions
> to propagate
>   65*/
>   66   def apply[T](r: => T): Try[T] =
>   67 try scala.util.Success(r) catch {
>   68   case e: TaskKilledException => throw e
>   69   case NonFatal(e) =>
>   70 logInfo("Caught and Ignored Exception: " + e.toString)
>   71 e.printStackTrace()
>   72 Failure(e)
>   73 }
>   74 }
>
> override def buildScan(
>  349   requiredColumns: Array[String],
>  350   filters: Array[Filter],
>  351   inputFiles: Array[FileStatus],
>  352   broadcastedConf: Broadcast[SerializableConfiguration]):
> RDD[Row]
> = {
>  353 val useMetadataCache =
> sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
>  354 val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
>  355 val assumeBinaryIsString = sqlContext.conf.
> isParquetBinaryAsString
>  356 val assumeInt96IsTimestamp =
> sqlContext.conf.isParquetINT96AsTimestamp
>  357 val followParquetFormatSpec =
> sqlContext.conf.followParquetFormatSpec
>  358
>  359 // When merging schemas is enabled and the column of the given
> filter does not exist,
>  360 // Parquet emits an exception which is an issue of Parquet
> (PARQUET-389).
>  361 val safeParquetFilterPushDown = !shouldMergeSchemas &&
> parquetFilterPushDown
>  362
>  363 // Parquet row group size. We will use this value as the value for
>  364 // mapreduce.input.fileinputformat.split.minsize and
> mapred.min.split.size if the value
>  365 // of these flags are smaller than the parquet row group size.
>  366 val parquetBlockSize =
> ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
>  367
>  368 // Create the function to set variable Parquet confs at both
> driver
> and executor side.
>  369 val initLocalJobFuncOpt =
>  370   ParquetRelation.initializeLocalJobFunc(
>  371 requiredColumns,
>  372 filters,
>  373 dataSchema,
>  374 parquetBlockSize,
>  375 useMetadataCache,
>  376 safeParquetFilterPushDown,
>  377 assumeBinaryIsString,
>  378 assumeInt96IsTimestamp,
>  379 followParquetFormatSpec) _
>  380
>  381 // Create the function to set input paths at the driver side.
>  382 val setInputPaths =
>  383   ParquetRelation.initializeDriverSideJobFunc(inputFiles,
> parquetBlockSize) _
>  384
>  385 Utils.withDummyCallSite(sqlContext.sparkContext) {
>  386   new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with
> Logging {
>  387
>  388 override def getPartitions: Array[SparkPartition] =
> internalRDD.getPartitions
>  389
>  390 override def getPreferredLocations(split: SparkPartition):
> Seq[String] =
>  391   internalRDD.getPreferredLocations(split)
>  392
>  393 override def checkpoint() {
>  394   // Do nothing. Hadoop RDD should not be checkpointed.
>  395 }
>  396
>  397 override def persist(storageLevel: StorageLevel): this.type =
> {
>  398   super.persist(storageLevel)
>  399 }
>  400
>  401 val internalRDD: SqlNewHadoopRDD[InternalRow] = new
> SqlNewHadoopRDD(
>  402 sc = sqlContext.sparkContext,
>  403 broadcastedConf = broadcastedConf,
>  404 initDriverSideJobFuncOpt = Some(setInputPaths),
>  405 initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
>  406 inputFormatClass = if (isSplittable) {
>  407   classOf[ParquetInputFormat[InternalRow]]
>  408 } else {
>  409   classOf[ParquetRowInputFormatIndivisible]
>  410 },
>  411 valueClass = classOf[InternalRow]) {
>  412
>  413 val cacheMetadata = useMetadataCache
>  414
>  415 @transient val cachedStatuses = inputFiles.map { f =>
>  416   // In order to encode the authority of a Path containing
> special characters such as '/'
>  417   // (which does happen in some S3N credentials), we need to
> use the string returned by the
>  418   // URI of the path to create a new Path.
>  419   val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
>  420   new FileStatus(
>  421 f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
> f.getModificationTime,
>  422 f.getAccessTime, f.getPermission, f.getOwner, f.getGroup,
> pathWithEscapedAuthority)
>  423 }.toSeq
>  424
>  425 private def escapePathUserInfo(path: Path): Path = {
>  426   val uri = path.toUri
>  427   new Path(new URI(
>  428 uri.getScheme, uri.getRawUserInf

Re: Kafka Spark structured streaming latency benchmark.

2017-01-02 Thread Prashant Sharma
This issue was fixed in https://issues.apache.org/jira/browse/SPARK-18991.

--Prashant


On Tue, Dec 20, 2016 at 6:16 PM, Prashant Sharma 
wrote:

> Hi Shixiong,
>
> Thanks for taking a look, I am trying to run and see if making
> ContextCleaner run more frequently and/or making it non blocking will help.
>
> --Prashant
>
>
> On Tue, Dec 20, 2016 at 4:05 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Prashant. Thanks for your codes. I did some investigation and it
>> turned out that ContextCleaner is too slow and its "referenceQueue" keeps
>> growing. My hunch is cleaning broadcast is very slow since it's a blocking
>> call.
>>
>> On Mon, Dec 19, 2016 at 12:50 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey, Prashant. Could you track the GC root of byte arrays in the heap?
>>>
>>> On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
>>> wrote:
>>>
 Furthermore, I ran the same thing with 26 GB as the memory, which would
 mean 1.3GB per thread of memory. My jmap
 
 results and jstat
 
 results collected after running the job for more than 11h, again show a
 memory constraint. The same gradual slowdown, but a bit more gradual as
 memory is considerably more than the previous runs.




 This situation sounds like a memory leak ? As the byte array objects
 are more than 13GB, and are not garbage collected.

 --Prashant


 On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
 wrote:

> Hi,
>
> Goal of my benchmark is to arrive at end to end latency lower than
> 100ms and sustain them over time, by consuming from a kafka topic and
> writing back to another kafka topic using Spark. Since the job does not do
> aggregation and does a constant time processing on each message, it
> appeared to me as an achievable target. But, then there are some 
> surprising
> and interesting pattern to observe.
>
>  Basically, it has four components namely,
> 1) kafka
> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with
> each message of about 1KB.
> 3) Spark  job subscribed to `test` topic and writes out to another
> topic `output`.
> 4) A Kafka consumer, reading from the `output` topic.
>
> How the latency was measured ?
>
> While sending messages from kafka producer, each message is embedded
> the timestamp at which it is pushed to the kafka `test` topic. Spark
> receives each message and writes them out to `output` topic as is. When
> these messages arrive at Kafka consumer, their embedded time is subtracted
> from the time of arrival at the consumer and a scatter plot of the same is
> attached.
>
> The scatter plots sample only 10 minutes of data received during
> initial one hour and then again 10 minutes of data received after 2 hours
> of run.
>
>
>
> These plots indicate a significant slowdown in latency, in the later
> scatter plot indicate almost all the messages were received with a delay
> larger than 2 seconds. However, first plot show that most messages arrived
> in less than 100ms latency. The two samples were taken with time 
> difference
> of 2 hours approx.
>
> After running the test for 24 hours, the jstat
> 
> and jmap
> 
>  output
> for the jobs indicate possibility  of memory constrains. To be more clear,
> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
> is straight forward and located here: https://github.com/ScrapCodes/
> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
> es/kafka/SparkSQLKafkaConsumer.scala .
>
>
> What is causing the gradual slowdown? I need help in diagnosing the
> problem.
>
> Thanks,
>
> --Prashant
>
>

>>>
>>
>


Re: Spark Improvement Proposals

2017-01-02 Thread Cody Koeninger
I'm bumping this one more time for the new year, and then I'm giving up.

Please, fix your process, even if it isn't exactly the way I suggested.

On Tue, Nov 8, 2016 at 11:14 AM, Ryan Blue  wrote:
> On lazy consensus as opposed to voting:
>
> First, why lazy consensus? The proposal was for consensus, which is at least
> three +1 votes and no vetos. Consensus has no losing side, it requires
> getting to a point where there is agreement. Isn't that agreement what we
> want to achieve with these proposals?
>
> Second, lazy consensus only removes the requirement for three +1 votes. Why
> would we not want at least three committers to think something is a good
> idea before adopting the proposal?
>
> rb
>
> On Tue, Nov 8, 2016 at 8:13 AM, Cody Koeninger  wrote:
>>
>> So there are some minor things (the Where section heading appears to
>> be dropped; wherever this document is posted it needs to actually link
>> to a jira filter showing current / past SIPs) but it doesn't look like
>> I can comment on the google doc.
>>
>> The major substantive issue that I have is that this version is
>> significantly less clear as to the outcome of an SIP.
>>
>> The apache example of lazy consensus at
>> http://apache.org/foundation/voting.html#LazyConsensus involves an
>> explicit announcement of an explicit deadline, which I think are
>> necessary for clarity.
>>
>>
>>
>> On Mon, Nov 7, 2016 at 1:55 PM, Reynold Xin  wrote:
>> > It turned out suggested edits (trackable) don't show up for non-owners,
>> > so
>> > I've just merged all the edits in place. It should be visible now.
>> >
>> > On Mon, Nov 7, 2016 at 10:10 AM, Reynold Xin 
>> > wrote:
>> >>
>> >> Oops. Let me try figure that out.
>> >>
>> >>
>> >> On Monday, November 7, 2016, Cody Koeninger  wrote:
>> >>>
>> >>> Thanks for picking up on this.
>> >>>
>> >>> Maybe I fail at google docs, but I can't see any edits on the document
>> >>> you linked.
>> >>>
>> >>> Regarding lazy consensus, if the board in general has less of an issue
>> >>> with that, sure.  As long as it is clearly announced, lasts at least
>> >>> 72 hours, and has a clear outcome.
>> >>>
>> >>> The other points are hard to comment on without being able to see the
>> >>> text in question.
>> >>>
>> >>>
>> >>> On Mon, Nov 7, 2016 at 3:11 AM, Reynold Xin 
>> >>> wrote:
>> >>> > I just looked through the entire thread again tonight - there are a
>> >>> > lot
>> >>> > of
>> >>> > great ideas being discussed. Thanks Cody for taking the first crack
>> >>> > at
>> >>> > the
>> >>> > proposal.
>> >>> >
>> >>> > I want to first comment on the context. Spark is one of the most
>> >>> > innovative
>> >>> > and important projects in (big) data -- overall technical decisions
>> >>> > made in
>> >>> > Apache Spark are sound. But of course, a project as large and active
>> >>> > as
>> >>> > Spark always have room for improvement, and we as a community should
>> >>> > strive
>> >>> > to take it to the next level.
>> >>> >
>> >>> > To that end, the two biggest areas for improvements in my opinion
>> >>> > are:
>> >>> >
>> >>> > 1. Visibility: There are so much happening that it is difficult to
>> >>> > know
>> >>> > what
>> >>> > really is going on. For people that don't follow closely, it is
>> >>> > difficult to
>> >>> > know what the important initiatives are. Even for people that do
>> >>> > follow, it
>> >>> > is difficult to know what specific things require their attention,
>> >>> > since the
>> >>> > number of pull requests and JIRA tickets are high and it's difficult
>> >>> > to
>> >>> > extract signal from noise.
>> >>> >
>> >>> > 2. Solicit user (broadly defined, including developers themselves)
>> >>> > input
>> >>> > more proactively: At the end of the day the project provides value
>> >>> > because
>> >>> > users use it. Users can't tell us exactly what to build, but it is
>> >>> > important
>> >>> > to get their inputs.
>> >>> >
>> >>> >
>> >>> > I've taken Cody's doc and edited it:
>> >>> >
>> >>> >
>> >>> > https://docs.google.com/document/d/1-Zdi_W-wtuxS9hTK0P9qb2x-nRanvXmnZ7SUi4qMljg/edit#heading=h.36ut37zh7w2b
>> >>> > (I've made all my modifications trackable)
>> >>> >
>> >>> > There are couple high level changes I made:
>> >>> >
>> >>> > 1. I've consulted a board member and he recommended lazy consensus
>> >>> > as
>> >>> > opposed to voting. The reason being in voting there can easily be a
>> >>> > "loser'
>> >>> > that gets outvoted.
>> >>> >
>> >>> > 2. I made it lighter weight, and renamed "strategy" to "optional
>> >>> > design
>> >>> > sketch". Echoing one of the earlier email: "IMHO so far aside from
>> >>> > tagging
>> >>> > things and linking them elsewhere simply having design docs and
>> >>> > prototypes
>> >>> > implementations in PRs is not something that has not worked so far".
>> >>> >
>> >>> > 3. I made some the language tweaks to focus more on visibility. For
>> >>> > example,
>> >>> > "The purpose of an SIP is to inform and involve", rather than just
>> >>> > "involve". SIPs should

Re: What is mainly different from a UDT and a spark internal type that ExpressionEncoder recognized?

2017-01-02 Thread Shuai Lin
Disclaimer: I'm not a spark guru, and what's written below are some notes I
took when reading spark source code, so I could be wrong, in which case I'd
appreciate a lot if someone could correct me.


> > Let me rephrase this. How does the SparkSQL engine call the codegen APIs
> to
> do the job of producing RDDs?


IIUC, physical operators like `ProjectExec` implements doProduce/doConsume
to support codegen, and when whole-stage codegen is enabled, a subtree
would be collapsed into a WholeStageCodegenExec wrapper tree, and the root
node of the wrapper tree would call the doProduce/doConsume method of each
operator to generate the java source code to be compiled into java byte
code by janino.

In contrast, when whole stage code gen is disabled (e.g. by passing "--conf
spark.sql.codegen.wholeStage=false" to spark submit), the doExecute method
of the physical operators are called so no code generation would happen.

The producing of the RDDs is some post-order SparkPlan tree evaluation. The
leaf node would be some data source: either some file-based
HadoopFsRelation, or some external data sources like JdbcRelation, or
in-memory LocalRelation created by "spark.range(100)". Above all, the leaf
nodes could produce rows on their own. Then the evaluation goes in a bottom
up manner, applying filter/limit/project etc. along the way. The generated
code or the various doExecute method would be called, depending on whether
codegen is enabled (the default) or not.

> What are those eval methods in Expressions for given there's already a
> doGenCode next to it?


AFAIK the `eval` method of Expression is used to do static evaluation when
the expression is foldable, e.g.:

   select map('a', 1, 'b', 2, 'a', 3) as m

Regards,
Shuai


On Wed, Dec 28, 2016 at 1:05 PM, dragonly  wrote:

> Thanks for your reply!
>
> Here's my *understanding*:
> basic types that ScalaReflection understands are encoded into tungsten
> binary format, while UDTs are encoded into GenericInternalRow, which stores
> the JVM objects in an Array[Any] under the hood, and thus lose those memory
> footprint efficiency and cpu cache efficiency stuff provided by tungsten
> encoding.
>
> If the above is correct, then here are my *further questions*:
> Are SparkPlan nodes (those ends with Exec) all codegenerated before
> actually
> running the toRdd logic? I know there are some non-codegenable nodes which
> implement trait CodegenFallback, but there's also a doGenCode method in the
> trait, so the actual calling convention really puzzles me. And I've tried
> to
> trace those calling flow for a few days but found them scattered every
> where. I cannot make a big graph of the method calling order even with the
> help of IntelliJ.
>
> Let me rephrase this. How does the SparkSQL engine call the codegen APIs to
> do the job of producing RDDs? What are those eval methods in Expressions
> for
> given there's already a doGenCode next to it?
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/What-is-mainly-
> different-from-a-UDT-and-a-spark-internal-type-that-
> ExpressionEncoder-recognized-tp20370p20376.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: mllib metrics vs ml evaluators and how to improve apis for users

2017-01-02 Thread Joseph Bradley
Hi Ilya,

Thanks for your thoughts.  Here's my understanding of where we are headed:
* We will want to move the *Metrics functionality to the spark.ml package,
as part of *Evaluator or related classes such as model/result summaries.
* It has not yet been decided if or when the spark.mllib package will be
removed.  This cannot happen until spark.ml has complete feature parity and
has been separated from spark.mllib internally for a few releases, and it
will require a community vote and significant QA.
* You're correct that Evaluators are meant for model tuning.  IMO, multiple
metrics are more naturally handled by model/result summaries, though I
could see good arguments for separating the metric computation from
models.  This is an issue which has not yet been discussed properly.  There
have also been questions about Evaluators maintaining multiple metrics
along the way during model tuning (SPARK-18704).

I created a JIRA for discussing this further:
https://issues.apache.org/jira/browse/SPARK-19053

Thanks!
Joseph

On Thu, Dec 29, 2016 at 8:36 PM, Ilya Matiach  wrote:

> Hi ML/MLLib developers,
>
> 1.I’m trying to add a weights column to ml spark evaluators
> (RegressionEvaluator, BinaryClassificationEvaluator,
> MutliclassClassificationEvaluator) that use mllib metrics and I have a
> few questions (JIRA
>
> 2.SPARK-18693 ).
> I didn’t see any similar question on the forums or stackoverflow.
>
> Moving forward, will we keep mllib metrics (RegressionMetrics,
> MulticlassMetrics, BinaryClassificationMetrics) as something separate to
> the evaluators, or will we remove them when mllib is removed in spark 3.0?
>
The mllib metrics seem very useful because they are able to compute/expose
> many metrics on one dataset, whereas with the evaluators it is not
> performant to re-evaluate the entire dataset for a single different metric.
>
> For example, if I calculate the RMSE and then MSE using the ML
> RegressionEvaluator, I will be redoing most of the work twice, so the ML
> api doesn’t make sense to use in this scenario.
>
Also, the ml evaluators expose a lot fewer metrics than the mllib metrics
> classes, so it seems like the ml evaluators are not at parity with the
> mllib metrics classes.
>
> I can see how the ml evaluators are useful in CrossValidator, but for
> exploring all metrics from a scored dataset it doesn’t really make sense to
> use them.
>
> From the viewpoint of exploring all metrics for a scored model, does this
> mean that the mllib metrics classes should be moved to ml?
>
That would solve my issue if that is what is planned in the future.
> However, that doesn’t make sense to me, because it may cause some confusion
> for ml users to see metrics and evaluators classes.
>
>
>
> Instead, it seems like the ml evaluators need to be changed at the api
> layer to:
>
>1. Allow the user to either retrieve a single value
>2. Allow the user to retrieve all metrics or a set of metrics
>
> One possibility would be to overload evaluate so that we would have
> something like:
>
>
>
> override def evaluate(dataset: Dataset[_]): Double
>
> override def evaluate(dataset: Dataset[_], metrics:Array[String]):
> Dataset[_]
>
>
>
> But for some metrics like confusion matrix you couldn’t really fit the
> data into the result of the second api in addition to the single-value
> metrics.
>
> The format of the mllib metrics classes was much more convenient, as you
> could retrieve them directly.
>
> Following this line of thought, maybe the APIs could be:
>
>
>
> override def evaluate(dataset: Dataset[_]): Double
>
> def evaluateMetrics(dataset: Dataset[_]): RegressionEvaluation (or
> classification/multiclass etc)
>
>
>
> where the evaluation class returned will have very similar fields to the
> corresponding mllib RegressionMetrics class that can be called by the user.
>
>
>
> Any thoughts/ideas about spark ml evaluators/mllib metrics apis, coding
> suggestions for the api proposed, or a general roadmap would be really
> appreciated.
>
>
>
> Thank you, Ilya
>



-- 

Joseph Bradley

Software Engineer - Machine Learning

Databricks, Inc.

[image: http://databricks.com] 


StateStoreSaveExec / StateStoreRestoreExec

2017-01-02 Thread Jeremy Smith
I have a question about state tracking in Structured Streaming.

First let me briefly explain my use case: Given a mutable data source (i.e.
an RDBMS) in which we assume we can retrieve a set of newly created row
versions (being a row that was created or updated between two given
`Offset`s, whatever those are), we can create a Structured Streaming
`Source` which retrieves the new row versions. Further assuming that every
logical row has some primary key, then as long as we can track the current
offset for each primary key, we can differentiate between new and updated
rows. Then, when a row is updated, we can record that the previous version
of that row expired at some particular time. That's essentially what I'm
trying to do. This would effectively give you an "event-sourcing" type of
historical/immutable log of changes out of a mutable data source.

I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
seemed like it would allow me to do exactly the tracking that I needed, so
I decided to try and use that built-in functionality rather than some
external key/value store for storing the current "version number" of each
primary key. There were a lot of hard-coded hoops I had to jump through,
but I eventually made it work by implementing some custom LogicalPlans and
SparkPlans around StateStore[Save/Restore]Exec.

Now, in Spark 2.1.0 it seems to have gotten even further away from what I
was using it for - the keyExpressions of StateStoreSaveExec must include a
timestamp column, which means that those expressions are not really keys
(at least not for a logical row). So it appears I can't use it that way
anymore (I can't blame Spark for this, as I knew what I was getting into
when leveraging developer APIs). There are also several hard-coded checks
which now make it clear that StateStore functionality is only to be used
for streaming aggregates, which is not really what I'm doing.

My question is - is there a good way to accomplish the above use case
within Structured Streaming? Or is this the wrong use case for the state
tracking functionality (which increasingly seems to be targeted toward
aggregates only)? Is there a plan for any kind of generalized
`mapWithState`-type functionality for Structured Streaming, or should I
just give up on that and use an external key/value store for my state
tracking?

Thanks,
Jeremy