I agree getting cassandra out of the picture is a good first step.
But if you just do foreachRDD { _.count } recent versions of direct stream
shouldn't do any work at all on the executor (since the number of messages
in the rdd is known already)
do a foreachPartition and println or count the iterator manually.
On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <[email protected]> wrote:
> Are sure that this is not related to Cassandra inserts? Could you just do
> foreachRDD { _.count } instead to keep Cassandra out of the picture and
> then test this agian.
>
> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <[email protected]> wrote:
>
>> Also check if the Kafka cluster is still balanced. Maybe one of the
>> brokers manages too many partitions, all the work will stay on that
>> executor unless you repartition right after kakfka (and I'm not saying you
>> should).
>>
>> Sent from my iPhone
>>
>> On 06 Oct 2015, at 22:17, Cody Koeninger <[email protected]> wrote:
>>
>> I'm not clear on what you're measuring. Can you post relevant code
>> snippets including the measurement code?
>>
>> As far as kafka metrics, nothing currently. There is an info-level log
>> message every time a kafka rdd iterator is instantiated,
>>
>> log.info(s"Computing topic ${part.topic}, partition
>> ${part.partition} " +
>>
>> s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>
>>
>> If you log once you're done with an iterator you should be able to see
>> the delta.
>>
>> The other thing to try is reduce the number of parts involved in the job
>> to isolate it ... first thing I'd do there is take cassandra out of the
>> equation.
>>
>>
>>
>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <[email protected]>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> The job is doing ETL from Kafka records to Cassandra. After a
>>> single filtering stage on Spark, the 'TL' part is done using the
>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>
>>> We have metrics on the executor work which we collect and add together,
>>> indicated here by 'local computation'. As you can see, we also measure how
>>> much it cost us to measure :-)
>>> See how 'local work' times are comparable. What's not visible is the
>>> task scheduling and consuming the data from Kafka which becomes part of the
>>> 'spark computation' part.
>>>
>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>
>>> Are there metrics available somehow on the Kafka reading time?
>>>
>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>> records processed 4297 5002
>>>
>>> (time in ms)
>>>
>>> kr, Gerard.
>>>
>>>
>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <[email protected]>
>>> wrote:
>>>
>>>> Can you say anything more about what the job is doing?
>>>>
>>>> First thing I'd do is try to get some metrics on the time taken by your
>>>> code on the executors (e.g. when processing the iterator) to see if it's
>>>> consistent between the two situations.
>>>>
>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>>> Our initial migration went quite fine but now we are seeing a weird
>>>>> zig-zag
>>>>> performance pattern we cannot explain.
>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>> the next takes 7sec for a stable streaming rate.
>>>>>
>>>>> Here are comparable metrics for two successive tasks:
>>>>> *Slow*:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22
>>>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1
>>>>> dnode-0.hdfs.private:43812 40 s 11 0 11
>>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49
>>>>> s 10 0 10
>>>>> *Fast*:
>>>>>
>>>>>
>>>>>
>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 0.6
>>>>> s 4 0 4 20151006-044141-2408867082-5050-21047-S1
>>>>> dnode-0.hdfs.private:43812 1 s 9 0 9
>>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 1
>>>>> s 11 0 11
>>>>> We have some custom metrics that measure wall-clock time of execution
>>>>> of certain blocks of the job, like the time it takes to do the local
>>>>> computations (RDD.foreachPartition closure) vs total time.
>>>>> The difference between the slow and fast executing task is on the
>>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>>> (DStream.foreachRDD closure)
>>>>>
>>>>> e.g.
>>>>> Slow task:
>>>>>
>>>>> local computation time: 347.60968499999996, *spark computation time:
>>>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>>>
>>>>> Fast task:
>>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>>
>>>>> We are currently running Spark 1.4.1. The load and the work to be done
>>>>> is stable -this is on a dev env with that stuff under control.
>>>>>
>>>>> Any ideas what this behavior could be?
>>>>>
>>>>> thanks in advance, Gerard.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>