fgerlits commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503848429
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext
*context, core::ProcessSessionF
ListenHTTP::~ListenHTTP() = default;
void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession
*session) {
- std::shared_ptr<core::FlowFile> flow_file = session->get();
+ logger_->log_debug("OnTrigger ListenHTTP");
+ processIncomingFlowFile(session);
+ processRequestBuffer(session);
+}
- // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
return;
}
std::string type;
flow_file->getAttribute("http.type", type);
- if (type == "response_body") {
-
- if (handler_) {
- struct response_body response { "", "", "" };
- ResponseBodyReadCallback cb(&response.body);
- flow_file->getAttribute("filename", response.uri);
- flow_file->getAttribute("mime.type", response.mime_type);
- if (response.mime_type.empty()) {
- logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
- response.mime_type = "application/octet-stream";
- }
- session->read(flow_file, &cb);
- handler_->set_response_body(std::move(response));
+ if (type == "response_body" && handler_) {
+ response_body response;
+ ResponseBodyReadCallback cb(&response.body);
+ flow_file->getAttribute("filename", response.uri);
+ flow_file->getAttribute("mime.type", response.mime_type);
+ if (response.mime_type.empty()) {
+ logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
+ response.mime_type = "application/octet-stream";
}
+ session->read(flow_file, &cb);
+ handler_->setResponseBody(std::move(response));
}
session->remove(flow_file);
}
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, core::ProcessSessionFactory *session_factory, std::string
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+ std::size_t flow_file_count = 0;
+ for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count)
{
+ FlowFileBufferPair flow_file_buffer_pair;
+ if (!handler_->request_buffer.tryDequeue(flow_file_buffer_pair)) {
+ break;
Review comment:
when can this happen? should we log an error?
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -191,7 +206,15 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
core::ProcessSessionF
}
server_.reset(new CivetServer(options, &callbacks_, &logger_));
- handler_.reset(new Handler(basePath, context, sessionFactory,
std::move(authDNPattern), std::move(headersAsAttributesPattern)));
+
+ context->getProperty(BatchSize.getName(), batch_size_);
+ logger_->log_debug("ListenHTTP using %s: %d", BatchSize.getName(),
batch_size_);
+
+ std::size_t buffer_size;
+ context->getProperty(BufferSize.getName(), buffer_size);
+ logger_->log_debug("ListenHTTP using %s: %d", BufferSize.getName(),
buffer_size);
+
+ handler_.reset(new Handler(basePath, context, std::move(authDNPattern),
std::move(headersAsAttributesPattern), buffer_size));
Review comment:
why does the `Handler` need the `buffer_size` parameter? it is already
set in the context
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const
mg_request_info *req_info,
}
}
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
+ auto flow_file = std::make_shared<FlowFileRecord>();
+ auto flow_version =
process_context_->getProcessorNode()->getFlowIdentifier();
+ if (flow_version != nullptr) {
+ flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
+ }
+
+ if (!flow_file) {
+ sendHttp500(conn);
Review comment:
it would be good to log an error or warning here
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const
mg_request_info *req_info,
}
}
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
+ auto flow_file = std::make_shared<FlowFileRecord>();
+ auto flow_version =
process_context_->getProcessorNode()->getFlowIdentifier();
+ if (flow_version != nullptr) {
+ flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
+ }
+
+ if (!flow_file) {
Review comment:
this should be checked earlier, before `flow_file` is dereferenced in
line 332
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -456,19 +473,28 @@ int64_t
ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea
}
// Read a buffer of data from client
- rlen = mg_read(conn_, &buf[0], (size_t) rlen);
+ rlen = mg_read(conn, &buf[0], (size_t) rlen);
if (rlen <= 0) {
break;
}
// Transfer buffer data to the output stream
- stream->write(&buf[0], gsl::narrow<int>(rlen));
+ content_buffer->write(&buf[0], gsl::narrow<int>(rlen));
nlen += rlen;
}
- return nlen;
+ return content_buffer;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream>
request_content)
+ : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger())
+ , request_content_(std::move(request_content)) {
+}
+
+int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream>
stream) {
Review comment:
the parameter type could be `const std::shared_ptr<io::BaseStream> &`
(or `io::BaseStream &`)
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext
*context, core::ProcessSessionF
ListenHTTP::~ListenHTTP() = default;
void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession
*session) {
- std::shared_ptr<core::FlowFile> flow_file = session->get();
+ logger_->log_debug("OnTrigger ListenHTTP");
+ processIncomingFlowFile(session);
+ processRequestBuffer(session);
+}
- // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
return;
}
std::string type;
flow_file->getAttribute("http.type", type);
- if (type == "response_body") {
-
- if (handler_) {
- struct response_body response { "", "", "" };
- ResponseBodyReadCallback cb(&response.body);
- flow_file->getAttribute("filename", response.uri);
- flow_file->getAttribute("mime.type", response.mime_type);
- if (response.mime_type.empty()) {
- logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
- response.mime_type = "application/octet-stream";
- }
- session->read(flow_file, &cb);
- handler_->set_response_body(std::move(response));
+ if (type == "response_body" && handler_) {
+ response_body response;
+ ResponseBodyReadCallback cb(&response.body);
+ flow_file->getAttribute("filename", response.uri);
+ flow_file->getAttribute("mime.type", response.mime_type);
+ if (response.mime_type.empty()) {
+ logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
+ response.mime_type = "application/octet-stream";
}
+ session->read(flow_file, &cb);
+ handler_->setResponseBody(std::move(response));
}
session->remove(flow_file);
}
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, core::ProcessSessionFactory *session_factory, std::string
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+ std::size_t flow_file_count = 0;
+ for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count)
{
+ FlowFileBufferPair flow_file_buffer_pair;
+ if (!handler_->request_buffer.tryDequeue(flow_file_buffer_pair)) {
+ break;
+ }
+
+ auto flow_file = flow_file_buffer_pair.first;
+ session->add(flow_file);
+
+ if (flow_file_buffer_pair.second) {
+ WriteCallback callback(std::move(flow_file_buffer_pair.second));
+ session->write(flow_file, &callback);
+ }
+
+ session->transfer(flow_file, Success);
+ }
+
+ logger_->log_debug("ListenHTTP transferred %d flow files from HTTP request
buffer", flow_file_count);
+}
+
+ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, std::string &&auth_dn_regex, std::string &&header_as_attrs_regex,
std::size_t buffer_size)
: base_uri_(std::move(base_uri)),
auth_dn_regex_(std::move(auth_dn_regex)),
headers_as_attrs_regex_(std::move(header_as_attrs_regex)),
- logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
- process_context_ = context;
- session_factory_ = session_factory;
+ process_context_(context),
+ logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()),
+ buffer_size_(buffer_size) {
}
-void ListenHTTP::Handler::send_error_response(struct mg_connection *conn) {
+void ListenHTTP::Handler::sendHttp500(mg_connection* const conn) {
mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
- "Content-Type: text/html\r\n"
- "Content-Length: 0\r\n\r\n");
+ "Content-Type: text/html\r\n"
+ "Content-Length: 0\r\n\r\n");
}
-void ListenHTTP::Handler::set_header_attributes(const mg_request_info
*req_info, const std::shared_ptr<core::FlowFile> &flow_file) const {
+void ListenHTTP::Handler::sendHttp503(mg_connection* const conn) {
+ mg_printf(conn, "HTTP/1.1 503 Service Unavailable\r\n"
+ "Content-Type: text/html\r\n"
+ "Content-Length: 0\r\n\r\n");
Review comment:
just out of curiosity, why do we use Windows line endings here?
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const
mg_request_info *req_info,
}
}
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
+ auto flow_file = std::make_shared<FlowFileRecord>();
+ auto flow_version =
process_context_->getProcessorNode()->getFlowIdentifier();
+ if (flow_version != nullptr) {
+ flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
+ }
+
+ if (!flow_file) {
+ sendHttp500(conn);
+ return true;
Review comment:
this function always returns true; either the error cases could return
false, or the function could be void
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext
*context, core::ProcessSessionF
ListenHTTP::~ListenHTTP() = default;
void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession
*session) {
- std::shared_ptr<core::FlowFile> flow_file = session->get();
+ logger_->log_debug("OnTrigger ListenHTTP");
+ processIncomingFlowFile(session);
+ processRequestBuffer(session);
+}
- // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
return;
}
std::string type;
flow_file->getAttribute("http.type", type);
- if (type == "response_body") {
-
- if (handler_) {
- struct response_body response { "", "", "" };
- ResponseBodyReadCallback cb(&response.body);
- flow_file->getAttribute("filename", response.uri);
- flow_file->getAttribute("mime.type", response.mime_type);
- if (response.mime_type.empty()) {
- logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
- response.mime_type = "application/octet-stream";
- }
- session->read(flow_file, &cb);
- handler_->set_response_body(std::move(response));
+ if (type == "response_body" && handler_) {
+ response_body response;
+ ResponseBodyReadCallback cb(&response.body);
+ flow_file->getAttribute("filename", response.uri);
+ flow_file->getAttribute("mime.type", response.mime_type);
+ if (response.mime_type.empty()) {
+ logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
+ response.mime_type = "application/octet-stream";
}
+ session->read(flow_file, &cb);
+ handler_->setResponseBody(std::move(response));
}
session->remove(flow_file);
}
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, core::ProcessSessionFactory *session_factory, std::string
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+ std::size_t flow_file_count = 0;
+ for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count)
{
+ FlowFileBufferPair flow_file_buffer_pair;
+ if (!handler_->request_buffer.tryDequeue(flow_file_buffer_pair)) {
+ break;
+ }
+
+ auto flow_file = flow_file_buffer_pair.first;
+ session->add(flow_file);
+
+ if (flow_file_buffer_pair.second) {
+ WriteCallback callback(std::move(flow_file_buffer_pair.second));
+ session->write(flow_file, &callback);
+ }
+
+ session->transfer(flow_file, Success);
+ }
+
+ logger_->log_debug("ListenHTTP transferred %d flow files from HTTP request
buffer", flow_file_count);
Review comment:
minor, but `%d` should be `%zu`
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const
mg_request_info *req_info,
}
}
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
+ auto flow_file = std::make_shared<FlowFileRecord>();
+ auto flow_version =
process_context_->getProcessorNode()->getFlowIdentifier();
+ if (flow_version != nullptr) {
+ flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
+ }
+
+ if (!flow_file) {
+ sendHttp500(conn);
+ return true;
+ }
+
+ setHeaderAttributes(req_info, flow_file);
+
+ if (buffer_size_ == 0 || request_buffer.size() < buffer_size_) {
+ request_buffer.enqueue(std::make_pair(std::move(flow_file),
std::move(content_buffer)));
+ } else {
+ logger_->log_warn("ListenHTTP buffer is full");
Review comment:
can we add some info about the message we're dropping? eg. `"ListenHTTP
buffer is full, message %s is dropped", req_info->something()`
##########
File path: extensions/civetweb/processors/ListenHTTP.h
##########
@@ -112,20 +115,22 @@ class ListenHTTP : public core::Processor {
}
}
+ std::size_t buffer_size_;
+ utils::ConcurrentQueue<FlowFileBufferPair> request_buffer;
Review comment:
member variable names should end with an underscore
EDIT: not sure if that's true if they are public
EDIT2: but is it a good idea to have public member variables?
##########
File path: libminifi/src/io/FileStream.cpp
##########
@@ -116,7 +116,12 @@ int FileStream::read(uint8_t *buf, int buflen) {
file_stream_->clear();
file_stream_->seekg(0, file_stream_->end);
file_stream_->seekp(0, file_stream_->end);
- size_t len = gsl::narrow<size_t>(file_stream_->tellg());
+ auto tellg_result = file_stream_->tellg();
+ if (tellg_result < 0) {
+ logging::LOG_ERROR(logger_) << "Tellg call on file stream failed.";
+ return -1;
Review comment:
:+1:
##########
File path: extensions/civetweb/processors/ListenHTTP.h
##########
@@ -155,18 +160,15 @@ class ListenHTTP : public core::Processor {
// Write callback for transferring data from HTTP request to content repo
class WriteCallback : public OutputStreamCallback {
public:
- WriteCallback(struct mg_connection *conn, const struct mg_request_info
*reqInfo);
+ WriteCallback(std::unique_ptr<io::BufferStream>);
int64_t process(std::shared_ptr<io::BaseStream> stream);
private:
- // Logger
std::shared_ptr<logging::Logger> logger_;
-
- struct mg_connection *conn_;
- const struct mg_request_info *req_info_;
+ std::shared_ptr<io::BufferStream> request_content_;
Review comment:
could this be a `unique_ptr` instead of a `shared_ptr`?
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -456,19 +473,28 @@ int64_t
ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea
}
// Read a buffer of data from client
- rlen = mg_read(conn_, &buf[0], (size_t) rlen);
+ rlen = mg_read(conn, &buf[0], (size_t) rlen);
if (rlen <= 0) {
break;
}
// Transfer buffer data to the output stream
- stream->write(&buf[0], gsl::narrow<int>(rlen));
+ content_buffer->write(&buf[0], gsl::narrow<int>(rlen));
nlen += rlen;
}
- return nlen;
+ return content_buffer;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream>
request_content)
+ : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger())
Review comment:
(I know it is old code, but) this `logger_` member doesn't seem to be
used anywhere, so could be removed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]