Github user JamesPeach commented on a diff in the pull request: https://github.com/apache/trafficserver/pull/653#discussion_r68601178 --- Diff: iocore/hostdb/P_RefCountCache.h --- @@ -0,0 +1,896 @@ +/** @file + + A cache (with map-esque interface) for RefCountObjs + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef _P_RefCountCache_h_ +#define _P_RefCountCache_h_ + +#include "I_EventSystem.h" +#include "P_EventSystem.h" // TODO: less? just need ET_TASK + +#include <ts/Map.h> +#include "ts/PriorityQueue.h" + +#include <ts/List.h> +#include <ts/ink_hrtime.h> + +#include "ts/Vec.h" +#include "ts/I_Version.h" +#include <unistd.h> + +#define REFCOUNT_CACHE_EVENT_SYNC REFCOUNT_CACHE_EVENT_EVENTS_START + +#define REFCOUNTCACHE_MAGIC_NUMBER 0x0BAD2D9 +#define REFCOUNTCACHE_MAJOR_VERSION 1 +#define REFCOUNTCACHE_MINOR_VERSION 0 + +// Stats +enum RefCountCache_Stats { + refcountcache_current_items_stat, // current number of items + refcountcache_current_size_stat, // current size of cache + refcountcache_total_inserts_stat, // total items inserted + refcountcache_total_failed_inserts_stat, // total items unable to insert + refcountcache_total_lookups_stat, // total get() calls + refcountcache_total_hits_stat, // total hits + + // Persistence metrics + refcountcache_last_sync_time, // seconds since epoch of last successful sync + refcountcache_last_total_items, // number of items sync last time + refcountcache_last_total_size, // total size at last sync + + RefCountCache_Stat_Count +}; + +struct RefCountCacheItemMeta { + uint64_t key; + unsigned int size; + ink_time_t expiry_time; // expire time as seconds since epoch + RefCountCacheItemMeta(uint64_t key, unsigned int size, int expire_time = -1) : key(key), size(size), expiry_time(expire_time) {} +}; + +// Layer of indirection for the hashmap-- since it needs lots of things inside of it +// We'll also use this as the item header, for persisting objects to disk +class RefCountCacheHashEntry +{ +public: + Ptr<RefCountObj> item; + LINK(RefCountCacheHashEntry, item_link); + + PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry; + + RefCountCacheItemMeta meta; + void + set(RefCountObj *i, uint64_t key, unsigned int size, int expire_time) + { + this->item = make_ptr(i); + this->meta = RefCountCacheItemMeta(key, size, expire_time); + }; + // Need a no-argument constructor to use the classAllocator + RefCountCacheHashEntry() : expiry_entry(NULL), meta(0, 0){}; + + // make these values comparable -- so we can sort them + bool + operator<(const RefCountCacheHashEntry &v2) const + { + return this->meta.expiry_time < v2.meta.expiry_time; + }; +}; +// Since the hashing values are all fixed size, we can simply use a classAllocator to avoid mallocs +extern ClassAllocator<RefCountCacheHashEntry> refCountCacheHashingValueAllocator; +extern ClassAllocator<PriorityQueueEntry<RefCountCacheHashEntry *>> expiryQueueEntry; + +struct RefCountCacheHashing { + typedef uint64_t ID; + typedef uint64_t const Key; + typedef RefCountCacheHashEntry Value; + typedef DList(RefCountCacheHashEntry, item_link) ListHead; + + static ID + hash(Key key) + { + return key; + } + static Key + key(Value const *value) + { + return value->meta.key; + } + static bool + equal(Key lhs, Key rhs) + { + return lhs == rhs; + } +}; + +// The RefCountCachePartition is simply a map of key -> Ptr<YourClass> +// We partition the cache to reduce lock contention +template <class C> class RefCountCachePartition +{ +public: + RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items, RecRawStatBlock *rsb = NULL); + Ptr<C> get(uint64_t key); + void put(uint64_t key, C *item, int size = 0, int expire_time = 0); + void erase(uint64_t key, int expiry_time = -1); + + void clear(); + bool is_full(); + bool reserve(unsigned int); + + const size_t count(); + void copy(Vec<RefCountCacheHashEntry *> &items); + + typedef typename TSHashTable<RefCountCacheHashing>::iterator iterator_type; + typedef typename TSHashTable<RefCountCacheHashing>::self hash_type; + typedef typename TSHashTable<RefCountCacheHashing>::Location location_type; + TSHashTable<RefCountCacheHashing> *get_map(); + + Ptr<ProxyMutex> lock; // Lock + +private: + void metric_inc(RefCountCache_Stats metric_enum, int64_t data); + + unsigned int part_num; + + uint64_t max_size; + unsigned int max_items; + + uint64_t size; + unsigned int items; + + hash_type item_map; + + PriorityQueue<RefCountCacheHashEntry *> expiry_queue; + RecRawStatBlock *rsb; +}; + +template <class C> +RefCountCachePartition<C>::RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items, + RecRawStatBlock *rsb) +{ + this->part_num = part_num; + this->max_size = max_size; + this->max_items = max_items; + this->size = 0; + this->items = 0; + this->rsb = rsb; + + // Initialize lock + this->lock = new_ProxyMutex(); +} + +template <class C> +Ptr<C> +RefCountCachePartition<C>::get(uint64_t key) +{ + this->metric_inc(refcountcache_total_lookups_stat, 1); + location_type l = this->item_map.find(key); + if (l.isValid()) { + // found + this->metric_inc(refcountcache_total_hits_stat, 1); + return make_ptr((C *)l.m_value->item.get()); + } else { + return Ptr<C>(); + } +} + +template <class C> +void +RefCountCachePartition<C>::put(uint64_t key, C *item, int size, int expire_time) +{ + this->metric_inc(refcountcache_total_inserts_stat, 1); + size += sizeof(C); + // Remove any colliding entries + this->erase(key); + + // if we are full, and can't make space-- then don't store the item + if (this->is_full() && !this->reserve(size)) { + Debug("refcountcache", "partition %d is full-- not storing item key=%ld", this->part_num, key); + this->metric_inc(refcountcache_total_failed_inserts_stat, 1); + return; + } + + // Create our value-- which has a ref to the `item` + RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc(); + val->set(item, key, size, expire_time); + + // add expiry_entry to expiry queue, if the expire time is positive (otherwise it means don't expire) + if (expire_time >= 0) { + Debug("refcountcache", "partition %d adding entry with expire_time=%d\n", this->part_num, expire_time); + PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry = expiryQueueEntry.alloc(); + new ((void *)expiry_entry) PriorityQueueEntry<RefCountCacheHashEntry *>(val); + expiry_queue.push(expiry_entry); + val->expiry_entry = expiry_entry; + } + + // add the item to the map + this->item_map.insert(val); + this->size += val->meta.size; + this->items++; + this->metric_inc(refcountcache_current_size_stat, (int64_t)val->meta.size); + this->metric_inc(refcountcache_current_items_stat, 1); +} + +template <class C> +void +RefCountCachePartition<C>::erase(uint64_t key, int expiry_time) +{ + location_type l = this->item_map.find(key); + if (l.isValid()) { + if (expiry_time >= 0 && l.m_value->meta.expiry_time != expiry_time) { + return; + } + // found + this->item_map.remove(l); + + // decrement usage counters + this->size -= l.m_value->meta.size; + this->items--; + + this->metric_inc(refcountcache_current_size_stat, -((int64_t)l.m_value->meta.size)); + this->metric_inc(refcountcache_current_items_stat, -1); + + // remove from expiry queue + if (l.m_value->expiry_entry != NULL) { + Debug("refcountcache", "partition %d deleting item from expiry_queue idx=%d\n", this->part_num, + l.m_value->expiry_entry->index); + this->expiry_queue.erase(l.m_value->expiry_entry); + expiryQueueEntry.free(l.m_value->expiry_entry); + l.m_value->expiry_entry = NULL; // To avoid the destruction of `l` calling the destructor again-- and causing issues + } + // Since the Value is actually RefCountObj-- when this gets deleted normally it calls the wrong + // `free` method, this forces the delete/decr to happen with the right type + Ptr<C> *tmp = (Ptr<C> *)&l.m_value->item; + tmp->clear(); + refCountCacheHashingValueAllocator.free(l.m_value); + } +} + +template <class C> +void +RefCountCachePartition<C>::clear() +{ + this->metric_inc(refcountcache_current_size_stat, -this->size); + this->metric_inc(refcountcache_current_items_stat, -this->items); + // Clear the in memory hashmap + // TODO: delete all items (not sure if clear calls delete on all items) + this->item_map.clear(); + // clear the queue + PriorityQueueEntry<RefCountCacheHashEntry *> *tmp_entry; + while (!this->expiry_queue.empty()) { + tmp_entry = this->expiry_queue.top(); + this->expiry_queue.pop(); + expiryQueueEntry.free(tmp_entry); + } + + this->items = 0; + this->size = 0; +} + +// Are we full? +template <class C> +bool +RefCountCachePartition<C>::is_full() +{ + Debug("refcountcache", "partition %d is full? items %d/%d size %ld/%ld\n\n", this->part_num, this->items, this->max_items, + this->size, this->max_size); + return (this->max_items > 0 && this->items >= this->max_items) || (this->max_size > 0 && this->size >= this->max_size); +} + +// Attempt to make space for item of `size` +template <class C> +bool +RefCountCachePartition<C>::reserve(unsigned int size) +{ + int curr_time = Thread::get_hrtime() / HRTIME_SECOND; + while (this->is_full() || (size > 0 && this->size + size > this->max_size)) { + PriorityQueueEntry<RefCountCacheHashEntry *> *top_item = expiry_queue.top(); + // if there is nothing in the expiry queue, then we can't make space + if (top_item == NULL) { + return false; + } + + // If the first item has expired, lets evict it, and then go around again + if (top_item->node->meta.expiry_time < curr_time) { + this->erase(top_item->node->meta.key); + expiry_queue.pop(); + } else { // if the first item isn't expired-- the rest won't be either (queue is sorted) + return false; + } + } + return true; +} + +template <class C> +const size_t +RefCountCachePartition<C>::count() +{ + return this->items; +} + +template <class C> +void +RefCountCachePartition<C>::copy(Vec<RefCountCacheHashEntry *> &items) +{ + for (RefCountCachePartition<C>::iterator_type i = this->item_map.begin(); i != this->item_map.end(); ++i) { + RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc(); + val->set(i.m_value->item.get(), i.m_value->meta.key, i.m_value->meta.size, i.m_value->meta.expiry_time); + items.push_back(val); + } +} + +template <class C> +void +RefCountCachePartition<C>::metric_inc(RefCountCache_Stats metric_enum, int64_t data) +{ + if (this->rsb) { + RecIncrGlobalRawStatCount(this->rsb, metric_enum, data); + } +} + +template <class C> +TSHashTable<RefCountCacheHashing> * +RefCountCachePartition<C>::get_map() +{ + return &this->item_map; +} + +// The header for the cache, this is used to check if the serialized cache is compatible +// The implementation of this class must be in here, as this is used by template +// classes, and c++ rocks that way +class RefCountCacheHeader +{ +public: + unsigned int magic; + VersionNumber version; + VersionNumber object_version; // version passed in of whatever it is we are caching + + RefCountCacheHeader(VersionNumber object_version = VersionNumber()) + : magic(REFCOUNTCACHE_MAGIC_NUMBER), object_version(object_version) + { + this->version.ink_major = REFCOUNTCACHE_MAJOR_VERSION; + this->version.ink_minor = REFCOUNTCACHE_MINOR_VERSION; + }; + + bool + operator==(const RefCountCacheHeader other) const + { + return (this->magic == other.magic && this->version.ink_major == other.version.ink_major && + this->version.ink_minor == other.version.ink_minor && + this->object_version.ink_major == other.object_version.ink_major && + this->object_version.ink_minor == other.object_version.ink_minor); + } + + bool + compatible(RefCountCacheHeader *other) + { + return (this->magic == other->magic && this->version.ink_major == other->version.ink_major && + this->object_version.ink_major == other->version.ink_major); + }; +}; + +// This continuation is responsible for persisting RefCountCache to disk +// To avoid locking the partitions for a long time we'll do the following per-partition: +// - lock +// - copy ptrs (bump refcount) +// - unlock +// - persist +// - remove ptrs (drop refcount) +// This way we only have to hold the lock on the partition for the time it takes to get Ptr<>s to all items in the partition +template <class C> class RefCountCacheSync : public Continuation +{ +public: + typedef int (RefCountCacheSync::*CacheSyncHandler)(int, void *); + + size_t partition; // Current partition + C *cc; // Pointer to the entire cache + Continuation *cont; + + int copy_partition(int event, Event *e); + int write_partition(int event, Event *e); + int pause_event(int event, Event *e); + + // Create the tmp file on disk we'll be writing to + int initialize_storage(int event, Event *e); + // do the final mv and close of file handle + int finalize_sync(); + + // helper method to spin on writes to disk + int write_to_disk(char *i, int size); + + RefCountCacheSync(Continuation *acont, C *cc, int frequency, std::string dirname, std::string filename); + +private: + Vec<RefCountCacheHashEntry *> partition_items; + + int fd; // fd for the file we are writing to + + std::string dirname; + std::string filename; + std::string tmp_filename; + + ink_hrtime time_per_partition; + ink_hrtime start; + + int total_items; + int64_t total_size; + + RecRawStatBlock *rsb; + + SocketManager socket_manager; +}; + +template <class C> +int +RefCountCacheSync<C>::copy_partition(int event, Event *e) +{ + (void)event; + if (partition >= cc->partition_count()) { + int sync_ret = this->finalize_sync(); + if (sync_ret != 0) { + Warning("Unable to finalize sync of cache to disk %s: %d", this->filename.c_str(), sync_ret); + } + cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0); + Debug("refcountcache", "RefCountCacheSync done"); + delete this; + return EVENT_DONE; + } + Debug("refcountcache", "sync partition=%ld/%ld", partition, cc->partition_count()); + // copy the partition into our buffer, then we'll let `pauseEvent` write it out + this->partition_items.reserve(cc->get_partition(partition).count()); + cc->get_partition(partition).copy(this->partition_items); + partition++; + SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::write_partition); + mutex = e->ethread->mutex; + e->schedule_imm(ET_TASK); + + return EVENT_CONT; +} + +template <class C> +int +RefCountCacheSync<C>::write_partition(int event, Event *e) +{ + (void)event; // unused + int curr_time = Thread::get_hrtime() / HRTIME_SECOND; --- End diff -- ``ink_time()``
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---