Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20270965
  
    --- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
 ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
    +
    +import scala.collection.{Map, mutable}
    +import scala.reflect.{ClassTag, classTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
    +import kafka.message.MessageAndMetadata
    +import kafka.serializer.Decoder
    +import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, 
ZKStringSerializer, ZkUtils}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{Logging, SparkEnv}
    +import org.apache.spark.storage.{StorageLevel, StreamBlockId}
    +import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into 
BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference 
compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates 
the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be 
updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver 
can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to 
turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration 
manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +  private def conf = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap 
is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency 
issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, 
Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset 
snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing 
block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Threadpool running the handlers for receiving message from multiple 
topics and partitions. */
    +  private var messageHandlerThreadPool: ThreadPoolExecutor = null
    +
    +  override def onStart(): Unit = {
    +    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
    +
    +    // Initialize the topic-partition / offset hash map.
    +    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
    +
    +    // Initialize the stream block id / offset snapshot hash map.
    +    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]]()
    +
    +    // Initialize the block generator for storing Kafka message.
    +    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, 
streamId, conf)
    +
    +    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && 
kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    +      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in 
ReliableKafkaReceiver, " +
    --- End diff --
    
    Good addition!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to