This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/main by this push:
new b5d51229 Removes use of most deprecated code. (#1126)
b5d51229 is described below
commit b5d51229dcecff7e36bf2b837f8a7453432c7abc
Author: Keith Turner <[email protected]>
AuthorDate: Mon Nov 28 21:30:33 2022 +0000
Removes use of most deprecated code. (#1126)
---
.../test/java/org/apache/fluo/command/FluoProgramTest.java | 2 +-
.../java/org/apache/fluo/core/client/FluoAdminImpl.java | 2 +-
.../main/java/org/apache/fluo/core/impl/Environment.java | 4 ++--
.../main/java/org/apache/fluo/core/util/AccumuloUtil.java | 8 ++++++++
modules/mapreduce/pom.xml | 8 ++++++++
.../org/apache/fluo/mapreduce/FluoEntryInputFormat.java | 13 +++++--------
.../org/apache/fluo/mapreduce/FluoKeyValueGenerator.java | 2 +-
.../org/apache/fluo/mapreduce/FluoMutationGenerator.java | 2 +-
.../java/org/apache/fluo/mapreduce/FluoRowInputFormat.java | 13 +++++--------
.../apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java | 4 ++--
pom.xml | 5 +++++
11 files changed, 39 insertions(+), 24 deletions(-)
diff --git
a/modules/command/src/test/java/org/apache/fluo/command/FluoProgramTest.java
b/modules/command/src/test/java/org/apache/fluo/command/FluoProgramTest.java
index e3586a40..8fbbf9a0 100644
--- a/modules/command/src/test/java/org/apache/fluo/command/FluoProgramTest.java
+++ b/modules/command/src/test/java/org/apache/fluo/command/FluoProgramTest.java
@@ -64,7 +64,7 @@ public class FluoProgramTest {
outPS = System.out;
errPS = System.err;
// This will hide usage and error logs when running tests
- try (PrintStream ps = new PrintStream(new NullOutputStream())) {
+ try (PrintStream ps = new
PrintStream(NullOutputStream.NULL_OUTPUT_STREAM)) {
System.setOut(ps);
System.setErr(ps);
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 29308381..92e149b2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -282,7 +282,7 @@ public class FluoAdminImpl implements FluoAdmin {
final String accumuloInstanceName =
client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
- final String accumuloInstanceID =
client.instanceOperations().getInstanceID();
+ final String accumuloInstanceID =
client.instanceOperations().getInstanceId().canonical();
final String fluoApplicationID = UUID.randomUUID().toString();
// Create node specified by chroot suffix of Zookeeper connection string
(if it doesn't exist)
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 5e4988cd..f9a23162 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -97,9 +97,9 @@ public class Environment implements AutoCloseable {
"unexpected accumulo instance name " + instanceName + " != " +
accumuloInstance);
}
- if
(!client.instanceOperations().getInstanceID().equals(accumuloInstanceID)) {
+ if
(!client.instanceOperations().getInstanceId().canonical().equals(accumuloInstanceID))
{
throw new IllegalArgumentException("unexpected accumulo instance id "
- + client.instanceOperations().getInstanceID() + " != " +
accumuloInstanceID);
+ + client.instanceOperations().getInstanceId() + " != " +
accumuloInstanceID);
}
try {
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
index b6eaeb6a..3ccad872 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
@@ -15,6 +15,8 @@
package org.apache.fluo.core.util;
+import java.util.Properties;
+
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.fluo.api.config.FluoConfiguration;
@@ -32,4 +34,10 @@ public class AccumuloUtil {
return Accumulo.newClient().to(config.getAccumuloInstance(),
config.getAccumuloZookeepers())
.as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
}
+
+ public static Properties getClientProps(FluoConfiguration config) {
+ return Accumulo.newClientProperties()
+ .to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
+ .as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
+ }
}
diff --git a/modules/mapreduce/pom.xml b/modules/mapreduce/pom.xml
index 771ea3a3..bd8d65f5 100644
--- a/modules/mapreduce/pom.xml
+++ b/modules/mapreduce/pom.xml
@@ -26,6 +26,10 @@
<description>This module provides utility code for MapReduce jobs that read
from or write to a
Apache Fluo table.</description>
<properties>
+ <!-- The checkstyle config comes from the Fluo parent pom and is outdated.
+ It disallows use of the newer Accumulo mapreduce APIs.
Disabling for
+ now so Fluo can use new APIs and build. -->
+ <checkstyle.skip>true</checkstyle.skip>
<!-- accumulo2-maven-plugin requires the accumulo version of thrift -->
<thrift.version>0.17.0</thrift.version>
</properties>
@@ -34,6 +38,10 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-hadoop-mapreduce</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-accumulo</artifactId>
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index 9d58a867..3eb701f5 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -23,9 +23,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
@@ -35,6 +35,7 @@ import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -154,12 +155,8 @@ public class FluoEntryInputFormat extends
InputFormat<RowColumn, Bytes> {
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
- AccumuloInputFormat.setZooKeeperInstance(conf,
fconfig.getAccumuloInstance(),
- fconfig.getAccumuloZookeepers());
- AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
- new PasswordToken(fconfig.getAccumuloPassword()));
- AccumuloInputFormat.setInputTableName(conf, env.getTable());
- AccumuloInputFormat.setScanAuthorizations(conf,
env.getAuthorizations());
+
AccumuloInputFormat.configure().clientProperties(AccumuloUtil.getClientProps(fconfig))
+ .table(env.getTable()).auths(env.getAuthorizations()).store(conf);
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
index 158bed90..898eb2f4 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java
@@ -19,9 +19,9 @@ import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
-import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
index 9293c2c0..a07d4a03 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java
@@ -18,8 +18,8 @@ package org.apache.fluo.mapreduce;
import java.nio.charset.StandardCharsets;
import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
index 1c5616d7..710e31d1 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
@@ -23,9 +23,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
@@ -35,6 +35,7 @@ import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -155,12 +156,8 @@ public class FluoRowInputFormat extends InputFormat<Bytes,
Iterator<ColumnValue>
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
- AccumuloInputFormat.setZooKeeperInstance(conf,
fconfig.getAccumuloInstance(),
- fconfig.getAccumuloZookeepers());
- AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
- new PasswordToken(fconfig.getAccumuloPassword()));
- AccumuloInputFormat.setInputTableName(conf, env.getTable());
- AccumuloInputFormat.setScanAuthorizations(conf,
env.getAuthorizations());
+
AccumuloInputFormat.configure().clientProperties(AccumuloUtil.getClientProps(fconfig))
+ .table(env.getTable()).auths(env.getAuthorizations()).store(conf);
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
index aa118a6b..5aed6082 100644
---
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
+++
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
@@ -20,9 +20,9 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
-import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
@@ -96,7 +96,7 @@ public class FluoFileOutputFormatIT extends ITBaseImpl {
Assert.assertTrue(job.waitForCompletion(false));
// bulk import rfiles
- aClient.tableOperations().importDirectory(table, outDir.toString(),
failDir.toString(), false);
+
aClient.tableOperations().importDirectory(outDir.toString()).to(table).threads(3).load();
// read and update data using transactions
TestTransaction tx1 = new TestTransaction(env);
diff --git a/pom.xml b/pom.xml
index 5e5c9b4c..561ceb02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,11 @@
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-hadoop-mapreduce</artifactId>
+ <version>${accumulo.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>