Glad it was helpful :) As far as executors, my expectation is that if you have multiple executors running, and one of them crashes, the failed task will be submitted on a different executor. That is typically what I observe in spark apps, if that's not what you're seeing I'd try to get help on that specific issue.
As far as kafka offsets per-partition, you need to atomically write your offsets and results in the same place. If that place is a filesystem, you need to be using atomic operations (I try to stay away from HDFS, but I believe renames are atomic, for instance). If you're doing that in normal code, ie with an iterator instead of an rdd or dataframe, you may have to do some of that work yourself. As far as using partitionBy, even after repartitioning you can still use the general idea of writing results and offsets in the same place. The major difference is that you need to write all offsets in each partition (because things have been shuffled), and need to be more careful on startup after a failure. On startup, you'd see which partitions were incomplete, start the job from the offsets in the incomplete partitions, do the work for all partitions, but ignore the writes when they got to the complete partitions. I realize that's kind of a complicated description, if it doesn't make sense ask, or I may be able to put up some publicly visible code at some point in the future. On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane <s...@dindane.com> wrote: > I just noticed that you're the author of the code I linked in my previous > email. :) It's helpful. > > When using `foreachPartition` or `mapPartitions`, I noticed I can't ask > Spark to write the data on the disk using `df.write()` but I need to use the > iterator to do so, which means losing the ability of using partitionBy(). > Do you know a workaround? Or I'll be forced to partition data manually. > > I think I understand why the job crashes when a single executor does: > `df.write()....save()` writes all the partitions in the same time, which > fails if one of them has died. > Is that right? > > Thank you. > > Samy > > > On 10/10/2016 04:58 PM, Samy Dindane wrote: >> >> Hi Cody, >> >> I am writing a spark job that reads records from a Kafka topic and writes >> them on the file system. >> This would be straightforward if it weren't for the custom checkpointing >> logic I want to have; Spark's checkpointing doesn't suit us as it doesn't >> permit code updates. >> >> The custom checkpointing would be similar to this: >> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala >> I am trying to understand how this would work if an executor crashes, so I >> tried making one crash manually, but I noticed it kills the whole job >> instead of creating another executor to resume the task. >> Is that expected? Is there anything wrong with my approach? >> >> Thank you for your time. >> >> >> On 10/10/2016 04:29 PM, Cody Koeninger wrote: >>> >>> What is it you're actually trying to accomplish? >>> >>> On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane <s...@dindane.com> wrote: >>>> >>>> I managed to make a specific executor crash by using >>>> TaskContext.get.partitionId and throwing an exception for a specific >>>> executor. >>>> >>>> The issue I have now is that the whole job stops when a single executor >>>> crashes. >>>> Do I need to explicitly tell Spark to start a new executor and keep the >>>> other ones running? >>>> >>>> >>>> On 10/10/2016 11:19 AM, Samy Dindane wrote: >>>>> >>>>> >>>>> Hi, >>>>> >>>>> I am writing a streaming job that reads a Kafka topic. >>>>> As far as I understand, Spark does a 1:1 mapping between its executors >>>>> and >>>>> Kafka partitions. >>>>> >>>>> In order to correctly implement my checkpoint logic, I'd like to know >>>>> what >>>>> exactly happens when an executors crashes. >>>>> Also, is it possible to kill an executor manually for testing purposes? >>>>> >>>>> Thank you. >>>>> >>>>> Samy >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org