exceptionfactory commented on code in PR #9812:
URL: https://github.com/apache/nifi/pull/9812#discussion_r2004090276
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java:
##########
@@ -33,9 +34,9 @@ public class KinesisRecordProcessorRaw extends
AbstractKinesisRecordProcessor {
public KinesisRecordProcessorRaw(final ProcessSessionFactory
sessionFactory, final ComponentLog log, final String streamName,
final String endpointPrefix, final String
kinesisEndpoint,
final long checkpointIntervalMillis,
final long retryWaitMillis,
- final int numRetries, final
DateTimeFormatter dateTimeFormatter) {
+ final int numRetries, final
DateTimeFormatter dateTimeFormatter, SwitchableRecordProcessorBlocker
consumeHalter) {
Review Comment:
```suggestion
final int numRetries, final
DateTimeFormatter dateTimeFormatter, RecordProcessorBlocker
recordProcessorBlocker) {
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/pause/TestSwitchableRecordProcessorBlocker.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Supplier;
+
+public class TestSwitchableRecordProcessorBlocker {
+
+ @Test
+ public void testResumeAfterPause() {
Review Comment:
It looks like these test methods should be renamed to align with the new
method names.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/pause/SwitchableRecordProcessorBlocker.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+import java.util.concurrent.CountDownLatch;
+
+public class SwitchableRecordProcessorBlocker implements
RecordProcessorBlocker {
+
+ private final SwitchableRecordProcessorBlockerInspector
switchableRecordProcessorBlockerInspector;
+ private CountDownLatch isPaused = new CountDownLatch(0);
+
+ public static SwitchableRecordProcessorBlocker createNonBlocking() {
Review Comment:
This appears to be used only in tests. If so, recommend extracting any
non-runtime code from this class.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/pause/SwitchableRecordProcessorBlocker.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+import java.util.concurrent.CountDownLatch;
+
+public class SwitchableRecordProcessorBlocker implements
RecordProcessorBlocker {
+
+ private final SwitchableRecordProcessorBlockerInspector
switchableRecordProcessorBlockerInspector;
+ private CountDownLatch isPaused = new CountDownLatch(0);
Review Comment:
```suggestion
private CountDownLatch blocker = new CountDownLatch(0);
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -61,9 +62,9 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
final long checkpointIntervalMillis,
final long retryWaitMillis,
final int numRetries, final
DateTimeFormatter dateTimeFormatter,
final RecordReaderFactory
readerFactory, final RecordSetWriterFactory writerFactory,
- final RecordConverter recordConverter)
{
+ final RecordConverter recordConverter,
SwitchableRecordProcessorBlocker consumeHalter) {
Review Comment:
```suggestion
final RecordConverter
recordConverter, final RecordProcessorBlocker recordProcessorBlocker) {
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/pause/TestSwitchableRecordProcessorBlocker.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Supplier;
+
+public class TestSwitchableRecordProcessorBlocker {
+
+ @Test
+ public void testResumeAfterPause() {
+ final TestSwitchableRecordProcessorBlockerInspector blockerInspector =
new TestSwitchableRecordProcessorBlockerInspector();
+
+ final SwitchableRecordProcessorBlocker recordProcessorBlocker =
SwitchableRecordProcessorBlocker.create(blockerInspector);
+
+ recordProcessorBlocker.block();
+ final Thread thread = new
Thread(createPauseRunnable(recordProcessorBlocker));
+ thread.start();
+
+ blockerInspector.awaitAwaited();
+ Assertions.assertTrue(thread.isAlive());
+
+ recordProcessorBlocker.unblock();
+ blockerInspector.awaitFinished();
+ }
+
+ @Test
+ public void testNoPause() {
+ final TestSwitchableRecordProcessorBlockerInspector blockerInspector =
new TestSwitchableRecordProcessorBlockerInspector();
+
+ final SwitchableRecordProcessorBlocker consumeHalter =
SwitchableRecordProcessorBlocker.create(blockerInspector);
+
+ consumeHalter.unblock();
+ final Thread thread = new Thread(createPauseRunnable(consumeHalter));
+ thread.start();
+
+ blockerInspector.awaitFinished();
+ }
+
+ @Test
+ public void testPause() {
+ final TestSwitchableRecordProcessorBlockerInspector blockerInspector =
new TestSwitchableRecordProcessorBlockerInspector();
+
+ final SwitchableRecordProcessorBlocker consumeHalter =
SwitchableRecordProcessorBlocker.create(blockerInspector);
+ consumeHalter.block();
+ final Thread thread = new Thread(createPauseRunnable(consumeHalter));
+ thread.start();
+
+ blockerInspector.awaitAwaited();
+ Assertions.assertTrue(thread.isAlive());
+ thread.interrupt();
+ }
+
+ private static Runnable createPauseRunnable(final
SwitchableRecordProcessorBlocker consumeHalter) {
+ return () -> {
+ try {
+ consumeHalter.await();
Review Comment:
```suggestion
private static Runnable createPauseRunnable(final
SwitchableRecordProcessorBlocker recordProcessorBlocker) {
return () -> {
try {
recordProcessorBlocker.await();
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/pause/TestSwitchableRecordProcessorBlocker.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Supplier;
+
+public class TestSwitchableRecordProcessorBlocker {
+
+ @Test
+ public void testResumeAfterPause() {
+ final TestSwitchableRecordProcessorBlockerInspector blockerInspector =
new TestSwitchableRecordProcessorBlockerInspector();
+
+ final SwitchableRecordProcessorBlocker recordProcessorBlocker =
SwitchableRecordProcessorBlocker.create(blockerInspector);
+
+ recordProcessorBlocker.block();
+ final Thread thread = new
Thread(createPauseRunnable(recordProcessorBlocker));
+ thread.start();
+
+ blockerInspector.awaitAwaited();
+ Assertions.assertTrue(thread.isAlive());
Review Comment:
The general convention is to use `import static` for assert methods.
```suggestion
assertTrue(thread.isAlive());
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/pause/SwitchableRecordProcessorBlockerInspector.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+interface SwitchableRecordProcessorBlockerInspector {
Review Comment:
Is this interface only necessary for testing? It seems like it could be
removed. If not, then also needs some naming attention.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/pause/SwitchableRecordProcessorBlocker.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.pause;
+
+import java.util.concurrent.CountDownLatch;
+
+public class SwitchableRecordProcessorBlocker implements
RecordProcessorBlocker {
Review Comment:
As this is the regular implementation, recommend using `Standard` for the
name:
```suggestion
public class StandardRecordProcessorBlocker implements
RecordProcessorBlocker {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]