chewbranca commented on code in PR #5602:
URL: https://github.com/apache/couchdb/pull/5602#discussion_r2221237408


##########
src/couch_stats/src/csrt_logger.erl:
##########
@@ -0,0 +1,592 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(csrt_logger).
+
+%% Process lifetime logging api
+-export([
+    get_tracker/0,
+    log_process_lifetime_report/1,
+    put_tracker/1,
+    stop_tracker/0,
+    stop_tracker/1,
+    track/1,
+    tracker/1
+]).
+
+%% Raw API that bypasses is_enabled checks
+-export([
+    do_lifetime_report/1,
+    do_status_report/1,
+    do_report/2,
+    maybe_report/2,
+    should_truncate_reports/0
+]).
+
+%% gen_server callbacks
+-export([
+    start_link/0,
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% Config update subscription API
+-export([
+    subscribe_changes/0,
+    handle_config_change/5,
+    handle_config_terminate/3
+]).
+
+%% Matchers
+-export([
+    deregister_matcher/1,
+    find_all_matches/2,
+    find_matches/2,
+    get_matcher/1,
+    get_matchers/0,
+    get_registered_matchers/0,
+    is_match/1,
+    is_match/2,
+    matcher_on_dbname/1,
+    matcher_on_docs_read/1,
+    matcher_on_docs_written/1,
+    matcher_on_rows_read/1,
+    matcher_on_changes_processed/1,
+    matcher_on_ioq_calls/1,
+    matcher_on_nonce/1,
+    matcher_on_long_reqs/1,
+    register_matcher/2,
+    reload_matchers/0
+]).
+
+%% Recon API Ports of https://github.com/ferd/recon/releases/tag/2.5.6
+-export([
+    pid_ref_attrs/1,
+    pid_ref_matchspec/1,
+    proc_window/3
+]).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("csrt.hrl").
+
+-record(st, {
+    registered_matchers = #{}
+}).
+
+-spec track(Rctx :: rctx()) -> pid().
+track(#rctx{pid_ref = PidRef}) ->
+    case get_tracker() of
+        undefined ->
+            Pid = spawn(?MODULE, tracker, [PidRef]),
+            put_tracker(Pid),
+            Pid;
+        Pid when is_pid(Pid) ->
+            Pid
+    end.
+
+-spec tracker(PidRef :: pid_ref()) -> ok.
+tracker({Pid, _Ref} = PidRef) ->
+    MonRef = erlang:monitor(process, Pid),
+    receive
+        stop ->
+            log_process_lifetime_report(PidRef),
+            csrt_server:destroy_resource(PidRef),
+            ok;
+        {'DOWN', MonRef, _Type, _0DPid, _Reason0} ->
+            %% TODO: should we pass reason to log_process_lifetime_report?
+            %% Reason = case Reason0 of
+            %%     {shutdown, Shutdown0} ->
+            %%         Shutdown = atom_to_binary(Shutdown0),
+            %%         <<"shutdown: ", Shutdown/binary>>;
+            %%     Reason0 ->
+            %%         Reason0
+            %% end,
+            %% TODO: should we send the induced work delta to the coordinator?
+            log_process_lifetime_report(PidRef),
+            csrt_server:destroy_resource(PidRef),
+            ok
+    end.
+
+-spec register_matcher(Name, MSpec) -> ok | {error, badarg} when
+    Name :: string(), MSpec :: ets:match_spec().
+register_matcher(Name, MSpec) ->
+    gen_server:call(?MODULE, {register, Name, MSpec}, infinity).
+
+-spec deregister_matcher(Name :: string()) -> ok.
+deregister_matcher(Name) ->
+    gen_server:call(?MODULE, {deregister, Name}).
+
+-spec log_process_lifetime_report(PidRef :: pid_ref()) -> ok.
+log_process_lifetime_report(PidRef) ->
+    case csrt_util:is_enabled() andalso csrt_util:is_enabled_reporting() of
+        true ->
+            maybe_report("csrt-pid-usage-lifetime", PidRef);
+        false ->
+            ok
+    end.
+
+%% Return a subset of Matchers for each Matcher that matches on Rctxs
+-spec find_matches(Rctxs :: [rctx()], Matchers :: matchers()) -> matchers().
+find_matches(Rctxs, Matchers) when is_list(Rctxs) andalso is_map(Matchers) ->
+    Rctxs1 =
+        case csrt_util:is_enabled_rpc_reporting() of
+            true ->
+                Rctxs;
+            false ->
+                [Rctx || #rctx{type = #coordinator{}} = Rctx <- Rctxs]
+        end,
+    maps:filter(
+        fun(_Name, {_MSpec, CompMSpec}) ->
+            (catch ets:match_spec_run(Rctxs1, CompMSpec)) =/= []
+        end,
+        Matchers
+    ).
+
+%% Return a Map of #{MatcherName => SRctxs :: rctxs()} for all MatcherName => 
Matcher
+%% in Matchers where SRctxs is the subset of Rctxs matched by the given Matcher
+-spec find_all_matches(Rctxs :: rctxs(), Matchers :: matchers()) -> 
matcher_matches().
+find_all_matches(Rctxs, Matchers) when is_list(Rctxs) andalso is_map(Matchers) 
->
+    maps:map(
+        fun(_Name, {_MSpec, CompMSpec}) ->
+            try
+                ets:match_spec_run(Rctxs, CompMSpec)
+            catch
+                _:_ ->
+                    []
+            end
+        end,
+        Matchers
+    ).
+
+-spec reload_matchers() -> ok.
+reload_matchers() ->
+    ok = gen_server:call(?MODULE, reload_matchers, infinity).
+
+-spec get_matchers() -> matchers().
+get_matchers() ->
+    persistent_term:get(?MATCHERS_KEY, #{}).
+
+-spec get_matcher(Name :: matcher_name()) -> maybe_matcher().
+get_matcher(Name) ->
+    maps:get(Name, get_matchers(), undefined).
+
+-spec get_registered_matchers() -> matchers().
+get_registered_matchers() ->
+    gen_server:call(?MODULE, get_registered_matchers, infinity).
+
+-spec is_match(Rctx :: maybe_rctx()) -> boolean().
+is_match(undefined) ->
+    false;
+is_match(#rctx{} = Rctx) ->
+    is_match(Rctx, get_matchers()).
+
+-spec is_match(Rctx :: maybe_rctx(), Matchers :: matchers()) -> boolean().
+is_match(undefined, _Matchers) ->
+    false;
+is_match(_Rctx, undefined) ->
+    false;
+is_match(#rctx{} = Rctx, Matchers) when is_map(Matchers) ->
+    maps:size(find_matches([Rctx], Matchers)) > 0.
+
+%% Generate a report for the Rctx if it triggers an active Matcher
+-spec maybe_report(ReportName :: string(), PidRef :: maybe_pid_ref()) -> ok.
+maybe_report(ReportName, PidRef) ->
+    Rctx = csrt_server:get_resource(PidRef),
+    case is_match(Rctx) of
+        true ->
+            do_report(ReportName, Rctx),
+            ok;
+        false ->
+            ok
+    end.
+
+%% Whether or not to remove zero value fields from reports to save on volume
+-spec should_truncate_reports() -> boolean().
+should_truncate_reports() ->
+    config:get_boolean(?CSRT, "should_truncate_reports", true).
+
+-spec do_lifetime_report(Rctx :: rctx()) -> boolean().
+do_lifetime_report(Rctx) ->
+    do_report("csrt-pid-usage-lifetime", Rctx).
+
+-spec do_status_report(Rctx :: rctx()) -> boolean().
+do_status_report(Rctx) ->
+    do_report("csrt-pid-usage-status", Rctx).
+
+-spec do_report(ReportName :: string(), Rctx :: rctx()) -> boolean().
+do_report(ReportName, #rctx{} = Rctx) ->
+    JRctx =
+        case {should_truncate_reports(), csrt_entry:to_json(Rctx)} of
+            {true, JRctx0} ->
+                maps:filter(fun(_K, V) -> V > 0 end, JRctx0);
+            {false, JRctx0} ->
+                JRctx0
+        end,
+    couch_log:report(ReportName, JRctx).
+
+%%
+%% Process lifetime logging api
+%%
+
+-spec get_tracker() -> maybe_pid().
+get_tracker() ->
+    get(?TRACKER_PID).
+
+-spec put_tracker(Pid :: pid()) -> maybe_pid().
+put_tracker(Pid) when is_pid(Pid) ->
+    put(?TRACKER_PID, Pid).
+
+-spec stop_tracker() -> ok.
+stop_tracker() ->
+    stop_tracker(get_tracker()).
+
+-spec stop_tracker(Pid :: maybe_pid()) -> ok.
+stop_tracker(undefined) ->
+    ok;
+stop_tracker(Pid) when is_pid(Pid) ->
+    Pid ! stop,
+    ok.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+    St = #st{},
+    ok = initialize_matchers(St#st.registered_matchers),
+    ok = subscribe_changes(),
+    {ok, St}.
+
+handle_call({register, Name, MSpec}, _From, #st{registered_matchers = 
RMatchers} = St) ->
+    case add_matcher(Name, MSpec, RMatchers) of
+        {ok, RMatchers1} ->
+            ok = initialize_matchers(RMatchers1),
+            {reply, ok, St#st{registered_matchers = RMatchers1}};
+        {error, {invalid_ms, _, _}} = Error ->
+            {reply, Error, St}
+    end;
+handle_call({deregister, Name}, _From, #st{registered_matchers = RMatchers} = 
St) ->
+    case maps:is_key(Name, RMatchers) of
+        false ->
+            {reply, {error, missing_matcher}, St};
+        true ->
+            RMatchers1 = maps:remove(Name, RMatchers),
+            ok = initialize_matchers(RMatchers1),
+            {reply, ok, St#st{registered_matchers = RMatchers1}}
+    end;
+handle_call(reload_matchers, _From, St) ->
+    couch_log:warning("Reloading persistent term matchers", []),
+    ok = initialize_matchers(St#st.registered_matchers),
+    {reply, ok, St};
+handle_call(get_registered_matchers, _From, St) ->
+    {reply, St#st.registered_matchers, St};
+handle_call(_, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State, 0}.
+
+handle_info(restart_config_listener, State) ->
+    ok = subscribe_changes(),
+    {noreply, State};
+handle_info(_Msg, St) ->
+    {noreply, St}.
+
+%%
+%% Matchers
+%%
+
+-spec matcher_on_dbname(DbName :: dbname()) -> ets:match_spec().
+matcher_on_dbname(DbName) when
+    is_binary(DbName)
+->
+    ets:fun2ms(fun(#rctx{dbname = DbName1} = R) when DbName =:= DbName1 -> R 
end).
+
+-spec matcher_on_dbname_io_threshold(DbName, Threshold) -> ets:match_spec() 
when
+    DbName :: dbname(), Threshold :: pos_integer().
+matcher_on_dbname_io_threshold(DbName, Threshold) when
+    is_binary(DbName)
+->
+    ets:fun2ms(fun(
+        #rctx{
+            dbname = DbName1,
+            ioq_calls = IOQ,
+            get_kv_node = KVN,
+            get_kp_node = KPN,
+            docs_read = Docs,
+            rows_read = Rows
+        } = R
+    ) when
+        DbName =:= DbName1 andalso
+            ((IOQ >= Threshold) or (KVN >= Threshold) or (KPN >= Threshold) or 
(Docs >= Threshold) or
+                (Rows >= Threshold))
+    ->
+        R
+    end).
+
+-spec matcher_on_docs_read(Threshold :: pos_integer()) -> ets:match_spec().
+matcher_on_docs_read(Threshold) when
+    is_integer(Threshold) andalso Threshold > 0
+->
+    %%ets:fun2ms(fun(#rctx{type=#coordinator{}, docs_read=DocsRead} = R) when 
DocsRead >= Threshold -> R end).
+    ets:fun2ms(fun(#rctx{docs_read = DocsRead} = R) when DocsRead >= Threshold 
-> R end).
+
+-spec matcher_on_docs_written(Threshold :: pos_integer()) -> ets:match_spec().
+matcher_on_docs_written(Threshold) when
+    is_integer(Threshold) andalso Threshold > 0
+->
+    %%ets:fun2ms(fun(#rctx{type=#coordinator{}, docs_written=DocsRead} = R) 
when DocsRead >= Threshold -> R end).
+    ets:fun2ms(fun(#rctx{docs_written = DocsWritten} = R) when DocsWritten >= 
Threshold -> R end).
+
+-spec matcher_on_rows_read(Threshold :: pos_integer()) -> ets:match_spec().
+matcher_on_rows_read(Threshold) when
+    is_integer(Threshold) andalso Threshold > 0
+->
+    ets:fun2ms(fun(#rctx{rows_read = RowsRead} = R) when RowsRead >= Threshold 
-> R end).
+
+-spec matcher_on_nonce(Nonce :: nonce()) -> ets:match_spec().
+matcher_on_nonce(Nonce) ->
+    ets:fun2ms(fun(#rctx{nonce = Nonce1} = R) when Nonce =:= Nonce1 -> R end).
+
+-spec matcher_on_changes_processed(Threshold :: pos_integer()) -> 
ets:match_spec().
+matcher_on_changes_processed(Threshold) when
+    is_integer(Threshold) andalso Threshold > 0
+->
+    %% HACK: because we overload the use of #rctx.rows_read for
+    %% changes_processed, we must specify a direct match against a changes
+    %% context. Fow now, just match on #coordinator's
+    ets:fun2ms(
+        fun(
+            #rctx{
+                type = #coordinator{mod = chttpd_db, func = 
handle_changes_req},
+                rows_read = Processed,
+                changes_returned = Returned
+            } = R
+        ) when (Processed - Returned) >= Threshold ->
+            R
+        end
+    ).
+
+%% Matcher on requests taking longer than Threshold milliseconds
+-spec matcher_on_long_reqs(Threshold :: pos_integer()) -> ets:match_spec().
+matcher_on_long_reqs(Threshold) when
+    is_integer(Threshold) andalso Threshold > 0
+->
+    %% Threshold is in milliseconds, but we track erlang:monotonic_time/0
+    %% which is in native format, a machine dependent internal representation
+    %% but with the guarantee of monotonically increasing. Note that this 
number
+    %% is integer(), _not_ positive_integer(), so we must use abs/2 to get a
+    %% positive time delta; notably, abs/2 is valid match_spec guard.
+    %%
+    %% Time warps and is relative and is complicated, so here's an example of
+    %% converting 10000 milliseconds into a native time format and back, then
+    %% using csrt_util:tnow/0 to accurately measure sleeping for 10000 ms.
+    %%
+    %% (node1@127.0.0.1)5> erlang:convert_time_unit(10000, millisecond, 
native).
+    %% 10000000000
+    %% (node1@127.0.0.1)6> erlang:convert_time_unit(10000000000, native, 
millisecond).
+    %% 10000
+    %% (node1@127.0.0.1)7> T0 = csrt_util:tnow(), timer:sleep(10000), T1 = 
csrt_util:tnow(),
+    %%     erlang:convert_time_unit(abs(T1 - T0), native, millisecond).
+    %% 10000
+
+    Unit = millisecond,
+    NativeThreshold = erlang:convert_time_unit(Threshold, Unit, native),
+    ets:fun2ms(
+        fun(
+            #rctx{
+                started_at = Started,
+                updated_at = Updated
+            } = R
+        ) when abs(Updated - Started) >= NativeThreshold ->
+            R
+        end
+    ).
+
+-spec matcher_on_ioq_calls(Threshold :: pos_integer()) -> ets:match_spec().
+matcher_on_ioq_calls(Threshold) when
+    is_integer(Threshold) andalso Threshold > 0
+->
+    ets:fun2ms(fun(#rctx{ioq_calls = IOQCalls} = R) when IOQCalls >= Threshold 
-> R end).
+
+%%-spec matcher_for_rctx_field(Field :: rctx_field()) -> ets:match_spec().
+%%matcher_for_rctx_field() ->
+%%    #{size := Size0, fields := Fields} = csrt_entry:record_info(),
+%%    %% Subtract 1 as record_info size includes tuple record name
+%%    %% erlang:list_to_tuple([rctx | lists:duplicate(maps:get(size, 
csrt_entry:record_info()), '_')])
+%%    Size = Size - 1,

Review Comment:
   I've deleted this now, but this was the start to my replacement to the 
`ets:fun2ms` parse transform, as the logic there will dynamically setup the 
matcher string on the appropriate field index, but I didn't have a chance to 
complete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to