Hi,
I just switched from "createStream" to the "createDirectStream" API for
kafka and while things otherwise seem happy, the first thing I noticed is
that stream/receiver stats are gone from the Spark UI :( Those stats were
very handy for keeping an eye on health of the app.
What's the best way t
us you get a fancy new streaming UI with more awesome
> stats. :)
>
> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith wrote:
>
>> Hi,
>>
>> I just switched from "createStream" to the "createDirectStream" API for
>> kafka and while things oth
of my app. Yes, for the record, this is with
CDH 5.4.1 and Spark 1.3.
On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith wrote:
> Thanks for the super-fast response, TD :)
>
> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
> are you listening? :D
>
>
>
>
>> Is there any more info you can provide / relevant code?
>>
>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith wrote:
>>
>>> Update on performance of the new API: the new code using the
>>> createDirectStream API ran overnight and when I checked the app stat
ition
val k = ssc.union(kInStreams)
val dataout = k.map(x=>myFunc(x._2,someParams))
dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
myOutputFunc.write(rec) })
Thanks,
Tim
>
>> If that's the case I'd try direct stream without the repartitioning.
>>
>>
>>
> there are kafka partitions.
>
> I'd remove the repartition. If you weren't doing any shuffles in the old
> job, and are doing a shuffle in the new job, it's not really comparable.
>
> On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith wrote:
>
>> On Fri, Jun 19, 2015
I am curious too - any comparison between the two. Looks like one is
Datastax sponsored and the other is Cloudera. Other than that, any
major/core differences in design/approach?
Thanks,
Tim
On Mon, Sep 28, 2015 at 8:32 AM, Ramirez Quetzal wrote:
> Anyone has feedback on using Hue / Spark Job
Hi,
I am using Spark 1.3 (CDH 5.4.4). What's the recipe for setting a minimum
output file size when writing out from SparkSQL? So far, I have tried:
--x-
import sqlContext.implicits._
sc.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache",true)
sc.hadoopConfiguration.setLon
Spark 1.3.0 (CDH 5.4.4)
scala> sqlContext.sql("SHOW TABLES").collect
res18: Array[org.apache.spark.sql.Row] = Array([allactivitydata,true],
[sample_07,false], [sample_08,false])
sqlContext.sql("SELECT COUNT(*) from allactivitydata").collect
res19: Array[org.apache.spark.sql.Row] = Array([1227230]
I am sharing this code snippet since I spent quite some time figuring it
out and I couldn't find any examples online. Between the Kinesis
documentation, tutorial on AWS site and other code snippets on the
Internet, I was confused about structure/format of the messages that Spark
fetches from Kinesi
http://stackoverflow.com/questions/37231616/add-a-new-column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator
On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson
wrote:
> Hi,
>
> What's the best way to assign a truly unique row ID (rather than a hash)
> to a DataFrame/Dataset?
>
> I orig
Hi,
I am trying to figure out the API to initialize a gaussian mixture model
using either centroids created by K-means or previously calculated GMM
model (I am aware that you can "save" a model and "load" in later but I am
not interested in saving a model to a filesystem).
The Spark MLlib API let
t this feature in Spark 2.3.
>
> Thanks
> Yanbo
>
> On Fri, Apr 28, 2017 at 1:46 AM, Tim Smith wrote:
>
>> Hi,
>>
>> I am trying to figure out the API to initialize a gaussian mixture model
>> using either centroids created by K-means or previously calculat
One, I think, you should take this to the spark developer list.
Two, I suspect broadcast variables aren't the best solution for the use
case, you describe. Maybe an in-memory data/object/file store like tachyon
is a better fit.
Thanks,
Tim
On Tue, May 2, 2017 at 11:56 AM, Nipun Arora
wrote:
On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
streaming app that consumes data from Kafka and writes it back to Kafka
(different topic). My big problem has been Total Delay. While execution
time is usually https://github.com/apache/spark/blob/master/core/src/main/scala/org/apac
1, 2015 at 11:16 PM, Tim Smith wrote:
> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
> streaming app that consumes data from Kafka and writes it back to Kafka
> (different topic). My big problem has been Total Delay. While execution
> time is usually minutes to
Spark to process on time.
> Could you try some of the 'knobs' I describe here to see if that would
> help?
>
> http://www.virdata.com/tuning-spark/
>
> -kr, Gerard.
>
> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith wrote:
>
>> Just read the thread "Are th
0 partitions?
>
> Regarding the block exception, could you give me a trace of info level
> logging that leads to this error? Basically I want trace the lifecycle of
> the block.
>
> TD
>
>
>
> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith wrote:
>
>> Hi Gerard,
&g
ution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
142380882 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
142380884 ms (execution: 4.026 s)
On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith wrote:
> TD - I will t
cause each Kafka partition mapping to Spark
> partition.
>
>
> Besides "set partition count to 1 for each dStream" means
> dstream.repartition(1) ? If so I think it will still introduce shuffle and
> move all the data into one partition.
>
> Thanks
> Saisai
>
> 2015-02-1
+1 for writing the Spark output to Kafka. You can then hang off multiple
compute/storage framework from kafka. I am using a similar pipeline to feed
ElasticSearch and HDFS in parallel. Allows modularity, you can take down
ElasticSearch or HDFS for maintenance without losing (except for some edge
ca
My streaming app runs fine for a few hours and then starts spewing "Could
not compute split, block input-xx-xxx not found" errors. After this,
jobs start to fail and batches start to pile up.
My question isn't so much about why this error but rather, how do I trace
what leads to this error? I
On Spark 1.2:
I am trying to capture # records read from a kafka topic:
val inRecords = ssc.sparkContext.accumulator(0, "InRecords")
..
kInStreams.foreach( k =>
{
k.foreachRDD ( rdd => inRecords += rdd.count().toInt )
inRecords.value
Questi
2
> scala> rdd.foreach(x => acc += 1)
> scala> acc.value
> res1: Int = 1000
>
> The Stage details page shows:
>
>
>
>
> On 20.2.2015. 9:25, Tim Smith wrote:
>
> On Spark 1.2:
>
> I am trying to capture # records read from a kafka
Hi,
I am writing some Scala code to normalize a stream of logs using an
input configuration file (multiple regex patterns). To avoid
re-starting the job, I can read in a new config file using fileStream
and then turn the config file to a map. But I am unsure about how to
update a shared map (since
Hi,
I have Spark (1.0.0 on CDH5) running with Kafka 0.8.1.1.
I have a streaming jobs that reads from a kafka topic and writes
output to another kafka topic. The job starts fine but after a while
the input stream stops getting any data. I think these messages show
no incoming data on the stream:
1
Hi,
In my streaming app, I receive from kafka where I have tried setting the
partitions when calling "createStream" or later, by calling repartition -
in both cases, the number of nodes running the tasks seems to be stubbornly
stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use mo
Hi,
Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with:
14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
ReceiverTracker.scala:275
Exception in thread "Thread-59" 14/08/28 22:28:15 INFO
YarnClientClusterScheduler: Cancelling stage 2
14/08/28 22:28:15 INFO DAGSch
lease try to take a look
> at the executor logs of the lost executor to find what is the root cause
> that caused the executor to fail.
>
> TD
>
>
> On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith wrote:
>
>> Hi,
>>
>> Have a Spark-1.0.0 (CDH5) streaming job read
e are sufficient number of partitions (try setting it to 2x the number
> cores given to the application).
>
> Yeah, in 1.0.0, ttl should be unnecessary.
>
>
>
> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith wrote:
>
>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das <
&
many open files is a sign that you need increase the
> system-wide limit of open files.
> Try adding ulimit -n 16000 to your conf/spark-env.sh.
>
> TD
>
>
> On Thu, Aug 28, 2014 at 5:29 PM, Tim Smith wrote:
>>
>> Appeared after running for a while. I re-ran the jo
I set partitions to 64:
//
kInMsg.repartition(64)
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
//
Still see all activity only on the two nodes that seem to be receiving
from Kafka.
On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith wrote:
> TD - Apologies, didn't realize I w
resource pool usage in YARN, this app is assigned 252.5GB of
memory, 128 VCores and 9 containers. Am I missing something here?
Thanks,
Tim
On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith wrote:
> I set partitions to 64:
>
> //
> kInMsg.repartition(64)
> val outdata
is timestamped "19:04:51" that tells
me the executor was killed for some reason right before the driver noticed
that executor/task failure.
How come my task failed only after 4 times although my config says failure
threshold is 64?
On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith w
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has bee
many receivers you have. It's on 2 nodes for each receiver. You need
> multiple partitions in the queue, each consumed by a DStream, if you
> mean to parallelize consuming the queue.
>
> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith wrote:
> > Good to see I am not the only one who
er. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If "Streams" are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?
On Fri, Aug 29, 2014 at 4:11
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spaw
I'd be interested in finding the answer too. Right now, I do:
val kafkaOutMsgs = kafkInMessages.map(x=>myFunc(x._2,someParam))
kafkaOutMsgs.foreachRDD((rdd,time) => { rdd.foreach(rec => {
writer.output(rec) }) } ) //where writer.ouput is a method that takes a
string and writer is an instance of a
I am seeing similar errors in my job's logs.
TD - Are you still waiting for debug logs? If yes, can you please let me
know how to generate debug logs? I am using Spark/Yarn and setting
"NodeManager" logs to "DEBUG" level doesn't seem to produce anything but
INFO logs.
Thanks,
Tim
>Aaah sorry, I
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't
the right way. You have to /val partedStream = repartition(...)/. Would be
nice to have it fixed in the docs.
On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das
wrote:
> Some thoughts on this thread to clarify the doubts.
How are you creating your kafka streams in Spark?
If you have 10 partitions for a topic, you can call "createStream" ten
times to create 10 parallel receivers/executors and then use "union" to
combine all the dStreams.
On Wed, Sep 10, 2014 at 7:16 AM, richiesgr wrote:
> Hi (my previous post a
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617
Slide 39 covers it.
On Tue, Sep 9, 2014 at 9:23 PM, qihong wrote:
> Hi Mayur,
>
> Thanks for your response. I did write a simple test that set up a DStream
> with
> 5 batches; The batch duration
I had a similar issue and many others - all were basically symptoms for
yarn killing the container for high memory usage. Haven't gotten to root
cause yet.
On Tue, Sep 9, 2014 at 3:18 PM, Marcelo Vanzin wrote:
> Your executor is exiting or crashing unexpectedly:
>
> On Tue, Sep 9, 2014 at 3:13 P
heapdump only
> contains a very large byte array consuming about 66%(the second link
> contains a picture of my heap -- I ran with a small heap to be able to get
> the failure quickly)
>
> I don't have solutions but wanted to affirm that I've observed a similar
> situa
uot;jmap histo " on
the executor process.
They should be about the same, right?
Also, in the heap dump, 99% of the heap seems to be occupied with
"unreachable objects" (and most of it is byte arrays).
On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith wrote:
> Actually, I am not d
I noticed that, by default, in CDH-5.1 (Spark 1.0.0), in both,
StandAlone and Yarn mode - no GC options are set when an executor is
launched. The only options passed in StandAlone mode are
"-XX:MaxPermSize=128m -Xms16384M -Xmx16384M" (when I give each
executor 16G).
In Yarn mode, even fewer JVM op
Thanks for all the good work. Very excited about seeing more features and
better stability in the framework.
On Thu, Sep 11, 2014 at 5:12 PM, Patrick Wendell wrote:
> I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is
> the second release on the API-compatible 1.X line. It i
Hi,
Anyone have a stable streaming app running in "production"? Can you
share some overview of the app and setup like number of nodes, events
per second, broad stream processing workflow, config highlights etc?
Thanks,
Tim
-
To
Similar issue (Spark 1.0.0). Streaming app runs for a few seconds
before these errors start to pop all over the driver logs:
14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found
at org
Spark 1.0.0
I write logs out from my app using this object:
object LogService extends Logging {
/** Set reasonable logging levels for streaming if the user has not
configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
Hi,
Anyone setting any explicit GC options for the executor jvm? If yes,
what and how did you arrive at them?
Thanks,
- Tim
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@
Hi Dibyendu,
I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?
Thanks,
duct based on Spark Streaming.
>
> I am exploring Spark Streaming for enterprise software and am cautiously
> optimistic about it. I see huge potential to improve debuggability of Spark.
>
> - Original Message -
> From: "Tim Smith"
> To: "spark users&qu
ever seen any issues with Spark caching your app jar
> between runs even if it changes?
Not that I can tell but then maybe because I use Yarn, I might be
shielded from some jar distribution bugs in Spark?
>
> On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith wrote:
>> I don't hav
I believe this is a known bug:
https://issues.apache.org/jira/browse/SPARK-1719
On Wed, Sep 17, 2014 at 5:40 PM, rogthefrog wrote:
> I have a HDFS cluster managed with CDH Manager. Version is CDH 5.1 with
> matching GPLEXTRAS parcel. LZO works with Hive and Pig, but I can't make it
> work with Sp
5:50 PM, Dibyendu Bhattacharya
wrote:
> Hi Tim
>
> Just curious to know ; Which Kafka Consumer you have used ?
>
> Dib
>
> On Sep 18, 2014 4:40 AM, "Tim Smith" wrote:
>>
>> Thanks :)
>>
>> On Wed, Sep 17, 2014 at 2:10 PM, P
Curious, if you have 1:1 mapping between Stream1:topic1 and
Stream2:topic2 then why not run different instances of the app for
each and pass as arguments to each instance the input source and
output topic?
On Thu, Sep 18, 2014 at 8:07 AM, Padmanabhan, Mahesh (contractor)
wrote:
> Hi all,
>
> I am
What kafka receiver are you using? Did you build a new jar for your
app with the latest streaming-kafka code for 1.1?
On Thu, Sep 18, 2014 at 11:47 AM, JiajiaJing wrote:
> Hi Spark Users,
>
> We just upgrade our spark version from 1.0 to 1.1. And we are trying to
> re-run all the written and tes
Posting your code would be really helpful in figuring out gotchas.
On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell wrote:
> Hey,
>
> Spark 1.1.0
> Kafka 0.8.1.1
> Hadoop (YARN/HDFS) 2.5.1
>
> I have a five partition Kafka topic. I can create a single Kafka receiver
> via KafkaUtils.createStream wi
Collections.singletonMap(topic,
> 5),
> // StorageLevel.MEMORY_ONLY_SER());
>
> final JavaPairDStream tuples = stream.mapToPair(
> new PairFunction, String, Integer>() {
> @Override
>
f anyone else can see a glaring issue in the
> Java approach that would be appreciated.
>
> Thanks,
> Matt
>
> On Sep 23, 2014, at 4:13 PM, Tim Smith wrote:
>
>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>> eq
gt; don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see
> anything suspect. I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith wrote:
>
>> Maybe post the before-code as in what was the code befo
literally coded like the examples (that are
> purported to work), but no luck.
>
> mn
>
> On Sep 24, 2014, at 11:27 AM, Tim Smith wrote:
>
> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell
> w
65 matches
Mail list logo