Hello,
We have a Spark streaming application and the problem that we are
encountering is that the batch processing time keeps on increasing and
eventually causes the application to start lagging. I am hoping that
someone here can point me to any underlying cause of why this might happen.
The batc
Hi Jon,
In Spark streaming, 1 batch = 1 RDD. Essentially, the terms are used
interchangeably. If you are trying to collect multiple batches across a
DStream into a single RDD, look at the window() operations.
Hope this helps
Nikunj
On Wed, Jul 15, 2015 at 7:00 PM, Jon Chase wrote:
> I should
g SSD disks as this behavior is mainly due
> to the IO wait.
>
> Thanks
> Best Regards
>
> On Thu, Jul 16, 2015 at 8:43 AM, N B wrote:
>
>> Hello,
>>
>> We have a Spark streaming application and the problem that we are
>> encountering is that the batch
; that case make sure you are having SSD disks as this behavior is mainly due
>> to the IO wait.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Jul 16, 2015 at 8:43 AM, N B wrote:
>>
>>> Hello,
>>>
>>> We have a Spark streaming applic
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das wrote:
> Responses inline.
>
> On Thu, Jul 16, 2015
transformations
only) and will pose that question on this mailing list separately.
Thanks
Nikunj
On Fri, Jul 17, 2015 at 2:45 AM, N B wrote:
> Hi TD,
>
> Thanks for the response. I do believe I understand the concept and the
> need for the filterfunction now. I made the requisite code
Hello,
How do I go about performing the equivalent of the following SQL clause in
Spark Streaming? I will be using this on a Windowed DStream.
SELECT key, count(distinct(value)) from table group by key;
so for example, given the following dataset in the table:
key | value
-+---
k1 |
We had this same issue with the reduceByKeyAndWindow API that you are
using. For fixing this issue, you have to use different flavor of that
API, specifically the 2 versions that allow you to give a 'Filter function'
to them. Putting in the filter functions helped stabilize our application
too.
H
That post from TD that you reference has a good explanation of the issue
you are encountering. The issue in my view here is that the reduce and the
inverseReduce function that you have specified are not perfect opposites of
each other. Consider the following strings:
"a"
"b"
"a"
forward reduce wi
ng.DStream.reduceByKeyAndWindow),
> filerFunc can be used to retain expiring keys. I do not want to retain any
> expiring key, so I do not understand how can this help me stabilize it.
> Please correct me if this is not the case.
>
> I am also specifying both reduceFunc and invReduceF
Hello,
Does anyone know if there are any potential pitfalls associated with using
ThreadLocal variables in a Spark streaming application? One things I have
seen mentioned in the context of app servers that use thread pools is that
ThreadLocals can leak memory. Could this happen in Spark streaming
ely a memory leak.
>
> On Thu, Jan 28, 2016 at 9:31 PM, N B wrote:
>
>> Hello,
>>
>> Does anyone know if there are any potential pitfalls associated with
>> using ThreadLocal variables in a Spark streaming application? One things I
>> have seen mentioned in the
reaming uses threadpools so you need to remove ThreadLocal when
> it's not used.
>
> On Fri, Jan 29, 2016 at 12:55 PM, N B wrote:
>
>> Thanks for the response Ryan. So I would say that it is in fact the
>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as t
Fri, Jan 29, 2016 at 4:32 PM, N B wrote:
> So this use of ThreadLocal will be inside the code of a function executing
> on the workers i.e. within a call from one of the lambdas. Would it just
> look like this then:
>
> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>
hanks
NB
On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu wrote:
> It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
> You don't need to use ThreadLocal if there are no multiple threads in your
> codes.
>
> On Fri, Jan 29, 201
Is each partition guaranteed to execute in a single thread in a worker?
Thanks
N B
On Fri, Jan 29, 2016 at 6:53 PM, Shixiong(Ryan) Zhu wrote:
> I see. Then you should use `mapPartitions` rather than using ThreadLocal.
> E.g.,
>
> dstream.mapPartitions( iter ->
> val
Hello,
In our Spark streaming application, we are forming DStreams made of objects
a rather large composite class. I have discovered that in order to do some
operations like RDD.subtract(), they are only successful for complex
objects such as these by overriding toString() and hashCode() methods f
Bryan,
By any chance, are you calling SparkConf.setMaster("local[*]") inside your
application code?
Nikunj
On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey
wrote:
> Looking at this further, it appears that my Spark Context is not correctly
> setting the Master name. I see the following in logs:
Hi Dibyendu,
How does one go about configuring spark streaming to use tachyon as its
place for storing checkpoints? Also, can one do this with tachyon running
on a completely different node than where spark processes are running?
Thanks
Nikunj
On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacha
>
> String checkpointDirectory = "hdfs://
> 10.252.5.113:9000/user/hadoop/spark/wal";
>
> jsc.checkpoint(checkpointDirectory);
>
>
> //I am using the My Receiver Based Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) . But
> KafkaUtil.CreateSt
is the issue of storing meta data in Tachyon .
> That needs a different JIRA I guess.
>
> Let me know I am able to explain the current scenario around Spark
> Streaming and Tachyon .
>
> Regards,
> Dibyendu
>
>
>
>
> On Sat, Sep 26, 2015 at 1:04 PM, N B wrote:
>
>
I wanted to add that we are not configuring the WAL in our scenario.
Thanks again,
Nikunj
On Sat, Sep 26, 2015 at 11:35 AM, N B wrote:
> Hi Dibyendu,
>
> Thanks. I believe I understand why it has been an issue using S3 for
> checkpoints based on your explanation. But does thi
Hello,
Does anyone have an insight into what could be the issue here?
Thanks
Nikunj
On Fri, Sep 25, 2015 at 10:44 AM, N B wrote:
> Hi Akhil,
>
> I do have 25 partitions being created. I have set
> the spark.default.parallelism property to 25. Batch size is 30 seconds and
> bl
key, count(distinct(value)) from table group by key
Thanks
Nikunj
On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam wrote:
> You mean this does not work?
>
> SELECT key, count(value) from table group by key
>
>
>
> On Sun, Jul 19, 2015 at 2:28 PM, N B wrote:
>
>> Hello
duceByKey((c1, c2) -> c1+c2 );
>
> List> output = rdd2.collect();
>
> for (Tuple2 tuple : output) {
>
> System.out.println( tuple._1() + " : " + tuple._2() );
>
> }
>
> }
>
> On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam wrote:
&g
all values of the same key
> together), then follow by mapValues (probably put the values into a set and
> then take the size of it because you want a distinct count)
>
> HTH,
>
> Jerry
>
> Sent from my iPhone
>
> On 19 Jul, 2015, at 8:48 pm, N B wrote:
>
> Hi
Hi all,
I have the following use case that I wanted to get some insight on how to
go about doing in Spark Streaming.
Every batch is processed through the pipeline and at the end, it has to
update some statistics information. This updated info should be reusable in
the next batch of this DStream e
tion problems.
>
> Jason
>
> On Fri, Aug 28, 2015 at 11:39 AM N B wrote:
>
>> Hi all,
>>
>> I have the following use case that I wanted to get some insight on how to
>> go about doing in Spark Streaming.
>>
>> Every batch is processed through t
Hello,
We have a Spark Streaming program that is currently running on a single
node in "local[n]" master mode. We currently give it local directories for
Spark's own state management etc. The input is streaming from network/flume
and output is also to network/kafka etc, so the process as such does
k and Spark Streaming checkpoint info WILL NEED HDFS for
> fault-tolerance. So that stuff can be recovered even if the spark cluster
> nodes go down.
>
> TD
>
> On Fri, Sep 4, 2015 at 2:45 PM, N B wrote:
>
>> Hello,
>>
>> We have a Spark Streaming program that
a well-test code path (so I have no idea what can happen).
>
> On Tue, Sep 8, 2015 at 6:59 AM, Cody Koeninger wrote:
>
>> Yes, local directories will be sufficient
>>
>> On Sat, Sep 5, 2015 at 10:44 AM, N B wrote:
>>
>>> Hi TD,
>>>
>>> Th
Hello,
How can we start taking advantage of the performance gains made under
Project Tungsten in Spark 1.5 for a Spark Streaming program?
>From what I understand, this is available by default for Dataframes. But
for a program written using Spark Streaming, would we see any potential
gains "out of
Hello all,
I have a Spark streaming application that reads from a Flume Stream, does
quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
operations before writing the analyzed output to ElasticSearch inside a
foreachRDD()...
I recently started to run this on a 2 node clust
s that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B wrote:
>
>> Hello all,
>>
>> I have a Spark streaming
Hello,
We have a Spark streaming application (running Spark 1.6.1) that consumes
data from a message queue. The application is running in local[*] mode so
driver and executors are in a single JVM.
The issue that we are dealing with these days is that if any of our lambda
functions throw any Excep
Hello,
Here is something I am unable to explain and goes against Kryo's
documentation, numerous suggestions on the web and on this list as well as
pure intuition.
Our Spark application runs in a single JVM (perhaps this is relevant, hence
mentioning it). We have been using Kryo serialization with
where to look in order to find the root cause will be
greatly appreciated.
Thanks
N B
he receive receive metadata but not write to the WAL after an
HDFS node is lost and restarted? HDFS replication factor is at its default
of 2.
Thanks
N B
On Mon, Jun 19, 2017 at 6:23 PM, N B wrote:
> Hi all,
>
> We are running a Standalone Spark Cluster for running a streaming
> applic
(DFSOutputStream.java:823)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator
ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in
the Write Ahead Log.
Thanks
N B
On Tue, Jun 20, 2017 at 10:24 AM, N B
;s
the root cause of the issues I have been seeing.
Can someone please confirm if the package mentioned above was indeed
compiled with Hadoop 2.7? Or should I fall back on an HDFS Server 2.2
instead?
Thanks
N B
Hadoop version 2.7.3
On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin
wrote:
> Which version of Hadoop are you running on?
>
> *Yohann Jardin*
> Le 6/21/2017 à 1:06 AM, N B a écrit :
>
> Ok some more info about this issue to see if someone can shine a light on
> what could
ible in terms
of both APIs and protocols.
"
Thanks
N B
On Tue, Jun 20, 2017 at 11:36 PM, N B wrote:
> Hadoop version 2.7.3
>
> On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin
> wrote:
>
>> Which version of Hadoop are you running on?
>>
>> *Yohann Jardin
consideration.
We achieved it using a HashMap that maintains counts instead of a Set.
Hope this helps,
N B
On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy wrote:
> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure. Any idea
> why this ha
Hi all,
Spark has had a backpressure implementation since 1.5 that helps to
stabilize a Spark Streaming application in terms of keeping the processing
time/batch under control and less than the batch interval. This
implementation leaves excess records in the source (Kafka, Flume etc) and
they get
Hi TD,
Thanks for the response. Since you mentioned GC, this got me thinking.
Given that we are running in local mode (all in a single JVM) for now, does
the option "spark.executor.extraJavaOptions" set to
"-XX:+UseConcMarkSweepGC" inside SparkConf object take effect at all before
we use it to cr
ht I would clarify.
>
> TD
>
> On Wed, Apr 8, 2015 at 1:23 PM, N B wrote:
>
>> Hi TD,
>>
>> Thanks for the response. Since you mentioned GC, this got me thinking.
>>
>> Given that we are running in local mode (all in a single JVM) for n
tor will be same the process.
> And in that case the Java options in SparkConf configuration will not
> work.
>
> On Wed, Apr 8, 2015 at 1:44 PM, N B wrote:
>
>> Since we are running in local mode, won't all the executors be in the
>> same JVM as the driver?
>
pr 8, 2015 at 6:08 PM, Tathagata Das wrote:
>
>> Yes, in local mode they the driver and executor will be same the process.
>> And in that case the Java options in SparkConf configuration will not
>> work.
>>
>> On Wed, Apr 8, 2015 at 1:44 PM, N B wrote:
&g
Hi all,
I had posed this query as part of a different thread but did not get a
response there. So creating a new thread hoping to catch someone's
attention.
We are experiencing this issue of shuffle files being left behind and not
being cleaned up by Spark. Since this is a Spark streaming applica
h 1 -type d -cmin
> +1440 -name "spark-*-*-*" -prune -exec rm -rf {} \+
>
>
> On 20 April 2015 at 23:12, N B wrote:
>
>> Hi all,
>>
>> I had posed this query as part of a different thread but did not get a
>> response there. So creating a new thread hoping
.unpersist=true.
>
>
> Those together cleaned up the shuffle files for us.
>
>
> -Conor
>
> On Tue, Apr 21, 2015 at 8:18 AM, N B wrote:
>
>> We already do have a cron job in place to clean just the shuffle files.
>> However, what I would really like to know is
ing application? Was it falling behind
> with a large increasing scheduling delay?
>
> TD
>
> On Thu, Apr 23, 2015 at 11:31 AM, N B wrote:
>
>> Thanks for the response, Conor. I tried with those settings and for a
>> while it seemed like it was cleaning up shuffle files after
sure how to proceed from here. Any suggestions on how to
avoid these errors?
Thanks
NB
On Fri, Apr 24, 2015 at 12:57 AM, N B wrote:
> Hi TD,
>
> That may very well have been the case. There may be some delay on our
> output side. I have made a change just for testing that send
Thanks Ilya. Does one have to call broadcast again once the underlying data
is updated in order to get the changes visible on all nodes?
Thanks
NB
On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin wrote:
> The broadcast variable is like a pointer. If the underlying data changes
> then the changes
tatins:
>
> In addition, the object v should not be modified after it is broadcast
> in order to ensure that all nodes get the same value of the broadcast
> variable
>
>
> On Sat, May 16, 2015 at 10:39 AM, N B wrote:
>
>> Thanks Ilya. Does one have to call broadcast ag
myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
> ...
> }
>
> var myBroadcast = sc.broadcast(...)
> (0 to 20).foreach { iteration =>
> oneIteration(myRDD, myBroadcast)
> var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with
> whatever y
on space
> myBroadcast.unpersist()
>
> // now we create a new broadcast which has the updated data in our
> mutable data structure
> myBroadcast = sc.broadcast(myMutableDataStructure)
> }
>
>
> hope this clarifies things!
>
> Imran
>
> On Tue, May 19, 2
57 matches
Mail list logo