Github user jacksontj commented on a diff in the pull request:

    https://github.com/apache/trafficserver/pull/653#discussion_r68604157
  
    --- Diff: iocore/hostdb/P_RefCountCache.h ---
    @@ -0,0 +1,896 @@
    +/** @file
    +
    +  A cache (with map-esque interface) for RefCountObjs
    +
    +  @section license License
    +
    +  Licensed to the Apache Software Foundation (ASF) under one
    +  or more contributor license agreements.  See the NOTICE file
    +  distributed with this work for additional information
    +  regarding copyright ownership.  The ASF licenses this file
    +  to you under the Apache License, Version 2.0 (the
    +  "License"); you may not use this file except in compliance
    +  with the License.  You may obtain a copy of the License at
    +
    +      http://www.apache.org/licenses/LICENSE-2.0
    +
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    + */
    +#ifndef _P_RefCountCache_h_
    +#define _P_RefCountCache_h_
    +
    +#include "I_EventSystem.h"
    +#include "P_EventSystem.h" // TODO: less? just need ET_TASK
    +
    +#include <ts/Map.h>
    +#include "ts/PriorityQueue.h"
    +
    +#include <ts/List.h>
    +#include <ts/ink_hrtime.h>
    +
    +#include "ts/Vec.h"
    +#include "ts/I_Version.h"
    +#include <unistd.h>
    +
    +#define REFCOUNT_CACHE_EVENT_SYNC REFCOUNT_CACHE_EVENT_EVENTS_START
    +
    +#define REFCOUNTCACHE_MAGIC_NUMBER 0x0BAD2D9
    +#define REFCOUNTCACHE_MAJOR_VERSION 1
    +#define REFCOUNTCACHE_MINOR_VERSION 0
    +
    +// Stats
    +enum RefCountCache_Stats {
    +  refcountcache_current_items_stat,        // current number of items
    +  refcountcache_current_size_stat,         // current size of cache
    +  refcountcache_total_inserts_stat,        // total items inserted
    +  refcountcache_total_failed_inserts_stat, // total items unable to insert
    +  refcountcache_total_lookups_stat,        // total get() calls
    +  refcountcache_total_hits_stat,           // total hits
    +
    +  // Persistence metrics
    +  refcountcache_last_sync_time,   // seconds since epoch of last 
successful sync
    +  refcountcache_last_total_items, // number of items sync last time
    +  refcountcache_last_total_size,  // total size at last sync
    +
    +  RefCountCache_Stat_Count
    +};
    +
    +struct RefCountCacheItemMeta {
    +  uint64_t key;
    +  unsigned int size;
    +  ink_time_t expiry_time; // expire time as seconds since epoch
    +  RefCountCacheItemMeta(uint64_t key, unsigned int size, int expire_time = 
-1) : key(key), size(size), expiry_time(expire_time) {}
    +};
    +
    +// Layer of indirection for the hashmap-- since it needs lots of things 
inside of it
    +// We'll also use this as the item header, for persisting objects to disk
    +class RefCountCacheHashEntry
    +{
    +public:
    +  Ptr<RefCountObj> item;
    +  LINK(RefCountCacheHashEntry, item_link);
    +
    +  PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry;
    +
    +  RefCountCacheItemMeta meta;
    +  void
    +  set(RefCountObj *i, uint64_t key, unsigned int size, int expire_time)
    +  {
    +    this->item = make_ptr(i);
    +    this->meta = RefCountCacheItemMeta(key, size, expire_time);
    +  };
    +  // Need a no-argument constructor to use the classAllocator
    +  RefCountCacheHashEntry() : expiry_entry(NULL), meta(0, 0){};
    +
    +  // make these values comparable -- so we can sort them
    +  bool
    +  operator<(const RefCountCacheHashEntry &v2) const
    +  {
    +    return this->meta.expiry_time < v2.meta.expiry_time;
    +  };
    +};
    +// Since the hashing values are all fixed size, we can simply use a 
classAllocator to avoid mallocs
    +extern ClassAllocator<RefCountCacheHashEntry> 
refCountCacheHashingValueAllocator;
    +extern ClassAllocator<PriorityQueueEntry<RefCountCacheHashEntry *>> 
expiryQueueEntry;
    +
    +struct RefCountCacheHashing {
    +  typedef uint64_t ID;
    +  typedef uint64_t const Key;
    +  typedef RefCountCacheHashEntry Value;
    +  typedef DList(RefCountCacheHashEntry, item_link) ListHead;
    +
    +  static ID
    +  hash(Key key)
    +  {
    +    return key;
    +  }
    +  static Key
    +  key(Value const *value)
    +  {
    +    return value->meta.key;
    +  }
    +  static bool
    +  equal(Key lhs, Key rhs)
    +  {
    +    return lhs == rhs;
    +  }
    +};
    +
    +// The RefCountCachePartition is simply a map of key -> Ptr<YourClass>
    +// We partition the cache to reduce lock contention
    +template <class C> class RefCountCachePartition
    +{
    +public:
    +  RefCountCachePartition(unsigned int part_num, uint64_t max_size, 
unsigned int max_items, RecRawStatBlock *rsb = NULL);
    +  Ptr<C> get(uint64_t key);
    +  void put(uint64_t key, C *item, int size = 0, int expire_time = 0);
    +  void erase(uint64_t key, int expiry_time = -1);
    +
    +  void clear();
    +  bool is_full();
    +  bool reserve(unsigned int);
    +
    +  const size_t count();
    +  void copy(Vec<RefCountCacheHashEntry *> &items);
    +
    +  typedef typename TSHashTable<RefCountCacheHashing>::iterator 
iterator_type;
    +  typedef typename TSHashTable<RefCountCacheHashing>::self hash_type;
    +  typedef typename TSHashTable<RefCountCacheHashing>::Location 
location_type;
    +  TSHashTable<RefCountCacheHashing> *get_map();
    +
    +  Ptr<ProxyMutex> lock; // Lock
    +
    +private:
    +  void metric_inc(RefCountCache_Stats metric_enum, int64_t data);
    +
    +  unsigned int part_num;
    +
    +  uint64_t max_size;
    +  unsigned int max_items;
    +
    +  uint64_t size;
    +  unsigned int items;
    +
    +  hash_type item_map;
    +
    +  PriorityQueue<RefCountCacheHashEntry *> expiry_queue;
    +  RecRawStatBlock *rsb;
    +};
    +
    +template <class C>
    +RefCountCachePartition<C>::RefCountCachePartition(unsigned int part_num, 
uint64_t max_size, unsigned int max_items,
    +                                                  RecRawStatBlock *rsb)
    +{
    +  this->part_num  = part_num;
    +  this->max_size  = max_size;
    +  this->max_items = max_items;
    +  this->size      = 0;
    +  this->items     = 0;
    +  this->rsb       = rsb;
    +
    +  // Initialize lock
    +  this->lock = new_ProxyMutex();
    +}
    +
    +template <class C>
    +Ptr<C>
    +RefCountCachePartition<C>::get(uint64_t key)
    +{
    +  this->metric_inc(refcountcache_total_lookups_stat, 1);
    +  location_type l = this->item_map.find(key);
    +  if (l.isValid()) {
    +    // found
    +    this->metric_inc(refcountcache_total_hits_stat, 1);
    +    return make_ptr((C *)l.m_value->item.get());
    +  } else {
    +    return Ptr<C>();
    +  }
    +}
    +
    +template <class C>
    +void
    +RefCountCachePartition<C>::put(uint64_t key, C *item, int size, int 
expire_time)
    +{
    +  this->metric_inc(refcountcache_total_inserts_stat, 1);
    +  size += sizeof(C);
    +  // Remove any colliding entries
    +  this->erase(key);
    +
    +  // if we are full, and can't make space-- then don't store the item
    +  if (this->is_full() && !this->reserve(size)) {
    +    Debug("refcountcache", "partition %d is full-- not storing item 
key=%ld", this->part_num, key);
    +    this->metric_inc(refcountcache_total_failed_inserts_stat, 1);
    +    return;
    +  }
    +
    +  // Create our value-- which has a ref to the `item`
    +  RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc();
    +  val->set(item, key, size, expire_time);
    +
    +  // add expiry_entry to expiry queue, if the expire time is positive 
(otherwise it means don't expire)
    +  if (expire_time >= 0) {
    +    Debug("refcountcache", "partition %d adding entry with 
expire_time=%d\n", this->part_num, expire_time);
    +    PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry = 
expiryQueueEntry.alloc();
    +    new ((void *)expiry_entry) PriorityQueueEntry<RefCountCacheHashEntry 
*>(val);
    +    expiry_queue.push(expiry_entry);
    +    val->expiry_entry = expiry_entry;
    +  }
    +
    +  // add the item to the map
    +  this->item_map.insert(val);
    +  this->size += val->meta.size;
    +  this->items++;
    +  this->metric_inc(refcountcache_current_size_stat, 
(int64_t)val->meta.size);
    +  this->metric_inc(refcountcache_current_items_stat, 1);
    +}
    +
    +template <class C>
    +void
    +RefCountCachePartition<C>::erase(uint64_t key, int expiry_time)
    +{
    +  location_type l = this->item_map.find(key);
    +  if (l.isValid()) {
    +    if (expiry_time >= 0 && l.m_value->meta.expiry_time != expiry_time) {
    +      return;
    +    }
    +    // found
    +    this->item_map.remove(l);
    +
    +    // decrement usage counters
    +    this->size -= l.m_value->meta.size;
    +    this->items--;
    +
    +    this->metric_inc(refcountcache_current_size_stat, 
-((int64_t)l.m_value->meta.size));
    +    this->metric_inc(refcountcache_current_items_stat, -1);
    +
    +    // remove from expiry queue
    +    if (l.m_value->expiry_entry != NULL) {
    +      Debug("refcountcache", "partition %d deleting item from expiry_queue 
idx=%d\n", this->part_num,
    +            l.m_value->expiry_entry->index);
    +      this->expiry_queue.erase(l.m_value->expiry_entry);
    +      expiryQueueEntry.free(l.m_value->expiry_entry);
    +      l.m_value->expiry_entry = NULL; // To avoid the destruction of `l` 
calling the destructor again-- and causing issues
    +    }
    +    // Since the Value is actually RefCountObj-- when this gets deleted 
normally it calls the wrong
    +    // `free` method, this forces the delete/decr to happen with the right 
type
    +    Ptr<C> *tmp = (Ptr<C> *)&l.m_value->item;
    +    tmp->clear();
    +    refCountCacheHashingValueAllocator.free(l.m_value);
    +  }
    +}
    +
    +template <class C>
    +void
    +RefCountCachePartition<C>::clear()
    +{
    +  this->metric_inc(refcountcache_current_size_stat, -this->size);
    +  this->metric_inc(refcountcache_current_items_stat, -this->items);
    +  // Clear the in memory hashmap
    +  // TODO: delete all items (not sure if clear calls delete on all items)
    +  this->item_map.clear();
    +  // clear the queue
    +  PriorityQueueEntry<RefCountCacheHashEntry *> *tmp_entry;
    +  while (!this->expiry_queue.empty()) {
    +    tmp_entry = this->expiry_queue.top();
    +    this->expiry_queue.pop();
    +    expiryQueueEntry.free(tmp_entry);
    +  }
    +
    +  this->items = 0;
    +  this->size  = 0;
    +}
    +
    +// Are we full?
    +template <class C>
    +bool
    +RefCountCachePartition<C>::is_full()
    +{
    +  Debug("refcountcache", "partition %d is full? items %d/%d size 
%ld/%ld\n\n", this->part_num, this->items, this->max_items,
    +        this->size, this->max_size);
    +  return (this->max_items > 0 && this->items >= this->max_items) || 
(this->max_size > 0 && this->size >= this->max_size);
    +}
    +
    +// Attempt to make space for item of `size`
    +template <class C>
    +bool
    +RefCountCachePartition<C>::reserve(unsigned int size)
    +{
    +  int curr_time = Thread::get_hrtime() / HRTIME_SECOND;
    +  while (this->is_full() || (size > 0 && this->size + size > 
this->max_size)) {
    +    PriorityQueueEntry<RefCountCacheHashEntry *> *top_item = 
expiry_queue.top();
    +    // if there is nothing in the expiry queue, then we can't make space
    +    if (top_item == NULL) {
    +      return false;
    +    }
    +
    +    // If the first item has expired, lets evict it, and then go around 
again
    +    if (top_item->node->meta.expiry_time < curr_time) {
    +      this->erase(top_item->node->meta.key);
    +      expiry_queue.pop();
    +    } else { // if the first item isn't expired-- the rest won't be either 
(queue is sorted)
    +      return false;
    +    }
    +  }
    +  return true;
    +}
    +
    +template <class C>
    +const size_t
    +RefCountCachePartition<C>::count()
    +{
    +  return this->items;
    +}
    +
    +template <class C>
    +void
    +RefCountCachePartition<C>::copy(Vec<RefCountCacheHashEntry *> &items)
    +{
    +  for (RefCountCachePartition<C>::iterator_type i = 
this->item_map.begin(); i != this->item_map.end(); ++i) {
    +    RefCountCacheHashEntry *val = 
refCountCacheHashingValueAllocator.alloc();
    +    val->set(i.m_value->item.get(), i.m_value->meta.key, 
i.m_value->meta.size, i.m_value->meta.expiry_time);
    +    items.push_back(val);
    +  }
    +}
    +
    +template <class C>
    +void
    +RefCountCachePartition<C>::metric_inc(RefCountCache_Stats metric_enum, 
int64_t data)
    +{
    +  if (this->rsb) {
    +    RecIncrGlobalRawStatCount(this->rsb, metric_enum, data);
    +  }
    +}
    +
    +template <class C>
    +TSHashTable<RefCountCacheHashing> *
    +RefCountCachePartition<C>::get_map()
    +{
    +  return &this->item_map;
    +}
    +
    +// The header for the cache, this is used to check if the serialized cache 
is compatible
    +// The implementation of this class must be in here, as this is used by 
template
    +// classes, and c++ rocks that way
    +class RefCountCacheHeader
    +{
    +public:
    +  unsigned int magic;
    +  VersionNumber version;
    +  VersionNumber object_version; // version passed in of whatever it is we 
are caching
    +
    +  RefCountCacheHeader(VersionNumber object_version = VersionNumber())
    +    : magic(REFCOUNTCACHE_MAGIC_NUMBER), object_version(object_version)
    +  {
    +    this->version.ink_major = REFCOUNTCACHE_MAJOR_VERSION;
    +    this->version.ink_minor = REFCOUNTCACHE_MINOR_VERSION;
    +  };
    +
    +  bool
    +  operator==(const RefCountCacheHeader other) const
    +  {
    +    return (this->magic == other.magic && this->version.ink_major == 
other.version.ink_major &&
    +            this->version.ink_minor == other.version.ink_minor &&
    +            this->object_version.ink_major == 
other.object_version.ink_major &&
    +            this->object_version.ink_minor == 
other.object_version.ink_minor);
    +  }
    +
    +  bool
    +  compatible(RefCountCacheHeader *other)
    +  {
    +    return (this->magic == other->magic && this->version.ink_major == 
other->version.ink_major &&
    +            this->object_version.ink_major == other->version.ink_major);
    +  };
    +};
    +
    +// This continuation is responsible for persisting RefCountCache to disk
    +// To avoid locking the partitions for a long time we'll do the following 
per-partition:
    +//    - lock
    +//    - copy ptrs (bump refcount)
    +//    - unlock
    +//    - persist
    +//    - remove ptrs (drop refcount)
    +// This way we only have to hold the lock on the partition for the time it 
takes to get Ptr<>s to all items in the partition
    +template <class C> class RefCountCacheSync : public Continuation
    +{
    +public:
    +  typedef int (RefCountCacheSync::*CacheSyncHandler)(int, void *);
    +
    +  size_t partition; // Current partition
    +  C *cc;            // Pointer to the entire cache
    +  Continuation *cont;
    +
    +  int copy_partition(int event, Event *e);
    +  int write_partition(int event, Event *e);
    +  int pause_event(int event, Event *e);
    +
    +  // Create the tmp file on disk we'll be writing to
    +  int initialize_storage(int event, Event *e);
    +  // do the final mv and close of file handle
    +  int finalize_sync();
    +
    +  // helper method to spin on writes to disk
    +  int write_to_disk(char *i, int size);
    +
    +  RefCountCacheSync(Continuation *acont, C *cc, int frequency, std::string 
dirname, std::string filename);
    +
    +private:
    +  Vec<RefCountCacheHashEntry *> partition_items;
    +
    +  int fd; // fd for the file we are writing to
    +
    +  std::string dirname;
    +  std::string filename;
    +  std::string tmp_filename;
    +
    +  ink_hrtime time_per_partition;
    +  ink_hrtime start;
    +
    +  int total_items;
    +  int64_t total_size;
    +
    +  RecRawStatBlock *rsb;
    +
    +  SocketManager socket_manager;
    +};
    +
    +template <class C>
    +int
    +RefCountCacheSync<C>::copy_partition(int event, Event *e)
    +{
    +  (void)event;
    +  if (partition >= cc->partition_count()) {
    +    int sync_ret = this->finalize_sync();
    +    if (sync_ret != 0) {
    +      Warning("Unable to finalize sync of cache to disk %s: %d", 
this->filename.c_str(), sync_ret);
    +    }
    +    cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
    +    Debug("refcountcache", "RefCountCacheSync done");
    +    delete this;
    +    return EVENT_DONE;
    +  }
    +  Debug("refcountcache", "sync partition=%ld/%ld", partition, 
cc->partition_count());
    +  // copy the partition into our buffer, then we'll let `pauseEvent` write 
it out
    +  this->partition_items.reserve(cc->get_partition(partition).count());
    +  cc->get_partition(partition).copy(this->partition_items);
    +  partition++;
    +  SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::write_partition);
    +  mutex = e->ethread->mutex;
    +  e->schedule_imm(ET_TASK);
    +
    +  return EVENT_CONT;
    +}
    +
    +template <class C>
    +int
    +RefCountCacheSync<C>::write_partition(int event, Event *e)
    +{
    +  (void)event; // unused
    +  int curr_time = Thread::get_hrtime() / HRTIME_SECOND;
    +  // write the partition to disk
    +  // for item in this->partitionItems
    +  // write to disk with headers per item
    +  RefCountCacheHashEntry *it;
    +  for (unsigned int i = 0; i < this->partition_items.length(); i++) {
    +    it = this->partition_items[i];
    +
    +    // check if the item has expired, if so don't persist it to disk
    +    if (it->meta.expiry_time < curr_time) {
    +      continue;
    +    }
    +
    +    // Write the RefCountCacheItemMeta (as our header)
    +    int ret = this->write_to_disk((char *)&it->meta, sizeof(it->meta));
    +    if (ret < 0) {
    +      Warning("Error writing cache item header to %s: %d", 
this->tmp_filename.c_str(), ret);
    +      cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
    +      delete this;
    +      return EVENT_DONE;
    +    }
    +    // write the actual object now
    +    ret = this->write_to_disk((char *)it->item.get(), it->meta.size);
    +    if (ret < 0) {
    +      Warning("Error writing cache item to %s: %d", 
this->tmp_filename.c_str(), ret);
    +      cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
    +      delete this;
    +      return EVENT_DONE;
    +    }
    +
    +    this->total_items++;
    +    this->total_size += it->meta.size;
    +    refCountCacheHashingValueAllocator.free(it);
    +  }
    +
    +  // Clear partition-- for the next user
    +  this->partition_items.clear();
    +
    +  SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::pause_event);
    +
    +  // Figure out how much time we spent
    +  ink_hrtime elapsed          = Thread::get_hrtime() - this->start;
    +  ink_hrtime expected_elapsed = (this->partition * 
this->time_per_partition);
    +
    +  // If we were quicker than our pace-- lets reschedule in the future
    +  if (elapsed < expected_elapsed) {
    +    e->schedule_in(expected_elapsed - elapsed, ET_TASK);
    +  } else { // Otherwise we were too slow-- and need to go now!
    +    e->schedule_imm(ET_TASK);
    +  }
    +  return EVENT_CONT;
    +}
    +
    +template <class C>
    +int
    +RefCountCacheSync<C>::pause_event(int event, Event *e)
    +{
    +  (void)event;
    +  (void)e;
    +
    +  // Schedule up the next partition
    +  if (partition < cc->partition_count())
    +    mutex = cc->get_partition(partition).lock.get();
    +  else
    +    mutex = cont->mutex;
    +  SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::copy_partition);
    +  e->schedule_imm(ET_TASK);
    +  return EVENT_CONT;
    +}
    +
    +// Open the tmp file, etc.
    +template <class C>
    +int
    +RefCountCacheSync<C>::initialize_storage(int event, Event *e)
    +{
    +  (void)event;                                                             
                           // unused
    +  this->fd = this->socket_manager.open(this->tmp_filename.c_str(), O_TRUNC 
| O_RDWR | O_CREAT, 0644); // TODO: configurable perms
    +
    +  if (this->fd <= 0) {
    +    Warning("Unable to create temporary file %s, unable to persist hostdb: 
%d\n", this->tmp_filename.c_str(), this->fd);
    +    cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
    +    delete this;
    +    return EVENT_DONE;
    +  }
    +
    +  // Write out the header
    +  int ret = this->write_to_disk((char *)&this->cc->get_header(), 
sizeof(RefCountCacheHeader));
    +  if (ret < 0) {
    +    Warning("Error writing cache header to %s: %d", 
this->tmp_filename.c_str(), ret);
    +    cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
    +    delete this;
    +    return EVENT_DONE;
    +  }
    +
    +  SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::copy_partition);
    +  e->schedule_imm(ET_TASK);
    +  return EVENT_CONT;
    +}
    +
    +// do the final mv and close of file handle
    +template <class C>
    +int
    +RefCountCacheSync<C>::finalize_sync()
    +{
    +  // fsync the fd we have
    +  int fsync_ret = this->socket_manager.fsync(this->fd);
    +  if (fsync_ret != 0) {
    +    return fsync_ret;
    +  }
    +
    +  int dir_fd = this->socket_manager.open(this->dirname.c_str(), 
O_DIRECTORY); // Correct permissions?
    +  if (dir_fd <= 0) {
    +    return dir_fd;
    +  }
    +  // move the file
    +  int ret = rename(this->tmp_filename.c_str(), this->filename.c_str());
    +
    +  if (ret != 0) {
    +    return ret;
    +  }
    +
    +  // fsync the dir
    +  int fsync_dir_ret = this->socket_manager.fsync(dir_fd);
    +  if (fsync_dir_ret != 0) {
    +    return fsync_dir_ret;
    +  }
    +
    +  int dir_close_ret = this->socket_manager.close(dir_fd);
    +  if (dir_close_ret != 0) {
    +    return dir_close_ret;
    +  }
    +
    +  int close_ret = this->socket_manager.close(this->fd);
    +  if (close_ret != 0) {
    +    return close_ret;
    +  }
    +
    +  if (this->rsb) {
    +    RecSetRawStatCount(this->rsb, refcountcache_last_sync_time, 
Thread::get_hrtime() / HRTIME_SECOND);
    +    RecSetRawStatCount(this->rsb, refcountcache_last_total_items, 
this->total_items);
    +    RecSetRawStatCount(this->rsb, refcountcache_last_total_size, 
this->total_size);
    +  }
    +
    +  return 0;
    +}
    +
    +// Write *i to this->fd, if there is an error we'll just stop this 
continuation
    +// TODO: reschedule the continuation if the disk was busy?
    +template <class C>
    +int
    +RefCountCacheSync<C>::write_to_disk(char *i, int size)
    +{
    +  int written = 0;
    +  while (written < size) {
    +    int ret = this->socket_manager.write(this->fd, i + written, size - 
written);
    +    if (ret <= 0) {
    +      return -1;
    +    } else {
    +      written += ret;
    +    }
    +  }
    +  return 0;
    +}
    +
    +template <class C>
    +RefCountCacheSync<C>::RefCountCacheSync(Continuation *acont, C *cc, int 
frequency, std::string dirname, std::string filename)
    +  : Continuation(NULL),
    +    partition(0),
    +    cc(cc),
    +    cont(acont),
    +    fd(0),
    +    dirname(dirname),
    +    filename(filename),
    +    time_per_partition(HRTIME_SECONDS(frequency) / cc->partition_count()),
    +    start(Thread::get_hrtime()),
    +    total_items(0),
    +    total_size(0),
    +    rsb(cc->get_rsb())
    +
    +{
    +  eventProcessor.schedule_imm(this, ET_TASK);
    +  mutex              = cc->get_partition(partition).lock.get();
    +  this->tmp_filename = this->filename + ".syncing"; // TODO tmp file 
extension configurable?
    +
    +  SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::initialize_storage);
    +}
    +
    +// RefCountCache is a ref-counted key->value map to store classes that 
inherit from RefCountObj.
    +// Once an item is `put` into the cache, the cache will maintain a Ptr<> 
to that object until erase
    +// or clear is called-- which will remove the cache's Ptr<> to the object.
    +// This cache may be Persisted (RefCountCacheSync) as well as loaded from 
disk (LoadRefCountCacheFromPath).
    +// This class will optionally emit metrics at the given `metrics_prefix`.
    +// Note: although this cache does allow you to set expiry times this cache 
does not actively GC itself-- meaning
    +// it will only remove expired items once the space is required. So to 
ensure that the cache is bounded either a
    +// size or an item limit must be set-- otherwise the cache will not GC. 
Also note, that if keys collide the previous
    +// entry for a given key will be removed, so this "leak" concern is 
assuming you don't have sufficient space to store
    +// an item for each possible key
    +template <class C> class RefCountCache
    +{
    +public:
    +  // Constructor
    +  RefCountCache(unsigned int num_partitions, int size = -1, int items = 
-1, VersionNumber object_version = VersionNumber(),
    +                std::string metrics_prefix = "");
    +  // Destructor
    +  ~RefCountCache();
    +
    +  // User interface to the cache
    +  Ptr<C> get(uint64_t key);
    +  void put(uint64_t key, C *item, int size = 0, int expiry_time = -1);
    +  void erase(uint64_t key);
    +  void clear();
    +
    +  // Some methods to get some internal state
    +  int partition_for_key(uint64_t key);
    +  ProxyMutex *lock_for_key(uint64_t key);
    +  const size_t partition_count();
    +  RefCountCachePartition<C> &get_partition(int pnum);
    +  const size_t count();
    +  RefCountCacheHeader &get_header();
    +  RecRawStatBlock *get_rsb();
    +
    +private:
    +  int max_size;  // Total size
    +  int max_items; // Total number of items allowed
    +
    +  unsigned int num_partitions;
    +
    +  std::string filepath;
    +  std::string dirname;
    --- End diff --
    
    Correct, must have missed them in the cleanup ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to