This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 24fc31ea span for new bthread (#2519)
24fc31ea is described below

commit 24fc31eaa56d8b522975ca41ed012faf6cf77438
Author: Yang,Liming <liming.y...@139.com>
AuthorDate: Mon Feb 26 10:46:43 2024 +0800

    span for new bthread (#2519)
---
 example/rpcz_echo_c++/CMakeLists.txt | 141 +++++++++++++++++++++++++++++
 example/rpcz_echo_c++/client.cpp     |  96 ++++++++++++++++++++
 example/rpcz_echo_c++/echo.proto     |  33 +++++++
 example/rpcz_echo_c++/server.cpp     | 171 +++++++++++++++++++++++++++++++++++
 src/brpc/builtin/rpcz_service.cpp    |  19 +++-
 src/brpc/global.cpp                  |   8 ++
 src/brpc/span.cpp                    |  80 +++++++++++++---
 src/brpc/span.h                      |  14 ++-
 src/brpc/span.proto                  |   1 +
 src/bthread/bthread.cpp              |   9 ++
 src/bthread/task_group.cpp           |  13 ++-
 src/bthread/unstable.h               |   3 +
 test/brpc_alpn_protocol_unittest.cpp |   2 +-
 test/bthread_unittest.cpp            |  51 ++++++++++-
 14 files changed, 616 insertions(+), 25 deletions(-)

diff --git a/example/rpcz_echo_c++/CMakeLists.txt 
b/example/rpcz_echo_c++/CMakeLists.txt
new file mode 100644
index 00000000..53c16690
--- /dev/null
+++ b/example/rpcz_echo_c++/CMakeLists.txt
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(rpcz_echo_c++ C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+    COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex 
\".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+    OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+# Search for libthrift* by best effort. If it is not found and brpc is
+# compiled with thrift protocol enabled, a link error would be reported.
+find_library(THRIFT_LIB NAMES thrift)
+if (NOT THRIFT_LIB)
+    set(THRIFT_LIB "")
+endif()
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+    find_library(BRPC_LIB NAMES brpc)
+else()
+    find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+    message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+    message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+    COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" 
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | 
tr -d '\n'"
+    OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+    execute_process(
+        COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" 
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | 
tr -d '\n'"
+        OUTPUT_VARIABLE GFLAGS_NS
+    )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    include(CheckFunctionExists)
+    CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+    if(NOT HAVE_CLOCK_GETTIME)
+        set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+    endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ 
-pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+else()
+    set(CMAKE_CXX_STANDARD 11)
+    set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+    message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(OPENSSL_ROOT_DIR
+        "/usr/local/opt/openssl"    # Homebrew installed OpenSSL
+        )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+set(DYNAMIC_LIB
+    ${CMAKE_THREAD_LIBS_INIT}
+    ${GFLAGS_LIBRARY}
+    ${PROTOBUF_LIBRARIES}
+    ${LEVELDB_LIB}
+    ${OPENSSL_CRYPTO_LIBRARY}
+    ${OPENSSL_SSL_LIBRARY}
+    ${THRIFT_LIB}
+    dl
+    )
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(DYNAMIC_LIB ${DYNAMIC_LIB}
+        pthread
+        "-framework CoreFoundation"
+        "-framework CoreGraphics"
+        "-framework CoreData"
+        "-framework CoreText"
+        "-framework Security"
+        "-framework Foundation"
+        "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
+        "-Wl,-U,_ProfilerStart"
+        "-Wl,-U,_ProfilerStop"
+        "-Wl,-U,__Z13GetStackTracePPvii")
+endif()
+
+add_executable(rpcz_echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
+add_executable(rpcz_echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
+
+target_link_libraries(rpcz_echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(rpcz_echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
+
diff --git a/example/rpcz_echo_c++/client.cpp b/example/rpcz_echo_c++/client.cpp
new file mode 100644
index 00000000..ad80de0f
--- /dev/null
+++ b/example/rpcz_echo_c++/client.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server every 1 second.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+
+namespace brpc {
+DECLARE_bool(enable_rpcz);
+}
+DEFINE_string(attachment, "", "Carry this along with requests");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in 
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
+DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // brpc::FLAGS_enable_rpcz = true;
+    // A Channel represents a communication line to a Server. Notice that 
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+    
+    // Initialize the channel, NULL means using default options.
+    brpc::ChannelOptions options;
+    options.protocol = FLAGS_protocol;
+    options.connection_type = FLAGS_connection_type;
+    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), 
&options) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return -1;
+    }
+
+    // Normally, you should not call a Channel directly, but instead construct
+    // a stub Service wrapping it. stub can be shared by all threads as well.
+    example::EchoService_Stub stub(&channel);
+
+    // Send a request and wait for the response every 1 second.
+    int log_id = 0;
+    while (!brpc::IsAskedToQuit()) {
+        // We will receive response synchronously, safe to put variables
+        // on stack.
+        example::EchoRequest request;
+        example::EchoResponse response;
+        brpc::Controller cntl;
+
+        request.set_message("hello world");
+
+        cntl.set_log_id(log_id ++);  // set by user
+        // Set attachment which is wired to network directly instead of 
+        // being serialized into protobuf messages.
+        cntl.request_attachment().append(FLAGS_attachment);
+
+        // Because `done'(last parameter) is NULL, this function waits until
+        // the response comes back or error occurs(including timedout).
+        stub.Echo(&cntl, &request, &response, NULL);
+        if (!cntl.Failed()) {
+            LOG(INFO) << "Received response from " << cntl.remote_side()
+                << " to " << cntl.local_side()
+                << ": " << response.message() << " (attached="
+                << cntl.response_attachment() << ")"
+                << " latency=" << cntl.latency_us() << "us";
+        } else {
+            LOG(WARNING) << cntl.ErrorText();
+        }
+        usleep(FLAGS_interval_ms * 1000L);
+    }
+
+    LOG(INFO) << "EchoClient is going to quit";
+    return 0;
+}
+
diff --git a/example/rpcz_echo_c++/echo.proto b/example/rpcz_echo_c++/echo.proto
new file mode 100644
index 00000000..2b39627f
--- /dev/null
+++ b/example/rpcz_echo_c++/echo.proto
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+package example;
+
+option cc_generic_services = true;
+
+message EchoRequest {
+      required string message = 1;
+};
+
+message EchoResponse {
+      required string message = 1;
+};
+
+service EchoService {
+      rpc Echo(EchoRequest) returns (EchoResponse);
+};
diff --git a/example/rpcz_echo_c++/server.cpp b/example/rpcz_echo_c++/server.cpp
new file mode 100644
index 00000000..0f5f7f26
--- /dev/null
+++ b/example/rpcz_echo_c++/server.cpp
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <brpc/channel.h>
+#include <brpc/server.h>
+#include <butil/logging.h>
+#include <gflags/gflags.h>
+#include <json2pb/pb_to_json.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_string(listen_addr, "",
+              "Server listen address, may be IPV4/IPV6/UDS."
+              " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1,
+             "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_string(server, "0.0.0.0:8001", "IP Address of server");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in 
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+
+static const bthread_attr_t BTHREAD_ATTR_NORMAL_WITH_SPAN = {
+    BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL, BTHREAD_TAG_INVALID};
+
+void* RunThreadFunc(void*) {
+    TRACEPRINTF("RunThreadFunc %lu", bthread_self());
+    // brpc::FLAGS_enable_rpcz = true;
+    // A Channel represents a communication line to a Server. Notice that
+    // Channel is thread-safe and can be shared by all threads in your program.
+    brpc::Channel channel;
+    // Initialize the channel, NULL means using default options.
+    brpc::ChannelOptions options;
+    options.protocol = FLAGS_protocol;
+    options.connection_type = FLAGS_connection_type;
+    options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+    options.max_retry = FLAGS_max_retry;
+    if (channel.Init(FLAGS_server.c_str(), "", &options) != 0) {
+        LOG(ERROR) << "Fail to initialize channel";
+        return nullptr;
+    }
+    example::EchoService_Stub stub(&channel);
+    // We will receive response synchronously, safe to put variables
+    // on stack.
+    example::EchoRequest request;
+    example::EchoResponse response;
+    brpc::Controller cntl;
+    request.set_message("hello world");
+
+    // Because `done'(last parameter) is NULL, this function waits until
+    // the response comes back or error occurs(including timedout).
+    stub.Echo(&cntl, &request, &response, NULL);
+    if (!cntl.Failed()) {
+        LOG(INFO) << "Received response from " << cntl.remote_side() << " to " 
<< cntl.local_side()
+                  << ": " << response.message() << " (attached=" << 
cntl.response_attachment()
+                  << ")"
+                  << " latency=" << cntl.latency_us() << "us";
+    } else {
+        LOG(WARNING) << cntl.ErrorText();
+    }
+
+    return nullptr;
+}
+
+class EchoServiceImpl : public EchoService {
+public:
+    EchoServiceImpl() {}
+    virtual ~EchoServiceImpl() {}
+    virtual void Echo(google::protobuf::RpcController* cntl_base, const 
EchoRequest* request,
+                      EchoResponse* response, google::protobuf::Closure* done) 
{
+        bthread_list_t list;
+        bthread_list_init(&list, 0, 0);
+        for (int i = 0; i < 2; ++i) {
+            bthread_t tid;
+            bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL_WITH_SPAN, 
RunThreadFunc, nullptr);
+            bthread_list_add(&list, tid);
+        }
+        bthread_list_join(&list);
+
+        TRACEPRINTF("Handle request");
+
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        // The purpose of following logs is to help you to understand
+        // how clients interact with servers more intuitively. You should
+        // remove these logs in performance-sensitive servers.
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from "
+                  << cntl->remote_side() << " to " << cntl->local_side() << ": 
"
+                  << request->message() << " (attached=" << 
cntl->request_attachment() << ")";
+
+        // Fill response.
+        response->set_message(request->message());
+
+        // You can compress the response by setting Controller, but be aware
+        // that compression may be costly, evaluate before turning on.
+        // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+        if (FLAGS_echo_attachment) {
+            // Set attachment which is wired to network directly instead of
+            // being serialized into protobuf messages.
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+};
+}  // namespace example
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Instance of your service.
+    example::EchoServiceImpl echo_service_impl;
+
+    // Add the service into server. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) 
!= 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    butil::EndPoint point;
+    if (!FLAGS_listen_addr.empty()) {
+        if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
+            LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
+            return -1;
+        }
+    } else {
+        point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
+    }
+    // Start the server.
+    brpc::ServerOptions options;
+    options.idle_timeout_sec = FLAGS_idle_timeout_s;
+    if (server.Start(point, &options) != 0) {
+        LOG(ERROR) << "Fail to start EchoServer";
+        return -1;
+    }
+
+    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+    server.RunUntilAskedToQuit();
+    return 0;
+}
diff --git a/src/brpc/builtin/rpcz_service.cpp 
b/src/brpc/builtin/rpcz_service.cpp
index 3625c068..b9e05637 100644
--- a/src/brpc/builtin/rpcz_service.cpp
+++ b/src/brpc/builtin/rpcz_service.cpp
@@ -309,6 +309,17 @@ static void PrintClientSpan(std::ostream& os,const 
RpczSpan& span,
     PrintClientSpan(os, span, &last_time, NULL, use_html);
 }
 
+static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* 
last_time,
+                             SpanInfoExtractor* server_extr, bool use_html) {
+    SpanInfoExtractor client_extr(span.info().c_str());
+    int num_extr = 0;
+    SpanInfoExtractor* extr[2];
+    if (server_extr) {
+        extr[num_extr++] = server_extr;
+    }
+    extr[num_extr++] = &client_extr;
+    PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr, 
num_extr);
+}
 
 static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
                             bool use_html) {
@@ -351,8 +362,12 @@ static void PrintServerSpan(std::ostream& os, const 
RpczSpan& span,
 
     const int nclient = span.client_spans_size();
     for (int i = 0; i < nclient; ++i) {
-        PrintClientSpan(os, span.client_spans(i), &last_time,
-                        &server_extr, use_html);
+        auto& client_span = span.client_spans(i);
+        if (client_span.type() == SPAN_TYPE_CLIENT) {
+            PrintClientSpan(os, client_span, &last_time, &server_extr, 
use_html);
+        } else {
+            PrintBthreadSpan(os, client_span, &last_time, &server_extr, 
use_html);
+        }
     }
 
     if (PrintAnnotationsAndRealTimeSpan(
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index fbd669e7..121b9d97 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -50,6 +50,11 @@
 #include "brpc/policy/hasher.h"
 #include "brpc/policy/dynpart_load_balancer.h"
 
+
+// Span
+#include "brpc/span.h"
+#include "bthread/unstable.h"
+
 // Compress handlers
 #include "brpc/compress.h"
 #include "brpc/policy/gzip_compress.h"
@@ -329,6 +334,9 @@ static void GlobalInitializeOrDieImpl() {
     // Make GOOGLE_LOG print to comlog device
     SetLogHandler(&BaiduStreamingLogHandler);
 
+    // Set bthread create span function
+    bthread_set_create_span_func(CreateBthreadSpan);
+
     // Setting the variable here does not work, the profiler probably check
     // the variable before main() for only once.
     // setenv("TCMALLOC_SAMPLE_PARAMETER", "524288", 0);
diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp
index 6e0e64af..356a7cd0 100644
--- a/src/brpc/span.cpp
+++ b/src/brpc/span.cpp
@@ -121,6 +121,7 @@ Span* Span::CreateClientSpan(const std::string& 
full_method_name,
     span->_start_send_real_us = 0;
     span->_sent_real_us = 0;
     span->_next_client = NULL;
+    span->_client_list = NULL;
     span->_tls_next = NULL;
     span->_full_method_name = full_method_name;
     span->_info.clear();
@@ -129,8 +130,8 @@ Span* Span::CreateClientSpan(const std::string& 
full_method_name,
         span->_trace_id = parent->trace_id();
         span->_parent_span_id = parent->span_id();
         span->_local_parent = parent;
-        span->_next_client = parent->_next_client;
-        parent->_next_client = span;
+        span->_next_client = parent->_client_list;
+        parent->_client_list = span;
     } else {
         span->_trace_id = GenerateTraceId();
         span->_parent_span_id = 0;
@@ -140,6 +141,47 @@ Span* Span::CreateClientSpan(const std::string& 
full_method_name,
     return span;
 }
 
+Span* Span::CreateBthreadSpan(const std::string& full_method_name, 
+                              int64_t base_real_us) {
+    Span* parent = (Span*)bthread::tls_bls.rpcz_parent_span;
+    if (parent == NULL) {
+        return NULL;
+    }
+    Span* span = butil::get_object<Span>(Forbidden());
+    if (__builtin_expect(span == NULL, 0)) {
+        return NULL;
+    }
+    span->_log_id = 0;
+    span->_base_cid = INVALID_BTHREAD_ID;
+    span->_ending_cid = INVALID_BTHREAD_ID;
+    span->_type = SPAN_TYPE_BTHREAD;
+    span->_async = false;
+    span->_protocol = PROTOCOL_UNKNOWN;
+    span->_error_code = 0;
+    span->_request_size = 0;
+    span->_response_size = 0;
+    span->_base_real_us = base_real_us;
+    span->_received_real_us = 0;
+    span->_start_parse_real_us = 0;
+    span->_start_callback_real_us = 0;
+    span->_start_send_real_us = 0;
+    span->_sent_real_us = 0;
+    span->_next_client = NULL;
+    span->_client_list = NULL;
+    span->_tls_next = NULL;
+    span->_full_method_name = full_method_name;
+    span->_info.clear();
+
+    span->_trace_id = parent->trace_id();
+    span->_parent_span_id = parent->span_id();
+    span->_local_parent = parent;
+    span->_next_client = parent->_client_list;
+    parent->_client_list = span;
+
+    span->_span_id = GenerateSpanId();
+    return span;
+}
+
 inline const std::string& unknown_span_name() {
     // thread-safe in gcc.
     static std::string s_unknown_method_name = "unknown_method";
@@ -173,6 +215,7 @@ Span* Span::CreateServerSpan(
     span->_start_send_real_us = 0;
     span->_sent_real_us = 0;
     span->_next_client = NULL;
+    span->_client_list = NULL;
     span->_tls_next = NULL;
     span->_full_method_name = (!full_method_name.empty() ?
                                full_method_name : unknown_span_name());
@@ -195,15 +238,20 @@ void Span::ResetServerSpanName(const std::string& 
full_method_name) {
 
 void Span::destroy() {
     EndAsParent();
-    Span* p = _next_client;
-    while (p) {
-        Span* p_next = p->_next_client;
-        p->_info.clear();
-        butil::return_object(p);
-        p = p_next;
+    traversal(this, [](Span* r) {
+        r->_info.clear();
+        butil::return_object(r);
+    });
+}
+
+void Span::traversal(Span* r, const std::function<void(Span*)>& f) const {
+    if (r == NULL) {
+        return;
     }
-    _info.clear();
-    butil::return_object(this);
+    for (auto p = r->_client_list; p != NULL; p = p->_next_client) {
+        traversal(p, f);
+    }
+    f(r);
 }
 
 void Span::Annotate(const char* fmt, ...) {
@@ -243,8 +291,8 @@ void Span::AnnotateCStr(const char* info, size_t length) {
 
 size_t Span::CountClientSpans() const {
     size_t n = 0;
-    for (Span* p = _next_client; p; p = p->_next_client, ++n);
-    return n;
+    traversal(const_cast<Span*>(this), [&](Span*) { ++n; });
+    return n - 1;
 }
 
 int64_t Span::GetStartRealTimeUs() const {
@@ -577,9 +625,13 @@ leveldb::Status SpanDB::Index(const Span* span, 
std::string* value_buf) {
         value_proto.add_client_spans();
     }
     size_t i = 0;
-    for (const Span* p = span->_next_client; p; p = p->_next_client, ++i) {
+    span->traversal(const_cast<Span*>(span), [&](Span* p) {
+        if (span == p) {
+            return;
+        }
         Span2Proto(p, value_proto.mutable_client_spans(client_span_count - i - 
1));
-    }
+        ++i;
+    });
     if (!value_proto.SerializeToString(value_buf)) {
         return leveldb::Status::InvalidArgument(
             leveldb::Slice("Fail to serialize RpczSpan"));
diff --git a/src/brpc/span.h b/src/brpc/span.h
index 43ede3d5..44273405 100644
--- a/src/brpc/span.h
+++ b/src/brpc/span.h
@@ -69,6 +69,10 @@ public:
     static Span* CreateClientSpan(const std::string& full_method_name,
                                   int64_t base_real_us);
 
+    // Create a span to track start bthread
+    static Span* CreateBthreadSpan(const std::string& full_method_name, 
+                                   int64_t base_real_us);
+
     static void Submit(Span* span, int64_t cpuwide_time_us);
 
     // Set tls parent.
@@ -82,7 +86,7 @@ public:
     void Annotate(const std::string& info);
     // When length <= 0, use strlen instead.
     void AnnotateCStr(const char* cstr, size_t length);
-    
+
     // #child spans, Not O(1)
     size_t CountClientSpans() const;
 
@@ -142,6 +146,7 @@ private:
 
     void dump_and_destroy(size_t round_index);
     void destroy();
+    void traversal(Span*, const std::function<void(Span*)>&) const;
     bvar::CollectorSpeedLimit* speed_limit();
     bvar::CollectorPreprocessor* preprocessor();
 
@@ -179,6 +184,7 @@ private:
 
     Span* _local_parent;
     Span* _next_client;
+    Span* _client_list;
     Span* _tls_next;
 };
 
@@ -233,6 +239,12 @@ inline bool IsTraceable(bool is_upstream_traced) {
         (FLAGS_enable_rpcz && bvar::is_collectable(&g_span_sl));
 }
 
+inline void* CreateBthreadSpan() {
+    const int64_t received_us = butil::cpuwide_time_us();
+    const int64_t base_realtime = butil::gettimeofday_us() - received_us;
+    return Span::CreateBthreadSpan("Bthread", base_realtime);
+}
+
 } // namespace brpc
 
 
diff --git a/src/brpc/span.proto b/src/brpc/span.proto
index a77a9cf6..d37a53e2 100644
--- a/src/brpc/span.proto
+++ b/src/brpc/span.proto
@@ -25,6 +25,7 @@ option java_outer_classname="Span";
 enum SpanType {
     SPAN_TYPE_SERVER = 0;
     SPAN_TYPE_CLIENT = 1;
+    SPAN_TYPE_BTHREAD = 2;
 }
 
 // We don't unify RpczSpan and TracingSpan as one because the former one needs
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 608c1b58..07233e60 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -83,6 +83,7 @@ TaskControl* g_task_control = NULL;
 extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
 extern void (*g_worker_startfn)();
 extern void (*g_tagged_worker_startfn)(bthread_tag_t);
+extern void* (*g_create_span_func)();
 
 inline TaskControl* get_task_control() {
     return g_task_control;
@@ -489,6 +490,14 @@ int bthread_set_tagged_worker_startfn(void 
(*start_fn)(bthread_tag_t)) {
     return 0;
 }
 
+int bthread_set_create_span_func(void* (*func)()) {
+    if (func == NULL) {
+        return EINVAL;
+    }
+    bthread::g_create_span_func = func;
+    return 0;
+}
+
 void bthread_stop_world() {
     bthread::TaskControl* c = bthread::get_task_control();
     if (c != NULL) {
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 46bd2460..45bd89ed 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -76,6 +76,15 @@ const size_t OFFSET_TABLE[] = {
 #include "bthread/offset_inl.list"
 };
 
+void* (*g_create_span_func)() = NULL;
+
+void* run_create_span_func() {
+    if (g_create_span_func) {
+        return g_create_span_func();
+    }
+    return tls_bls.rpcz_parent_span;
+}
+
 int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
     TaskMeta* const m = address_meta(tid);
     if (m != NULL) {
@@ -383,7 +392,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
     m->attr = using_attr;
     m->local_storage = LOCAL_STORAGE_INIT;
     if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
-        m->local_storage.rpcz_parent_span = tls_bls.rpcz_parent_span;
+        m->local_storage.rpcz_parent_span = run_create_span_func();
     }
     m->cpuwide_start_ns = start_ns;
     m->stat = EMPTY_STAT;
@@ -443,7 +452,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
     m->attr = using_attr;
     m->local_storage = LOCAL_STORAGE_INIT;
     if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
-        m->local_storage.rpcz_parent_span = tls_bls.rpcz_parent_span;
+        m->local_storage.rpcz_parent_span = run_create_span_func();
     }
     m->cpuwide_start_ns = start_ns;
     m->stat = EMPTY_STAT;
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index 5922cc2f..f5bdeecb 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -91,6 +91,9 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
 // Add a startup function with tag
 extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
 
+// Add a create span function
+extern int bthread_set_create_span_func(void* (*func)());
+
 // Stop all bthread and worker pthreads.
 // You should avoid calling this function which may cause bthread after main()
 // suspend indefinitely.
diff --git a/test/brpc_alpn_protocol_unittest.cpp 
b/test/brpc_alpn_protocol_unittest.cpp
index 7884b3fe..3040355a 100644
--- a/test/brpc_alpn_protocol_unittest.cpp
+++ b/test/brpc_alpn_protocol_unittest.cpp
@@ -31,7 +31,7 @@ DEFINE_string(listen_addr, "0.0.0.0:8011", "Server listen 
address.");
 
 int main(int argc, char* argv[]) {
     testing::InitGoogleTest(&argc, argv);
-    google::ParseCommandLineFlags(&argc, &argv, true);    
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);    
     return RUN_ALL_TESTS();
 }
 
diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp
index 72b8da3f..5ed8aba8 100644
--- a/test/bthread_unittest.cpp
+++ b/test/bthread_unittest.cpp
@@ -513,7 +513,7 @@ TEST_F(BthreadTest, bthread_usleep) {
 }
 
 static const bthread_attr_t BTHREAD_ATTR_NORMAL_WITH_SPAN =
-{ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL };
+{ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL, BTHREAD_TAG_INVALID };
 
 void* test_parent_span(void* p) {
     uint64_t *q = (uint64_t *)p;
@@ -522,6 +522,32 @@ void* test_parent_span(void* p) {
     return NULL;
 }
 
+void* test_grandson_parent_span(void* p) {
+    uint64_t* q = (uint64_t*)p;
+    *q = (uint64_t)(bthread::tls_bls.rpcz_parent_span);
+    LOG(INFO) << "parent span id in thread is " << *q;
+    return NULL;
+}
+
+void* test_son_parent_span(void* p) {
+    uint64_t* q = (uint64_t*)p;
+    *q = (uint64_t)(bthread::tls_bls.rpcz_parent_span);
+    LOG(INFO) << "parent span id in thread is " << *q;
+    bthread_t th;
+    uint64_t multi_p;
+    bthread_start_urgent(&th, &BTHREAD_ATTR_NORMAL_WITH_SPAN, 
test_grandson_parent_span, &multi_p);
+    bthread_join(th, NULL);
+    return NULL;
+}
+
+static uint64_t targets[] = {0xBADBEB0UL, 0xBADBEB1UL, 0xBADBEB2UL, 
0xBADBEB3UL};
+void* create_span_func() {
+    static std::atomic<int> index(0);
+    auto idx = index.fetch_add(1);
+    LOG(INFO) << "Bthread create span " << targets[idx];
+    return (void*)targets[idx];
+}
+
 TEST_F(BthreadTest, test_span) {
     uint64_t p1 = 0;
     uint64_t p2 = 0;
@@ -531,17 +557,32 @@ TEST_F(BthreadTest, test_span) {
 
     bthread::tls_bls.rpcz_parent_span = (void*)target;
     bthread_t th1;
-    ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN,
-                                      test_parent_span, &p1));
+    ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN, 
test_parent_span, &p1));
     ASSERT_EQ(0, bthread_join(th1, NULL));
 
     bthread_t th2;
-    ASSERT_EQ(0, bthread_start_background(&th2, NULL,
-                                      test_parent_span, &p2));
+    ASSERT_EQ(0, bthread_start_background(&th2, NULL, test_parent_span, &p2));
     ASSERT_EQ(0, bthread_join(th2, NULL));
 
     ASSERT_EQ(p1, target);
     ASSERT_NE(p2, target);
+
+    LOG(INFO) << "Test bthread create span";
+
+    bthread_set_create_span_func(create_span_func);
+
+    bthread_t multi_th1;
+    bthread_t multi_th2;
+    uint64_t multi_p1;
+    uint64_t multi_p2;
+    ASSERT_EQ(0, bthread_start_background(&multi_th1, 
&BTHREAD_ATTR_NORMAL_WITH_SPAN,
+                                          test_son_parent_span, &multi_p1));
+    ASSERT_EQ(0, bthread_start_background(&multi_th2, 
&BTHREAD_ATTR_NORMAL_WITH_SPAN,
+                                          test_son_parent_span, &multi_p2));
+    ASSERT_EQ(0, bthread_join(multi_th1, NULL));
+    ASSERT_EQ(0, bthread_join(multi_th2, NULL));
+    ASSERT_EQ(multi_p1, targets[0]);
+    ASSERT_EQ(multi_p2, targets[1]);
 }
 
 void* dummy_thread(void*) {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org


Reply via email to