On Mon, Jan 24, 2011 at 2:46 PM, <[email protected]> wrote: > Author: fdmanana > Date: Mon Jan 24 13:46:11 2011 > New Revision: 1062772 > > URL: http://svn.apache.org/viewvc?rev=1062772&view=rev > Log: > Refactoring of the replicator database listener > > Simpler implementation and more reliable behaviour when the replicator > database is deleted or changed on the fly. > > Modified: > couchdb/trunk/src/couchdb/couch_rep_db_listener.erl > > Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062772&r1=1062771&r2=1062772&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original) > +++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 13:46:11 > 2011 > @@ -18,98 +18,113 @@ > > -include("couch_db.hrl"). > > --define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id). > --define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). > +-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). > +-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). > > -record(state, { > changes_feed_loop = nil, > - changes_queue = nil, > - changes_processor = nil, > - db_notifier = nil > + db_notifier = nil, > + rep_db_name = nil, > + rep_start_pids = [] > }). > > +-import(couch_util, [ > + get_value/2, > + get_value/3 > +]). > + > > start_link() -> > gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). > > init(_) -> > process_flag(trap_exit, true), > - {ok, Queue} = couch_work_queue:new( > - [{max_size, 1024 * 1024}, {max_items, 1000}]), > - {ok, Processor} = changes_processor(Queue), > - {ok, Loop} = changes_feed_loop(Queue), > + ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, > private]), > + ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, > private]), > Server = self(), > ok = couch_config:register( > - fun("replicator", "db") -> > - ok = gen_server:cast(Server, rep_db_changed) > + fun("replicator", "db", NewName) -> > + ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}) > end > ), > + {Loop, RepDbName} = changes_feed_loop(), > {ok, #state{ > changes_feed_loop = Loop, > - changes_queue = Queue, > - changes_processor = Processor, > + rep_db_name = RepDbName, > db_notifier = db_update_notifier()} > }. > > > +handle_call({rep_db_update, Change}, _From, State) -> > + {reply, ok, process_update(State, Change)}; > + > handle_call(Msg, From, State) -> > ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p", > [Msg, From]), > {stop, {error, {unexpected_call, Msg}}, State}. > > > -handle_cast(rep_db_changed, State) -> > - #state{ > - changes_feed_loop = Loop, > - changes_queue = Queue > - } = State, > - catch unlink(Loop), > - catch exit(Loop, rep_db_changed), > - couch_work_queue:queue(Queue, stop_all_replications), > - {ok, NewLoop} = changes_feed_loop(Queue), > - {noreply, State#state{changes_feed_loop = NewLoop}}; > - > -handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) -> > - catch unlink(Loop), > - catch exit(Loop, rep_db_changed), > - {ok, NewLoop} = changes_feed_loop(State#state.changes_queue), > - {noreply, State#state{changes_feed_loop = NewLoop}}; > +handle_cast({rep_db_changed, NewName}, > + #state{rep_db_name = NewName} = State) -> > + {noreply, State}; > + > +handle_cast({rep_db_changed, _NewName}, State) -> > + {noreply, restart(State)}; > + > +handle_cast({rep_db_created, NewName}, > + #state{rep_db_name = NewName} = State) -> > + {noreply, State}; > + > +handle_cast({rep_db_created, _NewName}, State) -> > + {noreply, restart(State)}; > > handle_cast(Msg, State) -> > ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]), > {stop, {error, {unexpected_cast, Msg}}, State}. > > + > handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = > State) -> > % replicator DB deleted > - couch_work_queue:queue(State#state.changes_queue, stop_all_replications), > - {noreply, State#state{changes_feed_loop = nil}}; > + {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}}; > > handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> > ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), > {stop, {db_update_notifier_died, Reason}, State}; > > -handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = > State) -> > - ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]), > - {stop, {rep_db_changes_processor_died, Reason}, State}. > +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> > + % one of the replication start processes terminated successfully > + {noreply, State#state{rep_start_pids = Pids -- [From]}}; > + > +handle_info(Msg, State) -> > + ?LOG_ERROR("Replicator DB listener received unexpected message ~p", > [Msg]), > + {stop, {unexpected_msg, Msg}, State}. > > > terminate(_Reason, State) -> > #state{ > + rep_start_pids = StartPids, > changes_feed_loop = Loop, > - changes_queue = Queue > + db_notifier = Notifier > } = State, > - exit(Loop, stop), > - % closing the queue will cause changes_processor to shutdown > - couch_work_queue:close(Queue), > - ok. > + stop_all_replications(), > + lists:foreach( > + fun(Pid) -> > + catch unlink(Pid), > + catch exit(Pid, stop) > + end, > + [Loop | StartPids]), > + true = ets:delete(?REP_ID_TO_DOC_ID), > + true = ets:delete(?DOC_ID_TO_REP_ID), > + couch_db_update_notifier:stop(Notifier). > > > code_change(_OldVsn, State, _Extra) -> > {ok, State}. > > > -changes_feed_loop(ChangesQueue) -> > +changes_feed_loop() -> > {ok, RepDb} = couch_rep:ensure_rep_db_exists(), > + Server = self(), > Pid = spawn_link( > fun() -> > ChangesFeedFun = couch_changes:handle_changes( > @@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) -> > fun({change, Change, _}, _) -> > case has_valid_rep_id(Change) of > true -> > - couch_work_queue:queue(ChangesQueue, Change); > + ok = gen_server:call( > + Server, {rep_db_update, Change}, infinity); > false -> > ok > end; > @@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) -> > end > ), > couch_db:close(RepDb), > - {ok, Pid}. > + {Pid, couch_db:name(RepDb)}. > + > + > +has_valid_rep_id({Change}) -> > + has_valid_rep_id(get_value(<<"id">>, Change)); > +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> > + false; > +has_valid_rep_id(_Else) -> > + true. > > > db_update_notifier() -> > @@ -146,121 +170,106 @@ db_update_notifier() -> > fun({created, DbName}) -> > case ?l2b(couch_config:get("replicator", "db", "_replicator")) of > DbName -> > - ok = gen_server:cast(Server, rep_db_created); > + ok = gen_server:cast(Server, {rep_db_created, DbName}); > _ -> > ok > end; > (_) -> > + % no need to handle the 'deleted' event - the changes feed loop > + % dies when the database is deleted > ok > end > ), > Notifier. > > > -changes_processor(ChangesQueue) -> > - Pid = spawn_link( > - fun() -> > - ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]), > - ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]), > - consume_changes(ChangesQueue), > - true = ets:delete(?REP_ID_TO_DOC_ID_MAP), > - true = ets:delete(?DOC_TO_REP_ID_MAP) > - end > - ), > - {ok, Pid}. > - > - > -consume_changes(ChangesQueue) -> > - case couch_work_queue:dequeue(ChangesQueue) of > - closed -> > - ok; > - {ok, Changes} -> > - lists:foreach(fun process_change/1, Changes), > - consume_changes(ChangesQueue) > - end. > - > - > -has_valid_rep_id({Change}) -> > - has_valid_rep_id(couch_util:get_value(<<"id">>, Change)); > -has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> > - false; > -has_valid_rep_id(_Else) -> > - true. > +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = > State) -> > + stop_all_replications(), > + lists:foreach( > + fun(Pid) -> > + catch unlink(Pid), > + catch exit(Pid, rep_db_changed) > + end, > + [Loop | StartPids]), > + {NewLoop, NewRepDbName} = changes_feed_loop(), > + State#state{ > + changes_feed_loop = NewLoop, > + rep_db_name = NewRepDbName, > + rep_start_pids = [] > + }. > > -process_change(stop_all_replications) -> > - ?LOG_INFO("Stopping all ongoing replications because the replicator DB " > - "was deleted or changed", []), > - stop_all_replications(); > > -process_change({Change}) -> > - {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), > - DocId = couch_util:get_value(<<"_id">>, RepProps), > - case couch_util:get_value(<<"deleted">>, Change, false) of > +process_update(State, {Change}) -> > + {RepProps} = JsonRepDoc = get_value(doc, Change), > + DocId = get_value(<<"_id">>, RepProps), > + case get_value(<<"deleted">>, Change, false) of > true -> > - rep_doc_deleted(DocId); > + rep_doc_deleted(DocId), > + State; > false -> > - case couch_util:get_value(<<"_replication_state">>, RepProps) of > + case get_value(<<"_replication_state">>, RepProps) of > <<"completed">> -> > - replication_complete(DocId); > + replication_complete(DocId), > + State; > <<"error">> -> > - stop_replication(DocId); > + stop_replication(DocId), > + State; > <<"triggered">> -> > - maybe_start_replication(DocId, JsonRepDoc); > + maybe_start_replication(State, DocId, JsonRepDoc); > undefined -> > - maybe_start_replication(DocId, JsonRepDoc); > - _ -> > - ?LOG_ERROR("Invalid value for the `_replication_state` property" > - " of the replication document `~s`", [DocId]) > + maybe_start_replication(State, DocId, JsonRepDoc) > end > - end, > - ok. > + end. > > > rep_user_ctx({RepDoc}) -> > - case couch_util:get_value(<<"user_ctx">>, RepDoc) of > + case get_value(<<"user_ctx">>, RepDoc) of > undefined -> > #user_ctx{roles = [<<"_admin">>]}; > {UserCtx} -> > #user_ctx{ > - name = couch_util:get_value(<<"name">>, UserCtx, null), > - roles = couch_util:get_value(<<"roles">>, UserCtx, []) > + name = get_value(<<"name">>, UserCtx, null), > + roles = get_value(<<"roles">>, UserCtx, []) > } > end. > > > -maybe_start_replication(DocId, JsonRepDoc) -> > +maybe_start_replication(State, DocId, JsonRepDoc) -> > UserCtx = rep_user_ctx(JsonRepDoc), > {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), > - case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of > + case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of > [] -> > - true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}), > - true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), > - spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) > end); > + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), > + true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), > + Pid = spawn_link(fun() -> > + start_replication(JsonRepDoc, RepId, UserCtx) > + end), > + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; > [{BaseId, DocId}] -> > - ok; > + State; > [{BaseId, OtherDocId}] -> > - maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId) > + ?LOG_INFO("The replication specified by the document `~s` was > already" > + " triggered by the document `~s`", [DocId, OtherDocId]), > + maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), > + State > end. > > > -maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) -> > - case couch_util:get_value(<<"_replication_id">>, Props) of > +maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> > + case get_value(<<"_replication_id">>, Props) of > RepId -> > ok; > _ -> > - ?LOG_INFO("The replication specified by the document `~s` was > already" > - " triggered by the document `~s`", [DocId, OtherDocId]), > couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}]) > end. > > > - > -start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> > +start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> > case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of > - RepPid when is_pid(RepPid) -> > + Pid when is_pid(Pid) -> > ?LOG_INFO("Document `~s` triggered replication `~s`", > - [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]), > - couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx); > + [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), > + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); > Error -> > couch_rep:update_rep_doc( > RepDoc, > @@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, { > {<<"_replication_id">>, ?l2b(Base)} > ] > ), > - ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, > Error]) > + ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), > Error]) > end. > > + > rep_doc_deleted(DocId) -> > case stop_replication(DocId) of > - {ok, {Base, Ext}} -> > + {ok, RepId} -> > ?LOG_INFO("Stopped replication `~s` because replication document `~s`" > - " was deleted", [Base ++ Ext, DocId]); > + " was deleted", [pp_rep_id(RepId), DocId]); > none -> > ok > end. > > + > replication_complete(DocId) -> > case stop_replication(DocId) of > - {ok, {Base, Ext}} -> > + {ok, RepId} -> > ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", > - [Base ++ Ext, DocId]); > + [pp_rep_id(RepId), DocId]); > none -> > ok > end. > > + > stop_replication(DocId) -> > - case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of > + case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of > [{DocId, {BaseId, _} = RepId}] -> > couch_rep:end_replication(RepId), > - true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId), > - true = ets:delete(?DOC_TO_REP_ID_MAP, DocId), > + true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), > + true = ets:delete(?DOC_ID_TO_REP_ID, DocId), > {ok, RepId}; > [] -> > none > end. > > + > stop_all_replications() -> > + ?LOG_INFO("Stopping all ongoing replications because the replicator DB " > + "was deleted or changed", []), > ets:foldl( > fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end, > ok, > - ?DOC_TO_REP_ID_MAP > + ?DOC_ID_TO_REP_ID > ), > - true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP), > - true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP). > + true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), > + true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). > + > + > +% pretty-print replication id > +pp_rep_id({Base, Extension}) -> > + Base ++ Extension. > > >
Is there any reason you are using named table here ? Why not just use ets ids ? Also why using macros ?
