szaszm commented on code in PR #1875: URL: https://github.com/apache/nifi-minifi-cpp/pull/1875#discussion_r1791798482
########## NOTICE: ########## @@ -73,6 +73,9 @@ This software includes third party software subject to the following copyrights: - RE2 - Copyright (c) 2009 The RE2 Authors. All rights reserved. - c-ares - Copyright (c) 1998 Massachusetts Institute of Technology Copyright (c) 2007 - 2023 Daniel Stenberg with many contributors, see AUTHORS file. - lua - Copyright (c) 1994–2023 Lua.org, PUC-Rio. +- couchbase-cxx-client - Copyright 2023-Present Couchbase, Inc. +- snappy - Copyright 2011, Google Inc. Review Comment: I don't see snappy mentioned in the LICENSE file, where does it come from? ########## docker/test/integration/cluster/containers/CouchbaseServerContainer.py: ########## @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from .Container import Container +from utils import retry_check + + +class CouchbaseServerContainer(Container): + def __init__(self, feature_context, name, vols, network, image_store, command=None): + super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command) + + def get_startup_finished_log_entry(self): + return "logs available in" + + @retry_check(15, 2) + def run_post_startup_commands(self): + if self.post_startup_commands_finished: + return True + + commands = [ + ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", + "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], + ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", + "--bucket-ramsize", "1024"] + ] + for command in commands: + (code, _) = self.client.containers.get(self.name).exec_run(command) + if code != 0: + return False + self.post_startup_commands_finished = True + return True + + def deploy(self): + if not self.set_deployed(): + return + + port_list = [*range(8091, 8098), 9123, 11207, 11210, 11280, *range(18091, 18097)] Review Comment: why so many ports? ########## extensions/couchbase/controllerservices/CouchbaseClusterService.cpp: ########## @@ -0,0 +1,122 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CouchbaseClusterService.h" + +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::couchbase { + +nonstd::expected<CouchbaseUpsertResult, std::error_code> RemoteCouchbaseCollection::upsert(const std::string& document_id, const std::vector<std::byte>& buffer, + const ::couchbase::upsert_options& options) { + auto [err, resp] = collection_.upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get(); + if (err.ec()) { + client_.setConnectionError(); + return nonstd::make_unexpected(err.ec()); + } else { + const uint64_t partition_uuid = (resp.mutation_token().has_value() ? resp.mutation_token()->partition_uuid() : 0); + const uint64_t sequence_number = (resp.mutation_token().has_value() ? resp.mutation_token()->sequence_number() : 0); + const uint16_t partition_id = (resp.mutation_token().has_value() ? resp.mutation_token()->partition_id() : 0); + return CouchbaseUpsertResult { + collection_.bucket_name(), + resp.cas().value(), + partition_uuid, + sequence_number, + partition_id + }; + } +} + +std::unique_ptr<CouchbaseCollection> CouchBaseClient::getCollection(std::string_view bucket_name, std::string_view scope_name, std::string_view collection_name) { + if (!establishConnection()) { + return nullptr; + } + return std::make_unique<RemoteCouchbaseCollection>(cluster_.bucket(bucket_name).scope(scope_name).collection(collection_name), *this); +} + +void CouchBaseClient::setConnectionError() { + std::lock_guard<std::mutex> lock(state_mutex_); + state_ = State::UNKNOWN; +} + +void CouchBaseClient::close() { + std::lock_guard<std::mutex> lock(state_mutex_); + if (state_ == State::CONNECTED || state_ == State::UNKNOWN) { + cluster_.close().wait(); + state_ = State::DISCONNECTED; + } +} + +bool CouchBaseClient::establishConnection() { + std::lock_guard<std::mutex> lock(state_mutex_); + if (state_ == State::CONNECTED) { + return true; + } + + if (state_ == State::UNKNOWN) { + auto [err, resp] = cluster_.ping().get(); Review Comment: Do they really call connection opening as "ping"? :sweat_smile: If not, I think we shouldn't set the state to connected. ########## docker/test/integration/cluster/containers/CouchbaseServerContainer.py: ########## @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from .Container import Container +from utils import retry_check + + +class CouchbaseServerContainer(Container): + def __init__(self, feature_context, name, vols, network, image_store, command=None): + super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command) + + def get_startup_finished_log_entry(self): + return "logs available in" Review Comment: It would be nice to add some context to this, to see how this message makes sense as a "startup finished" message. ########## extensions/couchbase/controllerservices/CouchbaseClusterService.cpp: ########## @@ -0,0 +1,122 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CouchbaseClusterService.h" + +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::couchbase { + +nonstd::expected<CouchbaseUpsertResult, std::error_code> RemoteCouchbaseCollection::upsert(const std::string& document_id, const std::vector<std::byte>& buffer, + const ::couchbase::upsert_options& options) { + auto [err, resp] = collection_.upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get(); + if (err.ec()) { + client_.setConnectionError(); + return nonstd::make_unexpected(err.ec()); + } else { + const uint64_t partition_uuid = (resp.mutation_token().has_value() ? resp.mutation_token()->partition_uuid() : 0); + const uint64_t sequence_number = (resp.mutation_token().has_value() ? resp.mutation_token()->sequence_number() : 0); + const uint16_t partition_id = (resp.mutation_token().has_value() ? resp.mutation_token()->partition_id() : 0); + return CouchbaseUpsertResult { + collection_.bucket_name(), + resp.cas().value(), + partition_uuid, + sequence_number, + partition_id + }; + } +} + +std::unique_ptr<CouchbaseCollection> CouchBaseClient::getCollection(std::string_view bucket_name, std::string_view scope_name, std::string_view collection_name) { + if (!establishConnection()) { + return nullptr; + } + return std::make_unique<RemoteCouchbaseCollection>(cluster_.bucket(bucket_name).scope(scope_name).collection(collection_name), *this); +} + +void CouchBaseClient::setConnectionError() { + std::lock_guard<std::mutex> lock(state_mutex_); + state_ = State::UNKNOWN; +} + +void CouchBaseClient::close() { + std::lock_guard<std::mutex> lock(state_mutex_); + if (state_ == State::CONNECTED || state_ == State::UNKNOWN) { + cluster_.close().wait(); + state_ = State::DISCONNECTED; + } +} + +bool CouchBaseClient::establishConnection() { + std::lock_guard<std::mutex> lock(state_mutex_); + if (state_ == State::CONNECTED) { + return true; + } + + if (state_ == State::UNKNOWN) { + auto [err, resp] = cluster_.ping().get(); + if (err.ec()) { + close(); + } else { + state_ = State::CONNECTED; + return true; + } + } + + auto options = ::couchbase::cluster_options(username_, password_); + auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, options).get(); + if (connect_err.ec()) { + logger_->log_error("Failed to connect to Couchbase cluster: {}", connect_err.message()); + return false; + } + cluster_ = std::move(cluster); + state_ = State::CONNECTED; + return true; +} + +namespace controllers { + +void CouchbaseClusterService::initialize() { + setSupportedProperties(Properties); +} + +void CouchbaseClusterService::onEnable() { + std::string connection_string; + getProperty(ConnectionString, connection_string); + std::string username; + getProperty(UserName, username); + std::string password; + getProperty(UserPassword, password); + client_ = std::make_unique<CouchBaseClient>(connection_string, username, password, logger_); Review Comment: It is ensured anywhere that obviously wrong values, like empty, are checked and result in errors? ########## thirdparty/bustache/add-append.patch: ########## Review Comment: You could add some context above the diff. Normal text should be ignored by `patch` there, there is a similar patch somewhere in the tree. ########## libminifi/test/unit/LoggerTests.cpp: ########## @@ -384,7 +384,7 @@ TEST_CASE("fmt formatting works with the logger") { logger->log_critical("{} in hex is {:#x}", 13, 13); logger->log_critical("Unix epoch: {}", std::chrono::system_clock::time_point()); logger->log_critical("{:%Q %q} equals to {:%Q %q}", 2h, std::chrono::duration_cast<std::chrono::seconds>(2h)); - CHECK(LogTestController::getInstance().contains("[org::apache::nifi::minifi::core::logging::Logger] [critical] 1m equals to 60s")); + CHECK(LogTestController::getInstance().contains("[org::apache::nifi::minifi::core::logging::Logger] [critical] 1min equals to 60s")); Review Comment: There was a similar change in #1775 , it might be related -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
