This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 35538a67c46 CAMEL-18828: camel-kudu - Add DELETE, UPDATE, and UPSERT 
support to the producer (#8921)
35538a67c46 is described below

commit 35538a67c46235be159b00ffef0d5d2068997d51
Author: Kengo Seki <[email protected]>
AuthorDate: Mon Dec 19 19:09:31 2022 +0900

    CAMEL-18828: camel-kudu - Add DELETE, UPDATE, and UPSERT support to the 
producer (#8921)
    
    * CAMEL-18828: camel-kudu - Add DELETE, UPDATE, and UPSERT support to the 
producer
    
    * Remove an unnecessary blank line
---
 .../camel/component/kudu/KuduOperations.java       |   3 +
 .../apache/camel/component/kudu/KuduProducer.java  |  75 +++++++++-
 .../camel/component/kudu/KuduProducerTest.java     | 153 +++++++++++++++++++++
 3 files changed, 230 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
 
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
index cee6b63763a..88b29ed21a2 100644
--- 
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
+++ 
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduOperations.java
@@ -18,6 +18,9 @@ package org.apache.camel.component.kudu;
 
 public enum KuduOperations {
     INSERT,
+    DELETE,
+    UPDATE,
+    UPSERT,
     CREATE_TABLE,
     SCAN
 }
diff --git 
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
 
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
index 1b64586bd3a..4a01d40c869 100644
--- 
a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
+++ 
b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
@@ -20,13 +20,17 @@ import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Delete;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Update;
+import org.apache.kudu.client.Upsert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +61,15 @@ public class KuduProducer extends DefaultProducer {
             case INSERT:
                 doInsert(exchange, table);
                 break;
+            case DELETE:
+                doDelete(exchange, table);
+                break;
+            case UPDATE:
+                doUpdate(exchange, table);
+                break;
+            case UPSERT:
+                doUpsert(exchange, table);
+                break;
             case CREATE_TABLE:
                 doCreateTable(exchange, table);
                 break;
@@ -90,6 +103,67 @@ public class KuduProducer extends DefaultProducer {
         connection.newSession().apply(insert);
     }
 
+    private void doDelete(Exchange exchange, String tableName) throws 
KuduException {
+        LOG.trace("Delete on table {}", tableName);
+        KuduClient connection = endpoint.getKuduClient();
+        KuduTable table = connection.openTable(tableName);
+
+        Delete delete = table.newDelete();
+        PartialRow row = delete.getRow();
+
+        Map<?, ?> rows = exchange.getIn().getBody(Map.class);
+        for (Map.Entry<?, ?> entry : rows.entrySet()) {
+            final String colName = entry.getKey().toString();
+            final Object value = entry.getValue();
+            ColumnSchema column = table.getSchema().getColumn(colName);
+            if (column != null && column.isKey()) {
+                row.addObject(colName, value);
+            }
+        }
+
+        connection.newSession().apply(delete);
+    }
+
+    private void doUpdate(Exchange exchange, String tableName) throws 
KuduException {
+        LOG.trace("Update on table {}", tableName);
+        KuduClient connection = endpoint.getKuduClient();
+        KuduTable table = connection.openTable(tableName);
+
+        Update update = table.newUpdate();
+        PartialRow row = update.getRow();
+
+        Map<?, ?> rows = exchange.getIn().getBody(Map.class);
+        for (Map.Entry<?, ?> entry : rows.entrySet()) {
+            final String colName = entry.getKey().toString();
+            final Object value = entry.getValue();
+            if (table.getSchema().getColumn(colName) != null) {
+                row.addObject(colName, value);
+            }
+        }
+
+        connection.newSession().apply(update);
+    }
+
+    private void doUpsert(Exchange exchange, String tableName) throws 
KuduException {
+        LOG.trace("Upsert on table {}", tableName);
+        KuduClient connection = endpoint.getKuduClient();
+        KuduTable table = connection.openTable(tableName);
+
+        Upsert upsert = table.newUpsert();
+        PartialRow row = upsert.getRow();
+
+        Map<?, ?> rows = exchange.getIn().getBody(Map.class);
+        for (Map.Entry<?, ?> entry : rows.entrySet()) {
+            final String colName = entry.getKey().toString();
+            final Object value = entry.getValue();
+            if (table.getSchema().getColumn(colName) != null) {
+                row.addObject(colName, value);
+            }
+        }
+
+        connection.newSession().apply(upsert);
+    }
+
     private void doCreateTable(Exchange exchange, String tableName) throws 
KuduException {
         LOG.trace("Creating table {}", tableName);
         KuduClient connection = endpoint.getKuduClient();
@@ -98,7 +172,6 @@ public class KuduProducer extends DefaultProducer {
         CreateTableOptions builder = (CreateTableOptions) exchange.getIn()
                 .getHeader(KuduConstants.CAMEL_KUDU_TABLE_OPTIONS);
         connection.createTable(tableName, schema, builder);
-
     }
 
     private void doScan(Exchange exchange, String tableName) throws 
KuduException {
diff --git 
a/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
 
b/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
index cdfcc2e745e..d318fb59cae 100644
--- 
a/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
+++ 
b/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduProducerTest.java
@@ -30,9 +30,17 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class KuduProducerTest extends AbstractKuduTest {
 
     @EndpointInject(value = "mock:result")
@@ -61,6 +69,18 @@ public class KuduProducerTest extends AbstractKuduTest {
                 from("direct:data")
                         .to("kudu:localhost:7051/TestTable?operation=insert")
                         .to("mock:result");
+
+                from("direct:delete")
+                        .to("kudu:localhost:7051/TestTable?operation=delete")
+                        .to("mock:result");
+
+                from("direct:update")
+                        .to("kudu:localhost:7051/TestTable?operation=update")
+                        .to("mock:result");
+
+                from("direct:upsert")
+                        .to("kudu:localhost:7051/TestTable?operation=upsert")
+                        .to("mock:result");
             }
         };
     }
@@ -140,4 +160,137 @@ public class KuduProducerTest extends AbstractKuduTest {
         errorEndpoint.assertIsSatisfied();
         successEndpoint.assertIsSatisfied();
     }
+
+    @Test
+    public void insertAndDelete() throws InterruptedException, KuduException {
+        createTestTable("TestTable");
+
+        errorEndpoint.expectedMessageCount(0);
+        successEndpoint.expectedMessageCount(2);
+
+        // Create a sample row that can be inserted in the test table
+        Map<String, Object> row = new HashMap<>();
+        row.put("id", 5);
+        row.put("title", "Mr.");
+        row.put("name", "Samuel");
+        row.put("lastname", "Smith");
+        row.put("address", "4359  Plainfield Avenue");
+        sendBody("direct:insert", row);
+
+        // Then delete it
+        row = new HashMap<>();
+        row.put("id", 5);
+        sendBody("direct:delete", row);
+
+        errorEndpoint.assertIsSatisfied();
+        successEndpoint.assertIsSatisfied();
+
+        KuduClient client = ikc.getClient();
+        KuduScanner scanner = 
client.newScannerBuilder(client.openTable("TestTable")).build();
+        // The table shouldn't have any row
+        int rows = 0;
+        while (scanner.hasMoreRows()) {
+            RowResultIterator iterator = scanner.nextRows();
+            while (iterator.hasNext()) {
+                iterator.next();
+                rows++;
+            }
+        }
+        assertEquals(0, rows);
+    }
+
+    @Test
+    public void insertAndUpdate() throws InterruptedException, KuduException {
+        createTestTable("TestTable");
+
+        errorEndpoint.expectedMessageCount(0);
+        successEndpoint.expectedMessageCount(2);
+
+        // Create a sample row that can be inserted in the test table
+        Map<String, Object> row = new HashMap<>();
+        row.put("id", 5);
+        row.put("title", "Mr.");
+        row.put("name", "Samuel");
+        row.put("lastname", "Smith");
+        row.put("address", "4359  Plainfield Avenue");
+        sendBody("direct:insert", row);
+
+        // Then update it
+        row = new HashMap<>();
+        row.put("id", 5);
+        row.put("name", "John");
+        sendBody("direct:update", row);
+
+        errorEndpoint.assertIsSatisfied();
+        successEndpoint.assertIsSatisfied();
+
+        KuduClient client = ikc.getClient();
+        KuduScanner scanner = 
client.newScannerBuilder(client.openTable("TestTable")).build();
+        int rows = 0;
+        while (scanner.hasMoreRows()) {
+            RowResultIterator iterator = scanner.nextRows();
+            while (iterator.hasNext()) {
+                RowResult result = iterator.next();
+                if (result.getInt("id") == 5) {
+                    assertEquals("John", result.getString("name"));
+                }
+                rows++;
+            }
+        }
+        assertEquals(1, rows);
+    }
+
+    @Test
+    public void insertAndUpsert() throws InterruptedException, KuduException {
+        createTestTable("TestTable");
+
+        errorEndpoint.expectedMessageCount(0);
+        successEndpoint.expectedMessageCount(3);
+
+        // Create a sample row that can be inserted in the test table
+        Map<String, Object> row = new HashMap<>();
+        row.put("id", 5);
+        row.put("title", "Mr.");
+        row.put("name", "Samuel");
+        row.put("lastname", "Smith");
+        row.put("address", "4359 Plainfield Avenue");
+        sendBody("direct:insert", row);
+
+        // Then update by upsert
+        row.put("id", 5);
+        row.put("name", "John");
+        sendBody("direct:upsert", row);
+
+        // And insert by upsert
+        row = new HashMap<>();
+        row.put("id", 6);
+        row.put("title", "Dr.");
+        row.put("name", "Mary");
+        row.put("lastname", "Smith");
+        row.put("address", "4359 Plainfield Avenue");
+        sendBody("direct:upsert", row);
+
+        errorEndpoint.assertIsSatisfied();
+        successEndpoint.assertIsSatisfied();
+
+        KuduClient client = ikc.getClient();
+        KuduScanner scanner = 
client.newScannerBuilder(client.openTable("TestTable")).build();
+        int rows = 0;
+        while (scanner.hasMoreRows()) {
+            RowResultIterator iterator = scanner.nextRows();
+            while (iterator.hasNext()) {
+                RowResult result = iterator.next();
+                int id = result.getInt("id");
+                if (id == 5) {
+                    assertEquals("John", result.getString("name"));
+                } else if (id == 6) {
+                    assertEquals("Mary", result.getString("name"));
+                } else {
+                    fail("Unexpected record found");
+                }
+                rows++;
+            }
+        }
+        assertEquals(2, rows);
+    }
 }

Reply via email to