This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new eac19012 Support bthread_once and bthread singleton (#2520)
eac19012 is described below

commit eac190125d19eee150d886cea6e6dfc7482f8874
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Feb 26 10:46:24 2024 +0800

    Support bthread_once and bthread singleton (#2520)
---
 src/bthread/bthread.h                   |   9 +++
 src/bthread/bthread_once.cpp            |  81 +++++++++++++++++++
 src/bthread/singleton_on_bthread_once.h |  61 ++++++++++++++
 src/bthread/types.h                     |  23 ++++++
 test/bthread_once_unittest.cpp          | 136 ++++++++++++++++++++++++++++++++
 5 files changed, 310 insertions(+)

diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h
index f91bc9af..a4c05867 100644
--- a/src/bthread/bthread.h
+++ b/src/bthread/bthread.h
@@ -336,6 +336,15 @@ extern void* bthread_getspecific(bthread_key_t key);
 // Return current bthread tag
 extern bthread_tag_t bthread_self_tag(void);
 
+// The first call to bthread_once() by any thread in a process, with a given
+// once_control, will call the init_routine() with no arguments. Subsequent
+// calls of bthread_once() with the same once_control will not call the
+// init_routine(). On return from bthread_once(), it is guaranteed that
+// init_routine() has completed. The once_control parameter is used to
+// determine whether the associated initialisation routine has been called.
+// Returns 0 on success, error code otherwise.
+extern int bthread_once(bthread_once_t* once_control, void (*init_routine)());
+
 __END_DECLS
 
 #endif  // BTHREAD_BTHREAD_H
diff --git a/src/bthread/bthread_once.cpp b/src/bthread/bthread_once.cpp
new file mode 100644
index 00000000..a5751bc7
--- /dev/null
+++ b/src/bthread/bthread_once.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bthread/types.h"
+#include "bthread/butex.h"
+
+bthread_once_t::bthread_once_t()
+    : _butex(bthread::butex_create_checked<butil::atomic<int>>())  {
+    _butex->store(UNINITIALIZED, butil::memory_order_relaxed);
+}
+
+bthread_once_t::~bthread_once_t() {
+    bthread::butex_destroy(_butex);
+}
+
+namespace bthread {
+
+int bthread_once_impl(bthread_once_t* once_control, void (*init_routine)()) {
+    butil::atomic<int>* butex = once_control->_butex;
+    // We need acquire memory order for this load because if the value
+    // signals that initialization has finished, we need to see any
+    // data modifications done during initialization.
+    int val = butex->load(butil::memory_order_acquire);
+    if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) {
+        // The initialization has already been done.
+        return 0;
+    }
+    val = bthread_once_t::UNINITIALIZED;
+    if (butex->compare_exchange_strong(val, bthread_once_t::INPROGRESS,
+                                       butil::memory_order_relaxed,
+                                       butil::memory_order_relaxed)) {
+        // This (b)thread is the first and the Only one here. Do the 
initialization.
+        init_routine();
+        // Mark *once_control as having finished the initialization. We need
+        // release memory order here because we need to synchronize with other
+        // (b)threads that want to use the initialized data.
+        butex->store(bthread_once_t::INITIALIZED, butil::memory_order_release);
+        // Wake up all other (b)threads.
+        bthread::butex_wake_all(butex);
+        return 0;
+    }
+
+    while (true) {
+        // Same as above, we need acquire memory order.
+        val = butex->load(butil::memory_order_acquire);
+        if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) {
+            // The initialization has already been done.
+            return 0;
+        }
+        // Unless your constructor can be very time consuming, it is very 
unlikely o hit
+        // this race. When it does, we just wait the thread until the object 
has been created.
+        if (bthread::butex_wait(butex, val, NULL) < 0 &&
+            errno != EWOULDBLOCK && errno != EINTR/*note*/) {
+            return errno;
+        }
+    }
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_once(bthread_once_t* once_control, void (*init_routine)()) {
+    return bthread::bthread_once_impl(once_control, init_routine);
+}
+
+__END_DECLS
\ No newline at end of file
diff --git a/src/bthread/singleton_on_bthread_once.h 
b/src/bthread/singleton_on_bthread_once.h
new file mode 100644
index 00000000..9ea507d7
--- /dev/null
+++ b/src/bthread/singleton_on_bthread_once.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_SINGLETON_ON_BTHREAD_ONCE_H
+#define BRPC_SINGLETON_ON_BTHREAD_ONCE_H
+
+#include "bthread/bthread.h"
+
+namespace bthread {
+
+template <typename T>
+class GetLeakySingleton {
+public:
+    static T* _instance;
+    static bthread_once_t* g_create_leaky_singleton_once;
+    static void create_leaky_singleton();
+};
+
+template <typename T>
+T* GetLeakySingleton<T>::_instance = NULL;
+
+template <typename T>
+bthread_once_t* GetLeakySingleton<T>::g_create_leaky_singleton_once
+    = new bthread_once_t;
+
+template <typename T>
+void GetLeakySingleton<T>::create_leaky_singleton() {
+    _instance = new T;
+}
+
+// To get a never-deleted singleton of a type T, just call
+// bthread::get_leaky_singleton<T>(). Most daemon (b)threads
+// or objects that need to be always-on can be created by
+// this function. This function can be called safely not only
+// before main() w/o initialization issues of global variables,
+// but also on bthread with hanging operation.
+template <typename T>
+inline T* get_leaky_singleton() {
+    using LeakySingleton = GetLeakySingleton<T>;
+    bthread_once(LeakySingleton::g_create_leaky_singleton_once,
+                 LeakySingleton::create_leaky_singleton);
+    return LeakySingleton::_instance;
+}
+
+} // namespace bthread
+
+#endif // BRPC_SINGLETON_ON_BTHREAD_ONCE_H
diff --git a/src/bthread/types.h b/src/bthread/types.h
index cb39ae3c..d91b85aa 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -192,6 +192,29 @@ typedef struct {
 typedef struct {
 } bthread_barrierattr_t;
 
+#if defined(__cplusplus)
+class bthread_once_t;
+namespace bthread {
+extern int bthread_once_impl(bthread_once_t* once_control, void 
(*init_routine)());
+}
+
+class bthread_once_t {
+public:
+friend int bthread::bthread_once_impl(bthread_once_t* once_control, void 
(*init_routine)());
+    enum State {
+        UNINITIALIZED = 0,
+        INPROGRESS,
+        INITIALIZED,
+    };
+
+    bthread_once_t();
+    ~bthread_once_t();
+
+private:
+    butil::atomic<int>* _butex;
+};
+#endif
+
 typedef struct {
     uint64_t value;
 } bthread_id_t;
diff --git a/test/bthread_once_unittest.cpp b/test/bthread_once_unittest.cpp
new file mode 100644
index 00000000..618798e8
--- /dev/null
+++ b/test/bthread_once_unittest.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "bthread/bthread.h"
+#include "bthread/singleton_on_bthread_once.h"
+#include "bthread/task_control.h"
+
+namespace bthread {
+extern TaskControl* g_task_control;
+}
+
+namespace {
+
+bthread_once_t g_bthread_once_control;
+bool g_bthread_once_started = false;
+butil::atomic<int> g_bthread_once_count(0);
+
+void init_routine() {
+    bthread_usleep(2000 * 1000);
+    g_bthread_once_count.fetch_add(1, butil::memory_order_relaxed);
+}
+
+void bthread_once_task() {
+    bthread_once(&g_bthread_once_control, init_routine);
+    //  `init_routine' only be called once.
+    ASSERT_EQ(1, g_bthread_once_count.load(butil::memory_order_relaxed));
+}
+
+void* first_bthread_once_task(void*) {
+    g_bthread_once_started = true;
+    bthread_once_task();
+    return NULL;
+}
+
+
+void* other_bthread_once_task(void*) {
+    bthread_once_task();
+    return NULL;
+}
+
+TEST(BthreadOnceTest, once) {
+    bthread_t bid;
+    ASSERT_EQ(0, bthread_start_background(
+        &bid, NULL, first_bthread_once_task, NULL));
+    while (!g_bthread_once_started) {
+        bthread_usleep(1000);
+    }
+    ASSERT_NE(nullptr, bthread::g_task_control);
+    int concurrency = bthread::g_task_control->concurrency();
+    LOG(INFO) << "concurrency: " << concurrency;
+    ASSERT_GT(concurrency, 0);
+    std::vector<bthread_t> bids(concurrency * 100);
+    for (auto& id : bids) {
+        ASSERT_EQ(0, bthread_start_background(
+            &id, NULL, other_bthread_once_task, NULL));
+    }
+    bthread_once_task();
+
+    for (auto& id : bids) {
+        bthread_join(id, NULL);
+    }
+    bthread_join(bid, NULL);
+}
+
+bool g_bthread_started = false;
+butil::atomic<int> g_bthread_singleton_count(0);
+
+class BthreadSingleton {
+public:
+    BthreadSingleton() {
+        bthread_usleep(2000 * 1000);
+        g_bthread_singleton_count.fetch_add(1, butil::memory_order_relaxed);
+    }
+};
+
+void get_bthread_singleton() {
+    auto instance = bthread::get_leaky_singleton<BthreadSingleton>();
+    ASSERT_NE(nullptr, instance);
+    // Only one BthreadSingleton instance has been created.
+    ASSERT_EQ(1, g_bthread_singleton_count.load(butil::memory_order_relaxed));
+}
+
+void* first_get_bthread_singleton(void*) {
+    g_bthread_started = true;
+    get_bthread_singleton();
+    return NULL;
+}
+
+
+void* get_bthread_singleton(void*) {
+    get_bthread_singleton();
+    return NULL;
+}
+
+// Singleton will definitely not cause deadlock,
+// even if constructor of T will hang the bthread.
+TEST(BthreadOnceTest, singleton) {
+    bthread_t bid;
+    ASSERT_EQ(0, bthread_start_background(
+        &bid, NULL, first_get_bthread_singleton, NULL));
+    while (!g_bthread_started) {
+        bthread_usleep(1000);
+    }
+    ASSERT_NE(nullptr, bthread::g_task_control);
+    int concurrency = bthread::g_task_control->concurrency();
+    LOG(INFO) << "concurrency: " << concurrency;
+    ASSERT_GT(concurrency, 0);
+    std::vector<bthread_t> bids(concurrency * 100);
+    for (auto& id : bids) {
+        ASSERT_EQ(0, bthread_start_background(
+            &id, NULL, get_bthread_singleton, NULL));
+    }
+    get_bthread_singleton();
+
+    for (auto& id : bids) {
+        bthread_join(id, NULL);
+    }
+    bthread_join(bid, NULL);
+}
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to