Preferred locations are only advisory, you can still get tasks scheduled on
other executors. You can try bumping up the size of the cache to see if
that is causing the issue you're seeing.
On Nov 13, 2016 12:47, "Ivan von Nagy" wrote:
> As the code iterates through the parallel list, it is proc
As the code iterates through the parallel list, it is processing up to 8
KafkaRDD at a time. Each has it's own unique topic and consumer group now.
Every topic has 4 partitions, so technically there should never be more
then 32 CachedKafkaConsumers. However, this seems to not be the case as we
are
You should not be getting consumer churn on executors at all, that's
the whole point of the cache. How many partitions are you trying to
process per executor?
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies
gives instructions on the default size of th
Hi Sean,
Thanks for responding. We have run our jobs with internal parallel
processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and did
not encounter any of these issues until upgrading to Spark 2.0.1 and Kafka
0.10 clients. If we process serially, then we sometimes get the errors,
How are you iterating through your RDDs in parallel? In the past (Spark 1.5.2)
when I've had actions being performed on multiple RDDs concurrently using
futures, I've encountered some pretty bad behavior in Spark, especially during
job retries. Very difficult to explain things, like records from
The code was changed to use a unique group for each KafkaRDD that was
created (see Nov 10 post). There is no KafkaRDD being reused. The basic
logic (see Nov 10 post for example) is get a list of channels, iterate
through them in parallel, load a KafkaRDD using a given topic and a
consumer group tha
It is already documented that you must use a different group id, which as
far as I can tell you are still not doing.
On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu"
wrote:
> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>
> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy wrote:
Yeah, the KafkaRDD cannot be reused. It's better to document it.
On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy wrote:
> Ok, I have split he KafkaRDD logic to each use their own group and bumped
> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
> ends up with a timeout and
Ok, I have split he KafkaRDD logic to each use their own group and bumped
the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms ends
up with a timeout and exception so I am still perplexed on that one. The
new error I am getting now is a `ConcurrentModificationException` when
Spark
There definitely is Kafka documentation indicating that you should use
a different consumer group for logically different subscribers, this
is really basic to Kafka:
http://kafka.apache.org/documentation#intro_consumers
As for your comment that "commit async after each RDD, which is not
really vi
With our stream version, we update the offsets for only the partition we
operating on. We even break down the partition into smaller batches and
then update the offsets after each batch within the partition. With Spark
1.6 and Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
not
Someone can correct me, but I'm pretty sure Spark dstreams (in
general, not just kafka) have been progressing on to the next batch
after a given batch aborts for quite some time now. Yet another
reason I put offsets in my database transactionally. My jobs throw
exceptions if the offset in the DB
I've been encountering the same kinds of timeout issues as Ivan, using the
"Kafka Stream" approach that he is using, except I'm storing my offsets
manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
implemented the KafkaRDD approach, and therefore don't have the concurrenc
So basically what I am saying is
- increase poll.ms
- use a separate group id everywhere
- stop committing offsets under the covers
That should eliminate all of those as possible causes, and then we can
see if there are still issues.
As far as 0.8 vs 0.10, Spark doesn't require you to assign or
Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses
a single distinct topic. For example, the group would be something like
"storage-group", and the topics would be "storage-channel1", and
"storage-channel2". In each thread a KafkaConsumer is started, assigned the
partitions
So just to be clear, the answers to my questions are
- you are not using different group ids, you're using the same group
id everywhere
- you are committing offsets manually
Right?
If you want to eliminate network or kafka misbehavior as a source,
tune poll.ms upwards even higher.
You must use
Here are some examples and details of the scenarios. The KafkaRDD is the most
error prone to polling
timeouts and concurrentm modification errors.
*Using KafkaRDD* - This takes a list of channels and processes them in
parallel using the KafkaRDD directly. they all use the same consumer group
('st
- are you using different group ids for the different streams?
- are you manually committing offsets?
- what are the values of your kafka-related settings?
On Fri, Nov 4, 2016 at 12:20 PM, vonnagy wrote:
> I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,
> one that uses
18 matches
Mail list logo