Dear Spark folks,
Is there somewhere a guideline on the density tipping point when it makes
more sense to use a spark ml dense vector vs. a sparse vector with regards
to the memory usage on fairly large (image processing) vectors?
My google-foo didn't deliver me anything useful.
Thanks in advance
Hi Srinivas,
Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The "kafka.bootstrap.servers" opti
Hi,
Could you share the code that you're using to configure the connection to
the Kafka broker?
This is a bread-and-butter feature. My first thought is that there's
something in your particular setup that prevents this from working.
kind regards, Gerard.
On Fri, Apr 10, 2020 at 7:34 PM Debabrat
Hi Hrishi,
When using the Direct Kafka stream approach, processing tasks will be
distributed to the cluster.
The level of parallelism is dependent on how many partitions the consumed
topics have.
Why do you think that the processing is not happening in parallel?
I would advise you to get the base
Ooops - linked the wrong JIRA ticket: (that other one is related)
https://issues.apache.org/jira/browse/SPARK-28025
On Wed, Jun 12, 2019 at 1:21 PM Gerard Maas wrote:
> Hi!
> I would like to socialize this issue we are currently facing:
> The Structured Streaming default CheckpointFi
Hi!
I would like to socialize this issue we are currently facing:
The Structured Streaming default CheckpointFileManager leaks .crc files by
leaving them behind after users of this class (like
HDFSBackedStateStoreProvider) apply their cleanup methods.
This results in an unbounded creation of tiny
Hi,
I'm afraid you sent this email to the wrong Mailing list.
This is the Spark users mailing list. We could probably tell you how to do
this with Spark, but I think that's not your intention :)
kr, Gerard.
On Thu, May 9, 2019 at 11:03 AM Balakumar iyer S
wrote:
> Hi All,
>
> I am trying to r
James,
How do you create an instance of `RDD[Iterable[MyCaseClass]]` ?
Is it in that first code snippet? > new SparkContext(sc).parallelize(seq)?
kr, Gerard
On Fri, Nov 30, 2018 at 3:02 PM James Starks
wrote:
> When processing data, I create an instance of RDD[Iterable[MyCaseClass]]
> and I
uot;2018-08-27 09:50:00" not "2018-08-27 09:53:00"?
> When I define the window, the starttime is not set.
> 2、why the agg result of time "2018-08-27 09:53:00 " is not output when
> the batch1 data is comming?
>
> Thanks a lot!
>
>
>
> --
Hi Aakash,
In Spark Streaming, forEachRDD provides you access to the data in
each micro batch.
You can transform that RDD into a DataFrame and implement the flow you
describe.
eg.:
var historyRDD:RDD[mytype] = sparkContext.emptyRDD
// create Kafka Dstream ...
dstream.foreachRDD{ rdd =>
val a
Hi Chris,
Could you show the code you are using? When you mention "I like to use a
static datasource (JDBC) in the state function" do you refer to a DataFrame
from a JDBC source or an independent JDBC connection?
The key point to consider is that the flatMapGroupsWithState function must
be serial
Hi,
In Structured Streaming lingo, "ReduceByKeyAndWindow" would be a window
aggregation with a composite key.
Something like:
stream.groupBy($"key", window($"timestamp", "5 minutes"))
.agg(sum($"value") as "total")
The aggregate could be any supported SQL function.
Is this what you are
Hi Daniele,
A pragmatic approach to do that would be to execute the computations in the
scope of a foreachRDD, surrounded by wall-clock timers.
For example:
dstream.foreachRDD{ rdd =>
val t0 = System.currentTimeMillis()
val aggregates = rdd.
// make sure you get a result here, not another
Dhaval,
Which Streaming API are you using?
In Structured Streaming, you are able to start several streaming queries
within the same context.
kind regards, Gerard.
On Sun, May 6, 2018 at 7:59 PM, Dhaval Modi wrote:
> Hi Susan,
>
> Thanks for your response.
>
> Will try configuration as suggeste
Aakash,
There are two issues here.
The issue with the code on the first question is that the first query
blocks and the code for the second does not get executed. Panagiotis
pointed this out correctly.
In the updated code, the issue is related to netcat (nc) and the way
structured streaming works.
Hi,
I'm looking into the Parquet format support for the File source in
Structured Streaming.
The docs mention the use of the option 'mergeSchema' to merge the schemas
of the part files found.[1]
What would be the practical use of that in a streaming context?
In its batch counterpart, `mergeSchem
This is a good start:
https://github.com/deanwampler/JustEnoughScalaForSpark
And the corresponding talk:
https://www.youtube.com/watch?v=LBoSgiLV_NQ
There're many more resources if you search for it.
-kr, Gerard.
On Sun, Mar 18, 2018 at 11:15 AM, Mahender Sarangam <
mahender.bigd...@outlook.com
Hi,
You can run as many jobs in your cluster as you want, provided you have
enough capacity.
The one streaming context constrain is per job.
You can submit several jobs for Flume and some other for Twitter, Kafka,
etc...
If you are getting started with Streaming with Spark, I'd recommend you to
Hi,
You can monitor a filesystem directory as streaming source as long as the
files placed there are atomically copied/moved into the directory.
Updating the files is not supported.
kr, Gerard.
On Mon, Jan 15, 2018 at 11:41 PM, kant kodali wrote:
> Hi All,
>
> I am wondering if HDFS can be a s
Hi Arkadiusz,
Try 'rooting' your import. It looks like the import is being interpreted as
being relative to the previous.
'rooting; is done by adding the '_root_' prefix to your import:
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.io.confluent.kafka.serializers.KafkaAvroDeco
Can you show us the code?
On Tue, Dec 12, 2017 at 9:02 AM, Vikash Pareek
wrote:
> Hi All,
>
> I am unioning 2 rdds(each of them having 2 records) but this union it is
> getting hang.
> I found a solution to this that is caching both the rdds before performing
> union but I could not figure out t
un again to
> make sure.
>
> @Gerard Thanks much! but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
> Thanks!
>
> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas wrote:
>
>> The general answer to your i
The general answer to your initial question is that "it depends". If the
operation in the rdd.foreach() closure can be parallelized, then you don't
need to collect first. If it needs some local context (e.g. a socket
connection), then you need to do rdd.collect first to bring the data
locally, whic
Hi Arpan,
The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:
var offsetRanges: Array[OffsetRanger] = _
//create strea
Hammad,
The recommended way to implement this logic would be to:
Create a SparkSession.
Create a Streaming Context using the SparkContext embedded in the
SparkSession
Use the single SparkSession instance for the SQL operations within the
foreachRDD.
It's important to note that spark operations c
y at 2).
>>
>> An alternative to the socket source issue would be to open a new free
>> socket, but then the user has to figure out where the source is listening.
>>
>> I second Gerard's request for additional information, and confirmation of
>> my theories!
Hi,
I've been investigating this SO question:
https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming
TL;DR: when using the Socket source, trying to create multiple queries does
not work properly, only one the first query in the start order
also, read the newest book of Holden on High-Performance Spark:
http://shop.oreilly.com/product/0636920046967.do
On Fri, Jun 9, 2017 at 5:38 PM, Alonso Isidoro Roman
wrote:
> a quick search on google:
>
> https://www.cloudera.com/documentation/enterprise/5-9-
> x/topics/admin_spark_tuning.html
It looks like the clean up should go into the foreachRDD function:
stateUpdateStream.foreachRdd(...) { rdd =>
// do stuff with the rdd
stateUpdater.cleanupExternalService// should work in this position
}
Code within the foreachRDD(*) executes on the driver, so you can keep the
state of the
This question seems to deserve an scalation from Stack Overflow:
http://stackoverflow.com/questions/40803969/spark-size-exceeds-integer-max-value-when-joining-2-large-dfs
Looks like an important limitation.
-kr, Gerard.
Meta:PS: What do you think would be the best way to scalate from SO? Should
AAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 26 May 2016 at 19:09, Gerard Maas wrote:
>>
&
ult Derby metastore in the same location. Also, if you want them to
>> be able to persist permanent table metadata for SparkSQL then you’ll want
>> to set up a true metastore.
>>
>>
>>
>> The other thing it could be is Hive dependency collisions from the
>
Hi,
I'm helping some folks setting up an analytics cluster with Spark.
They want to use the HiveContext to enable the Window functions on
DataFrames(*) but they don't have any Hive installation, nor they need one
at the moment (if not necessary for this feature)
When we try to create a Hive cont
Hi Manas,
The approach is correct, with one caveat: You may have several tasks
executing in parallel in one executor. Having one single connection per JVM
will either fail, if the connection is not thread-safe or become a
bottleneck b/c all task will be competing for the same resource.
The best ap
It sounds like another window operation on top of the 30-min window will
achieve the desired objective.
Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl)
to a long enough value and you will require enough resources (mem & disk)
to keep the required data.
-kr, Gerard.
O
Hi,
We're facing a situation where simple queries to parquet files stored in
Swift through a Hive Metastore sometimes fail with this exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
in stage 58.0 failed 4 times, most recent failure: Lost task 6.3 in stage
58.0
NoSuchMethodError usually refers to a version conflict. Probably your job
was built against a higher version of the cassandra connector than what's
available on the run time.
Check that the versions are aligned.
-kr, Gerard.
On Wed, Feb 3, 2016 at 1:37 PM, Madabhattula Rajesh Kumar <
mrajaf...@gm
ide those to direct API.
>
> So my question is should i consider passing all the partition from command
> line and query kafka and find and provide , what is the correct approach.
>
> Ashish
>
> On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas
> wrote:
>
>> What are
What are you trying to achieve?
Looks like you want to provide offsets but you're not managing them and I'm
assuming you're using the direct stream approach.
In that case, use the simpler constructor that takes the kafka config and
the topics. Let it figure it out the offsets (it will contact kaf
Hi Padma,
Have you considered reducing the dataset before writing it to Cassandra? Looks
like this consistency problem could be avoided by cleaning the dataset of
unnecessary records before persisting it:
val onlyMax = rddByPrimaryKey.reduceByKey{case (x,y) => Max(x,y)} // your max
function he
http://stackoverflow.com/search?q=%5Bapache-spark%5D+flatmap
-kr, Gerard.
On Tue, Dec 8, 2015 at 12:04 PM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:
> Guys... I am new to Spark..
> Please anyone please explain me how flatMap function works with a little
> sample example...
> Thanks
dstream.count()
See: http://spark.apache.org/docs/latest/programming-guide.html#actions
-kr, Gerard.
On Tue, Dec 1, 2015 at 6:32 PM, patcharee wrote:
> Hi,
>
> In spark streaming how to count the total number of message (from Socket)
> in one batch?
>
> Thanks,
> Patcharee
>
>
Spark Streaming will consumer and process data in parallel. So the order of
the output will depend not only on the order of the input but also in the
time it takes for each task to process. Different options, like
repartitions, sorts and shuffles at Spark level will also affect ordering,
so the bes
Andy,
Using the rdd.saveAsTextFile(...) will overwrite the data if your target
is the same file.
If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix) where a new file will be written at each streaming interval.
Note that this will result in a saved file for each s
As TD mentions, there's no such thing as an 'empty DStream'. Some intervals
of a DStream could be empty, in which case the related RDD will be empty.
This means that you should express such condition based on the RDD's of the
DStream. Translated in code:
dstream.foreachRDD{ rdd =>
if (!rdd.isEmpt
You can create as many functional derivates of your original stream by
using transformations. That's exactly the model that Spark Streaming offers.
In your example, that would become something like:
val stream = ssc.socketTextStream("localhost", )
val stream1 = stream.map(fun1)
val stream2 =
and fake this, you can override
> getPreferredLocations and set spark.locality.wait to a high value.
>
>
>
> On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maas
> wrote:
>
>> Hi Cody,
>>
>> I think that I misused the term 'data locality'. I think I should b
e, regardless of
> what consumer you use. Even if you have locality preferences, and locality
> wait turned up really high, you still have to account for losing executors.
>
> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas
> wrote:
>
>> Thanks Saisai, Mishra,
>>
>>
osted
>> probably this will work. If not, I am not sure how to get data locality for
>> a partition.
>> Others,
>> correct me if there is a way.
>>
>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas
>> wrote:
>>
>>> In the receiver-based kafka st
In the receiver-based kafka streaming model, given that each receiver
starts as a long-running task, one can rely in a certain degree of data
locality based on the kafka partitioning: Data published on a given
topic/partition will land on the same spark streaming receiving node until
the receiver
stantiated,
>>>>
>>>> log.info(s"Computing topic ${part.topic}, partition
>>>> ${part.partition} " +
>>>>
>>>> s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>>
>>>>
>&
istent between the two situations.
>
> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas
> wrote:
>
>> Hi,
>>
>> We recently migrated our streaming jobs to the direct kafka receiver. Our
>> initial migration went quite fine but now we are seeing a weird zig-zag
>&g
Hi,
We recently migrated our streaming jobs to the direct kafka receiver. Our
initial migration went quite fine but now we are seeing a weird zig-zag
performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the
next takes 7sec for a stable streamin
rdd.collect { case (t, data) if t == topic => data }
> CassandraHelper.saveDataToCassandra(topic, filteredRdd)
> }
> updateOffsetsinZk(rdd)
> }
>
> }
>
> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas
> wrote:
>
>> Something like this?
>>
>
Something like this?
I'm making the assumption that your topic name equals your keyspace for
this filtering example.
dstream.foreachRDD{rdd =>
val topics = rdd.map(_._1).distinct.collect
topics.foreach{topic =>
val filteredRdd = rdd.collect{case (t, data) if t == topic => data}.
filt
How many cores are you assigning to your spark streaming job?
On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий
wrote:
> hello
> I have 4 streams from kafka and streaming not working.
> without any errors or logs
> but with 3 streams everything perfect.
> make sense only amount of streams , diff
You need to take into consideration 'where' things are executing. The
closure of the 'forEachRDD' executes in the driver. Therefore, the log
statements printed during the execution of that part will be found in the
driver logs.
In contrast, the foreachPartition closure executes on the worker nodes
(removing dev from the to: as not relevant)
it would be good to see some sample data and the cassandra schema to have a
more concrete idea of the problem space.
Some thoughts: reduceByKey could still be used to 'pick' one element.
example of arbitrarily choosing the first one: reduceByKey{case (e
A side question: Any reason why you're using window(Seconds(10), Seconds(10))
instead of new StreamingContext(conf, Seconds(10)) ?
Making the micro-batch interval 10 seconds instead of 1 will provide you
the same 10-second window with less complexity. Of course, this might just
be a test for the w
liding window and adding them to the Batch RDD
>
>
>
> This should be defined as the Frequency of Updates to the Batch RDD and
> then using dstream.window() equal to that frequency
>
>
>
> Can you also elaborate why you consider the dstream.window approach more
> “reli
Anand,
AFAIK, you will need to change two settings:
spark.streaming.unpersist = false // in order for SStreaming to not drop
the raw RDD data
spark.cleaner.ttl =
Also be aware that the lineage of your union RDD will grow with each batch
interval. You will need to break lineage often with cache(
Are you sharing the SimpleDateFormat instance? This looks a lot more like
the non-thread-safe behaviour of SimpleDateFormat (that has claimed many
unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try
writing the timestamps in millis to Kafka and compare.
-kr, Gerard.
On Fri,
Would using the socketTextStream and `yourApp | nc -lk ` work?? Not
sure how resilient the socket receiver is though. I've been playing with it
for a little demo and I don't understand yet its reconnection behavior.
Although I would think that putting some elastic buffer in between would be
a good
? =
On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya wrote:
> Hi ,
>
> How can I find spark.cassandra.connection.host? And what should I change ?
> Should I change cassandra.yaml ?
>
> Error says me *"Exception in thread "main" java.io.IOException: Failed to
> open native connection to Cassand
8, 2015 at 7:50 PM, Tim Chen wrote:
>
> -- Forwarded message --
> From: Tim Chen
> Date: Thu, May 28, 2015 at 10:49 AM
> Subject: Re: [Streaming] Configure executor logging on Mesos
> To: Gerard Maas
>
>
> Hi Gerard,
>
> The log line you referred to is not Spa
Hi,
I'm trying to control the verbosity of the logs on the Mesos executors with
no luck so far. The default behaviour is INFO on stderr dump with an
unbounded growth that gets too big at some point.
I noticed that when the executor is instantiated, it locates a default log
configuration in the sp
Hi,
tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
streaming processes is not supported.
*Longer version.*
I assume that you are talking about Spark Streaming as the discussion is
about handing Kafka streaming data.
Then you have two things to consider: the Streaming re
rows)
>
>
>
> Another confirmation of this hypothesis is the phrase “error during
> Transport Initialization” – so all these stuff points out in the direction
> of Infrastructure or Configuration issues – check you Casandra service and
> how you connect to it etc mate
>
>
>
.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> ... 3 more
> 15/05/10 12:20:08 ERROR ControlConnection: [Control connection] Cannot
> connect to any host, scheduling retry in 1000 milliseconds
>
> Thanks!
>
> 2015-05-10 0:58 GMT+02:00 Gerard Maas :
>
Hola Sergio,
It would help if you added the error message + stack trace.
-kr, Gerard.
On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio <
drarse.a...@gmail.com> wrote:
> I am trying save some data in Cassandra in app with spark Streaming:
>
> Messages.foreachRDD {
> . . .
> CassandraRDD.s
Hi Bill,
I just found weird that one would use parallel threads to 'filter', as
filter is lazy in Spark, and multithreading wouldn't have any effect unless
the action triggering the execution of the lineage containing such filter
is executed on a separate thread. One must have very specific
reason
Hi Bill,
Could you show a snippet of code to illustrate your choice?
-Gerard.
On Thu, May 7, 2015 at 5:55 PM, Bill Q wrote:
> Thanks for the replies. We decided to use concurrency in Scala to do the
> two mappings using the same source RDD in parallel. So far, it seems to be
> working. Any com
Hi,
Where could I find good documentation on the DataFrame DSL?
I'm struggling trying to combine selects, groupBy and aggregations.
A language definition would also help.
I perused these resources, but still have some gaps in my understanding and
things are not doing what I'd expect:
https://spa
I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
is a singleton, I guess that what's going on when running on a cluster is
that the call to:
SolrIndexerDriver.solrInputDocumentList.add(elem)
is happening on different singleton instances of the SolrIndexerDriver on
diffe
same time and has to be
> processed sequentially is a BAD thing
>
>
>
> So the key is whether it is about 1 or 2 and if it is about 1, whether it
> leads to e.g. Higher Throughput and Lower Latency or not
>
>
>
> Regards,
>
> Evo Eftimov
>
>
>
> *Fro
>From experience, I'd recommend using the dstream.foreachRDD method and
doing the filtering within that context. Extending the example of TD,
something like this:
dstream.foreachRDD { rdd =>
rdd.cache()
messageType.foreach (msgTyp =>
val selection = rdd.filter(msgTyp.match(_))
Try writing this Spark Streaming idiom in Java and you'll choose Scala soon
enough:
dstream.foreachRDD{rdd =>
rdd.foreachPartition( partition => )
}
When deciding between Java and Scala for Spark, IMHO Scala has the
upperhand. If you're concerned with readability, have a look at the Scal
In spark-streaming, the consumers will fetch data and put it into 'blocks'.
Each block becomes a partition of the rdd generated during that batch
interval.
The size of each is block controlled by the conf:
'spark.streaming.blockInterval'. That is, the amount of data the consumer
can collect in that
This: "java.lang.NoSuchMethodError" almost always indicates a version
conflict somewhere.
It looks like you are using Spark 1.1.1 with the cassandra-spark connector
1.2.0. Try aligning those. Those metrics were introduced recently in the
1.2.0 branch of the cassandra connector.
Either upgrade you
+1 for TypeSafe config
Our practice is to include all spark properties under a 'spark' entry in
the config file alongside job-specific configuration:
A config file would look like:
spark {
master = ""
cleaner.ttl = 123456
...
}
job {
context {
src = "foo"
action
Hi Tim,
>From this: " There are 5 kafka receivers and each incoming stream is split
into 40 partitions" I suspect that you're creating too many tasks for
Spark to process on time.
Could you try some of the 'knobs' I describe here to see if that would help?
http://www.virdata.com/tuning-spark/
-
this is more of a scala question, so probably next time you'd like to
address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala
val optArrStr:Option[Array[String]] = ???
optArrStr.map(arr => arr.mkString(",")).getOrElse("") // empty string or
whatever default value you have for th
>> isn't this the same issueas this?
>> https://issues.apache.org/jira/browse/MESOS-1688
>>
>> On Mon, Jan 26, 2015 at 9:17 PM, Gerard Maas
>> wrote:
>>
>>> Hi,
>>>
>>> We are observing with certain regularity that our Spark jo
Hi,
Did you try asking this on StackOverflow?
http://stackoverflow.com/questions/tagged/apache-spark
I'd also suggest adding some sample data to help others understanding your
logic.
-kr, Gerard.
On Tue, Jan 27, 2015 at 1:14 PM, 老赵 wrote:
> Hello All,
>
> I am writing a simple Spark applica
t be Spark at all.
>
> Thank you.
>
> Le 26 janv. 2015 22:28, "Gerard Maas" a écrit :
>
> >
> > (looks like the list didn't like a HTML table on the previous email. My
> excuses for any duplicates)
> >
> > Hi,
> >
> > We are observ
(looks like the list didn't like a HTML table on the previous email. My
excuses for any duplicates)
Hi,
We are observing with certain regularity that our Spark jobs, as Mesos
framework, are hoarding resources and not releasing them, resulting in
resource starvation to all jobs running on the Mes
Hi,
We are observing with certain regularity that our Spark jobs, as Mesos
framework, are hoarding resources and not releasing them, resulting in
resource starvation to all jobs running on the Mesos cluster.
For example:
This is a job that has spark.cores.max = 4 and spark.executor.memory="3g"
+1
On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:
> That sounds good to me. Shall I open a JIRA / PR about updating the site
> community page?
> On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell
> wrote:
>
>> Hey Nick,
>>
>> So I think we what can do is encou
ls here:
http://www.virdata.com/tuning-spark/#Partitions)
-kr, Gerard.
On Thu, Jan 22, 2015 at 4:28 PM, Gerard Maas wrote:
> So the system has gone from 7msg in 4.961 secs (median) to 106msgs in
> 4,761 seconds.
> I think there's evidence that setup costs are quite high in thi
So the system has gone from 7msg in 4.961 secs (median) to 106msgs in 4,761
seconds.
I think there's evidence that setup costs are quite high in this case and
increasing the batch interval is helping.
On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee <
asudipta.baner...@gmail.com> wrote:
> Hi Ash
and post the code (if possible).
In a nutshell, your processing time > batch interval, resulting in an
ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic
is.
-kr, Gerard.
On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das w
I've have been contributing to SO for a while now. Here're few
observations I'd like to contribute to the discussion:
The level of questions on SO is often of more entry-level. "Harder"
questions (that require expertise in a certain area) remain unanswered for
a while. Same questions here on the
ong the same line, that is to fix the number of streams
> and change the input and output channels dynamically.
>
> But could not make it work (seems that the receiver is not allowing any
> change in the config after it started).
>
> thanks,
>
> On Wed, Jan 21, 2015 at 10:49 A
One possible workaround could be to orchestrate launch/stopping of
Streaming jobs on demand as long as the number of jobs/streams stay within
the boundaries of the resources (cores) you've available.
e.g. if you're using Mesos, Marathon offers a REST interface to manage job
lifecycle. You will stil
Hi Mukesh,
How are you creating your receivers? Could you post the (relevant) code?
-kr, Gerard.
On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha wrote:
> Hello Guys,
>
> I've re partitioned my kafkaStream so that it gets evenly distributed
> among the executors and the results are better.
> Still
Spark will use the number of cores available in the cluster. If your
cluster is 1 node with 4 cores, Spark will execute up to 4 tasks in
parallel.
Setting your #of partitions to 4 will ensure an even load across cores.
Note that this is different from saying "threads" - Internally Spark uses
many t
You are looking for dstream.transform(rdd => rdd.(otherRdd))
The docs contain an example on how to use transform.
https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams
-kr, Gerard.
On Thu, Jan 8, 2015 at 5:50 PM, Asim Jalis wrote:
> Is there a way t
t;> stream
>> .map { input =>
>> val bytes = produce(input)
>> // metricRegistry.meter("some.metrics").mark(bytes.length)
>> bytes
>> }
>> .saveAsTextFile("text")
>>
>>
pics) {
>> topicMap.put(topic, numStreams);
>> }
>>
>> List> kafkaStreams = new
>> ArrayList<>(numStreams);
>> for (int i = 0; i < numStreams; i++) {
>> kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
>> byte[].
Hi,
Could you add the code where you create the Kafka consumer?
-kr, Gerard.
On Wed, Jan 7, 2015 at 3:43 PM, wrote:
> Hi Mukesh,
>
> If my understanding is correct, each Stream only has a single Receiver.
> So, if you have each receiver consuming 9 partitions, you need 10 input
> DStreams to c
1 - 100 of 192 matches
Mail list logo