Spark streaming Processing time keeps increasing

2015-07-15 Thread N B
Hello, We have a Spark streaming application and the problem that we are encountering is that the batch processing time keeps on increasing and eventually causes the application to start lagging. I am hoping that someone here can point me to any underlying cause of why this might happen. The batc

Re: Possible to combine all RDDs from a DStream batch into one?

2015-07-15 Thread N B
Hi Jon, In Spark streaming, 1 batch = 1 RDD. Essentially, the terms are used interchangeably. If you are trying to collect multiple batches across a DStream into a single RDD, look at the window() operations. Hope this helps Nikunj On Wed, Jul 15, 2015 at 7:00 PM, Jon Chase wrote: > I should

Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread N B
g SSD disks as this behavior is mainly due > to the IO wait. > > Thanks > Best Regards > > On Thu, Jul 16, 2015 at 8:43 AM, N B wrote: > >> Hello, >> >> We have a Spark streaming application and the problem that we are >> encountering is that the batch

Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread N B
; that case make sure you are having SSD disks as this behavior is mainly due >> to the IO wait. >> >> Thanks >> Best Regards >> >> On Thu, Jul 16, 2015 at 8:43 AM, N B wrote: >> >>> Hello, >>> >>> We have a Spark streaming applic

Re: Spark streaming Processing time keeps increasing

2015-07-17 Thread N B
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das wrote: > Responses inline. > > On Thu, Jul 16, 2015

Re: Spark streaming Processing time keeps increasing

2015-07-19 Thread N B
transformations only) and will pose that question on this mailing list separately. Thanks Nikunj On Fri, Jul 17, 2015 at 2:45 AM, N B wrote: > Hi TD, > > Thanks for the response. I do believe I understand the concept and the > need for the filterfunction now. I made the requisite code

Counting distinct values for a key?

2015-07-19 Thread N B
Hello, How do I go about performing the equivalent of the following SQL clause in Spark Streaming? I will be using this on a Windowed DStream. SELECT key, count(distinct(value)) from table group by key; so for example, given the following dataset in the table: key | value -+--- k1 |

Re: Spark Kafka stream processing time increasing gradually

2016-06-16 Thread N B
We had this same issue with the reduceByKeyAndWindow API that you are using. For fixing this issue, you have to use different flavor of that API, specifically the 2 versions that allow you to give a 'Filter function' to them. Putting in the filter functions helped stabilize our application too. H

Re: Neither previous window has value for key, nor new values found.

2016-06-16 Thread N B
That post from TD that you reference has a good explanation of the issue you are encountering. The issue in my view here is that the reduce and the inverseReduce function that you have specified are not perfect opposites of each other. Consider the following strings: "a" "b" "a" forward reduce wi

Re: Spark Kafka stream processing time increasing gradually

2016-06-20 Thread N B
ng.DStream.reduceByKeyAndWindow), > filerFunc can be used to retain expiring keys. I do not want to retain any > expiring key, so I do not understand how can this help me stabilize it. > Please correct me if this is not the case. > > I am also specifying both reduceFunc and invReduceF

Spark streaming and ThreadLocal

2016-01-28 Thread N B
Hello, Does anyone know if there are any potential pitfalls associated with using ThreadLocal variables in a Spark streaming application? One things I have seen mentioned in the context of app servers that use thread pools is that ThreadLocals can leak memory. Could this happen in Spark streaming

Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
ely a memory leak. > > On Thu, Jan 28, 2016 at 9:31 PM, N B wrote: > >> Hello, >> >> Does anyone know if there are any potential pitfalls associated with >> using ThreadLocal variables in a Spark streaming application? One things I >> have seen mentioned in the

Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
reaming uses threadpools so you need to remove ThreadLocal when > it's not used. > > On Fri, Jan 29, 2016 at 12:55 PM, N B wrote: > >> Thanks for the response Ryan. So I would say that it is in fact the >> purpose of a ThreadLocal i.e. to have a copy of the variable as long as t

Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Fri, Jan 29, 2016 at 4:32 PM, N B wrote: > So this use of ThreadLocal will be inside the code of a function executing > on the workers i.e. within a call from one of the lambdas. Would it just > look like this then: > > dstream.map( p -> { ThreadLocal d = new ThreadLocal<>

Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
hanks NB On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu wrote: > It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"? > You don't need to use ThreadLocal if there are no multiple threads in your > codes. > > On Fri, Jan 29, 201

Re: Spark streaming and ThreadLocal

2016-02-01 Thread N B
Is each partition guaranteed to execute in a single thread in a worker? Thanks N B On Fri, Jan 29, 2016 at 6:53 PM, Shixiong(Ryan) Zhu wrote: > I see. Then you should use `mapPartitions` rather than using ThreadLocal. > E.g., > > dstream.mapPartitions( iter -> > val

Overriding toString and hashCode with Spark streaming

2016-02-02 Thread N B
Hello, In our Spark streaming application, we are forming DStreams made of objects a rather large composite class. I have discovered that in order to do some operations like RDD.subtract(), they are only successful for complex objects such as these by overriding toString() and hashCode() methods f

Re: Weird worker usage

2015-09-25 Thread N B
Bryan, By any chance, are you calling SparkConf.setMaster("local[*]") inside your application code? Nikunj On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey wrote: > Looking at this further, it appears that my Spark Context is not correctly > setting the Master name. I see the following in logs:

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-25 Thread N B
Hi Dibyendu, How does one go about configuring spark streaming to use tachyon as its place for storing checkpoints? Also, can one do this with tachyon running on a completely different node than where spark processes are running? Thanks Nikunj On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacha

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
> > String checkpointDirectory = "hdfs:// > 10.252.5.113:9000/user/hadoop/spark/wal"; > > jsc.checkpoint(checkpointDirectory); > > > //I am using the My Receiver Based Consumer ( > https://github.com/dibbhatt/kafka-spark-consumer) . But > KafkaUtil.CreateSt

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
is the issue of storing meta data in Tachyon . > That needs a different JIRA I guess. > > Let me know I am able to explain the current scenario around Spark > Streaming and Tachyon . > > Regards, > Dibyendu > > > > > On Sat, Sep 26, 2015 at 1:04 PM, N B wrote: > >

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
I wanted to add that we are not configuring the WAL in our scenario. Thanks again, Nikunj On Sat, Sep 26, 2015 at 11:35 AM, N B wrote: > Hi Dibyendu, > > Thanks. I believe I understand why it has been an issue using S3 for > checkpoints based on your explanation. But does thi

Re: Weird worker usage

2015-09-26 Thread N B
Hello, Does anyone have an insight into what could be the issue here? Thanks Nikunj On Fri, Sep 25, 2015 at 10:44 AM, N B wrote: > Hi Akhil, > > I do have 25 partitions being created. I have set > the spark.default.parallelism property to 25. Batch size is 30 seconds and > bl

Re: Counting distinct values for a key?

2015-07-19 Thread N B
key, count(distinct(value)) from table group by key Thanks Nikunj On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam wrote: > You mean this does not work? > > SELECT key, count(value) from table group by key > > > > On Sun, Jul 19, 2015 at 2:28 PM, N B wrote: > >> Hello

Re: Counting distinct values for a key?

2015-07-19 Thread N B
duceByKey((c1, c2) -> c1+c2 ); > > List> output = rdd2.collect(); > > for (Tuple2 tuple : output) { > > System.out.println( tuple._1() + " : " + tuple._2() ); > > } > > } > > On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam wrote: &g

Re: Counting distinct values for a key?

2015-07-20 Thread N B
all values of the same key > together), then follow by mapValues (probably put the values into a set and > then take the size of it because you want a distinct count) > > HTH, > > Jerry > > Sent from my iPhone > > On 19 Jul, 2015, at 8:48 pm, N B wrote: > > Hi

Dynamic lookup table

2015-08-28 Thread N B
Hi all, I have the following use case that I wanted to get some insight on how to go about doing in Spark Streaming. Every batch is processed through the pipeline and at the end, it has to update some statistics information. This updated info should be reusable in the next batch of this DStream e

Re: Dynamic lookup table

2015-08-28 Thread N B
tion problems. > > Jason > > On Fri, Aug 28, 2015 at 11:39 AM N B wrote: > >> Hi all, >> >> I have the following use case that I wanted to get some insight on how to >> go about doing in Spark Streaming. >> >> Every batch is processed through t

Is HDFS required for Spark streaming?

2015-09-04 Thread N B
Hello, We have a Spark Streaming program that is currently running on a single node in "local[n]" master mode. We currently give it local directories for Spark's own state management etc. The input is streaming from network/flume and output is also to network/kafka etc, so the process as such does

Re: Is HDFS required for Spark streaming?

2015-09-05 Thread N B
k and Spark Streaming checkpoint info WILL NEED HDFS for > fault-tolerance. So that stuff can be recovered even if the spark cluster > nodes go down. > > TD > > On Fri, Sep 4, 2015 at 2:45 PM, N B wrote: > >> Hello, >> >> We have a Spark Streaming program that

Re: Is HDFS required for Spark streaming?

2015-09-09 Thread N B
a well-test code path (so I have no idea what can happen). > > On Tue, Sep 8, 2015 at 6:59 AM, Cody Koeninger wrote: > >> Yes, local directories will be sufficient >> >> On Sat, Sep 5, 2015 at 10:44 AM, N B wrote: >> >>> Hi TD, >>> >>> Th

Tungsten and Spark Streaming

2015-09-09 Thread N B
Hello, How can we start taking advantage of the performance gains made under Project Tungsten in Spark 1.5 for a Spark Streaming program? >From what I understand, this is available by default for Dataframes. But for a program written using Spark Streaming, would we see any potential gains "out of

Weird worker usage

2015-09-25 Thread N B
Hello all, I have a Spark streaming application that reads from a Flume Stream, does quite a few maps/filters in addition to a few reduceByKeyAndWindow and join operations before writing the analyzed output to ElasticSearch inside a foreachRDD()... I recently started to run this on a 2 node clust

Re: Weird worker usage

2015-09-25 Thread N B
s that you are having, > if you are not receiving sufficient partitions (partitions > total # cores) > then try to do a .repartition. > > Thanks > Best Regards > > On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: > >> Hello all, >> >> I have a Spark streaming

Spark Streaming user function exceptions causing hangs

2016-08-24 Thread N B
Hello, We have a Spark streaming application (running Spark 1.6.1) that consumes data from a message queue. The application is running in local[*] mode so driver and executors are in a single JVM. The issue that we are dealing with these days is that if any of our lambda functions throw any Excep

Kryo (with Spark 1.6.3) class registration slows down processing

2017-01-20 Thread N B
Hello, Here is something I am unable to explain and goes against Kryo's documentation, numerous suggestions on the web and on this list as well as pure intuition. Our Spark application runs in a single JVM (perhaps this is relevant, hence mentioning it). We have been using Kryo serialization with

Flume DStream produces 0 records after HDFS node killed

2017-06-19 Thread N B
where to look in order to find the root cause will be greatly appreciated. Thanks N B

Re: Flume DStream produces 0 records after HDFS node killed

2017-06-20 Thread N B
he receive receive metadata but not write to the WAL after an HDFS node is lost and restarted? HDFS replication factor is at its default of 2. Thanks N B On Mon, Jun 19, 2017 at 6:23 PM, N B wrote: > Hi all, > > We are running a Standalone Spark Cluster for running a streaming > applic

Re: Flume DStream produces 0 records after HDFS node killed

2017-06-20 Thread N B
(DFSOutputStream.java:823) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475) polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in the Write Ahead Log. Thanks N B On Tue, Jun 20, 2017 at 10:24 AM, N B

Spark 2.1.1 and Hadoop version 2.2 or 2.7?

2017-06-20 Thread N B
;s the root cause of the issues I have been seeing. Can someone please confirm if the package mentioned above was indeed compiled with Hadoop 2.7? Or should I fall back on an HDFS Server 2.2 instead? Thanks N B

Re: Flume DStream produces 0 records after HDFS node killed

2017-06-20 Thread N B
Hadoop version 2.7.3 On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin wrote: > Which version of Hadoop are you running on? > > *Yohann Jardin* > Le 6/21/2017 à 1:06 AM, N B a écrit : > > Ok some more info about this issue to see if someone can shine a light on > what could

Re: Flume DStream produces 0 records after HDFS node killed

2017-06-22 Thread N B
ible in terms of both APIs and protocols. " Thanks N B On Tue, Jun 20, 2017 at 11:36 PM, N B wrote: > Hadoop version 2.7.3 > > On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin > wrote: > >> Which version of Hadoop are you running on? >> >> *Yohann Jardin

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-26 Thread N B
consideration. We achieved it using a HashMap that maintains counts instead of a Set. Hope this helps, N B On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy wrote: > Hi TD, > > I am still seeing this issue with any immuatble DataStructure. Any idea > why this ha

Implementing Dynamic Sampling in a Spark Streaming Application

2017-07-12 Thread N B
Hi all, Spark has had a backpressure implementation since 1.5 that helps to stabilize a Spark Streaming application in terms of keeping the processing time/batch under control and less than the batch interval. This implementation leaves excess records in the source (Kafka, Flume etc) and they get

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
Hi TD, Thanks for the response. Since you mentioned GC, this got me thinking. Given that we are running in local mode (all in a single JVM) for now, does the option "spark.executor.extraJavaOptions" set to "-XX:+UseConcMarkSweepGC" inside SparkConf object take effect at all before we use it to cr

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
ht I would clarify. > > TD > > On Wed, Apr 8, 2015 at 1:23 PM, N B wrote: > >> Hi TD, >> >> Thanks for the response. Since you mentioned GC, this got me thinking. >> >> Given that we are running in local mode (all in a single JVM) for n

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
tor will be same the process. > And in that case the Java options in SparkConf configuration will not > work. > > On Wed, Apr 8, 2015 at 1:44 PM, N B wrote: > >> Since we are running in local mode, won't all the executors be in the >> same JVM as the driver? >

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-16 Thread N B
pr 8, 2015 at 6:08 PM, Tathagata Das wrote: > >> Yes, in local mode they the driver and executor will be same the process. >> And in that case the Java options in SparkConf configuration will not >> work. >> >> On Wed, Apr 8, 2015 at 1:44 PM, N B wrote: &g

Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread N B
Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming applica

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-21 Thread N B
h 1 -type d -cmin > +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+ > > > On 20 April 2015 at 23:12, N B wrote: > >> Hi all, >> >> I had posed this query as part of a different thread but did not get a >> response there. So creating a new thread hoping

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread N B
.unpersist=true. > > > Those together cleaned up the shuffle files for us. > > > -Conor > > On Tue, Apr 21, 2015 at 8:18 AM, N B wrote: > >> We already do have a cron job in place to clean just the shuffle files. >> However, what I would really like to know is

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-24 Thread N B
ing application? Was it falling behind > with a large increasing scheduling delay? > > TD > > On Thu, Apr 23, 2015 at 11:31 AM, N B wrote: > >> Thanks for the response, Conor. I tried with those settings and for a >> while it seemed like it was cleaning up shuffle files after

Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-24 Thread N B
sure how to proceed from here. Any suggestions on how to avoid these errors? Thanks NB On Fri, Apr 24, 2015 at 12:57 AM, N B wrote: > Hi TD, > > That may very well have been the case. There may be some delay on our > output side. I have made a change just for testing that send

Re: Broadcast variables can be rebroadcast?

2015-05-15 Thread N B
Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin wrote: > The broadcast variable is like a pointer. If the underlying data changes > then the changes

Re: Broadcast variables can be rebroadcast?

2015-05-16 Thread N B
tatins: > > In addition, the object v should not be modified after it is broadcast > in order to ensure that all nodes get the same value of the broadcast > variable > > > On Sat, May 16, 2015 at 10:39 AM, N B wrote: > >> Thanks Ilya. Does one have to call broadcast ag

Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { > ... > } > > var myBroadcast = sc.broadcast(...) > (0 to 20).foreach { iteration => > oneIteration(myRDD, myBroadcast) > var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with > whatever y

Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
on space > myBroadcast.unpersist() > > // now we create a new broadcast which has the updated data in our > mutable data structure > myBroadcast = sc.broadcast(myMutableDataStructure) > } > > > hope this clarifies things! > > Imran > > On Tue, May 19, 2