Hi, You’re using internal details of the CRDT implementation, I’m not sure that is such a great idea. You always have your `Context` set to `undefined` but if your ops are all adds that shouldn’t matter in this case.
The issue is that you’re calling `riak_kv_crdt:update` that needs to be called from within the vnode. The `ThisNode` value is not a serial actor, so you may have many concurrent updates with the actor `ThisNode`, and that’s not how to do it. It is crucial that the actor updating the CRDT acts in serial issuing an increasing count of events. Thats why we put the CRDT code inside Riak, inside the vnode. You’re doing neither one thing nor the other, in that your using a datatype bucket but not the datatype API (it sends Ops from the client, you’re doing read/modify/write.) I can see the issue here is that the API is external only, if you _must_ use the internal API have a look at https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl and you see that the CRDT_OP is all that is sent by the client. https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162 Those put options matter too, especially for counters, less so for sets. I guess it would be great if riak_client had some internal API functions that made this easier to do from a hook. If you open an issue on github.com/basho/riak_kv I can look into that and make a PR. Hope that helps Russell On 1 Mar 2017, at 09:30, 李明 <lmlmlmlal...@gmail.com> wrote: > Hi > I am new to erlang and riak. I started to use riak as a kv store couple > of months ago. Now i want to implement a commit hook to riak so that riak > could help me to make some statistics. > i read some docs and write a pre-hook scripts, which will fetch the object > key and store it into a set. > This hook works fine if there is only one client write to riak, but if i > increase the connection to riak writing, i found it lost some elements in the > set. Looks like the crdt_op did not do the merge operation.And there is no > obvious error in the log files. > > Could someone help me to finger out what happened or what i has missed. > > i am using the riak 2.1.3 > > Thanks all! > > > Here is the hook scripts: > > ------------------------------------------------------------------------------------------------------ > > -module(myhook). > -export([pretest/1]). > > now_to_local_string({MegaSecs, Secs, MicroSecs}) -> > LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}), > {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime, > TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w", > [Year, Month, Day, Hour, Minute])), > TimeStr. > > is_deleted(Object)-> > case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of > {ok,_} -> > true; > _ -> > false > end. > > pretest(Object) -> > % timer:sleep(10000), > try > ObjBucket = riak_object:bucket(Object), > % riak_object:bucket(Obj). > % {<<"cn-archive">>,<<"local-test">>} > > Bucket = element(2, ObjBucket), > BucketType = element(1, ObjBucket), > > ObjKey = riak_object:key(Object), > % Key = binary_to_list(ObjKey), > % ObjData = riak_object:get_value(Object), > % Msg = binary_to_list(ObjData), > CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, > Bucket}, {k, ObjKey}, {t, BucketType}]})), > > case is_deleted(Object) of > true -> > KeyPrefix = "delete"; > _ -> > KeyPrefix = "update" > end, > > CurMin = now_to_local_string(os:timestamp()), > IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, > KeyPrefix])), > > %% Get a riak client > {ok, C} = riak:local_client(), > % get node obj > ThisNode = atom_to_binary(node(), latin1), > > % get index obj and set context > BType = <<"archive">>, > B = <<"local-test">>, > > {SetObj, Context} = case C:get({BType, B}, IndexKey) of > {error, notfound} -> > ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, > riak_dt_orswot), > {ThisSetObj, undefined}; > {ok, ThisSetObj} -> > % The datatype update requires the context if the value > exists > {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, > riak_dt_orswot), > {ThisSetObj, Ctx} > end, > > UpdateIndex = [{add, CommitItem}], > % UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, > Context}, > UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, > undefined}, > NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp), > > error_logger:info_msg("Updating index for ~s,to set ~s~n", > [binary:bin_to_list(CommitItem), IndexKey]), > > C:put(NewObj), > Object > catch > error:Error -> > {fail, > lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))} > end. > > ------------------------------------------------------------------------------------------------------ > > > This is the set bucket props > ------------------------------------------------------------------------------------------------------ > > active: true > allow_mult: true > basic_quorum: false > big_vclock: 50 > chash_keyfun: {riak_core_util,chash_std_keyfun} > claimant: 'riak@192.168.100.2' > datatype: set > dvv_enabled: true > dw: quorum > last_write_wins: false > linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun} > n_val: 3 > notfound_ok: true > old_vclock: 86400 > postcommit: [] > pr: 0 > precommit: [] > pw: 0 > r: quorum > rw: quorum > small_vclock: 50 > w: quorum > young_vclock: 20 > > > _______________________________________________ > riak-users mailing list > riak-users@lists.basho.com > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com _______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com