Hi TD,

Thanks a lot for offering to look at our PR (if we fire one) at the
conference NYC.

As we discussed briefly the issues of unbalanced and
under-distributed kafka partitions when developing Spark streaming
application in Mobius (C# for Spark), we're trying the option of
repartitioning within DirectKafkaInputDStream instead of DStream.repartiton
API which introduces extra network cost and doesn't really solve the root
cause.

However, instead of firing a JIRA with PR directly, we decided to create a
customized Kafka RDD / DStream (to start with and contribute back later if
success) - DynamicPartitionKafkaRDD and DynamicPartitionKafkaInputDStream
using inheritance model and expose a new API
KafkaUtils.CreateDirectStreamWithRepartition with one more parameter -
numPartitions (hint number of RDD partitions to create)

it'll be great that you can take look at the code and share your comments:

https://github.com/Microsoft/Mobius/tree/master/scala/src/main/org/apache/spark/streaming/api/kafka

the major relevant change is in DynamicPartitionKafkaRDD.getPartitions
where an average size of RDD partition is calculated first (total size of
the topic divided by numPartitions) and used to split partitions (final RDD
partitions will be greater or equal to numPartitions)

there's a concern that Kafka partition[i] no longer maps to task[i] which
might break existing application. here's our thinking:

a. OffsetRanges in original implementation may have multiple topics meaning
'partition i maps to tasks i' is generally a false statement

b. Even if only one topic is involved, partition sequence in offsetRanges
comes from Kafka topic meta data response which doesn't necessary guarantee
the sequence, even if it does, application should not take that dependency

c. Topic partition split happens only when configured


there're some other more complicated changes related to fault
tolerance which are irrelevant here (but you're more than welcome to
comment on them too) and are introduced to unblock the scenarios we're
experiencing on a daily basis.

1. temporally redirect kafka read to C# worker by passing metadata instead
of actual kafka messages to it, in C# worker, a C# version of kafka client
is used which enables much easier debugging

2. bypass metadata request exceptions on driver side and let next batch
retry

3. bypass some read errors on worker side


Note all above are at very early stage, your comments will be much valuable
and  appreciated.


Thanks a lot,

Reny.

Reply via email to