adamdebreceni commented on code in PR #1945:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1945#discussion_r2041888227


##########
libminifi/src/core/repository/VolatileContentRepository.cpp:
##########
@@ -17,142 +16,133 @@
  */
 
 #include "core/repository/VolatileContentRepository.h"
+#include "core/logging/LoggerFactory.h"
 
-#include <cstdio>
-#include <memory>
-#include <string>
-#include <thread>
+namespace org::apache::nifi::minifi::core::repository {
 
-#include "core/expect.h"
-#include "io/FileStream.h"
-#include "utils/StringUtils.h"
+namespace {
 
-using namespace std::literals::chrono_literals;
+class StringRefStream : public io::BaseStream {
+ public:
+  StringRefStream(std::shared_ptr<std::string> data, std::mutex& 
data_store_mtx, std::shared_ptr<std::string>& data_store, std::atomic<size_t>& 
total_size)
+      : data_(std::move(data)), data_store_mtx_(data_store_mtx), 
data_store_(data_store), total_size_(total_size) {}
 
-namespace org::apache::nifi::minifi::core::repository {
+  [[nodiscard]] size_t size() const override {
+    return data_->size();
+  }
+
+  size_t read(std::span<std::byte> out_buffer) override {
+    auto read_size = std::min(data_->size() - read_offset_, out_buffer.size());
+    std::copy_n(reinterpret_cast<const std::byte*>(data_->data()) + 
read_offset_, read_size, out_buffer.data());
+    read_offset_ += read_size;
+    return read_size;
+  }
 
-const char *VolatileContentRepository::minimal_locking = "minimal.locking";
+  size_t write(const uint8_t *value, size_t len) override {
+    data_ = std::make_shared<std::string>(*data_);
+    data_->append(reinterpret_cast<const char*>(value), len);
+    total_size_ += len;
+    {
+      std::lock_guard lock(data_store_mtx_);
+      data_store_ = data_;
+    }
+    return len;
+  }
 
-bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> 
&configure) {
-  repo_data_.initialize(configure, getName());
+  void close() override {}
 
-  logger_->log_info("Resizing repo_data_.value_vector for {} count is {}", 
getName(), repo_data_.max_count);
-  logger_->log_info("Using a maximum size for {} of {}", getName(), 
repo_data_.max_size);
+  void seek(size_t offset) override {
+    read_offset_ = std::min(offset, data_->size());
+  }
 
-  if (configure != nullptr) {
-    std::string value;
-    std::stringstream strstream;
-    strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << minimal_locking;
-    if (configure->get(strstream.str(), value)) {
-      minimize_locking_ =  utils::string::toBool(value).value_or(true);
-    }
+  size_t tell() const override {
+    return read_offset_;
   }
-  if (!minimize_locking_) {
-    repo_data_.clear();
+
+  int initialize() override {
+    return 1;
+  }
+
+  std::span<const std::byte> getBuffer() const override {
+    return as_bytes(std::span{*data_});
   }
 
+ private:
+  size_t read_offset_{0};
+  std::shared_ptr<std::string> data_;
+  std::mutex& data_store_mtx_;
+  std::shared_ptr<std::string>& data_store_;
+  std::atomic<size_t>& total_size_;
+};
+
+}  // namespace
+
+VolatileContentRepository::VolatileContentRepository(std::string_view name)
+  : ContentRepositoryImpl(name),
+    logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {}
+
+uint64_t VolatileContentRepository::getRepositorySize() const {
+  return total_size_.load();
+}
+
+uint64_t VolatileContentRepository::getMaxRepositorySize() const {
+  return std::numeric_limits<uint64_t>::max();
+}
+
+uint64_t VolatileContentRepository::getRepositoryEntryCount() const {
+  std::lock_guard lock(data_mtx_);
+  return data_.size();
+}
+
+bool VolatileContentRepository::isFull() const {
+  return false;
+}
+
+bool VolatileContentRepository::initialize(const std::shared_ptr<Configure>& 
/*configure*/) {
   return true;
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
minifi::ResourceClaim &claim, bool /*append*/) {
-  logger_->log_info("enter write for {}", claim.getContentFullPath());
-  {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_check = master_list_.find(claim.getContentFullPath());
-    if (claim_check != master_list_.end()) {
-      logger_->log_info("Creating copy of atomic entry");
-      auto ent = claim_check->second->takeOwnership();
-      if (ent == nullptr) {
-        return nullptr;
-      }
-      return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-    }
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
minifi::ResourceClaim &claim, bool append) {
+  std::lock_guard lock(data_mtx_);
+  auto& value_ref = data_[claim.getContentFullPath()];
+  if (!value_ref) {
+    value_ref = std::make_shared<std::string>();
+  } else if (!append) {
+    total_size_ -= value_ref->size();
+    value_ref = std::make_shared<std::string>();
   }
+  return std::make_shared<StringRefStream>(value_ref, data_mtx_, value_ref, 
total_size_);
+}
 
-  int size = 0;
-  if (LIKELY(minimize_locking_ == true)) {
-    for (auto ent : repo_data_.value_vector) {
-      if (ent->testAndSetKey(claim.getContentFullPath())) {
-        std::lock_guard<std::mutex> lock(map_mutex_);
-        master_list_[claim.getContentFullPath()] = ent;
-        logger_->log_info("Minimize locking, return stream for {}", 
claim.getContentFullPath());
-        return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-      }
-      size++;
-    }
-  } else {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_check = master_list_.find(claim.getContentFullPath());
-    if (claim_check != master_list_.end()) {
-      return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 claim_check->second);
-    } else {
-      auto *ent = new 
AtomicEntry<ResourceClaim::Path>(&repo_data_.current_size, 
&repo_data_.max_size);  // NOLINT(cppcoreguidelines-owning-memory)
-      if (ent->testAndSetKey(claim.getContentFullPath())) {
-        master_list_[claim.getContentFullPath()] = ent;
-        return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-      }
-    }
+std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const 
minifi::ResourceClaim &claim) {
+  std::lock_guard lock(data_mtx_);
+  if (auto it = data_.find(claim.getContentFullPath()); it != data_.end()) {
+    return std::make_shared<StringRefStream>(it->second, data_mtx_, 
it->second, total_size_);
   }
-  logger_->log_info("Cannot write {} {}, returning nullptr to roll back 
session. Repo is either full or locked", claim.getContentFullPath(), size);
   return nullptr;
 }
 
 bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) {
-  std::lock_guard<std::mutex> lock(map_mutex_);
-  auto claim_check = master_list_.find(claim.getContentFullPath());
-  if (claim_check != master_list_.end()) {
-    auto ent = claim_check->second->takeOwnership();
-    return ent != nullptr;
-  }
-
-  return false;
+  std::lock_guard lock(data_mtx_);
+  return data_.contains(claim.getContentFullPath());
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const 
minifi::ResourceClaim &claim) {
-  std::lock_guard<std::mutex> lock(map_mutex_);
-  auto claim_check = master_list_.find(claim.getContentFullPath());
-  if (claim_check != master_list_.end()) {
-    auto ent = claim_check->second->takeOwnership();
-    if (ent == nullptr) {
-      return nullptr;
-    }
-    return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-  }
+bool VolatileContentRepository::close(const minifi::ResourceClaim &claim) {
+  return remove(claim);
+}
 
-  return nullptr;
+void VolatileContentRepository::clearOrphans() {
+  // there are no persisted orphans to delete
 }
 
 bool VolatileContentRepository::removeKey(const std::string& content_path) {
-  if (LIKELY(minimize_locking_ == true)) {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto ent = master_list_.find(content_path);
-    if (ent != master_list_.end()) {
-      auto ptr = ent->second;
-      // if we cannot remove the entry we will let the owner's destructor
-      // decrement the reference count and free it
-      master_list_.erase(content_path);
-      // because of the test and set we need to decrement ownership
-      ptr->decrementOwnership();
-      if (ptr->freeValue(content_path)) {
-        logger_->log_info("Deleting resource {}", content_path);
-      } else {
-        logger_->log_info("free failed for {}", content_path);
-      }
-    } else {
-      logger_->log_info("Could not remove {}", content_path);
-    }
-  } else {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_item = master_list_.find(content_path);
-    if (claim_item != master_list_.end()) {
-      auto size = claim_item->second->getLength();
-      delete claim_item->second;  // NOLINT(cppcoreguidelines-owning-memory)
-      master_list_.erase(content_path);
-      repo_data_.current_size -= size;
-    }
+  std::lock_guard lock(data_mtx_);
+  if (auto it = data_.find(content_path); it != data_.end()) {
+    total_size_ -= it->second->size();
+    data_.erase(it);
+    logger_->log_info("Deleting resource {}", content_path);
   }

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to