Thanks Russell, Yes, it was a big trouble using the internal api, especially with very few knowledge about erlang. Have read the riak_kv_pb_crdt, but it confused me a lot. How to define a serial actor and is there a simple example creating an `riak_client:put` option object?
The reason why i need to use the internal API is i believed it will bring less performance loss than trying to create a new client which may be more convenient, we are running a very heavy riak cluster and i want to keep the hook working lightly. 2017-03-02 16:32 GMT+08:00 Russell Brown <russell.br...@mac.com>: > Hi, > > You’re using internal details of the CRDT implementation, I’m not sure > that is such a great idea. You always have your `Context` set to > `undefined` but if your ops are all adds that shouldn’t matter in this case. > > The issue is that you’re calling `riak_kv_crdt:update` that needs to be > called from within the vnode. The `ThisNode` value is not a serial actor, > so you may have many concurrent updates with the actor `ThisNode`, and > that’s not how to do it. It is crucial that the actor updating the CRDT > acts in serial issuing an increasing count of events. Thats why we put the > CRDT code inside Riak, inside the vnode. > > You’re doing neither one thing nor the other, in that your using a > datatype bucket but not the datatype API (it sends Ops from the client, > you’re doing read/modify/write.) > > I can see the issue here is that the API is external only, if you _must_ > use the internal API have a look at https://github.com/basho/riak_ > kv/blob/develop/src/riak_kv_pb_crdt.erl and you see that the CRDT_OP is > all that is sent by the client. > > https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162 > > Those put options matter too, especially for counters, less so for sets. > > I guess it would be great if riak_client had some internal API functions > that made this easier to do from a hook. If you open an issue on > github.com/basho/riak_kv I can look into that and make a PR. > > Hope that helps > > Russell > > On 1 Mar 2017, at 09:30, 李明 <lmlmlmlal...@gmail.com> wrote: > > > Hi > > I am new to erlang and riak. I started to use riak as a kv store > couple of months ago. Now i want to implement a commit hook to riak so that > riak could help me to make some statistics. > > i read some docs and write a pre-hook scripts, which will fetch the > object key and store it into a set. > > This hook works fine if there is only one client write to riak, but > if i increase the connection to riak writing, i found it lost some elements > in the set. Looks like the crdt_op did not do the merge operation.And there > is no obvious error in the log files. > > > > Could someone help me to finger out what happened or what i has > missed. > > > > i am using the riak 2.1.3 > > > > Thanks all! > > > > > > Here is the hook scripts: > > > > ------------------------------------------------------------ > ------------------------------------------ > > > > -module(myhook). > > -export([pretest/1]). > > > > now_to_local_string({MegaSecs, Secs, MicroSecs}) -> > > LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}), > > {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime, > > TimeStr = lists:flatten(io_lib:format("~ > 4..0w~2..0w~2..0w~2..0w~2..0w", > > [Year, Month, Day, Hour, Minute])), > > TimeStr. > > > > is_deleted(Object)-> > > case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) > of > > {ok,_} -> > > true; > > _ -> > > false > > end. > > > > pretest(Object) -> > > % timer:sleep(10000), > > try > > ObjBucket = riak_object:bucket(Object), > > % riak_object:bucket(Obj). > > % {<<"cn-archive">>,<<"local-test">>} > > > > Bucket = element(2, ObjBucket), > > BucketType = element(1, ObjBucket), > > > > ObjKey = riak_object:key(Object), > > % Key = binary_to_list(ObjKey), > > % ObjData = riak_object:get_value(Object), > > % Msg = binary_to_list(ObjData), > > CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, > Bucket}, {k, ObjKey}, {t, BucketType}]})), > > > > case is_deleted(Object) of > > true -> > > KeyPrefix = "delete"; > > _ -> > > KeyPrefix = "update" > > end, > > > > CurMin = now_to_local_string(os:timestamp()), > > IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, > KeyPrefix])), > > > > %% Get a riak client > > {ok, C} = riak:local_client(), > > % get node obj > > ThisNode = atom_to_binary(node(), latin1), > > > > % get index obj and set context > > BType = <<"archive">>, > > B = <<"local-test">>, > > > > {SetObj, Context} = case C:get({BType, B}, IndexKey) of > > {error, notfound} -> > > ThisSetObj = riak_kv_crdt:new({BType, B}, > IndexKey, riak_dt_orswot), > > {ThisSetObj, undefined}; > > {ok, ThisSetObj} -> > > % The datatype update requires the context if the > value exists > > {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, > riak_dt_orswot), > > {ThisSetObj, Ctx} > > end, > > > > UpdateIndex = [{add, CommitItem}], > > % UpdateOp = {crdt_op, riak_dt_orswot, {update, > UpdateIndex}, Context}, > > UpdateOp = {crdt_op, riak_dt_orswot, {update, > UpdateIndex}, undefined}, > > NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp), > > > > error_logger:info_msg("Updating index for ~s,to set > ~s~n", [binary:bin_to_list(CommitItem), IndexKey]), > > > > C:put(NewObj), > > Object > > catch > > error:Error -> > > {fail, lists:flatten(io_lib:format("[ > PREHOOKEXCEPTION]~p",[Error]))} > > end. > > > > ------------------------------------------------------------ > ------------------------------------------ > > > > > > This is the set bucket props > > ------------------------------------------------------------ > ------------------------------------------ > > > > active: true > > allow_mult: true > > basic_quorum: false > > big_vclock: 50 > > chash_keyfun: {riak_core_util,chash_std_keyfun} > > claimant: 'riak@192.168.100.2' > > datatype: set > > dvv_enabled: true > > dw: quorum > > last_write_wins: false > > linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun} > > n_val: 3 > > notfound_ok: true > > old_vclock: 86400 > > postcommit: [] > > pr: 0 > > precommit: [] > > pw: 0 > > r: quorum > > rw: quorum > > small_vclock: 50 > > w: quorum > > young_vclock: 20 > > > > > > _______________________________________________ > > riak-users mailing list > > riak-users@lists.basho.com > > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com > >
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com