Nishkam Ravi created KAFKA-5528:
-----------------------------------

             Summary: Error while reading topic, offset, partition info from 
process method
                 Key: KAFKA-5528
                 URL: https://issues.apache.org/jira/browse/KAFKA-5528
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Nishkam Ravi


We are encountering an IllegalStateException while trying to access 
context.topic() from process function. The code is written in Scala and is 
being launched using sbt (spring isn't involved). Here's the code sketch:

class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: 
Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] 
with LazyLogging {
  private var hsmClient: HSMClient = _
  override def init(processorContext: ProcessorContext): Unit = { 
    super.init(processorContext) 
    hsmClient = HSMClient(config).getOrElse(null) 
  }
  override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
    val topic: String = this.context.topic() 
    partition: Int = this.context.partition() 
    val offset: Long = this.context.offset() 
    val timestamp: Long = this.context.timestamp() 
    // business logic 
  }
}
The exception is thrown only for the multi-consumer case (when number of 
partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to