This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 35538a67c46 CAMEL-18828: camel-kudu - Add DELETE, UPDATE, and UPSERT
support to the producer (#8921)
35538a67c46 is described below
commit 35538a67c46235be159b00ffef0d5d2068997d51
Author: Kengo Seki <[email protected]>
AuthorDate: Mon Dec 19 19:09:31 2022 +0900
CAMEL-18828: camel-kudu - Add DELETE, UPDATE, and UPSERT support to the
producer (#8921)
* CAMEL-18828: camel-kudu - Add DELETE, UPDATE, and UPSERT support to the
producer
* Remove an unnecessary blank line
---
.../camel/component/kudu/KuduOperations.java | 3 +
.../apache/camel/component/kudu/KuduProducer.java | 75 +++++++++-
.../camel/component/kudu/KuduProducerTest.java | 153 +++++++++++++++++++++
3 files changed, 230 insertions(+), 1 deletion(-)
diff --git
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
index cee6b63763a..88b29ed21a2 100644
---
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
+++
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
@@ -18,6 +18,9 @@ package org.apache.camel.component.kudu;
public enum KuduOperations {
INSERT,
+ DELETE,
+ UPDATE,
+ UPSERT,
CREATE_TABLE,
SCAN
}
diff --git
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
index 1b64586bd3a..4a01d40c869 100644
---
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
+++
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
@@ -20,13 +20,17 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultProducer;
+import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Update;
+import org.apache.kudu.client.Upsert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +61,15 @@ public class KuduProducer extends DefaultProducer {
case INSERT:
doInsert(exchange, table);
break;
+ case DELETE:
+ doDelete(exchange, table);
+ break;
+ case UPDATE:
+ doUpdate(exchange, table);
+ break;
+ case UPSERT:
+ doUpsert(exchange, table);
+ break;
case CREATE_TABLE:
doCreateTable(exchange, table);
break;
@@ -90,6 +103,67 @@ public class KuduProducer extends DefaultProducer {
connection.newSession().apply(insert);
}
+ private void doDelete(Exchange exchange, String tableName) throws
KuduException {
+ LOG.trace("Delete on table {}", tableName);
+ KuduClient connection = endpoint.getKuduClient();
+ KuduTable table = connection.openTable(tableName);
+
+ Delete delete = table.newDelete();
+ PartialRow row = delete.getRow();
+
+ Map<?, ?> rows = exchange.getIn().getBody(Map.class);
+ for (Map.Entry<?, ?> entry : rows.entrySet()) {
+ final String colName = entry.getKey().toString();
+ final Object value = entry.getValue();
+ ColumnSchema column = table.getSchema().getColumn(colName);
+ if (column != null && column.isKey()) {
+ row.addObject(colName, value);
+ }
+ }
+
+ connection.newSession().apply(delete);
+ }
+
+ private void doUpdate(Exchange exchange, String tableName) throws
KuduException {
+ LOG.trace("Update on table {}", tableName);
+ KuduClient connection = endpoint.getKuduClient();
+ KuduTable table = connection.openTable(tableName);
+
+ Update update = table.newUpdate();
+ PartialRow row = update.getRow();
+
+ Map<?, ?> rows = exchange.getIn().getBody(Map.class);
+ for (Map.Entry<?, ?> entry : rows.entrySet()) {
+ final String colName = entry.getKey().toString();
+ final Object value = entry.getValue();
+ if (table.getSchema().getColumn(colName) != null) {
+ row.addObject(colName, value);
+ }
+ }
+
+ connection.newSession().apply(update);
+ }
+
+ private void doUpsert(Exchange exchange, String tableName) throws
KuduException {
+ LOG.trace("Upsert on table {}", tableName);
+ KuduClient connection = endpoint.getKuduClient();
+ KuduTable table = connection.openTable(tableName);
+
+ Upsert upsert = table.newUpsert();
+ PartialRow row = upsert.getRow();
+
+ Map<?, ?> rows = exchange.getIn().getBody(Map.class);
+ for (Map.Entry<?, ?> entry : rows.entrySet()) {
+ final String colName = entry.getKey().toString();
+ final Object value = entry.getValue();
+ if (table.getSchema().getColumn(colName) != null) {
+ row.addObject(colName, value);
+ }
+ }
+
+ connection.newSession().apply(upsert);
+ }
+
private void doCreateTable(Exchange exchange, String tableName) throws
KuduException {
LOG.trace("Creating table {}", tableName);
KuduClient connection = endpoint.getKuduClient();
@@ -98,7 +172,6 @@ public class KuduProducer extends DefaultProducer {
CreateTableOptions builder = (CreateTableOptions) exchange.getIn()
.getHeader(KuduConstants.CAMEL_KUDU_TABLE_OPTIONS);
connection.createTable(tableName, schema, builder);
-
}
private void doScan(Exchange exchange, String tableName) throws
KuduException {
diff --git
a/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
b/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
index cdfcc2e745e..d318fb59cae 100644
---
a/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
+++
b/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
@@ -30,9 +30,17 @@ import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
public class KuduProducerTest extends AbstractKuduTest {
@EndpointInject(value = "mock:result")
@@ -61,6 +69,18 @@ public class KuduProducerTest extends AbstractKuduTest {
from("direct:data")
.to("kudu:localhost:7051/TestTable?operation=insert")
.to("mock:result");
+
+ from("direct:delete")
+ .to("kudu:localhost:7051/TestTable?operation=delete")
+ .to("mock:result");
+
+ from("direct:update")
+ .to("kudu:localhost:7051/TestTable?operation=update")
+ .to("mock:result");
+
+ from("direct:upsert")
+ .to("kudu:localhost:7051/TestTable?operation=upsert")
+ .to("mock:result");
}
};
}
@@ -140,4 +160,137 @@ public class KuduProducerTest extends AbstractKuduTest {
errorEndpoint.assertIsSatisfied();
successEndpoint.assertIsSatisfied();
}
+
+ @Test
+ public void insertAndDelete() throws InterruptedException, KuduException {
+ createTestTable("TestTable");
+
+ errorEndpoint.expectedMessageCount(0);
+ successEndpoint.expectedMessageCount(2);
+
+ // Create a sample row that can be inserted in the test table
+ Map<String, Object> row = new HashMap<>();
+ row.put("id", 5);
+ row.put("title", "Mr.");
+ row.put("name", "Samuel");
+ row.put("lastname", "Smith");
+ row.put("address", "4359 Plainfield Avenue");
+ sendBody("direct:insert", row);
+
+ // Then delete it
+ row = new HashMap<>();
+ row.put("id", 5);
+ sendBody("direct:delete", row);
+
+ errorEndpoint.assertIsSatisfied();
+ successEndpoint.assertIsSatisfied();
+
+ KuduClient client = ikc.getClient();
+ KuduScanner scanner =
client.newScannerBuilder(client.openTable("TestTable")).build();
+ // The table shouldn't have any row
+ int rows = 0;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator iterator = scanner.nextRows();
+ while (iterator.hasNext()) {
+ iterator.next();
+ rows++;
+ }
+ }
+ assertEquals(0, rows);
+ }
+
+ @Test
+ public void insertAndUpdate() throws InterruptedException, KuduException {
+ createTestTable("TestTable");
+
+ errorEndpoint.expectedMessageCount(0);
+ successEndpoint.expectedMessageCount(2);
+
+ // Create a sample row that can be inserted in the test table
+ Map<String, Object> row = new HashMap<>();
+ row.put("id", 5);
+ row.put("title", "Mr.");
+ row.put("name", "Samuel");
+ row.put("lastname", "Smith");
+ row.put("address", "4359 Plainfield Avenue");
+ sendBody("direct:insert", row);
+
+ // Then update it
+ row = new HashMap<>();
+ row.put("id", 5);
+ row.put("name", "John");
+ sendBody("direct:update", row);
+
+ errorEndpoint.assertIsSatisfied();
+ successEndpoint.assertIsSatisfied();
+
+ KuduClient client = ikc.getClient();
+ KuduScanner scanner =
client.newScannerBuilder(client.openTable("TestTable")).build();
+ int rows = 0;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator iterator = scanner.nextRows();
+ while (iterator.hasNext()) {
+ RowResult result = iterator.next();
+ if (result.getInt("id") == 5) {
+ assertEquals("John", result.getString("name"));
+ }
+ rows++;
+ }
+ }
+ assertEquals(1, rows);
+ }
+
+ @Test
+ public void insertAndUpsert() throws InterruptedException, KuduException {
+ createTestTable("TestTable");
+
+ errorEndpoint.expectedMessageCount(0);
+ successEndpoint.expectedMessageCount(3);
+
+ // Create a sample row that can be inserted in the test table
+ Map<String, Object> row = new HashMap<>();
+ row.put("id", 5);
+ row.put("title", "Mr.");
+ row.put("name", "Samuel");
+ row.put("lastname", "Smith");
+ row.put("address", "4359 Plainfield Avenue");
+ sendBody("direct:insert", row);
+
+ // Then update by upsert
+ row.put("id", 5);
+ row.put("name", "John");
+ sendBody("direct:upsert", row);
+
+ // And insert by upsert
+ row = new HashMap<>();
+ row.put("id", 6);
+ row.put("title", "Dr.");
+ row.put("name", "Mary");
+ row.put("lastname", "Smith");
+ row.put("address", "4359 Plainfield Avenue");
+ sendBody("direct:upsert", row);
+
+ errorEndpoint.assertIsSatisfied();
+ successEndpoint.assertIsSatisfied();
+
+ KuduClient client = ikc.getClient();
+ KuduScanner scanner =
client.newScannerBuilder(client.openTable("TestTable")).build();
+ int rows = 0;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator iterator = scanner.nextRows();
+ while (iterator.hasNext()) {
+ RowResult result = iterator.next();
+ int id = result.getInt("id");
+ if (id == 5) {
+ assertEquals("John", result.getString("name"));
+ } else if (id == 6) {
+ assertEquals("Mary", result.getString("name"));
+ } else {
+ fail("Unexpected record found");
+ }
+ rows++;
+ }
+ }
+ assertEquals(2, rows);
+ }
}