[ https://issues.apache.org/jira/browse/KAFKA-9005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna resolved KAFKA-9005. ---------------------------------- Resolution: Not A Bug > Kafka stream: “TopicAuthorizationException: Not authorized to access topics” > for an internal state store > -------------------------------------------------------------------------------------------------------- > > Key: KAFKA-9005 > URL: https://issues.apache.org/jira/browse/KAFKA-9005 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0 > Reporter: Arnaud Villevieille > Priority: Blocker > > Java: OpenJdk 11 > Java: OpenJdk 11 > Kafka server: 2.2.0 > Kafka streams lib: 2.3.0 > i have created a stackoverflow query > [here|[https://stackoverflow.com/questions/58299827/kafka-stream-topicauthorizationexception-not-authorized-to-access-topics-for]] > > I am trying to deploy my Kafka streams application in a *docker* container > and it fails while trying to create an internal state store with a > TopicAuthorizationException.It works well locally. The main difference > between locally and on the server is that there it connects to a server > deployed Kafka and authenticates using the usual *Kerberos* auth.I fail to > understand the link between authentication and the *local stores*. > My stream looks like that: > {code:java} > StreamsBuilder builder = new StreamsBuilder(); > //We stream from the source topic > KStream<String, EnrichedMessagePayload> sourceMessagesStream = > builder.stream(sourceTopic, Consumed > .with(Serdes.serdeFrom(String.class), INPUT_SERDE)); > //We group per room and window > TimeWindowedKStream<String, EnrichedMessagePayload> windowed = > sourceMessagesStream > > .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO)); > //We make them a list > KStream<Windowed<String>, WindowedMessages> grouped = windowed > .aggregate(WindowedMessages::new, > (key, value, aggregate) -> aggregate.add(value), > Materialized.with(Serdes.String(), > Serdes.serdeFrom(windowSerializer, windowSerializer))) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream(); > //Filter > KStream<Windowed<String>, FilterResult> filtered = grouped > .mapValues((readOnlyKey, value) -> > filterWindow(value.getMessages())); > //Re map to its original form > KStream<String, OutputPayload> reduced = filtered > .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, > Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value > .getMessages() > .stream().map(payload -> new KeyValue<>(key.key(), > payload)) > .collect(toList())); > //Target topic > reduced.to(sinkTopic, Produced > .with(Serdes.serdeFrom(String.class), SERDE)); > return builder.build(); > {code} > It receives a stream of messages, windows it, aggregates all the messages per > window, keeps only the last version of the list with a 'Suppressed' and then > flatMaps the whole to forward it to another topic. > Every time i get that kind of exception: > > {quote}org.apache.kafka.common.errors.TopicAuthorizationException: Not > authorized to access topics: [Topic authorization failed.]> Error message > was: org.apache.kafka.common.errors.TopicAuthorizationException: Not > authorized to access topics: [Topic authorization failed.]2019-10-09 > 06:44:03.255 +0000 ERROR > [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] > [StreamThread.java:777] - stream-thread > [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Encountered > the following unexpected Kafka exception during processing, this usually > indicate Streams internal errors: - > [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] > - [] - []org.apache.kafka.streams.errors.StreamsException: Could not create > topic filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. at > org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:226) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:104) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:971) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:618) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)Caused > by: org.apache.kafka.common.errors.TopicAuthorizationException: Not > authorized to access topics: [Topic authorization failed.] > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)