This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f820bf30 [INLONG-9225][Audit] Automatically create audit topic after 
service startup (#9226)
c4f820bf30 is described below

commit c4f820bf3092726cb6905086d7f15570118bc1e7
Author: LiJie20190102 <53458004+lijie20190...@users.noreply.github.com>
AuthorDate: Tue Nov 28 11:19:41 2023 +0800

    [INLONG-9225][Audit] Automatically create audit topic after service startup 
(#9226)
    
    Co-authored-by: lijie0203 <li...@qishudi.com>
---
 .../org/apache/inlong/audit/sink/KafkaSink.java    | 65 ++++++++++++++++++++
 .../inlong/audit/config/MessageQueueConfig.java    |  6 ++
 .../inlong/audit/service/consume/KafkaConsume.java | 70 +++++++++++++++++++++-
 inlong-audit/conf/application.properties           |  3 +
 inlong-audit/conf/audit-proxy-kafka.conf           |  6 ++
 5 files changed, 148 insertions(+), 2 deletions(-)

diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
index 82a3d7fb8b..dc2c7c154d 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
@@ -34,19 +34,30 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -62,6 +73,10 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     private String kafkaServerUrl;
     private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
     private static final String TOPIC = "topic";
+
+    private static final String TOPIC_REPLICATIONS = "topic_replications";
+
+    private static final String TOPIC_PARTITIONS = "topic_partitions";
     private static final String RETRIES = "retries";
     private static final String BATCH_SIZE = "batch_size";
     private static final String LINGER_MS = "linger_ms";
@@ -81,6 +96,12 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     public Map<String, KafkaProducer<String, byte[]>> producerMap;
     private SinkCounter sinkCounter;
     private String topic;
+
+    private int topicReplications;
+    private int topicPartitions;
+    private static final int DEFAULT_TOPIC_REPLICATIONS = 2;
+    private static final int DEFAULT_TOPIC_PARTITIONS = 3;
+
     private volatile boolean canSend = false;
     private volatile boolean canTake = false;
     private int threadNum;
@@ -227,6 +248,10 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
         topic = context.getString(TOPIC);
         Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic 
specified");
 
+        // topic config
+        topicPartitions = context.getInteger(TOPIC_PARTITIONS, 
DEFAULT_TOPIC_PARTITIONS);
+        topicReplications = context.getInteger(TOPIC_REPLICATIONS, 
DEFAULT_TOPIC_REPLICATIONS);
+
         producerMap = new HashMap<>();
 
         logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, 
DEFAULT_LOG_EVERY_N_EVENTS);
@@ -274,6 +299,9 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
             logger.error("topic is empty");
         }
 
+        // create topic if need
+        createTopic();
+
         if (producer == null) {
             producer = new KafkaProducer<>(properties, new StringSerializer(), 
new ByteArraySerializer());
         }
@@ -282,6 +310,43 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
         logger.info(getName() + " success create producer");
     }
 
+    /**
+     * create topic if need
+     */
+    private void createTopic() {
+
+        try (AdminClient adminClient = AdminClient.create(properties)) {
+            ListTopicsResult topicList = adminClient.listTopics();
+            KafkaFuture<Set<String>> kafkaFuture = topicList.names();
+            Set<String> topicSet = kafkaFuture.get();
+
+            if (topicSet.contains(topic)) {
+                // not need
+                logger.info("The audit topic:{} already exists.", topic);
+                return;
+            }
+
+            DescribeClusterResult describeClusterResult = 
adminClient.describeCluster();
+            Collection<Node> nodes = describeClusterResult.nodes().get();
+            if (nodes.isEmpty()) {
+                throw new IllegalArgumentException("kafka server not find");
+            }
+
+            int partition = Math.min(topicPartitions, nodes.size());
+            int factor = Math.min(topicReplications, nodes.size());
+
+            NewTopic needCreateTopic = new NewTopic(topic, partition, (short) 
factor);
+
+            CreateTopicsResult createTopicsResult =
+                    
adminClient.createTopics(Collections.singletonList(needCreateTopic));
+            createTopicsResult.all().get();
+
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(
+                    String.format("create audit topic:{} error with 
config:%s", properties));
+        }
+    }
+
     private KafkaProducer<String, byte[]> getProducer(String topic) {
         if (!producerMap.containsKey(topic)) {
             synchronized (this) {
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
index 05036ff552..2cfa4b1d34 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
@@ -72,6 +72,12 @@ public class MessageQueueConfig {
     @Value("${audit.kafka.topic:}")
     private String kafkaTopic;
 
+    @Value("${audit.kafka.topic.numPartitions:3}")
+    private String numPartitions;
+
+    @Value("${audit.kafka.topic.replicationFactor:2}")
+    private String replicationFactor;
+
     @Value("${audit.kafka.consumer.name:}")
     private String kafkaConsumerName;
 
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
index 421147ef73..cc0d8cac21 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
@@ -23,10 +23,17 @@ import org.apache.inlong.audit.service.InsertData;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
@@ -34,9 +41,12 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 public class KafkaConsume extends BaseConsume {
 
@@ -45,6 +55,9 @@ public class KafkaConsume extends BaseConsume {
     private String serverUrl;
     private String topic;
 
+    private static final int DEFAULT_NUM_PARTITIONS = 3;
+    private static final int DEFAULT_REPLICATION_FACTOR = 2;
+
     /**
      * Constructor
      *
@@ -67,6 +80,9 @@ public class KafkaConsume extends BaseConsume {
         
Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaConsumerName()),
                 "no kafka consume name specified");
 
+        // create topic if need
+        createTopic();
+
         initConsumer(mqConfig);
 
         Thread thread = new Thread(new Fetcher(consumer, topic, isAutoCommit, 
mqConfig.getFetchWaitMs()),
@@ -74,9 +90,60 @@ public class KafkaConsume extends BaseConsume {
         thread.start();
     }
 
+    /**
+     * create topic if need
+     */
+    private void createTopic() {
+        int numPartitions = DEFAULT_NUM_PARTITIONS;
+        if (StringUtils.isNotEmpty(mqConfig.getNumPartitions())) {
+            numPartitions = Integer.parseInt(mqConfig.getNumPartitions());
+        }
+
+        int replicationFactor = DEFAULT_REPLICATION_FACTOR;
+        if (StringUtils.isNotEmpty(mqConfig.getReplicationFactor())) {
+            replicationFactor = 
Integer.parseInt(mqConfig.getReplicationFactor());
+        }
+
+        try (AdminClient adminClient = 
AdminClient.create(getProperties(mqConfig))) {
+            ListTopicsResult topicList = adminClient.listTopics();
+            KafkaFuture<Set<String>> kafkaFuture = topicList.names();
+            Set<String> topicSet = kafkaFuture.get();
+
+            if (topicSet.contains(topic)) {
+                // not need
+                LOG.info("The audit topic:{} already exists.", topic);
+                return;
+            }
+
+            DescribeClusterResult describeClusterResult = 
adminClient.describeCluster();
+            Collection<Node> nodes = describeClusterResult.nodes().get();
+            if (nodes.isEmpty()) {
+                throw new IllegalArgumentException("kafka server not find");
+            }
+
+            int partition = Math.min(numPartitions, nodes.size());
+            int factor = Math.min(replicationFactor, nodes.size());
+
+            NewTopic needCreateTopic = new NewTopic(topic, partition, (short) 
factor);
+
+            CreateTopicsResult createTopicsResult =
+                    
adminClient.createTopics(Collections.singletonList(needCreateTopic));
+            createTopicsResult.all().get();
+
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(
+                    String.format("create audit topic:%s error with 
config:%s", topic, getProperties(mqConfig)), e);
+        }
+    }
+
     protected void initConsumer(MessageQueueConfig mqConfig) {
         LOG.info("init kafka consumer, topic:{}, serverUrl:{}", topic, 
serverUrl);
+        Properties properties = getProperties(mqConfig);
+        consumer = new KafkaConsumer<>(properties);
+        consumer.subscribe(Collections.singleton(topic));
+    }
 
+    private Properties getProperties(MessageQueueConfig mqConfig) {
         Properties properties = new Properties();
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
         properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
mqConfig.getKafkaGroupId());
@@ -85,8 +152,7 @@ public class KafkaConsume extends BaseConsume {
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
mqConfig.getAutoOffsetReset());
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-        consumer = new KafkaConsumer<>(properties);
-        consumer.subscribe(Collections.singleton(topic));
+        return properties;
     }
 
     public class Fetcher implements Runnable {
diff --git a/inlong-audit/conf/application.properties 
b/inlong-audit/conf/application.properties
index 41fd4e1e28..c76942da90 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -64,6 +64,9 @@ audit.tube.consumer.group.name=inlong-audit-consumer
 
 # kafka config
 audit.kafka.topic=inlong-audit
+# create a topic if the topic does not exist.
+audit.kafka.topic.numPartitions=3
+audit.kafka.topic.replicationFactor=2
 audit.kafka.consumer.name=inlong-audit-consumer
 audit.kafka.group.id=audit-consumer-group
 
diff --git a/inlong-audit/conf/audit-proxy-kafka.conf 
b/inlong-audit/conf/audit-proxy-kafka.conf
index df14a6e6d1..8e700c31bf 100644
--- a/inlong-audit/conf/audit-proxy-kafka.conf
+++ b/inlong-audit/conf/audit-proxy-kafka.conf
@@ -59,6 +59,9 @@ agent1.channels.ch-msg2.fsyncInterval = 10
 agent1.sinks.kafka-sink-msg1.channel = ch-msg1
 agent1.sinks.kafka-sink-msg1.type =  org.apache.inlong.audit.sink.KafkaSink
 agent1.sinks.kafka-sink-msg1.topic = inlong-audit
+# create a topic if the topic does not exist.
+agent1.sinks.kafka-sink-msg1.topic_replications = 2
+agent1.sinks.kafka-sink-msg1.topic_partitions = 3
 agent1.sinks.kafka-sink-msg1.retries = 0
 agent1.sinks.kafka-sink-msg1.batch_size = 16384
 agent1.sinks.kafka-sink-msg1.linger_ms = 0
@@ -67,6 +70,9 @@ agent1.sinks.kafka-sink-msg1.buffer_memory = 33554432
 agent1.sinks.kafka-sink-msg2.channel = ch-msg1
 agent1.sinks.kafka-sink-msg2.type =  org.apache.inlong.audit.sink.KafkaSink
 agent1.sinks.kafka-sink-msg2.topic = inlong-audit
+# create a topic if the topic does not exist.
+agent1.sinks.kafka-sink-msg2.topic_replications = 2
+agent1.sinks.kafka-sink-msg2.topic_partitions = 3
 agent1.sinks.kafka-sink-msg2.retries = 0
 agent1.sinks.kafka-sink-msg2.batch_size = 16384
 agent1.sinks.kafka-sink-msg2.linger_ms = 0

Reply via email to