Hi Petr,

I've made changes to the patch that hopefully align with what you are
looking for. I would appreciate it if you could go over it and see if
the changes are in the right direction. And if so, you should decide
whether I should make these kinds of changes for the whole series and
submit a v2 before you continue with the review.

The list of changes:

- Added comments everywhere I think they could be useful. Is it too
  much?

- Renamed struct prb_handle to prb_reserved_entry (more appropriate).

- Fixed up macros as you requested.

- The implementation from prb_commit() has been moved to a new
  prb_commit_all_reserved(). This should resolve the confusion in the
  "failed to push_tail()" code.

- I tried moving calc_next() into prb_reserve(), but it was pure
  insanity. I played with refactoring for a while until I found
  something that I think looks nice. I moved the implementation of
  calc_next() along with its containing loop into a new function
  find_res_ptrs(). This function does what calc_next() and push_tail()
  did. With this solution, I think prb_reserve() looks pretty
  clean. However, the optimization of communicating about the wrap is
  gone. So even though find_res_ptrs() knew if a wrap occurred,
  prb_reserve() figures it out again for itself. If we want the
  optimization, I still think the best approach is the -1,0,1 return
  value of find_res_ptrs().

I'm looking forward to your response.

John Ogness


diff --git a/include/linux/printk_ringbuffer.h 
b/include/linux/printk_ringbuffer.h
index 4239dc86e029..ab6177c9fe0a 100644
--- a/include/linux/printk_ringbuffer.h
+++ b/include/linux/printk_ringbuffer.h
@@ -25,6 +25,23 @@ struct printk_ringbuffer {
        atomic_t                ctx;
 };
 
+/*
+ * struct prb_reserved_entry: Reserved but not yet committed entry.
+ * @rb: The printk_ringbuffer where the entry was reserved.
+ *
+ * This is a handle used by the writer to represent an entry that has been
+ * reserved but not yet committed.
+ *
+ * The structure does not actually store any information about the entry that
+ * has been reserved because this information is not required by the
+ * implementation. The struct could prove useful if extra tracking or even
+ * fundamental changes to the ringbuffer were to be implemented. And as such
+ * would not require changes to writers.
+ */
+struct prb_reserved_entry {
+       struct printk_ringbuffer        *rb;
+};
+
 #define DECLARE_STATIC_PRINTKRB_CPULOCK(name)                          \
 static struct prb_cpulock name = {                                     \
        .owner          = ATOMIC_INIT(-1),                              \
@@ -46,6 +63,11 @@ static struct printk_ringbuffer name = {                     
        \
        .ctx            = ATOMIC_INIT(0),                               \
 }
 
+/* writer interface */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+                 unsigned int size);
+void prb_commit(struct prb_reserved_entry *e);
+
 /* utility functions */
 void prb_lock(struct prb_cpulock *cpu_lock);
 void prb_unlock(struct prb_cpulock *cpu_lock);
diff --git a/lib/printk_ringbuffer.c b/lib/printk_ringbuffer.c
index 54c750092810..fbe1d92b9b60 100644
--- a/lib/printk_ringbuffer.c
+++ b/lib/printk_ringbuffer.c
@@ -2,6 +2,59 @@
 #include <linux/smp.h>
 #include <linux/printk_ringbuffer.h>
 
+/*
+ * struct prb_entry: An entry within the ringbuffer.
+ * @size: The size in bytes of the entry or -1 if terminating.
+ * @seq: The unique sequence number of the entry.
+ * @data: The data bytes of the entry.
+ *
+ * The struct is typecasted directly into the ringbuffer data array to access
+ * an entry. The @size specifies the complete size of the entry including any
+ * padding. The next entry will be located at &this_entry + this_entry.size.
+ * The only exception is if the entry is terminating (size = -1). In this case
+ * @seq and @data are invalid and the next entry is at the beginning of the
+ * ringbuffer data array.
+ */
+struct prb_entry {
+       unsigned int    size;
+       u64             seq;
+       char            data[0];
+};
+
+/* the size and size bitmask of the ringbuffer data array */
+#define PRB_SIZE(rb) (1L << rb->size_bits)
+#define PRB_SIZE_BITMASK(rb) (PRB_SIZE(rb) - 1)
+
+/* given a logical position, return its index in the ringbuffer data array */
+#define PRB_INDEX(rb, lpos) (lpos & PRB_SIZE_BITMASK(rb))
+
+/*
+ * given a logical position, return how many times the data buffer has
+ * wrapped, where logical position 0 begins at index 0 with no wraps
+ */
+#define PRB_WRAPS(rb, lpos) (lpos >> rb->size_bits)
+
+/*
+ * given a logical position, return the logical position that represents the
+ * beginning of the ringbuffer data array for this wrap
+ */
+#define PRB_THIS_WRAP_START_LPOS(rb, lpos) \
+       (PRB_WRAPS(rb, lpos) << rb->size_bits)
+
+/*
+ * given a logical position, return the logical position that represents the
+ * beginning of the ringbuffer data array for the next wrap
+ */
+#define PRB_NEXT_WRAP_START_LPOS(rb, lpos) \
+       ((PRB_WRAPS(rb, lpos) + 1) << rb->size_bits)
+
+/*
+ * entries are aligned to allow direct typecasts as struct prb_entry
+ */
+#define PRB_ENTRY_ALIGN sizeof(long)
+#define PRB_ENTRY_ALIGN_SIZE(sz) \
+       ((sz + (PRB_ENTRY_ALIGN - 1)) & ~(PRB_ENTRY_ALIGN - 1))
+
 /*
  * prb_lock: Perform a processor-reentrant spin lock.
  * @cpu_lock: A pointer to the lock object.
@@ -58,3 +111,257 @@ void prb_unlock(struct prb_cpulock *cpu_lock)
        }
        put_cpu();
 }
+
+/* translate a logical position to an entry in the data array */
+static struct prb_entry *to_entry(struct printk_ringbuffer *rb,
+                                 unsigned long lpos)
+{
+       char *buffer = rb->buffer;
+
+       buffer += PRB_INDEX(rb, lpos);
+       return (struct prb_entry *)buffer;
+}
+
+/* try to move the tail pointer forward, thus removing the oldest entry */
+static bool push_tail(struct printk_ringbuffer *rb, unsigned long tail)
+{
+       unsigned long new_tail, head;
+       struct prb_entry *e;
+
+       /* maybe another context already pushed the tail */
+       if (tail != atomic_long_read(&rb->tail))
+               return true;
+
+       /*
+        * Determine what the new tail should be. If the tail is a
+        * terminating entry, the new tail will be beyond the entry
+        * at the beginning of the data array.
+        */
+       e = to_entry(rb, tail);
+       if (e->size != -1)
+               new_tail = tail + e->size;
+       else
+               new_tail = PRB_NEXT_WRAP_START_LPOS(rb, tail);
+
+       /* make sure the new tail does not overtake the head */
+       head = atomic_long_read(&rb->head);
+       if (head - new_tail > PRB_SIZE(rb))
+               return false;
+
+       /*
+        * The result of this cmpxchg does not matter. If it succeeds,
+        * this context pushed the tail. If it fails, some other context
+        * pushed the tail. Either way, the tail was pushed.
+        */
+       atomic_long_cmpxchg(&rb->tail, tail, new_tail);
+       return true;
+}
+
+/*
+ * If this context incremented rb->ctx to 1, move the head pointer
+ * beyond all reserved entries.
+ */
+static void prb_commit_all_reserved(struct printk_ringbuffer *rb)
+{
+       unsigned long head, res;
+       struct prb_entry *e;
+
+       for (;;) {
+               if (atomic_read(&rb->ctx) > 1) {
+                       /* another context will adjust the head pointer */
+                       atomic_dec(&rb->ctx);
+                       break;
+               }
+
+               /*
+                * This is the only context that will adjust the head pointer.
+                * If NMIs interrupt at any time, they can reserve/commit new
+                * entries, but they will not adjust the head pointer.
+                */
+
+               /* assign sequence numbers before moving the head pointer */
+               head = atomic_long_read(&rb->head);
+               res = atomic_long_read(&rb->reserve);
+               while (head != res) {
+                       e = to_entry(rb, head);
+                       if (e->size == -1) {
+                               head = PRB_NEXT_WRAP_START_LPOS(rb, head);
+                               continue;
+                       }
+                       e->seq = ++rb->seq;
+                       head += e->size;
+               }
+
+               /*
+                * move the head pointer, thus making all reserved entries
+                * visible to any readers
+                */
+               atomic_long_set_release(&rb->head, res);
+
+               atomic_dec(&rb->ctx);
+               if (atomic_long_read(&rb->reserve) == res)
+                       break;
+               /*
+                * The reserve pointer is different than previously read. New
+                * entries were reserve/committed by NMI contexts, possibly
+                * before ctx was decremented by this context. Go back and move
+                * the head pointer beyond those entries as well.
+                */
+               atomic_inc(&rb->ctx);
+       }
+
+       /* Enable interrupts and allow other CPUs to reserve/commit. */
+       prb_unlock(rb->cpulock);
+}
+
+/*
+ * prb_commit: Commit a reserved entry to the ring buffer.
+ * @e: A structure referencing a the reserved entry to commit.
+ *
+ * Commit data that has been reserved using prb_reserve(). Once the entry
+ * has been committed, it can be invalidated at any time. If a writer is
+ * interested in using the data after committing, the writer should make
+ * its own copy first or use the prb_iter_ reader functions to access the
+ * data in the ring buffer.
+ *
+ * It is safe to call this function from any context and state.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+       prb_commit_all_reserved(e->rb);
+}
+
+/* given the size to reserve, determine current and next reserve pointers */
+static bool find_res_ptrs(struct printk_ringbuffer *rb, unsigned long *res_old,
+                         unsigned long *res_new, unsigned int size)
+{
+       unsigned long tail, entry_begin;
+
+       /*
+        * The reserve pointer is not allowed to overtake the index of the
+        * tail pointer. If this would happen, the tail pointer must be
+        * pushed, thus removing the oldest entry.
+        */
+       for (;;) {
+               tail = atomic_long_read(&rb->tail);
+               *res_old = atomic_long_read(&rb->reserve);
+
+               /*
+                * If the new reserve pointer wraps, the new entry will
+                * begin at the beginning of the data array. This loop
+                * exists only to handle the wrap.
+                */
+               for (entry_begin = *res_old;;) {
+
+                       *res_new = entry_begin + size;
+
+                       if (*res_new - tail > PRB_SIZE(rb)) {
+                               /* would overtake tail, push tail */
+
+                               if (!push_tail(rb, tail)) {
+                                       /* couldn't push tail, can't reserve */
+                                       return false;
+                               }
+
+                               /* tail pushed, try again */
+                               break;
+                       }
+
+                       if (PRB_WRAPS(rb, entry_begin) ==
+                           PRB_WRAPS(rb, *res_new)) {
+                               /* reserve pointer values determined */
+                               return true;
+                       }
+
+                       /*
+                        * The new entry will wrap. Calculate the new reserve
+                        * pointer based on the beginning of the data array
+                        * for the wrap of the new reserve pointer.
+                        */
+                       entry_begin = PRB_THIS_WRAP_START_LPOS(rb, *res_new);
+               }
+       }
+}
+
+/*
+ * prb_reserve: Reserve an entry within a ring buffer.
+ * @e: A structure to be setup and reference a reserved entry.
+ * @rb: A ring buffer to reserve data within.
+ * @size: The number of bytes to reserve.
+ *
+ * Reserve an entry of at least @size bytes to be used by the caller. If
+ * successful, the data region of the entry belongs to the caller and cannot
+ * be invalidated by any other task/context. For this reason, the caller
+ * should call prb_commit() as quickly as possible in order to avoid preventing
+ * other tasks/contexts from reserving data in the case that the ring buffer
+ * has wrapped.
+ *
+ * It is safe to call this function from any context and state.
+ *
+ * Returns a pointer to the reserved data (and @e is setup to reference the
+ * entry containing that data) or NULL if it was not possible to reserve data.
+ */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+                 unsigned int size)
+{
+       unsigned long res_old, res_new;
+       struct prb_entry *entry;
+
+       if (size == 0)
+               return NULL;
+
+       /* add entry header to size and align for the following entry */
+       size = PRB_ENTRY_ALIGN_SIZE(sizeof(struct prb_entry) + size);
+
+       if (size >= PRB_SIZE(rb))
+               return NULL;
+
+       /*
+        * Lock out all other CPUs and disable interrupts. Only NMIs will
+        * be able to interrupt. It will stay this way until the matching
+        * commit is called.
+        */
+       prb_lock(rb->cpulock);
+
+       /*
+        * Clarify the responsibility of this context. If this context
+        * increments ctx to 1, this context is responsible for pushing
+        * the head pointer beyond all reserved entries on commit.
+        */
+       atomic_inc(&rb->ctx);
+
+       /*
+        * Move the reserve pointer forward. Since NMIs can interrupt at any
+        * time, modifying the reserve pointer is done in a cmpxchg loop.
+        */
+       do {
+               if (!find_res_ptrs(rb, &res_old, &res_new, size)) {
+                       /*
+                        * Not possible to move the reserve pointer. Try to
+                        * commit all reserved entries because this context
+                        * might have that responsibility (if it incremented
+                        * ctx to 1).
+                        */
+                       prb_commit_all_reserved(rb);
+                       return NULL;
+               }
+       } while (!atomic_long_try_cmpxchg_acquire(&rb->reserve,
+                                                 &res_old, res_new));
+
+       entry = to_entry(rb, res_old);
+       if (PRB_WRAPS(rb, res_old) != PRB_WRAPS(rb, res_new)) {
+               /*
+                * The reserve wraps. Create the terminating entry and get the
+                * pointer to the actually reserved entry at the beginning of
+                * the data array on the wrap of the new reserve pointer.
+                */
+               entry->size = -1;
+               entry = to_entry(rb, PRB_THIS_WRAP_START_LPOS(rb, res_new));
+       }
+
+       /* The size is set now. The seq is set later, on commit. */
+       entry->size = size;
+
+       e->rb = rb;
+       return &entry->data[0];
+}

Reply via email to