This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c4f820bf30 [INLONG-9225][Audit] Automatically create audit topic after service startup (#9226) c4f820bf30 is described below commit c4f820bf3092726cb6905086d7f15570118bc1e7 Author: LiJie20190102 <53458004+lijie20190...@users.noreply.github.com> AuthorDate: Tue Nov 28 11:19:41 2023 +0800 [INLONG-9225][Audit] Automatically create audit topic after service startup (#9226) Co-authored-by: lijie0203 <li...@qishudi.com> --- .../org/apache/inlong/audit/sink/KafkaSink.java | 65 ++++++++++++++++++++ .../inlong/audit/config/MessageQueueConfig.java | 6 ++ .../inlong/audit/service/consume/KafkaConsume.java | 70 +++++++++++++++++++++- inlong-audit/conf/application.properties | 3 + inlong-audit/conf/audit-proxy-kafka.conf | 6 ++ 5 files changed, 148 insertions(+), 2 deletions(-) diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java index 82a3d7fb8b..dc2c7c154d 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java @@ -34,19 +34,30 @@ import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.PulsarClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -62,6 +73,10 @@ public class KafkaSink extends AbstractSink implements Configurable { private String kafkaServerUrl; private static final String BOOTSTRAP_SERVER = "bootstrap_servers"; private static final String TOPIC = "topic"; + + private static final String TOPIC_REPLICATIONS = "topic_replications"; + + private static final String TOPIC_PARTITIONS = "topic_partitions"; private static final String RETRIES = "retries"; private static final String BATCH_SIZE = "batch_size"; private static final String LINGER_MS = "linger_ms"; @@ -81,6 +96,12 @@ public class KafkaSink extends AbstractSink implements Configurable { public Map<String, KafkaProducer<String, byte[]>> producerMap; private SinkCounter sinkCounter; private String topic; + + private int topicReplications; + private int topicPartitions; + private static final int DEFAULT_TOPIC_REPLICATIONS = 2; + private static final int DEFAULT_TOPIC_PARTITIONS = 3; + private volatile boolean canSend = false; private volatile boolean canTake = false; private int threadNum; @@ -227,6 +248,10 @@ public class KafkaSink extends AbstractSink implements Configurable { topic = context.getString(TOPIC); Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic specified"); + // topic config + topicPartitions = context.getInteger(TOPIC_PARTITIONS, DEFAULT_TOPIC_PARTITIONS); + topicReplications = context.getInteger(TOPIC_REPLICATIONS, DEFAULT_TOPIC_REPLICATIONS); + producerMap = new HashMap<>(); logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, DEFAULT_LOG_EVERY_N_EVENTS); @@ -274,6 +299,9 @@ public class KafkaSink extends AbstractSink implements Configurable { logger.error("topic is empty"); } + // create topic if need + createTopic(); + if (producer == null) { producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer()); } @@ -282,6 +310,43 @@ public class KafkaSink extends AbstractSink implements Configurable { logger.info(getName() + " success create producer"); } + /** + * create topic if need + */ + private void createTopic() { + + try (AdminClient adminClient = AdminClient.create(properties)) { + ListTopicsResult topicList = adminClient.listTopics(); + KafkaFuture<Set<String>> kafkaFuture = topicList.names(); + Set<String> topicSet = kafkaFuture.get(); + + if (topicSet.contains(topic)) { + // not need + logger.info("The audit topic:{} already exists.", topic); + return; + } + + DescribeClusterResult describeClusterResult = adminClient.describeCluster(); + Collection<Node> nodes = describeClusterResult.nodes().get(); + if (nodes.isEmpty()) { + throw new IllegalArgumentException("kafka server not find"); + } + + int partition = Math.min(topicPartitions, nodes.size()); + int factor = Math.min(topicReplications, nodes.size()); + + NewTopic needCreateTopic = new NewTopic(topic, partition, (short) factor); + + CreateTopicsResult createTopicsResult = + adminClient.createTopics(Collections.singletonList(needCreateTopic)); + createTopicsResult.all().get(); + + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException( + String.format("create audit topic:{} error with config:%s", properties)); + } + } + private KafkaProducer<String, byte[]> getProducer(String topic) { if (!producerMap.containsKey(topic)) { synchronized (this) { diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java index 05036ff552..2cfa4b1d34 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java @@ -72,6 +72,12 @@ public class MessageQueueConfig { @Value("${audit.kafka.topic:}") private String kafkaTopic; + @Value("${audit.kafka.topic.numPartitions:3}") + private String numPartitions; + + @Value("${audit.kafka.topic.replicationFactor:2}") + private String replicationFactor; + @Value("${audit.kafka.consumer.name:}") private String kafkaConsumerName; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java index 421147ef73..cc0d8cac21 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java @@ -23,10 +23,17 @@ import org.apache.inlong.audit.service.InsertData; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; @@ -34,9 +41,12 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; public class KafkaConsume extends BaseConsume { @@ -45,6 +55,9 @@ public class KafkaConsume extends BaseConsume { private String serverUrl; private String topic; + private static final int DEFAULT_NUM_PARTITIONS = 3; + private static final int DEFAULT_REPLICATION_FACTOR = 2; + /** * Constructor * @@ -67,6 +80,9 @@ public class KafkaConsume extends BaseConsume { Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaConsumerName()), "no kafka consume name specified"); + // create topic if need + createTopic(); + initConsumer(mqConfig); Thread thread = new Thread(new Fetcher(consumer, topic, isAutoCommit, mqConfig.getFetchWaitMs()), @@ -74,9 +90,60 @@ public class KafkaConsume extends BaseConsume { thread.start(); } + /** + * create topic if need + */ + private void createTopic() { + int numPartitions = DEFAULT_NUM_PARTITIONS; + if (StringUtils.isNotEmpty(mqConfig.getNumPartitions())) { + numPartitions = Integer.parseInt(mqConfig.getNumPartitions()); + } + + int replicationFactor = DEFAULT_REPLICATION_FACTOR; + if (StringUtils.isNotEmpty(mqConfig.getReplicationFactor())) { + replicationFactor = Integer.parseInt(mqConfig.getReplicationFactor()); + } + + try (AdminClient adminClient = AdminClient.create(getProperties(mqConfig))) { + ListTopicsResult topicList = adminClient.listTopics(); + KafkaFuture<Set<String>> kafkaFuture = topicList.names(); + Set<String> topicSet = kafkaFuture.get(); + + if (topicSet.contains(topic)) { + // not need + LOG.info("The audit topic:{} already exists.", topic); + return; + } + + DescribeClusterResult describeClusterResult = adminClient.describeCluster(); + Collection<Node> nodes = describeClusterResult.nodes().get(); + if (nodes.isEmpty()) { + throw new IllegalArgumentException("kafka server not find"); + } + + int partition = Math.min(numPartitions, nodes.size()); + int factor = Math.min(replicationFactor, nodes.size()); + + NewTopic needCreateTopic = new NewTopic(topic, partition, (short) factor); + + CreateTopicsResult createTopicsResult = + adminClient.createTopics(Collections.singletonList(needCreateTopic)); + createTopicsResult.all().get(); + + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException( + String.format("create audit topic:%s error with config:%s", topic, getProperties(mqConfig)), e); + } + } + protected void initConsumer(MessageQueueConfig mqConfig) { LOG.info("init kafka consumer, topic:{}, serverUrl:{}", topic, serverUrl); + Properties properties = getProperties(mqConfig); + consumer = new KafkaConsumer<>(properties); + consumer.subscribe(Collections.singleton(topic)); + } + private Properties getProperties(MessageQueueConfig mqConfig) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); properties.put(ConsumerConfig.GROUP_ID_CONFIG, mqConfig.getKafkaGroupId()); @@ -85,8 +152,7 @@ public class KafkaConsume extends BaseConsume { properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, mqConfig.getAutoOffsetReset()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - consumer = new KafkaConsumer<>(properties); - consumer.subscribe(Collections.singleton(topic)); + return properties; } public class Fetcher implements Runnable { diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index 41fd4e1e28..c76942da90 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -64,6 +64,9 @@ audit.tube.consumer.group.name=inlong-audit-consumer # kafka config audit.kafka.topic=inlong-audit +# create a topic if the topic does not exist. +audit.kafka.topic.numPartitions=3 +audit.kafka.topic.replicationFactor=2 audit.kafka.consumer.name=inlong-audit-consumer audit.kafka.group.id=audit-consumer-group diff --git a/inlong-audit/conf/audit-proxy-kafka.conf b/inlong-audit/conf/audit-proxy-kafka.conf index df14a6e6d1..8e700c31bf 100644 --- a/inlong-audit/conf/audit-proxy-kafka.conf +++ b/inlong-audit/conf/audit-proxy-kafka.conf @@ -59,6 +59,9 @@ agent1.channels.ch-msg2.fsyncInterval = 10 agent1.sinks.kafka-sink-msg1.channel = ch-msg1 agent1.sinks.kafka-sink-msg1.type = org.apache.inlong.audit.sink.KafkaSink agent1.sinks.kafka-sink-msg1.topic = inlong-audit +# create a topic if the topic does not exist. +agent1.sinks.kafka-sink-msg1.topic_replications = 2 +agent1.sinks.kafka-sink-msg1.topic_partitions = 3 agent1.sinks.kafka-sink-msg1.retries = 0 agent1.sinks.kafka-sink-msg1.batch_size = 16384 agent1.sinks.kafka-sink-msg1.linger_ms = 0 @@ -67,6 +70,9 @@ agent1.sinks.kafka-sink-msg1.buffer_memory = 33554432 agent1.sinks.kafka-sink-msg2.channel = ch-msg1 agent1.sinks.kafka-sink-msg2.type = org.apache.inlong.audit.sink.KafkaSink agent1.sinks.kafka-sink-msg2.topic = inlong-audit +# create a topic if the topic does not exist. +agent1.sinks.kafka-sink-msg2.topic_replications = 2 +agent1.sinks.kafka-sink-msg2.topic_partitions = 3 agent1.sinks.kafka-sink-msg2.retries = 0 agent1.sinks.kafka-sink-msg2.batch_size = 16384 agent1.sinks.kafka-sink-msg2.linger_ms = 0