[
https://issues.apache.org/jira/browse/MINIFICPP-274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251468#comment-16251468
]
ASF GitHub Bot commented on MINIFICPP-274:
------------------------------------------
Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/182#discussion_r150850075
--- Diff: extensions/librdkafka/PutKafka.h ---
@@ -0,0 +1,182 @@
+/**
+ * @file PutKafka.h
+ * PutKafka class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUT_KAFKA_H__
+#define __PUT_KAFKA_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define COMPRESSION_CODEC_NONE "none"
+#define COMPRESSION_CODEC_GZIP "gzip"
+#define COMPRESSION_CODEC_SNAPPY "snappy"
+#define ROUND_ROBIN_PARTITIONING "Round Robin"
+#define RANDOM_PARTITIONING "Random Robin"
+#define USER_DEFINED_PARTITIONING "User-Defined"
+#define DELIVERY_REPLICATED "all"
+#define DELIVERY_ONE_NODE "1"
+#define DELIVERY_BEST_EFFORT "0"
+#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define SECURITY_PROTOCOL_SSL "ssl"
+#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
+#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
+
+// PutKafka Class
+class PutKafka: public core::Processor {
+public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ explicit PutKafka(std::string name, uuid_t uuid = NULL) :
+ core::Processor(name, uuid),
logger_(logging::LoggerFactory<PutKafka>::getLogger()) {
+ conf_ = nullptr;
+ rk_ = nullptr;
+ topic_conf_ = nullptr;
+ rkt_ = nullptr;
+ }
+ // Destructor
+ virtual ~PutKafka() {
+ if (rk_)
+ rd_kafka_flush(rk_, 10*1000); /* wait for max 10 seconds */
+ if (rkt_)
+ rd_kafka_topic_destroy(rkt_);
+ if (rk_)
+ rd_kafka_destroy(rk_);
+ }
+ // Processor Name
+ static constexpr char const* ProcessorName = "PutKafka";
+ // Supported Properties
+ static core::Property SeedBrokers;
+ static core::Property Topic;
+ static core::Property DeliveryGuarantee;
+ static core::Property MaxMessageSize;
+ static core::Property RequestTimeOut;
+ static core::Property ClientName;
+ static core::Property BatchSize;
+ static core::Property QueueBufferMaxTime;
+ static core::Property QueueBufferMaxSize;
+ static core::Property QueueBufferMaxMessage;
+ static core::Property CompressCodec;
+ static core::Property MaxFlowSegSize;
+ static core::Property SecurityProtocol;
+ static core::Property SecurityCA;
+ static core::Property SecurityCert;
+ static core::Property SecurityPrivateKey;
+ static core::Property SecurityPrivateKeyPassWord;
+
+ // Supported Relationships
+ static core::Relationship Failure;
+ static core::Relationship Success;
+
+ // Nest Callback Class for read stream
+ class ReadCallback: public InputStreamCallback {
+ public:
+ ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const
std::string &key, rd_kafka_topic_t *rkt) :
+ flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key),
rkt_(rkt) {
+ status_ = 0;
+ read_size_ = 0;
+ }
+ ~ReadCallback() {
+ }
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ if (flow_size_ < max_seg_size_)
+ max_seg_size_ = flow_size_;
+ std::unique_ptr<unsigned char[]> buffer(new unsigned
char[max_seg_size_]());
--- End diff --
why not use a vector without the need for new?
> add kafka producer
> ------------------
>
> Key: MINIFICPP-274
> URL: https://issues.apache.org/jira/browse/MINIFICPP-274
> Project: NiFi MiNiFi C++
> Issue Type: New Feature
> Affects Versions: 0.3.0
> Reporter: bqiu
> Assignee: bqiu
> Priority: Minor
>
> Add kafka producer for minifi C++
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)