[jira] [Commented] (KAFKA-12963) Improve error message for Class cast exception

2021-08-30 Thread Rasmus Helbig Hansen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406600#comment-17406600
 ] 

Rasmus Helbig Hansen commented on KAFKA-12963:
--

[~ableegoldman] first of all thanks for looking into this and providing a patch.

I do understand that the data flowing into a processor doesn't need to have 
originated from a topic. A punctuator could be sending messages, as an example.

However, in my particular case, where we (mostly) use the high level API, I was 
naively thinking that e.g.
{code:java}
.selectKey((k, v) -> new MyKey(v.getLocation())
.leftJoin(businessCentres, ...)
{code}
 

should be able to say something about the underlying topic, rather than the 
programmer inspecting the low level topology, in order to make a qualified 
guess at where the type misalignment might be.

The inclusion of the processor name is fine for me. I'm happy with that. But I 
can imagine that a class cast exception of types A and B for processor 
KSTREAM-TRANSFORM-16 would be a bit of a head-scratcher for programmers 
getting started with Streams.

 

 

> Improve error message for Class cast exception
> --
>
> Key: KAFKA-12963
> URL: https://issues.apache.org/jira/browse/KAFKA-12963
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Rasmus Helbig Hansen
>Assignee: Andrew Lapidas
>Priority: Minor
> Fix For: 3.1.0
>
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  at 

[GitHub] [kafka] jlprat commented on a change in pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler

2021-08-30 Thread GitBox


jlprat commented on a change in pull request #11228:
URL: https://github.com/apache/kafka/pull/11228#discussion_r698268258



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -509,7 +526,7 @@ private void handleStreamsUncaughtException(final Throwable 
throwable,
 break;
 case SHUTDOWN_CLIENT:
 log.error("Encountered the following exception during 
processing " +
-"and the registered exception handler opted to " + 
action + "." +
+"and Kafka Streams opted to " + action + "." +

Review comment:
   Changed the error log line as mentioned.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -495,9 +498,25 @@ private void replaceStreamThread(final Throwable 
throwable) {
 }
 }
 
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,
+   
 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) 
{
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action;
+// Exception might we wrapped within a StreamsException one

Review comment:
   Updated now, the `if` statement now only checks the wrapped exception if 
it exists, and decides upon that. See next line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler

2021-08-30 Thread GitBox


cadonna commented on a change in pull request #11228:
URL: https://github.com/apache/kafka/pull/11228#discussion_r698280849



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -495,9 +498,23 @@ private void replaceStreamThread(final Throwable 
throwable) {
 }
 }
 
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,
+   
 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) 
{

Review comment:
   ```suggestion
   private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,

   final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -495,9 +498,23 @@ private void replaceStreamThread(final Throwable 
throwable) {
 }
 }
 
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,
+   
 final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) 
{
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action;
+// Exception might we wrapped within a StreamsException one
+if (throwable.getCause() != null && 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS.contains(throwable.getCause().getClass()))
 {

Review comment:
   Could you please put this check in a method with a meaningful name like 
`wrappedExceptionIsIn()`? Then the check would be 
   ```
   if (wrappedExceptionIsIn(EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS))
   ``` 
   and you can remove all other inline comments because the code per se 
contains all needed info.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -163,6 +164,27 @@ public void shouldShutdownClient() throws 
InterruptedException {
 }
 }
 
+
+@Test
+public void shouldShutdownClientWhenIllegalStateException() throws 
InterruptedException {

Review comment:
   Could you please also add a test for the `IllegalArgumentException`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wangxianghu closed pull request #11210: KAFKA-13199: Make Task extends Versioned

2021-08-30 Thread GitBox


wangxianghu closed pull request #11210:
URL: https://github.com/apache/kafka/pull/11210


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler

2021-08-30 Thread GitBox


jlprat commented on pull request #11228:
URL: https://github.com/apache/kafka/pull/11228#issuecomment-908160013


   @cadonna Applied your feedback as well, thanks for the comments. I also 
realized the `AtomicBoolean` variables in the test could be final and added 
that as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13250) 2.4.0

2021-08-30 Thread chaos (Jira)
chaos created KAFKA-13250:
-

 Summary: 2.4.0
 Key: KAFKA-13250
 URL: https://issues.apache.org/jira/browse/KAFKA-13250
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
 Environment: linux 4.1.0
Reporter: chaos


 

 

[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
[2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR 
from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). 
Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)

 

disk errors are:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13251) 2.4.0

2021-08-30 Thread chaos (Jira)
chaos created KAFKA-13251:
-

 Summary: 2.4.0
 Key: KAFKA-13251
 URL: https://issues.apache.org/jira/browse/KAFKA-13251
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
 Environment: linux 4.1.0
Reporter: chaos


 

 

[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
[2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR 
from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). 
Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)

 

disk errors are:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

2021-08-30 Thread GitBox


cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r698327147



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -178,12 +184,48 @@ public void resetProducer() {
 throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
 }
 
+oldProducerTotalBlockedTime += totalBlockedTime(producer);
+final long start = time.nanoseconds();
 producer.close();
+final long closeTime = time.nanoseconds() - start;
+oldProducerTotalBlockedTime += closeTime;
 
 producer = clientSupplier.getProducer(eosV2ProducerConfigs);
 transactionInitialized = false;
 }
 
+private double getMetricValue(final Map 
metrics,
+  final String name) {
+final List found = metrics.keySet().stream()
+.filter(n -> n.name().equals(name))
+.collect(Collectors.toList());
+if (found.isEmpty()) {
+return 0.0;
+}
+if (found.size() > 1) {

Review comment:
   @rodesai I see your point here. However, the downside of not throwing is 
that we will also not notice the bad behavior in our tests like the soak tests. 
I personally prefer to improve tests instead of downgrading the reaction to bad 
behavior. Assume in future somebody makes a change that breaks the assumption 
of the non-shared metrics registry, we would find this bug immediately during 
development instead of during production.
   Another option that comes to my mind is to classify exceptions that 
originate from the metrics framework differently in the uncaught exception 
handler, but that would probably need some more work. 

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2262,6 +2352,14 @@ public void testListOffsetShouldUpateSubscriptions() {
 return newConsumer(time, client, subscription, metadata, assignor, 
false, groupInstanceId);
 }
 
+private KafkaConsumer 
consumerWithPendingAuthenticationError() {
+return consumerWithPendingAuthenticationError(new MockTime());
+}
+
+private KafkaConsumer consumerWithPendingError(final Time 
time) {
+return consumerWithPendingAuthenticationError(time);
+}

Review comment:
   Fair enough!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13251) 2.4.0

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos updated KAFKA-13251:
--
Description: 
Disk error occurred in broker(=42),and then Shrinking ISR,
why Shrinking ISR to error broker ?


 
kafka logs:
broker42:
[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
[2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR 
from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). 
Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)
 




disk error is:

Aug 26 20:20:55 kafka-zk-19 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK

  was:
 

 

[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
[2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR 
from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). 
Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)

 

disk errors are:


> 2.4.0
> -
>
> Key: KAFKA-13251
> URL: https://issues.apache.org/jira/browse/KAFKA-13251
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: linux 4.1.0
>Reporter: chaos
>Priority: Major
>
> Disk error occurred in broker(=42),and then Shrinking ISR,
> why Shrinking ISR to error broker ?
>  
> kafka logs:
> broker42:
> [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
> fetch with max size 1048576 from consumer on partition topic_xx-123: 
> (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
> currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
> smaller than minimum record overhead (14) in file 
> /data4/kafka-logs/topic_xx-123/011060934646.log.
> [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,645] ERROR Error while deleting segments for 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureC

[jira] [Updated] (KAFKA-13251) 2.4.0

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos updated KAFKA-13251:
--
Description: 
Disk error occurred in broker(=42),and then Shrinking ISR to itself.
 so why Shrinking ISR to an error broker?

i.e.  why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 to 
42".


 kafka logs:
 broker42:

[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
 org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
 [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)
  

broker55:
 [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, 
epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread)

disk error on broker42 is:
 Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK

  was:
Disk error occurred in broker(=42),and then Shrinking ISR,
why Shrinking ISR to error broker ?


 
kafka logs:
broker42:
[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
[2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
[2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
[2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking ISR 
from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 11061228965). 
Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)
 




disk error is:

Aug 26 20:20:55 kafka-zk-19 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK


> 2.4.0
> -
>
> Key: KAFKA-13251
> URL: https://issues.apache.org/jira/browse/KAFKA-13251
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: linux 4.1.0
>Reporter: chaos
>Priority: Major
>
> Disk error occurred in broker(=42),and then Shrinking ISR to itself.
>  so why Shrinking ISR to an error broker?
> i.e.  why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 
> to 42".
>  kafka logs:
>  broker42:
> [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
> fetch with max size 1048576 from consumer on partition topic_xx-123: 
> (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576,

[jira] [Updated] (KAFKA-13251) 2.4.0

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos updated KAFKA-13251:
--
Component/s: core

> 2.4.0
> -
>
> Key: KAFKA-13251
> URL: https://issues.apache.org/jira/browse/KAFKA-13251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
> Environment: linux 4.1.0
>Reporter: chaos
>Priority: Major
>
> Disk error occurred in broker(=42),and then Shrinking ISR to itself.
>  so why Shrinking ISR to an error broker?
> i.e.  why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 
> to 42".
>  kafka logs:
>  broker42:
> [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
> fetch with max size 1048576 from consumer on partition topic_xx-123: 
> (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
> currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
>  org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
> smaller than minimum record overhead (14) in file 
> /data4/kafka-logs/topic_xx-123/011060934646.log.
>  [2021-08-26 20:20:55,640] ERROR Error while appending records to 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
>  [2021-08-26 20:20:55,645] ERROR Error while deleting segments for 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
>  java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
>  Suppressed: java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
>  [2021-08-26 20:20:55,644] ERROR Error while appending records to 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
>  [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
> ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
> 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
> (kafka.cluster.Partition)
>   
> broker55:
>  [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, 
> epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread)
> disk error on broker42 is:
>  Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
> hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13251) 2.4.0

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos updated KAFKA-13251:
--
Description: 
Disk error occurred in broker(=42),and then Shrinking ISR to itself.
 so why Shrinking ISR to an error broker?

i.e.  why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 to 
42".

note:

other partition(110) shrink correctly.

 

kafka logs:
 broker42:

[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
 org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
 [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)
  

broker55:
 [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, 
epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread)

[2021-08-26 20:20:43,503] INFO [Partition topic_xxx-110 broker=55] Shrinking 
ISR from 55,42 to 55. Leader: (highWatermark: 11061384367, endOffset: 
11061388788). Out of sync replicas: (brokerId: 42, endOffset: 11061384367). 
(kafka.cluster.Partition)

 

disk error on broker42 is:
 Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK

  was:
Disk error occurred in broker(=42),and then Shrinking ISR to itself.
 so why Shrinking ISR to an error broker?

i.e.  why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 to 
42".


 kafka logs:
 broker42:

[2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
fetch with max size 1048576 from consumer on partition topic_xx-123: 
(fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
 org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
smaller than minimum record overhead (14) in file 
/data4/kafka-logs/topic_xx-123/011060934646.log.
 [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 [2021-08-26 20:20:55,645] ERROR Error while deleting segments for topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 Suppressed: java.nio.file.FileSystemException: 
/data4/kafka-logs/topic_xx-123/011040402299.log -> 
/data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only file 
system
 [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
 [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
(kafka.cluster.Partition)
  

broker55:
 [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, 
epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread)

disk error on broker42 is:
 Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
hostbyte=DID_BAD_T

[jira] [Created] (KAFKA-13252) ISR shrink to an error broker

2021-08-30 Thread chaos (Jira)
chaos created KAFKA-13252:
-

 Summary: ISR shrink to an error broker
 Key: KAFKA-13252
 URL: https://issues.apache.org/jira/browse/KAFKA-13252
 Project: Kafka
  Issue Type: Bug
Reporter: chaos






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13251) ISR shrink to an error broker

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos updated KAFKA-13251:
--
Summary: ISR shrink to an error broker  (was: 2.4.0)

> ISR shrink to an error broker
> -
>
> Key: KAFKA-13251
> URL: https://issues.apache.org/jira/browse/KAFKA-13251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
> Environment: linux 4.1.0
>Reporter: chaos
>Priority: Major
>
> Disk error occurred in broker(=42),and then Shrinking ISR to itself.
>  so why Shrinking ISR to an error broker?
> i.e.  why not "Shrinking ISR from 55,42 to 55" but "Shrinking ISR from 55,42 
> to 42".
> note:
> other partition(110) shrink correctly.
>  
> kafka logs:
>  broker42:
> [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
> fetch with max size 1048576 from consumer on partition topic_xx-123: 
> (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
> currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
>  org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
> smaller than minimum record overhead (14) in file 
> /data4/kafka-logs/topic_xx-123/011060934646.log.
>  [2021-08-26 20:20:55,640] ERROR Error while appending records to 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
>  [2021-08-26 20:20:55,645] ERROR Error while deleting segments for 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
>  java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
>  Suppressed: java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
>  [2021-08-26 20:20:55,644] ERROR Error while appending records to 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
>  [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
> ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
> 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
> (kafka.cluster.Partition)
>   
> broker55:
>  [2021-08-26 20:20:32,456] WARN [ReplicaFetcher replicaId=55, leaderId=42, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=55, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=830774713, 
> epoch=1562806014), rackId=) (kafka.server.ReplicaFetcherThread)
> [2021-08-26 20:20:43,503] INFO [Partition topic_xxx-110 broker=55] Shrinking 
> ISR from 55,42 to 55. Leader: (highWatermark: 11061384367, endOffset: 
> 11061388788). Out of sync replicas: (brokerId: 42, endOffset: 11061384367). 
> (kafka.cluster.Partition)
>  
> disk error on broker42 is:
>  Aug 26 20:20:55 kernel: sd 0:2:5:0: [sdf] tag#33 FAILED Result: 
> hostbyte=DID_BAD_TARGET driverbyte=DRIVER_OK



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13250) 2.4.0

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos resolved KAFKA-13250.
---
Resolution: Duplicate

> 2.4.0
> -
>
> Key: KAFKA-13250
> URL: https://issues.apache.org/jira/browse/KAFKA-13250
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: linux 4.1.0
>Reporter: chaos
>Priority: Major
>
>  
>  
> [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
> fetch with max size 1048576 from consumer on partition topic_xx-123: 
> (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
> currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
> smaller than minimum record overhead (14) in file 
> /data4/kafka-logs/topic_xx-123/011060934646.log.
> [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,645] ERROR Error while deleting segments for 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
>  Suppressed: java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
> [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
> ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
> 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
> (kafka.cluster.Partition)
>  
> disk errors are:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13252) ISR shrink to an error broker

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos resolved KAFKA-13252.
---
Resolution: Duplicate

> ISR shrink to an error broker
> -
>
> Key: KAFKA-13252
> URL: https://issues.apache.org/jira/browse/KAFKA-13252
> Project: Kafka
>  Issue Type: Bug
>Reporter: chaos
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-08-30 Thread GitBox


vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r698416259



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PersistentSessionStore.java
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.SessionStore;
+
+public interface PersistentSessionStore  extends SessionStore {

Review comment:
   done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PersistentWindowStore.java
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.WindowStore;
+
+public interface PersistentWindowStore  extends WindowStore  {

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-08-30 Thread GitBox


vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r698417779



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -217,6 +230,24 @@ public V fetch(final K key,
 final long timeFrom,
 final long timeTo) {
 Objects.requireNonNull(key, "key cannot be null");
+
+if (wrapped().persistent()) {
+
+final long actualWindowStartTime = 
getActualWindowStartTime(timeFrom);
+
+if (timeTo < actualWindowStartTime) {
+return KeyValueIterators.emptyWindowStoreIterator();
+}
+
+return new MeteredWindowStoreIterator<>(
+wrapped().fetch(keyBytes(key), actualWindowStartTime, timeTo),
+fetchSensor,
+streamsMetrics,
+serdes,
+time
+);

Review comment:
   For the most part, only getting the actual start time and returning 
empty iterator are duplicate. The MeteredWindowStoreIterator object is built 
using a different method call every time. Do you think the first part needs to 
be abstracted out to a different method? That might also differ across 
different methods..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-08-30 Thread GitBox


vamossagar12 commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-908275012


   @showuon , is there an existing integration test for Metered classes? I 
tried to find one to add the relevant tests but couldnt't find one..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-13250) 2.4.0

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos closed KAFKA-13250.
-

duplicated

> 2.4.0
> -
>
> Key: KAFKA-13250
> URL: https://issues.apache.org/jira/browse/KAFKA-13250
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: linux 4.1.0
>Reporter: chaos
>Priority: Major
>
>  
>  
> [2021-08-26 20:20:55,640] ERROR [ReplicaManager broker=42] Error processing 
> fetch with max size 1048576 from consumer on partition topic_xx-123: 
> (fetchOffset=11061228956, logStartOffset=-1, maxBytes=1048576, 
> currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 
> smaller than minimum record overhead (14) in file 
> /data4/kafka-logs/topic_xx-123/011060934646.log.
> [2021-08-26 20:20:55,640] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,645] ERROR Error while deleting segments for 
> topic_xx-123 in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
>  Suppressed: java.nio.file.FileSystemException: 
> /data4/kafka-logs/topic_xx-123/011040402299.log -> 
> /data4/kafka-logs/topic_xx-123/011040402299.log.deleted: Read-only 
> file system
> [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,644] ERROR Error while appending records to topic_xx-123 
> in dir /data4/kafka-logs (kafka.server.LogDirFailureChannel)
> [2021-08-26 20:20:55,652] INFO [Partition topic_xx-123 broker=42] Shrinking 
> ISR from 55,42 to 42. Leader: (highWatermark: 11061228956, endOffset: 
> 11061228965). Out of sync replicas: (brokerId: 55, endOffset: 11061228956). 
> (kafka.cluster.Partition)
>  
> disk errors are:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-13252) ISR shrink to an error broker

2021-08-30 Thread chaos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaos closed KAFKA-13252.
-

duplicated

> ISR shrink to an error broker
> -
>
> Key: KAFKA-13252
> URL: https://issues.apache.org/jira/browse/KAFKA-13252
> Project: Kafka
>  Issue Type: Bug
>Reporter: chaos
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread David Dufour (Jira)
David Dufour created KAFKA-13253:


 Summary: Kafka Connect losing task (re)configuration when 
connector name has special characters
 Key: KAFKA-13253
 URL: https://issues.apache.org/jira/browse/KAFKA-13253
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.7.1
Reporter: David Dufour


When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
http://10.224.0.15:8083/connectors/mirror1-cluster->mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}} catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {

log.error("IO error forwarding REST request: ", e);

throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error 
trying to forward REST request: " + e.getMessage(), } finally {
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread David Dufour (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Dufour updated KAFKA-13253:
-
Description: 
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e)

{ log.error("IO error forwarding REST request: ", e); throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to 
forward REST request: " + e.getMessage(), }

finally {
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily

  was:
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
http://10.224.0.15:8083/connectors/mirror1-cluster->mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}} catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {

log.error("IO error forwarding REST request: ", e);

throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error 
trying to forward REST request: " + e.getMessage(), } finally {
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily


> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the connector name contains some special characters, 

[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread David Dufour (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Dufour updated KAFKA-13253:
-
Description: 
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {
    log.error("IO error forwarding REST request: ", e);
    throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO 
Error trying to forward REST request: " + e.getMessage(), e);
} finally {
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily

  was:
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e)

{ log.error("IO error forwarding REST request: ", e); throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to 
forward REST request: " + e.getMessage(), }

finally {
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily


> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the connector name contains some special cha

[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread David Dufour (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Dufour updated KAFKA-13253:
-
Description: 
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}{{catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {}}
    log.error("IO error forwarding REST request: ", e);
    {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO 
Error trying to forward REST request: " + e.getMessage(), e);}}

{{} finally {}}
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily

  was:
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}{{catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {}}
{{    log.error("IO error forwarding REST request: ", e);
}}{{    throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, 
"IO Error trying to forward REST request: " + e.getMessage(), e);}}{{} finally 
{}}{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily


> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the con

[jira] [Updated] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread David Dufour (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Dufour updated KAFKA-13253:
-
Description: 
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}{{catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {}}
{{    log.error("IO error forwarding REST request: ", e);
}}{{    throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, 
"IO Error trying to forward REST request: " + e.getMessage(), e);}}{{} finally 
{}}{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily

  was:
When not leader, DistributedHerder.reconfigureConnector() forwards the task 
configuration to the leader as follow:
{quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
connName + "/tasks");

log.trace("Forwarding task configurations for connector {} to leader", 
connName);

RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
sessionKey, requestSignatureAlgorithm);
{quote}
The problem is that if the connector name contains some special characters, 
such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
uncatched exception is raised in RestClient and the forward is lost.

Here is the kind of exception we can catch by adding the necessary code in 
RestClient:
{quote}java.lang.IllegalArgumentException: Illegal character in path at index 
51: 
[http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
{quote}
An additional catch() should be added in RestClient.httpRequest(), here:
{quote}catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {
    log.error("IO error forwarding REST request: ", e);
    throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO 
Error trying to forward REST request: " + e.getMessage(), e);
} finally {
{quote}
to catch all other exceptions because without, this kind of problem is 
completly silent.

To reproduce:
 * start 2 kafka clusters
 * start a kafka connect (distributed) with at least 2 nodes
 * start an HeartbeatConnector with name "cluster1->cluster2"

If the node which generated the task is not the leader (not systematic), it 
will forward the creation to the leader and it will be lost. As a result, the 
connector will stay in RUNNING state but without any task.

Problem not easy to reproduce, it is important to start with empty connect 
topics to reproduce more easily


> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the connector name c

[GitHub] [kafka] dajac commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests

2021-08-30 Thread GitBox


dajac commented on pull request #11073:
URL: https://github.com/apache/kafka/pull/11073#issuecomment-908364578


   @lbradstreet @junrao Could you take a look at this one as you both reviewed 
the original PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler

2021-08-30 Thread GitBox


jlprat commented on pull request #11228:
URL: https://github.com/apache/kafka/pull/11228#issuecomment-908365423


   Test failure is related to https://issues.apache.org/jira/browse/KAFKA-8138 
but not the same, as in this particular instance it seems more like a failure 
in the test itself (topic already exists)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

2021-08-30 Thread GitBox


junrao merged pull request #11060:
URL: https://github.com/apache/kafka/pull/11060


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-13253.
--
Resolution: Duplicate

Same issue as KAFKA-9747 - that one already has a fix under review

> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the connector name contains some special characters, 
> such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
> uncatched exception is raised in RestClient and the forward is lost.
> Here is the kind of exception we can catch by adding the necessary code in 
> RestClient:
> {quote}java.lang.IllegalArgumentException: Illegal character in path at index 
> 51: 
> [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
> {quote}
> An additional catch() should be added in RestClient.httpRequest(), here:
> {quote}{{catch (IOException | InterruptedException | TimeoutException | 
> ExecutionException e) {}}
>     log.error("IO error forwarding REST request: ", e);
>     {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, 
> "IO Error trying to forward REST request: " + e.getMessage(), e);}}
> {{} finally {}}
> {quote}
> to catch all other exceptions because without, this kind of problem is 
> completly silent.
> To reproduce:
>  * start 2 kafka clusters
>  * start a kafka connect (distributed) with at least 2 nodes
>  * start an HeartbeatConnector with name "cluster1->cluster2"
> If the node which generated the task is not the leader (not systematic), it 
> will forward the creation to the leader and it will be lost. As a result, the 
> connector will stay in RUNNING state but without any task.
> Problem not easy to reproduce, it is important to start with empty connect 
> topics to reproduce more easily



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-30 Thread GitBox


junrao commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r698652526



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partitions are affected, inform brokers of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty || 
(newOfflineReplicasNotForDeletion.isEmpty && 
partitionsWithOfflineLeader.isEmpty)) {

Review comment:
   On second thought, I think this change still doesn't cover all the 
cases. The main issue is that in 
partitionStateMachine.triggerOnlinePartitionStateChange(), we only send 
updateMetadataRequest for partitions with successful leader change. If no 
partition has successful leader change, no updateMetadataRequest will be sent. 
The reason could be that newOfflineReplicas is empty, but it could also be that 
the leader can't be elected for other reasons (e.g. no replica is live). 
   
   In PartitionStateMachine.doElectLeaderForPartitions(), we have 3 categories 
of partitions, finishedUpdates, failedElections and updatesToRetry. Perhaps we 
could return finishedUpdates all the way to 
partitionStateMachine.triggerOnlinePartitionStateChange() and force a send of 
updateMetadataRequest if finishedUpdates is empty.





-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-30 Thread GitBox


splett2 commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r698693107



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partitions are affected, inform brokers of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty || 
(newOfflineReplicasNotForDeletion.isEmpty && 
partitionsWithOfflineLeader.isEmpty)) {

Review comment:
   I think that makes sense. Do we need to do something similar for both 
`partitionStateMachine` calls?
   `handleStateChanges` already returns a `Map[TopicPartition, 
Either[Throwable, LeaderAndIsr]]` so it seems like we could just look to see if 
there are any values with `LeaderAndIsr` and that would imply that we sent out 
`updateMetadataRequest` as part of Online/Offline state changes. and likewise, 
we can have `triggerOnlinePartitionStateChange` return the same results.
   
   I am thinking something along the lines of:
   ```
   // trigger OfflinePartition state for all partitions whose current 
leader is one amongst the newOfflineReplicas
   val offlineStateChangeResults = 
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, 
OfflinePartition)
   // trigger OnlinePartition state changes for offline or new partitions
   val onlineStateChangeResults = 
partitionStateMachine.triggerOnlinePartitionStateChange()
   
   ...
   val leaderElectionSucceeded = (offlineStateChangeResults ++ 
onlineStateChangeResults).find(_)
   // (!leaderElectionSucceeded && newOfflineReplicasForDeletion.isEmpty) {
 
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set.empty)
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-30 Thread GitBox


splett2 commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r698693107



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partitions are affected, inform brokers of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty || 
(newOfflineReplicasNotForDeletion.isEmpty && 
partitionsWithOfflineLeader.isEmpty)) {

Review comment:
   I think that makes sense. Do we need to do something similar for both 
`partitionStateMachine` calls?
   `handleStateChanges` already returns a `Map[TopicPartition, 
Either[Throwable, LeaderAndIsr]]` so it seems like we could just look to see if 
there are any values with `LeaderAndIsr` and that would imply that we sent out 
`updateMetadataRequest` as part of Online/Offline state changes. and likewise, 
we can have `triggerOnlinePartitionStateChange` return the same results.
   
   I am thinking something along the lines of:
   ```
   // trigger OfflinePartition state for all partitions whose current 
leader is one amongst the newOfflineReplicas
   val offlineStateChangeResults = 
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, 
OfflinePartition)
   // trigger OnlinePartition state changes for offline or new partitions
   val onlineStateChangeResults = 
partitionStateMachine.triggerOnlinePartitionStateChange()
   
   ...
   val leaderElectionSucceeded = (offlineStateChangeResults ++ 
onlineStateChangeResults).find(etc)
   // (!leaderElectionSucceeded && newOfflineReplicasForDeletion.isEmpty) {
 
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set.empty)
   }
   ```

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partitions are affected, inform brokers of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty || 
(newOfflineReplicasNotForDeletion.isEmpty && 
partitionsWithOfflineLeader.isEmpty)) {

Review comment:
   I think that makes sense. Do we need to do something similar for both 
`partitionStateMachine` calls?
   `handleStateChanges` already returns a `Map[TopicPartition, 
Either[Throwable, LeaderAndIsr]]` so it seems like we could just look to see if 
there are any values with `LeaderAndIsr` and that would imply that we sent out 
`updateMetadataRequest` as part of Online/Offline state changes. and likewise, 
we can have `triggerOnlinePartitionStateChange` return the same results.
   
   I am thinking something along the lines of:
   ```
   // trigger OfflinePartition state for all partitions whose current 
leader is one amongst the newOfflineReplicas
   val offlineStateChangeResults = 
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, 
OfflinePartition)
   // trigger OnlinePartition state changes for offline or new partitions
   val onlineStateChangeResults = 
partitionStateMachine.triggerOnlinePartitionStateChange()
   
   ...
   val leaderElectionSucceeded = (offlineStateChangeResults ++ 
onlineStateChangeResults).find(etc, etc)
   // (!leaderElectionSucceeded && newOfflineReplicasForDeletion.isEmpty) {
 
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set.empty)
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue opened a new pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-08-30 Thread GitBox


kirktrue opened a new pull request #11284:
URL: https://github.com/apache/kafka/pull/11284


   This task is to provide a concrete implementation of the interfaces defined 
in KIP-255 to allow Kafka to connect to an OAuth/OIDC identity provider for 
authentication and token retrieval. While KIP-255 provides an unsecured JWT 
example for development, this will fill in the gap and provide a 
production-grade implementation.
   
   The OAuth/OIDC work will allow out-of-the-box configuration by any Apache 
Kafka users to connect to an external identity provider service (e.g. Okta, 
Auth0, Azure, etc.). The code will implement the standard OAuth 
clientcredentials grant type.
   
   The proposed change is largely composed of a pair of 
AuthenticateCallbackHandler implementations: one to login on the client and one 
to validate on the broker.
   
   See the following for more detail:
   
   * 
[KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575)
   * [KAFKA-13202](https://issues.apache.org/jira/browse/KAFKA-13202)
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-30 Thread GitBox


junrao commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r698849242



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,9 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partitions are affected, inform brokers of the offline brokers
 // Note that during leader re-election, brokers update their metadata
-if (partitionsWithOfflineLeader.isEmpty) {
+if (newOfflineReplicas.isEmpty || 
(newOfflineReplicasNotForDeletion.isEmpty && 
partitionsWithOfflineLeader.isEmpty)) {

Review comment:
   I think we only need to check the return result of 
`partitionStateMachine.triggerOnlinePartitionStateChange()`. 
`partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, 
OfflinePartition)` doesn't generate any  updateMetadataRequest.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-08-30 Thread GitBox


ableegoldman commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908748576


   Hey @hutchiko , thanks for digging into this bug so thoroughly and providing 
a patch! One quick question before I review -- does this only affect version 
2.8 or below specifically, or could this be present on trunk/3.0 as well? 
Unless you already checked this, my guess would be the latter. If you can 
verify that is true, then can you please retarget this PR against the `trunk` 
branch? Once it gets merged then we can cherrypick back to 2.8 from there. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS

2021-08-30 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13249:
---
Affects Version/s: (was: 2.7.1)

> Checkpoints do not contain latest offsets on shutdown when using EOS
> 
>
> Key: KAFKA-13249
> URL: https://issues.apache.org/jira/browse/KAFKA-13249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Oliver Hutchison
>Priority: Major
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app 
> is shutdown does not always contain changelog offsets which represent the 
> latest state of the state store. The offsets can often be behind the end of 
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts 
> after shutting down cleanly as streams thinks (based on the incorrect offsets 
> in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single 
> instance streams app from around 10 second to sometime over 2 minutes in our 
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} 
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last 
> commit
>   // and hence we need to refresh checkpointable offsets regardless whether 
> we should checkpoint or not
>   if (commitNeeded) {
> stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app 
> where an app simply starts, runs and then shuts down the {{if 
> (commitNeeded)}} test always fails when running with EOS which results in the 
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this 
> is the case as there's only 1 place in the code which calls 
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final 
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} 
> state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
> maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", 
> state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to 
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as 
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? 
> Again looking only at the steady state case we find that it's only called 
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from 
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it 
> happens *after* all active tasks have commited. Which means that 
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test 
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the 
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to 
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always 
> update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

2021-08-30 Thread GitBox


guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r698854001



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+public static final String GROUP = "producer-metrics";
+private static final String FLUSH = "flush";
+private static final String TXN_INIT = "txn-init";
+private static final String TXN_BEGIN = "txn-begin";
+private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+private static final String TXN_COMMIT = "txn-commit";
+private static final String TXN_ABORT = "txn-abort";
+private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
+
+private final Map tags;
+private final Metrics metrics;
+private final Sensor initTimeSensor;
+private final Sensor beginTxnTimeSensor;
+private final Sensor flushTimeSensor;
+private final Sensor sendOffsetsSensor;
+private final Sensor commitTxnSensor;
+private final Sensor abortTxnSensor;
+
+public KafkaProducerMetrics(Metrics metrics) {
+this.metrics = metrics;
+tags = this.metrics.config().tags();
+flushTimeSensor = newLatencySensor(
+FLUSH,
+"Total time producer has spent in flush in nanoseconds."
+);
+initTimeSensor = newLatencySensor(
+TXN_INIT,
+"Total time producer has spent in initTransactions in nanoseconds."
+);
+beginTxnTimeSensor = newLatencySensor(
+TXN_BEGIN,
+"Total time producer has spent in beginTransaction in nanoseconds."
+);
+sendOffsetsSensor = newLatencySensor(
+TXN_SEND_OFFSETS,
+"Total time producer has spent in sendOffsetsToTransaction."

Review comment:
   nit: also add ` in nanoseconds`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
##
@@ -91,6 +94,10 @@ private ThreadMetrics() {}
 "The fraction of time the thread spent on polling records from 
consumer";
 private static final String COMMIT_RATIO_DESCRIPTION =
 "The fraction of time the thread spent on committing all tasks";
+private static final String BLOCKED_TIME_DESCRIPTION =
+"The total time the thread spent blocked on kafka";

Review comment:
   ` in nanoseconds`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11149: KAFKA-13229: add total blocked time metric to streams (KIP-761)

2021-08-30 Thread GitBox


guozhangwang merged pull request #11149:
URL: https://github.com/apache/kafka/pull/11149


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS

2021-08-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406988#comment-17406988
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13249:


Thanks for the detailed report! I agree with your analysis, it does appear that 
we can end up writing stale offsets if the thread is shut down immediately 
following a "normal" commit that occurs during active processing. And given the 
small commit interval typical of EOS applications, we almost always perform 
this kind of commit before breaking out of the StreamThread's processing loop, 
meaning most of the time we will indeed have just committed all tasks when we 
get to checking the shutdown signal and entering {{TaskManager.shutdown()}}

I'll give your PR a pass -- thanks also for submitting a patch with the bug 
report :) 

> Checkpoints do not contain latest offsets on shutdown when using EOS
> 
>
> Key: KAFKA-13249
> URL: https://issues.apache.org/jira/browse/KAFKA-13249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Oliver Hutchison
>Priority: Major
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app 
> is shutdown does not always contain changelog offsets which represent the 
> latest state of the state store. The offsets can often be behind the end of 
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts 
> after shutting down cleanly as streams thinks (based on the incorrect offsets 
> in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single 
> instance streams app from around 10 second to sometime over 2 minutes in our 
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} 
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last 
> commit
>   // and hence we need to refresh checkpointable offsets regardless whether 
> we should checkpoint or not
>   if (commitNeeded) {
> stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app 
> where an app simply starts, runs and then shuts down the {{if 
> (commitNeeded)}} test always fails when running with EOS which results in the 
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this 
> is the case as there's only 1 place in the code which calls 
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final 
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} 
> state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
> maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", 
> state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to 
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as 
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? 
> Again looking only at the steady state case we find that it's only called 
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from 
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it 
> happens *after* all active tasks have commited. Which means that 
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test 
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the 
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to 
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always 
> update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-08-30 Thread GitBox


ableegoldman commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698868805



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
 protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
 // commitNeeded indicates we may have processed some records since 
last commit
 // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
-if (commitNeeded) {
+if (commitNeeded || enforceCheckpoint) {

Review comment:
   What if we just removed the check altogether? It's not like updating the 
changelog offsets is a particularly "heavy" call, we may as well future-proof 
things even more by just updating the offsets any time.
   
   In fact, why do we even have this weird split brain logic to begin with...it 
would make more sense to just update the offsets inside the 
`StreamTask#maybeWriteCheckpoint` and `stateMgr.checkpoint()` methods, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gitlw opened a new pull request #11285: KAFKA-10548: Implement topic deletion logic with the LeaderAndIsr in KIP-516

2021-08-30 Thread GitBox


gitlw opened a new pull request #11285:
URL: https://github.com/apache/kafka/pull/11285


   This PR includes the following changes
   1. Adding the type field to the LeaderAndIsr request as proposed in KIP-516
   2. Letting the controller set the type of LeaderAndIsr requests to be either 
FULL or INCREMENTAL
   3. Allowing topic deletion to complete even with offline brokers
   4. Schedule the deletion of replicas with inconsistent topic IDs or not 
present in the full LeaderAndIsr request
   
   Testing Strategy:
   This PR added two tests 
   1. testTopicDeletionWithOfflineBrokers: to ensure that topic deletion can 
proceed even with offline brokers 
   2. testDeletionOfStrayPartitions: to ensure stray replicas whose topic has 
been deleted will be removed upon broker startup
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java

2021-08-30 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406991#comment-17406991
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13248:


Nice find, thanks for the report – would you be interested in submitting a 
patch for this?

> Class name mismatch for LoggerFactory.getLogger method in 
> TimeOrderedWindowStoreBuilder.java
> 
>
> Key: KAFKA-13248
> URL: https://issues.apache.org/jira/browse/KAFKA-13248
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: S. B. Hunter
>Priority: Minor
> Attachments: KAFKA-13248.1.patch
>
>
> I have noticed a mismatch with the class name passed to the 
> LoggerFactory.getLogger method. This would make it hard to track the source 
> of log messages.
> public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} 
> extends AbstractStoreBuilder> {
>  private final Logger log = 
> LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class);
> private final WindowBytesStoreSupplier storeSupplier;
> public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final 
> WindowBytesStoreSupplier storeSupplier,
>  final Serde keySerde,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13254) Deadlock when expanding ISR

2021-08-30 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13254:
---

 Summary: Deadlock when expanding ISR
 Key: KAFKA-13254
 URL: https://issues.apache.org/jira/browse/KAFKA-13254
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Found this when debugging downgrade system test failures. The patch for 
https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here 
are the jstack details:
{code}
"data-plane-kafka-request-handler-4":   

   
  waiting for ownable synchronizer 0xfcc00020, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),  
 
  which is held by "data-plane-kafka-request-handler-5" 

   

"data-plane-kafka-request-handler-5":
  waiting for ownable synchronizer 0xc9161b20, (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "data-plane-kafka-request-handler-4"

"data-plane-kafka-request-handler-4":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xfcc00020> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at 
kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
at 
kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
at 
kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown Source)
at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
at 
kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
at kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
at 
kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown 
Source)
at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
at 
kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
at java.lang.Thread.run(Thread.java:748)

"data-plane-kafka-request-handler-5":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc9161b20> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at 
java.util.concurren

[jira] [Updated] (KAFKA-13254) Deadlock when expanding ISR

2021-08-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13254:

Issue Type: Bug  (was: Improvement)

> Deadlock when expanding ISR
> ---
>
> Key: KAFKA-13254
> URL: https://issues.apache.org/jira/browse/KAFKA-13254
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Found this when debugging downgrade system test failures. The patch for 
> https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here 
> are the jstack details:
> {code}
> "data-plane-kafka-request-handler-4": 
>   
>
>   waiting for ownable synchronizer 0xfcc00020, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>
>   which is held by "data-plane-kafka-request-handler-5"   
>   
>
> "data-plane-kafka-request-handler-5":
>   waiting for ownable synchronizer 0xc9161b20, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "data-plane-kafka-request-handler-4"
> "data-plane-kafka-request-handler-4":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xfcc00020> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at 
> kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
> at 
> kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
> at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
> at 
> kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
> at 
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
> at 
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
> at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown 
> Source)
> at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
> at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
> at 
> kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
> at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
> at 
> kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
> at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
> at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
> at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
> at 
> kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
> at 
> kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
> at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-request-handler-5":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xc9161b20> (a 
> java.util.concurrent.locks.ReentrantReadWrit

[jira] [Updated] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr

2021-08-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13091:

Fix Version/s: (was: 3.0.1)

> Increment HW after shrinking ISR through AlterIsr
> -
>
> Key: KAFKA-13091
> URL: https://issues.apache.org/jira/browse/KAFKA-13091
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> After we have shrunk the ISR, we have an opportunity to advance the high 
> watermark. We do this currently in `maybeShrinkIsr` after the synchronous 
> update through ZK. For the AlterIsr path, however, we cannot rely on this 
> call since the request is sent asynchronously. Instead we should attempt to 
> advance the high watermark in the callback when the AlterIsr response returns 
> successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-08-30 Thread GitBox


hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447


   @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto 
trunk/3.0 as see how that goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hutchiko commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-08-30 Thread GitBox


hutchiko commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698898574



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
 protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
 // commitNeeded indicates we may have processed some records since 
last commit
 // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
-if (commitNeeded) {
+if (commitNeeded || enforceCheckpoint) {

Review comment:
   I thought the same thing but not knowing enough about all the streams 
internals I thought I'd just go with the most minimal change possible. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hutchiko commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-08-30 Thread GitBox


hutchiko commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908826996


   Yeah I've rebased and verified the issue is there in `trunk` too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS

2021-08-30 Thread Oliver Hutchison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oliver Hutchison updated KAFKA-13249:
-
Affects Version/s: 3.0.0

> Checkpoints do not contain latest offsets on shutdown when using EOS
> 
>
> Key: KAFKA-13249
> URL: https://issues.apache.org/jira/browse/KAFKA-13249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.7.0, 2.8.0
>Reporter: Oliver Hutchison
>Priority: Major
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app 
> is shutdown does not always contain changelog offsets which represent the 
> latest state of the state store. The offsets can often be behind the end of 
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts 
> after shutting down cleanly as streams thinks (based on the incorrect offsets 
> in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single 
> instance streams app from around 10 second to sometime over 2 minutes in our 
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} 
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last 
> commit
>   // and hence we need to refresh checkpointable offsets regardless whether 
> we should checkpoint or not
>   if (commitNeeded) {
> stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app 
> where an app simply starts, runs and then shuts down the {{if 
> (commitNeeded)}} test always fails when running with EOS which results in the 
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this 
> is the case as there's only 1 place in the code which calls 
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final 
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} 
> state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
> maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", 
> state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to 
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as 
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? 
> Again looking only at the steady state case we find that it's only called 
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from 
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it 
> happens *after* all active tasks have commited. Which means that 
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test 
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the 
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to 
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always 
> update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hutchiko edited a comment on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-08-30 Thread GitBox


hutchiko edited a comment on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-908804447


   @ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto 
`trunk` as see how that goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java

2021-08-30 Thread S. B. Hunter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407070#comment-17407070
 ] 

S. B. Hunter commented on KAFKA-13248:
--

I would love to. I already have changed the code on my forked repository. Can I 
create the PR?
https://github.com/sider-bughunter/kafka/commit/fbca00c7656378b6dc43320fc9ce4d3d83b58b4a

> Class name mismatch for LoggerFactory.getLogger method in 
> TimeOrderedWindowStoreBuilder.java
> 
>
> Key: KAFKA-13248
> URL: https://issues.apache.org/jira/browse/KAFKA-13248
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: S. B. Hunter
>Priority: Minor
> Attachments: KAFKA-13248.1.patch
>
>
> I have noticed a mismatch with the class name passed to the 
> LoggerFactory.getLogger method. This would make it hard to track the source 
> of log messages.
> public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} 
> extends AbstractStoreBuilder> {
>  private final Logger log = 
> LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class);
> private final WindowBytesStoreSupplier storeSupplier;
> public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final 
> WindowBytesStoreSupplier storeSupplier,
>  final Serde keySerde,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rodesai opened a new pull request #11286: add units to metrics descriptions + test fix

2021-08-30 Thread GitBox


rodesai opened a new pull request #11286:
URL: https://github.com/apache/kafka/pull/11286


   Fix some review feedback and test flakiness from #11149 
   - adds units to metrics descriptions for total blocked time metrics
   - fix producer metrics test checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on pull request #11286: add units to metrics descriptions + test fix

2021-08-30 Thread GitBox


rodesai commented on pull request #11286:
URL: https://github.com/apache/kafka/pull/11286#issuecomment-908931988


   @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-08-30 Thread Anamika Nadkarni (Jira)
Anamika Nadkarni created KAFKA-13255:


 Summary: Mirrormaker config property config.properties.exclude is 
not working as expected 
 Key: KAFKA-13255
 URL: https://issues.apache.org/jira/browse/KAFKA-13255
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.8.0
Reporter: Anamika Nadkarni


Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
between cluster hosted in private data center and aws msk cluster.

Steps performed -
 # Started kafka-connect service.
 # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
heartbeat connector). Curl commands used to create connectors are in the 
attached file.  To exclude certain config properties while topic replication, 
we are using the 'config.properties.exclude' property in the MM2 source 
connector.

Expected -

Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
in destination cluster.

Actual -

Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
cluster fails with an error. Error is
{code:java}
[2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
dev.portlandDc.anamika.helloMsk. 
(org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-08-30 Thread Anamika Nadkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407116#comment-17407116
 ] 

Anamika Nadkarni commented on KAFKA-13255:
--

I keep getting HTTP error 403 when trying to attach file with curl commands to 
this jira. As a work around I am pasting the file contents below.
{code:java}
curl -X POST http://10.xx.xx.xx:8083/connectors -H "Content-Type: 
application/json" -d '{
   "name":"mm2-msc",
   "config":{
  "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
  "clusters":"mysource,mydestination",
  "source.cluster.alias":"mysource",
  "target.cluster.alias":"mydestination",
  
"replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
  
"target.cluster.bootstrap.servers":"b-2.mydestination.amazonaws.com:9096,b-3.mydestination.amazonaws.com:9096,b-1.mydestination.amazonaws.com:9096",
  
"source.cluster.bootstrap.servers":"10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093",
  "topics":"dev.portlandDc.anamika.helloMsk",
  "config.properties.exclude":"^confluent\\..*$",
  "tasks.max":"1",
  "key.converter":" org.apache.kafka.connect.converters.ByteArrayConverter",
  
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
  "replication.factor":"3",
  "offset-syncs.topic.replication.factor":"3",
  "sync.topic.acls.interval.seconds":"20",
  "sync.topic.configs.interval.seconds":"20",
  "refresh.topics.interval.seconds":"20",
  "refresh.groups.interval.seconds":"20",
  "consumer.group.id":"mm2-msc-cons",
  "producer.enable.idempotence":"true",
  "source.cluster.security.protocol":"SASL_SSL",
  "source.cluster.sasl.mechanism":"PLAIN",
  
"source.cluster.ssl.truststore.location":"/home/ec2-user/dc/kafka.client.truststore.jks",
  "source.cluster.ssl.truststore.password":"x",
  "source.cluster.ssl.endpoint.identification.algorithm":"",
  
"source.cluster.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"x\" password=\"x\";",
  "target.cluster.security.protocol":"SASL_SSL",
  "target.cluster.sasl.mechanism":"SCRAM-SHA-512",
  
"target.cluster.ssl.truststore.location":"/home/ec2-user/kafka_2.13-2.8.0/config/msk/kafka.client.truststore.jks",
  
"target.cluster.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule
 required username=\"x\" password=\"x\";"
   }
}' | jq .
curl -X POST http://10.xx.xx.xx:8083/connectors -H "Content-Type: 
application/json" -d '{
   "name":"mm2-cpc",
   "config":{
  
"connector.class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
  "clusters":"mysource,mydestination",
  "source.cluster.alias":"mysource",
  "target.cluster.alias":"mydestination",
  "config.properties.exclude":"confluent.value.schema.validation",
  "config.properties.blacklist":"confluent.value.schema.validation",
  
"target.cluster.bootstrap.servers":"b-2.mydestination.amazonaws.com:9096,b-3.mydestination.amazonaws.com:9096,b-1.mydestination.amazonaws.com:9096",
  
"source.cluster.bootstrap.servers":"10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093,10.xx.xx.xx:9093",
  "tasks.max":"1",
  "key.converter":" org.apache.kafka.connect.converters.ByteArrayConverter",
  
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
  
"replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
  "replication.factor":"3",
  "sync.topic.configs.enabled":"false",
  "checkpoints.topic.replication.factor":"3",
  "emit.checkpoints.interval.seconds":"20",
  "source.cluster.security.protocol":"SASL_SSL",
  "source.cluster.sasl.mechanism":"PLAIN",
  
"source.cluster.ssl.truststore.location":"/home/ec2-user/dc/kafka.client.truststore.jks",
  "source.cluster.ssl.truststore.password":"x",
  "source.cluster.ssl.endpoint.identification.algorithm":"",
  
"source.cluster.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"x\" password=\"x\";",
  "target.cluster.security.protocol":"SASL_SSL",
  "target.cluster.sasl.mechanism":"SCRAM-SHA-512",
  
"target.cluster.ssl.truststore.location":"/home/ec2-user/kafka_2.13-2.8.0/config/msk/kafka.client.truststore.jks",
  
"target.cluster.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule
 required username=\"x\" password=\"x\";"
   }
}' | jq .
curl -X POST http://10.xx.xx.xx:8083/connectors -H "Content-Type: 
application/json" -d '{
   "name":"mm2-hbc",
   "config":{
  
"connector.class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
  "clusters":"mysource,mydestination",
  "source.

[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-08-30 Thread Anamika Nadkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407122#comment-17407122
 ] 

Anamika Nadkarni commented on KAFKA-13255:
--

To replicate source topic in destination cluster with no prefix 
(), I am using custom replication policy while creating source 
connector. You can generate custom replication policy jar from below link. I am 
getting 'HTTP - 403 Forbidden' , when I am trying to attach the jar.

https://github.com/aws-samples/mirrormaker2-msk-migration/tree/master/CustomMM2ReplicationPolicy

 

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Priority: Major
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)