This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 4c23acbb54 Conditional writer creation now always includes the client
properties (#4732)
4c23acbb54 is described below
commit 4c23acbb54b4cffffbadc7a57e079a9476b82276
Author: Kevin Rathbun <[email protected]>
AuthorDate: Wed Jul 10 15:04:31 2024 -0400
Conditional writer creation now always includes the client properties
(#4732)
Closes #4708
- Fixed ClientContext.createConditionalWriter(String) to include client
props
- Added testCreateConditionalWriterUsesClientProps to ConditionalWriterIT
---
.../accumulo/core/clientImpl/ClientContext.java | 4 +--
.../core/clientImpl/ConditionalWriterImpl.java | 11 +++++++-
.../apache/accumulo/test/ConditionalWriterIT.java | 33 ++++++++++++++++++++++
3 files changed, 44 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index e4ba0028c9..75b63940ce 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -786,9 +786,7 @@ public class ClientContext implements AccumuloClient {
@Override
public ConditionalWriter createConditionalWriter(String tableName) throws
TableNotFoundException {
- ensureOpen();
- return new ConditionalWriterImpl(this,
requireNotOffline(getTableId(tableName), tableName),
- tableName, new ConditionalWriterConfig());
+ return createConditionalWriter(tableName, null);
}
@Override
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 3beedd49bf..cb7675196c 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -90,7 +90,9 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class ConditionalWriterImpl implements ConditionalWriter {
+import com.google.common.annotations.VisibleForTesting;
+
+public class ConditionalWriterImpl implements ConditionalWriter {
private static final Logger log =
LoggerFactory.getLogger(ConditionalWriterImpl.class);
@@ -106,6 +108,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private long timeout;
private final Durability durability;
private final String classLoaderContext;
+ private final ConditionalWriterConfig config;
private static class ServerQueue {
BlockingQueue<TabletServerMutations<QCMutation>> queue = new
LinkedBlockingQueue<>();
@@ -371,6 +374,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
ConditionalWriterImpl(ClientContext context, TableId tableId, String
tableName,
ConditionalWriterConfig config) {
+ this.config = config;
this.context = context;
this.auths = config.getAuthorizations();
this.ve = new VisibilityEvaluator(config.getAuthorizations());
@@ -825,6 +829,11 @@ class ConditionalWriterImpl implements ConditionalWriter {
}
}
+ @VisibleForTesting
+ public ConditionalWriterConfig getConfig() {
+ return config;
+ }
+
@Override
public Result write(ConditionalMutation mutation) {
return write(Collections.singleton(mutation).iterator()).next();
diff --git
a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 4e1ad83086..853269c390 100644
--- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -58,6 +58,7 @@ import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.RowIterator;
@@ -68,6 +69,8 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.clientImpl.ConditionalWriterImpl;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Condition;
@@ -1536,4 +1539,34 @@ public class ConditionalWriterIT extends
SharedMiniClusterBase {
}
}
+ @Test
+ public void testCreateConditionalWriterUsesClientProps() throws Exception {
+ // Tests that creating a conditional writer includes the client properties
that were set
+ String tableName = getUniqueNames(1)[0];
+ var clientProps = getClientProps();
+ // Set non-default values for all conditional writer props
+
clientProps.setProperty(ClientProperty.CONDITIONAL_WRITER_TIMEOUT_MAX.getKey(),
"99");
+
clientProps.setProperty(ClientProperty.CONDITIONAL_WRITER_THREADS_MAX.getKey(),
"101");
+
clientProps.setProperty(ClientProperty.CONDITIONAL_WRITER_DURABILITY.getKey(),
+ Durability.NONE.name());
+ try (AccumuloClient client =
Accumulo.newClient().from(clientProps).build()) {
+ client.tableOperations().create(tableName);
+
+ try (
+ ConditionalWriterImpl cw1 =
+ (ConditionalWriterImpl)
client.createConditionalWriter(tableName);
+ ConditionalWriterImpl cw2 =
+ (ConditionalWriterImpl) client.createConditionalWriter(tableName,
+ new ConditionalWriterConfig().setMaxWriteThreads(200))) {
+ // verify we see the non-default prop values
+ assertEquals(99, cw1.getConfig().getTimeout(TimeUnit.SECONDS));
+ assertEquals(101, cw1.getConfig().getMaxWriteThreads());
+ assertEquals(Durability.NONE, cw1.getConfig().getDurability());
+ assertEquals(99, cw2.getConfig().getTimeout(TimeUnit.SECONDS));
+ assertEquals(200, cw2.getConfig().getMaxWriteThreads());
+ assertEquals(Durability.NONE, cw2.getConfig().getDurability());
+ }
+ }
+ }
+
}