[jira] [Commented] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406600#comment-17406600 ] Rasmus Helbig Hansen commented on KAFKA-12963: -- [~ableegoldman] first of all thanks for looking into this and providing a patch. I do understand that the data flowing into a processor doesn't need to have originated from a topic. A punctuator could be sending messages, as an example. However, in my particular case, where we (mostly) use the high level API, I was naively thinking that e.g. {code:java} .selectKey((k, v) -> new MyKey(v.getLocation()) .leftJoin(businessCentres, ...) {code} should be able to say something about the underlying topic, rather than the programmer inspecting the low level topology, in order to make a qualified guess at where the type misalignment might be. The inclusion of the processor name is fine for me. I'm happy with that. But I can imagine that a class cast exception of types A and B for processor KSTREAM-TRANSFORM-16 would be a bit of a head-scratcher for programmers getting started with Streams. > Improve error message for Class cast exception > -- > > Key: KAFKA-12963 > URL: https://issues.apache.org/jira/browse/KAFKA-12963 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Rasmus Helbig Hansen >Assignee: Andrew Lapidas >Priority: Minor > Fix For: 3.1.0 > > > After a topology change and starting the application again, we got this type > of error message: > [g9z-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following > error: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > invoking Processor. Do the Processor's input types match the deserialized > types? Check the Serde setup and change the default Serdes in StreamConfig or > provide correct Serdes via method parameters. Make sure the Processor can > accept the deserialized input of type key: org.acme.SomeKey, and value: > org.acme.SomeValue. > Note that although incorrect Serdes are a common cause of error, the cast > exception might have another cause (in user code, for example). For example, > if a processor wires in a store, but casts the generics incorrectly, a class > cast exception could be raised during processing, but the cause would not be > wrong Serdes. > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at
[GitHub] [kafka] jlprat commented on a change in pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat commented on a change in pull request #11228: URL: https://github.com/apache/kafka/pull/11228#discussion_r698268258 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -509,7 +526,7 @@ private void handleStreamsUncaughtException(final Throwable throwable, break; case SHUTDOWN_CLIENT: log.error("Encountered the following exception during processing " + -"and the registered exception handler opted to " + action + "." + +"and Kafka Streams opted to " + action + "." + Review comment: Changed the error log line as mentioned. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -495,9 +498,25 @@ private void replaceStreamThread(final Throwable throwable) { } } +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action; +// Exception might we wrapped within a StreamsException one Review comment: Updated now, the `if` statement now only checks the wrapped exception if it exists, and decides upon that. See next line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
cadonna commented on a change in pull request #11228: URL: https://github.com/apache/kafka/pull/11228#discussion_r698280849 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -495,9 +498,23 @@ private void replaceStreamThread(final Throwable throwable) { } } +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { Review comment: ```suggestion private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -495,9 +498,23 @@ private void replaceStreamThread(final Throwable throwable) { } } +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action; +// Exception might we wrapped within a StreamsException one +if (throwable.getCause() != null && EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS.contains(throwable.getCause().getClass())) { Review comment: Could you please put this check in a method with a meaningful name like `wrappedExceptionIsIn()`? Then the check would be ``` if (wrappedExceptionIsIn(EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) ``` and you can remove all other inline comments because the code per se contains all needed info. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java ## @@ -163,6 +164,27 @@ public void shouldShutdownClient() throws InterruptedException { } } + +@Test +public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException { Review comment: Could you please also add a test for the `IllegalArgumentException`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wangxianghu closed pull request #11210: KAFKA-13199: Make Task extends Versioned
wangxianghu closed pull request #11210: URL: https://github.com/apache/kafka/pull/11210 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat commented on pull request #11228: URL: https://github.com/apache/kafka/pull/11228#issuecomment-908160013 @cadonna Applied your feedback as well, thanks for the comments. I also realized the `AtomicBoolean` variables in the test could be final and added that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13250) 2.4.0
chaos created KAFKA-13250: - Summary: 2.4.0 Key: KAFKA-13250 URL: https://issues.apache.org/jira/browse/KAFKA-13250 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0 Environment: linux 4.1.0 Reporter: chaos [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) disk errors are: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13251) 2.4.0
chaos created KAFKA-13251: - Summary: 2.4.0 Key: KAFKA-13251 URL: https://issues.apache.org/jira/browse/KAFKA-13251 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0 Environment: linux 4.1.0 Reporter: chaos [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) disk errors are: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)
cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r698327147 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -178,12 +184,48 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } +oldProducerTotalBlockedTime += totalBlockedTime(producer); +final long start = time.nanoseconds(); producer.close(); +final long closeTime = time.nanoseconds() - start; +oldProducerTotalBlockedTime += closeTime; producer = clientSupplier.getProducer(eosV2ProducerConfigs); transactionInitialized = false; } +private double getMetricValue(final Map metrics, + final String name) { +final List found = metrics.keySet().stream() +.filter(n -> n.name().equals(name)) +.collect(Collectors.toList()); +if (found.isEmpty()) { +return 0.0; +} +if (found.size() > 1) { Review comment: @rodesai I see your point here. However, the downside of not throwing is that we will also not notice the bad behavior in our tests like the soak tests. I personally prefer to improve tests instead of downgrading the reaction to bad behavior. Assume in future somebody makes a change that breaks the assumption of the non-shared metrics registry, we would find this bug immediately during development instead of during production. Another option that comes to my mind is to classify exceptions that originate from the metrics framework differently in the uncaught exception handler, but that would probably need some more work. ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() { return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); } +private KafkaConsumer consumerWithPendingAuthenticationError() { +return consumerWithPendingAuthenticationError(new MockTime()); +} + +private KafkaConsumer consumerWithPendingError(final Time time) { +return consumerWithPendingAuthenticationError(time); +} Review comment: Fair enough! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13251) 2.4.0
[ https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos updated KAFKA-13251: -- Description: Disk error occurred in broker(=42),and then Shrinking ISR, why Shrinking ISR to error broker ? kafka logs: broker42: [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) disk error is: Aug 26 20:20:55 kafka-zk-19 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK was: [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) disk errors are: > 2.4.0 > - > > Key: KAFKA-13251 > URL: https://issues.apache.org/jira/browse/KAFKA-13251 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 > Environment: linux 4.1.0 >Reporter: chaos >Priority: Major > > Disk error occurred in broker(=42),and then Shrinking ISR, > why Shrinking ISR to error broker ? > > kafka logs: > broker42: > [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing > fetch with max size 1048576 from consumer on partition topic_xx-123: > (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, > currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 > smaller than minimum record overhead (14) in file > /data4/kafka-logs/topic_xx-123/011060934646.log. > [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,645] ERROR Error while deleting segments for > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureC
[jira] [Updated] (KAFKA-13251) 2.4.0
[ https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos updated KAFKA-13251: -- Description: Disk error occurred in broker(=42),and then Shrinking ISR to itself. so why Shrinking ISR to an error broker? i.e. why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 to 42". kafka logs: broker42: [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) broker55: [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread) disk error on broker42 is: Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK was: Disk error occurred in broker(=42),and then Shrinking ISR, why Shrinking ISR to error broker ? kafka logs: broker42: [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) disk error is: Aug 26 20:20:55 kafka-zk-19 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK > 2.4.0 > - > > Key: KAFKA-13251 > URL: https://issues.apache.org/jira/browse/KAFKA-13251 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 > Environment: linux 4.1.0 >Reporter: chaos >Priority: Major > > Disk error occurred in broker(=42),and then Shrinking ISR to itself. > so why Shrinking ISR to an error broker? > i.e. why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 > to 42". > kafka logs: > broker42: > [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing > fetch with max size 1048576 from consumer on partition topic_xx-123: > (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576,
[jira] [Updated] (KAFKA-13251) 2.4.0
[ https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos updated KAFKA-13251: -- Component/s: core > 2.4.0 > - > > Key: KAFKA-13251 > URL: https://issues.apache.org/jira/browse/KAFKA-13251 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0 > Environment: linux 4.1.0 >Reporter: chaos >Priority: Major > > Disk error occurred in broker(=42),and then Shrinking ISR to itself. > so why Shrinking ISR to an error broker? > i.e. why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 > to 42". > kafka logs: > broker42: > [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing > fetch with max size 1048576 from consumer on partition topic_xx-123: > (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, > currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 > smaller than minimum record overhead (14) in file > /data4/kafka-logs/topic_xx-123/011060934646.log. > [2021-08-26 20:20:55,640] ERROR Error while appending records to > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,645] ERROR Error while deleting segments for > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > Suppressed: java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > [2021-08-26 20:20:55,644] ERROR Error while appending records to > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking > ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: > 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). > (kafka.cluster.Partition) > > broker55: > [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, > epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread) > disk error on broker42 is: > Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: > hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13251) 2.4.0
[ https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos updated KAFKA-13251: -- Description: Disk error occurred in broker(=42),and then Shrinking ISR to itself. so why Shrinking ISR to an error broker? i.e. why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 to 42". note: other partition(110) shrink correctly. kafka logs: broker42: [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) broker55: [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread) [2021-08-26 20:20:43,503] INFO [Partition topic_xxx-110 broker=55] Shrinking ISR from 55,42 to 55. Leader: (highWatermark: 11061384367, endOffset: 11061388788). Out of sync replicas: (brokerId: 42, endOffset: 11061384367). (kafka.cluster.Partition) disk error on broker42 is: Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK was: Disk error occurred in broker(=42),and then Shrinking ISR to itself. so why Shrinking ISR to an error broker? i.e. why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 to 42". kafka logs: broker42: [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing fetch with max size 1048576 from consumer on partition topic_xx-123: (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /data4/kafka-logs/topic_xx-123/011060934646.log. [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system Suppressed: java.nio.file.FileSystemException: /data4/kafka-logs/topic_xx-123/011040402299.log -> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file system [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). (kafka.cluster.Partition) broker55: [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread) disk error on broker42 is: Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: hostbyte=DID_BAD_T
[jira] [Created] (KAFKA-13252) ISR shrink to an error broker
chaos created KAFKA-13252: - Summary: ISR shrink to an error broker Key: KAFKA-13252 URL: https://issues.apache.org/jira/browse/KAFKA-13252 Project: Kafka Issue Type: Bug Reporter: chaos -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13251) ISR shrink to an error broker
[ https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos updated KAFKA-13251: -- Summary: ISR shrink to an error broker (was: 2.4.0) > ISR shrink to an error broker > - > > Key: KAFKA-13251 > URL: https://issues.apache.org/jira/browse/KAFKA-13251 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0 > Environment: linux 4.1.0 >Reporter: chaos >Priority: Major > > Disk error occurred in broker(=42),and then Shrinking ISR to itself. > so why Shrinking ISR to an error broker? > i.e. why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 > to 42". > note: > other partition(110) shrink correctly. > > kafka logs: > broker42: > [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing > fetch with max size 1048576 from consumer on partition topic_xx-123: > (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, > currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 > smaller than minimum record overhead (14) in file > /data4/kafka-logs/topic_xx-123/011060934646.log. > [2021-08-26 20:20:55,640] ERROR Error while appending records to > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,645] ERROR Error while deleting segments for > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > Suppressed: java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > [2021-08-26 20:20:55,644] ERROR Error while appending records to > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking > ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: > 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). > (kafka.cluster.Partition) > > broker55: > [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, > epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread) > [2021-08-26 20:20:43,503] INFO [Partition topic_xxx-110 broker=55] Shrinking > ISR from 55,42 to 55. Leader: (highWatermark: 11061384367, endOffset: > 11061388788). Out of sync replicas: (brokerId: 42, endOffset: 11061384367). > (kafka.cluster.Partition) > > disk error on broker42 is: > Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: > hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13250) 2.4.0
[ https://issues.apache.org/jira/browse/KAFKA-13250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos resolved KAFKA-13250. --- Resolution: Duplicate > 2.4.0 > - > > Key: KAFKA-13250 > URL: https://issues.apache.org/jira/browse/KAFKA-13250 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 > Environment: linux 4.1.0 >Reporter: chaos >Priority: Major > > > > [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing > fetch with max size 1048576 from consumer on partition topic_xx-123: > (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, > currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 > smaller than minimum record overhead (14) in file > /data4/kafka-logs/topic_xx-123/011060934646.log. > [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,645] ERROR Error while deleting segments for > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > Suppressed: java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking > ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: > 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). > (kafka.cluster.Partition) > > disk errors are: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13252) ISR shrink to an error broker
[ https://issues.apache.org/jira/browse/KAFKA-13252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos resolved KAFKA-13252. --- Resolution: Duplicate > ISR shrink to an error broker > - > > Key: KAFKA-13252 > URL: https://issues.apache.org/jira/browse/KAFKA-13252 > Project: Kafka > Issue Type: Bug >Reporter: chaos >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r698416259 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PersistentSessionStore.java ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.state.SessionStore; + +public interface PersistentSessionStore extends SessionStore { Review comment: done ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PersistentWindowStore.java ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.state.WindowStore; + +public interface PersistentWindowStore extends WindowStore { Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r698417779 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ## @@ -217,6 +230,24 @@ public V fetch(final K key, final long timeFrom, final long timeTo) { Objects.requireNonNull(key, "key cannot be null"); + +if (wrapped().persistent()) { + +final long actualWindowStartTime = getActualWindowStartTime(timeFrom); + +if (timeTo < actualWindowStartTime) { +return KeyValueIterators.emptyWindowStoreIterator(); +} + +return new MeteredWindowStoreIterator<>( +wrapped().fetch(keyBytes(key), actualWindowStartTime, timeTo), +fetchSensor, +streamsMetrics, +serdes, +time +); Review comment: For the most part, only getting the actual start time and returning empty iterator are duplicate. The MeteredWindowStoreIterator object is built using a different method call every time. Do you think the first part needs to be abstracted out to a different method? That might also differ across different methods.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-908275012 @showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-13250) 2.4.0
[ https://issues.apache.org/jira/browse/KAFKA-13250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos closed KAFKA-13250. - duplicated > 2.4.0 > - > > Key: KAFKA-13250 > URL: https://issues.apache.org/jira/browse/KAFKA-13250 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 > Environment: linux 4.1.0 >Reporter: chaos >Priority: Major > > > > [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing > fetch with max size 1048576 from consumer on partition topic_xx-123: > (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, > currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 > smaller than minimum record overhead (14) in file > /data4/kafka-logs/topic_xx-123/011060934646.log. > [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,645] ERROR Error while deleting segments for > topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > Suppressed: java.nio.file.FileSystemException: > /data4/kafka-logs/topic_xx-123/011040402299.log -> > /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only > file system > [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 > in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel) > [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking > ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: > 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). > (kafka.cluster.Partition) > > disk errors are: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-13252) ISR shrink to an error broker
[ https://issues.apache.org/jira/browse/KAFKA-13252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaos closed KAFKA-13252. - duplicated > ISR shrink to an error broker > - > > Key: KAFKA-13252 > URL: https://issues.apache.org/jira/browse/KAFKA-13252 > Project: Kafka > Issue Type: Bug >Reporter: chaos >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
David Dufour created KAFKA-13253: Summary: Kafka Connect losing task (re)configuration when connector name has special characters Key: KAFKA-13253 URL: https://issues.apache.org/jira/browse/KAFKA-13253 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.7.1 Reporter: David Dufour When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: http://10.224.0.15:8083/connectors/mirror1-cluster->mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}} catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { log.error("IO error forwarding REST request: ", e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), } finally { {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Dufour updated KAFKA-13253: - Description: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { log.error("IO error forwarding REST request: ", e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), } finally { {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily was: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: http://10.224.0.15:8083/connectors/mirror1-cluster->mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}} catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { log.error("IO error forwarding REST request: ", e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), } finally { {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the connector name contains some special characters,
[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Dufour updated KAFKA-13253: - Description: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { log.error("IO error forwarding REST request: ", e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e); } finally { {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily was: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { log.error("IO error forwarding REST request: ", e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), } finally { {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the connector name contains some special cha
[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Dufour updated KAFKA-13253: - Description: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}{{catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {}} log.error("IO error forwarding REST request: ", e); {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);}} {{} finally {}} {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily was: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}{{catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {}} {{ log.error("IO error forwarding REST request: ", e); }}{{ throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);}}{{} finally {}}{quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the con
[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Dufour updated KAFKA-13253: - Description: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}{{catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {}} {{ log.error("IO error forwarding REST request: ", e); }}{{ throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);}}{{} finally {}}{quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily was: When not leader, DistributedHerder.reconfigureConnector() forwards the task configuration to the leader as follow: {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks"); log.trace("Forwarding task configurations for connector {} to leader", connName); RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); {quote} The problem is that if the connector name contains some special characters, such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an uncatched exception is raised in RestClient and the forward is lost. Here is the kind of exception we can catch by adding the necessary code in RestClient: {quote}java.lang.IllegalArgumentException: Illegal character in path at index 51: [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks {quote} An additional catch() should be added in RestClient.httpRequest(), here: {quote}catch (IOException | InterruptedException | TimeoutException | ExecutionException e) { log.error("IO error forwarding REST request: ", e); throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e); } finally { {quote} to catch all other exceptions because without, this kind of problem is completly silent. To reproduce: * start 2 kafka clusters * start a kafka connect (distributed) with at least 2 nodes * start an HeartbeatConnector with name "cluster1->cluster2" If the node which generated the task is not the leader (not systematic), it will forward the creation to the leader and it will be lost. As a result, the connector will stay in RUNNING state but without any task. Problem not easy to reproduce, it is important to start with empty connect topics to reproduce more easily > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the connector name c
[GitHub] [kafka] dajac commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
dajac commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-908364578 @lbradstreet @junrao Could you take a look at this one as you both reviewed the original PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat commented on pull request #11228: URL: https://github.com/apache/kafka/pull/11228#issuecomment-908365423 Test failure is related to https://issues.apache.org/jira/browse/KAFKA-8138 but not the same, as in this particular instance it seems more like a failure in the test itself (topic already exists) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.
junrao merged pull request #11060: URL: https://github.com/apache/kafka/pull/11060 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-13253. -- Resolution: Duplicate Same issue as KAFKA-9747 - that one already has a fix under review > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the connector name contains some special characters, > such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an > uncatched exception is raised in RestClient and the forward is lost. > Here is the kind of exception we can catch by adding the necessary code in > RestClient: > {quote}java.lang.IllegalArgumentException: Illegal character in path at index > 51: > [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks > {quote} > An additional catch() should be added in RestClient.httpRequest(), here: > {quote}{{catch (IOException | InterruptedException | TimeoutException | > ExecutionException e) {}} > log.error("IO error forwarding REST request: ", e); > {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, > "IO Error trying to forward REST request: " + e.getMessage(), e);}} > {{} finally {}} > {quote} > to catch all other exceptions because without, this kind of problem is > completly silent. > To reproduce: > * start 2 kafka clusters > * start a kafka connect (distributed) with at least 2 nodes > * start an HeartbeatConnector with name "cluster1->cluster2" > If the node which generated the task is not the leader (not systematic), it > will forward the creation to the leader and it will be lost. As a result, the > connector will stay in RUNNING state but without any task. > Problem not easy to reproduce, it is important to start with empty connect > topics to reproduce more easily -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
junrao commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r698652526 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partitions are affected, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty || (newOfflineReplicasNotForDeletion.isEmpty && partitionsWithOfflineLeader.isEmpty)) { Review comment: On second thought, I think this change still doesn't cover all the cases. The main issue is that in partitionStateMachine.triggerOnlinePartitionStateChange(), we only send updateMetadataRequest for partitions with successful leader change. If no partition has successful leader change, no updateMetadataRequest will be sent. The reason could be that newOfflineReplicas is empty, but it could also be that the leader can't be elected for other reasons (e.g. no replica is live). In PartitionStateMachine.doElectLeaderForPartitions(), we have 3 categories of partitions, finishedUpdates, failedElections and updatesToRetry. Perhaps we could return finishedUpdates all the way to partitionStateMachine.triggerOnlinePartitionStateChange() and force a send of updateMetadataRequest if finishedUpdates is empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r698693107 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partitions are affected, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty || (newOfflineReplicasNotForDeletion.isEmpty && partitionsWithOfflineLeader.isEmpty)) { Review comment: I think that makes sense. Do we need to do something similar for both `partitionStateMachine` calls? `handleStateChanges` already returns a `Map[TopicPartition, Either[Throwable, LeaderAndIsr]]` so it seems like we could just look to see if there are any values with `LeaderAndIsr` and that would imply that we sent out `updateMetadataRequest` as part of Online/Offline state changes. and likewise, we can have `triggerOnlinePartitionStateChange` return the same results. I am thinking something along the lines of: ``` // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas val offlineStateChangeResults = partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions val onlineStateChangeResults = partitionStateMachine.triggerOnlinePartitionStateChange() ... val leaderElectionSucceeded = (offlineStateChangeResults ++ onlineStateChangeResults).find(_) // (!leaderElectionSucceeded && newOfflineReplicasForDeletion.isEmpty) { sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r698693107 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partitions are affected, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty || (newOfflineReplicasNotForDeletion.isEmpty && partitionsWithOfflineLeader.isEmpty)) { Review comment: I think that makes sense. Do we need to do something similar for both `partitionStateMachine` calls? `handleStateChanges` already returns a `Map[TopicPartition, Either[Throwable, LeaderAndIsr]]` so it seems like we could just look to see if there are any values with `LeaderAndIsr` and that would imply that we sent out `updateMetadataRequest` as part of Online/Offline state changes. and likewise, we can have `triggerOnlinePartitionStateChange` return the same results. I am thinking something along the lines of: ``` // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas val offlineStateChangeResults = partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions val onlineStateChangeResults = partitionStateMachine.triggerOnlinePartitionStateChange() ... val leaderElectionSucceeded = (offlineStateChangeResults ++ onlineStateChangeResults).find(etc) // (!leaderElectionSucceeded && newOfflineReplicasForDeletion.isEmpty) { sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) } ``` ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partitions are affected, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty || (newOfflineReplicasNotForDeletion.isEmpty && partitionsWithOfflineLeader.isEmpty)) { Review comment: I think that makes sense. Do we need to do something similar for both `partitionStateMachine` calls? `handleStateChanges` already returns a `Map[TopicPartition, Either[Throwable, LeaderAndIsr]]` so it seems like we could just look to see if there are any values with `LeaderAndIsr` and that would imply that we sent out `updateMetadataRequest` as part of Online/Offline state changes. and likewise, we can have `triggerOnlinePartitionStateChange` return the same results. I am thinking something along the lines of: ``` // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas val offlineStateChangeResults = partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions val onlineStateChangeResults = partitionStateMachine.triggerOnlinePartitionStateChange() ... val leaderElectionSucceeded = (offlineStateChangeResults ++ onlineStateChangeResults).find(etc, etc) // (!leaderElectionSucceeded && newOfflineReplicasForDeletion.isEmpty) { sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue opened a new pull request #11284: URL: https://github.com/apache/kafka/pull/11284 This task is to provide a concrete implementation of the interfaces defined in KIP-255 to allow Kafka to connect to an OAuth/OIDC identity provider for authentication and token retrieval. While KIP-255 provides an unsecured JWT example for development, this will fill in the gap and provide a production-grade implementation. The OAuth/OIDC work will allow out-of-the-box configuration by any Apache Kafka users to connect to an external identity provider service (e.g. Okta, Auth0, Azure, etc.). The code will implement the standard OAuth clientcredentials grant type. The proposed change is largely composed of a pair of AuthenticateCallbackHandler implementations: one to login on the client and one to validate on the broker. See the following for more detail: * [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) * [KAFKA-13202](https://issues.apache.org/jira/browse/KAFKA-13202) *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
junrao commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r698849242 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partitions are affected, inform brokers of the offline brokers // Note that during leader re-election, brokers update their metadata -if (partitionsWithOfflineLeader.isEmpty) { +if (newOfflineReplicas.isEmpty || (newOfflineReplicasNotForDeletion.isEmpty && partitionsWithOfflineLeader.isEmpty)) { Review comment: I think we only need to check the return result of `partitionStateMachine.triggerOnlinePartitionStateChange()`. `partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition)` doesn't generate any updateMetadataRequest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
ableegoldman commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-908748576 Hey @hutchiko , thanks for digging into this bug so thoroughly and providing a patch! One quick question before I review -- does this only affect version 2.8 or below specifically, or could this be present on trunk/3.0 as well? Unless you already checked this, my guess would be the latter. If you can verify that is true, then can you please retarget this PR against the `trunk` branch? Once it gets merged then we can cherrypick back to 2.8 from there. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13249: --- Affects Version/s: (was: 2.7.1) > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Priority: Major > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)
guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r698854001 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; + +import java.util.Map; + +public class KafkaProducerMetrics implements AutoCloseable { + +public static final String GROUP = "producer-metrics"; +private static final String FLUSH = "flush"; +private static final String TXN_INIT = "txn-init"; +private static final String TXN_BEGIN = "txn-begin"; +private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; +private static final String TXN_COMMIT = "txn-commit"; +private static final String TXN_ABORT = "txn-abort"; +private static final String TOTAL_TIME_SUFFIX = "-time-ns-total"; + +private final Map tags; +private final Metrics metrics; +private final Sensor initTimeSensor; +private final Sensor beginTxnTimeSensor; +private final Sensor flushTimeSensor; +private final Sensor sendOffsetsSensor; +private final Sensor commitTxnSensor; +private final Sensor abortTxnSensor; + +public KafkaProducerMetrics(Metrics metrics) { +this.metrics = metrics; +tags = this.metrics.config().tags(); +flushTimeSensor = newLatencySensor( +FLUSH, +"Total time producer has spent in flush in nanoseconds." +); +initTimeSensor = newLatencySensor( +TXN_INIT, +"Total time producer has spent in initTransactions in nanoseconds." +); +beginTxnTimeSensor = newLatencySensor( +TXN_BEGIN, +"Total time producer has spent in beginTransaction in nanoseconds." +); +sendOffsetsSensor = newLatencySensor( +TXN_SEND_OFFSETS, +"Total time producer has spent in sendOffsetsToTransaction." Review comment: nit: also add ` in nanoseconds`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java ## @@ -91,6 +94,10 @@ private ThreadMetrics() {} "The fraction of time the thread spent on polling records from consumer"; private static final String COMMIT_RATIO_DESCRIPTION = "The fraction of time the thread spent on committing all tasks"; +private static final String BLOCKED_TIME_DESCRIPTION = +"The total time the thread spent blocked on kafka"; Review comment: ` in nanoseconds`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)
guozhangwang merged pull request #11149: URL: https://github.com/apache/kafka/pull/11149 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406988#comment-17406988 ] A. Sophie Blee-Goldman commented on KAFKA-13249: Thanks for the detailed report! I agree with your analysis, it does appear that we can end up writing stale offsets if the thread is shut down immediately following a "normal" commit that occurs during active processing. And given the small commit interval typical of EOS applications, we almost always perform this kind of commit before breaking out of the StreamThread's processing loop, meaning most of the time we will indeed have just committed all tasks when we get to checking the shutdown signal and entering {{TaskManager.shutdown()}} I'll give your PR a pass -- thanks also for submitting a patch with the bug report :) > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Priority: Major > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
ableegoldman commented on a change in pull request #11283: URL: https://github.com/apache/kafka/pull/11283#discussion_r698868805 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() { protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not -if (commitNeeded) { +if (commitNeeded || enforceCheckpoint) { Review comment: What if we just removed the check altogether? It's not like updating the changelog offsets is a particularly "heavy" call, we may as well future-proof things even more by just updating the offsets any time. In fact, why do we even have this weird split brain logic to begin with...it would make more sense to just update the offsets inside the `StreamTask#maybeWriteCheckpoint` and `stateMgr.checkpoint()` methods, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gitlw opened a new pull request #11285: KAFKA-10548: Implement topic deletion logic with the LeaderAndIsr in KIP-516
gitlw opened a new pull request #11285: URL: https://github.com/apache/kafka/pull/11285 This PR includes the following changes 1. Adding the type field to the LeaderAndIsr request as proposed in KIP-516 2. Letting the controller set the type of LeaderAndIsr requests to be either FULL or INCREMENTAL 3. Allowing topic deletion to complete even with offline brokers 4. Schedule the deletion of replicas with inconsistent topic IDs or not present in the full LeaderAndIsr request Testing Strategy: This PR added two tests 1. testTopicDeletionWithOfflineBrokers: to ensure that topic deletion can proceed even with offline brokers 2. testDeletionOfStrayPartitions: to ensure stray replicas whose topic has been deleted will be removed upon broker startup ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java
[ https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406991#comment-17406991 ] A. Sophie Blee-Goldman commented on KAFKA-13248: Nice find, thanks for the report – would you be interested in submitting a patch for this? > Class name mismatch for LoggerFactory.getLogger method in > TimeOrderedWindowStoreBuilder.java > > > Key: KAFKA-13248 > URL: https://issues.apache.org/jira/browse/KAFKA-13248 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: S. B. Hunter >Priority: Minor > Attachments: KAFKA-13248.1.patch > > > I have noticed a mismatch with the class name passed to the > LoggerFactory.getLogger method. This would make it hard to track the source > of log messages. > public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} > extends AbstractStoreBuilder> { > private final Logger log = > LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class); > private final WindowBytesStoreSupplier storeSupplier; > public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final > WindowBytesStoreSupplier storeSupplier, > final Serde keySerde, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13254) Deadlock when expanding ISR
Jason Gustafson created KAFKA-13254: --- Summary: Deadlock when expanding ISR Key: KAFKA-13254 URL: https://issues.apache.org/jira/browse/KAFKA-13254 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Found this when debugging downgrade system test failures. The patch for https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here are the jstack details: {code} "data-plane-kafka-request-handler-4": waiting for ownable synchronizer 0xfcc00020, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "data-plane-kafka-request-handler-5" "data-plane-kafka-request-handler-5": waiting for ownable synchronizer 0xc9161b20, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), which is held by "data-plane-kafka-request-handler-4" "data-plane-kafka-request-handler-4": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xfcc00020> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264) at kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59) at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907) at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421) at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340) at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340) at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown Source) at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74) at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345) at kafka.cluster.Partition.expandIsr(Partition.scala:1312) at kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755) at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754) at kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672) at kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806) at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown Source) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42) at kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970) at kafka.server.KafkaApis.handle(KafkaApis.scala:173) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(Thread.java:748) "data-plane-kafka-request-handler-5": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc9161b20> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967) at java.util.concurren
[jira] [Updated] (KAFKA-13254) Deadlock when expanding ISR
[ https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13254: Issue Type: Bug (was: Improvement) > Deadlock when expanding ISR > --- > > Key: KAFKA-13254 > URL: https://issues.apache.org/jira/browse/KAFKA-13254 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Found this when debugging downgrade system test failures. The patch for > https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here > are the jstack details: > {code} > "data-plane-kafka-request-handler-4": > > > waiting for ownable synchronizer 0xfcc00020, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > > which is held by "data-plane-kafka-request-handler-5" > > > "data-plane-kafka-request-handler-5": > waiting for ownable synchronizer 0xc9161b20, (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync), > which is held by "data-plane-kafka-request-handler-4" > "data-plane-kafka-request-handler-4": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfcc00020> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at > kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264) > at > kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907) > at > kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340) > at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown > Source) > at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74) > at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345) > at kafka.cluster.Partition.expandIsr(Partition.scala:1312) > at > kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755) > at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754) > at > kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672) > at > kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806) > at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown > Source) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42) > at > kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790) > at > kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025) > at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970) > at kafka.server.KafkaApis.handle(KafkaApis.scala:173) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(Thread.java:748) > "data-plane-kafka-request-handler-5": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc9161b20> (a > java.util.concurrent.locks.ReentrantReadWrit
[jira] [Updated] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr
[ https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13091: Fix Version/s: (was: 3.0.1) > Increment HW after shrinking ISR through AlterIsr > - > > Key: KAFKA-13091 > URL: https://issues.apache.org/jira/browse/KAFKA-13091 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > After we have shrunk the ISR, we have an opportunity to advance the high > watermark. We do this currently in `maybeShrinkIsr` after the synchronous > update through ZK. For the AlterIsr path, however, we cannot rely on this > call since the request is sent asynchronously. Instead we should attempt to > advance the high watermark in the callback when the AlterIsr response returns > successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
hutchiko commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447 @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto trunk/3.0 as see how that goes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hutchiko commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
hutchiko commented on a change in pull request #11283: URL: https://github.com/apache/kafka/pull/11283#discussion_r698898574 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() { protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not -if (commitNeeded) { +if (commitNeeded || enforceCheckpoint) { Review comment: I thought the same thing but not knowing enough about all the streams internals I thought I'd just go with the most minimal change possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
hutchiko commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-908826996 Yeah I've rebased and verified the issue is there in `trunk` too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oliver Hutchison updated KAFKA-13249: - Affects Version/s: 3.0.0 > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Priority: Major > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hutchiko edited a comment on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
hutchiko edited a comment on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447 @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto `trunk` as see how that goes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java
[ https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407070#comment-17407070 ] S. B. Hunter commented on KAFKA-13248: -- I would love to. I already have changed the code on my forked repository. Can I create the PR? https://github.com/sider-bughunter/kafka/commit/fbca00c7656378b6dc43320fc9ce4d3d83b58b4a > Class name mismatch for LoggerFactory.getLogger method in > TimeOrderedWindowStoreBuilder.java > > > Key: KAFKA-13248 > URL: https://issues.apache.org/jira/browse/KAFKA-13248 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: S. B. Hunter >Priority: Minor > Attachments: KAFKA-13248.1.patch > > > I have noticed a mismatch with the class name passed to the > LoggerFactory.getLogger method. This would make it hard to track the source > of log messages. > public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} > extends AbstractStoreBuilder> { > private final Logger log = > LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class); > private final WindowBytesStoreSupplier storeSupplier; > public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final > WindowBytesStoreSupplier storeSupplier, > final Serde keySerde, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rodesai opened a new pull request #11286: add units to metrics descriptions + test fix
rodesai opened a new pull request #11286: URL: https://github.com/apache/kafka/pull/11286 Fix some review feedback and test flakiness from #11149 - adds units to metrics descriptions for total blocked time metrics - fix producer metrics test checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on pull request #11286: add units to metrics descriptions + test fix
rodesai commented on pull request #11286: URL: https://github.com/apache/kafka/pull/11286#issuecomment-908931988 @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected
Anamika Nadkarni created KAFKA-13255: Summary: Mirrormaker config property config.properties.exclude is not working as expected Key: KAFKA-13255 URL: https://issues.apache.org/jira/browse/KAFKA-13255 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.8.0 Reporter: Anamika Nadkarni Objective - Use MM2 (kafka connect in distributed cluster) for data migration between cluster hosted in private data center and aws msk cluster. Steps performed - # Started kafka-connect service. # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and heartbeat connector). Curl commands used to create connectors are in the attached file. To exclude certain config properties while topic replication, we are using the 'config.properties.exclude' property in the MM2 source connector. Expected - Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created in destination cluster. Actual - Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination cluster fails with an error. Error is {code:java} [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic dev.portlandDc.anamika.helloMsk. (org.apache.kafka.connect.mirror.MirrorSourceConnector:371) org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.value.schema.validation{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407116#comment-17407116 ] Anamika Nadkarni commented on KAFKA-13255: -- I keep getting HTTP error 403 when trying to attach file with curl commands to this jira. As a work around I am pasting the file contents below. {code:java} curl -X POST http://10.xx.xx.xx:8083/connectors -H "Content-Type: application/json" -d '{ "name":"mm2-msc", "config":{ "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector", "clusters":"mysource,mydestination", "source.cluster.alias":"mysource", "target.cluster.alias":"mydestination", "replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy", "target.cluster.bootstrap.servers":"b-2.mydestination.amazonaws.com:9096,b-3.mydestination.amazonaws.com:9096,b-1.mydestination.amazonaws.com:9096", "source.cluster.bootstrap.servers":"10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093", "topics":"dev.portlandDc.anamika.helloMsk", "config.properties.exclude":"^confluent\\..*$", "tasks.max":"1", "key.converter":" org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor":"3", "offset-syncs.topic.replication.factor":"3", "sync.topic.acls.interval.seconds":"20", "sync.topic.configs.interval.seconds":"20", "refresh.topics.interval.seconds":"20", "refresh.groups.interval.seconds":"20", "consumer.group.id":"mm2-msc-cons", "producer.enable.idempotence":"true", "source.cluster.security.protocol":"SASL_SSL", "source.cluster.sasl.mechanism":"PLAIN", "source.cluster.ssl.truststore.location":"/home/ec2-user/dc/kafka.client.truststore.jks", "source.cluster.ssl.truststore.password":"x", "source.cluster.ssl.endpoint.identification.algorithm":"", "source.cluster.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"x\" password=\"x\";", "target.cluster.security.protocol":"SASL_SSL", "target.cluster.sasl.mechanism":"SCRAM-SHA-512", "target.cluster.ssl.truststore.location":"/home/ec2-user/kafka_2.13-2.8.0/config/msk/kafka.client.truststore.jks", "target.cluster.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"x\" password=\"x\";" } }' | jq . curl -X POST http://10.xx.xx.xx:8083/connectors -H "Content-Type: application/json" -d '{ "name":"mm2-cpc", "config":{ "connector.class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "clusters":"mysource,mydestination", "source.cluster.alias":"mysource", "target.cluster.alias":"mydestination", "config.properties.exclude":"confluent.value.schema.validation", "config.properties.blacklist":"confluent.value.schema.validation", "target.cluster.bootstrap.servers":"b-2.mydestination.amazonaws.com:9096,b-3.mydestination.amazonaws.com:9096,b-1.mydestination.amazonaws.com:9096", "source.cluster.bootstrap.servers":"10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093", "tasks.max":"1", "key.converter":" org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy", "replication.factor":"3", "sync.topic.configs.enabled":"false", "checkpoints.topic.replication.factor":"3", "emit.checkpoints.interval.seconds":"20", "source.cluster.security.protocol":"SASL_SSL", "source.cluster.sasl.mechanism":"PLAIN", "source.cluster.ssl.truststore.location":"/home/ec2-user/dc/kafka.client.truststore.jks", "source.cluster.ssl.truststore.password":"x", "source.cluster.ssl.endpoint.identification.algorithm":"", "source.cluster.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"x\" password=\"x\";", "target.cluster.security.protocol":"SASL_SSL", "target.cluster.sasl.mechanism":"SCRAM-SHA-512", "target.cluster.ssl.truststore.location":"/home/ec2-user/kafka_2.13-2.8.0/config/msk/kafka.client.truststore.jks", "target.cluster.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"x\" password=\"x\";" } }' | jq . curl -X POST http://10.xx.xx.xx:8083/connectors -H "Content-Type: application/json" -d '{ "name":"mm2-hbc", "config":{ "connector.class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "clusters":"mysource,mydestination", "source.
[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407122#comment-17407122 ] Anamika Nadkarni commented on KAFKA-13255: -- To replicate source topic in destination cluster with no prefix (), I am using custom replication policy while creating source connector. You can generate custom replication policy jar from below link. I am getting 'HTTP - 403 Forbidden' , when I am trying to attach the jar. https://github.com/aws-samples/mirrormaker2-msk-migration/tree/master/CustomMM2ReplicationPolicy > Mirrormaker config property config.properties.exclude is not working as > expected > - > > Key: KAFKA-13255 > URL: https://issues.apache.org/jira/browse/KAFKA-13255 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0 >Reporter: Anamika Nadkarni >Priority: Major > > Objective - Use MM2 (kafka connect in distributed cluster) for data migration > between cluster hosted in private data center and aws msk cluster. > Steps performed - > # Started kafka-connect service. > # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and > heartbeat connector). Curl commands used to create connectors are in the > attached file. To exclude certain config properties while topic replication, > we are using the 'config.properties.exclude' property in the MM2 source > connector. > Expected - > Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created > in destination cluster. > Actual - > Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination > cluster fails with an error. Error is > {code:java} > [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic > dev.portlandDc.anamika.helloMsk. > (org.apache.kafka.connect.mirror.MirrorSourceConnector:371) > org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic > config name: confluent.value.schema.validation{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)