[ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reopened KAFKA-5528: ------------------------------------ > Error while reading topic, offset, partition info from process method > --------------------------------------------------------------------- > > Key: KAFKA-5528 > URL: https://issues.apache.org/jira/browse/KAFKA-5528 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Nishkam Ravi > > We are encountering an {{IllegalStateException}} while trying to access > {{context.topic()}} from process function. The code is written in Scala and > is being launched using sbt (spring isn't involved). Here's the code sketch: > {noformat} > class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], > decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], > Array[Byte]] with LazyLogging { > private var hsmClient: HSMClient = _ > override def init(processorContext: ProcessorContext): Unit = { > super.init(processorContext) > hsmClient = HSMClient(config).getOrElse(null) > } > override def process(key: Array[Byte], value: Array[Byte]): Unit = { > val topic: String = this.context.topic() > partition: Int = this.context.partition() > val offset: Long = this.context.offset() > val timestamp: Long = this.context.timestamp() > // business logic > } > } > {noformat} > The exception is thrown only for the multi-consumer case (when number of > partitions for a topic > 1 and parallelism > 1). -- This message was sent by Atlassian JIRA (v6.4.14#64029)