Hi,

You’re using internal details of the CRDT implementation, I’m not sure that is 
such a great idea. You always have your `Context` set to `undefined` but if 
your ops are all adds that shouldn’t matter in this case.

The issue is that you’re calling `riak_kv_crdt:update` that needs to be called 
from within the vnode. The `ThisNode` value is not a serial actor, so you may 
have many concurrent updates with the actor `ThisNode`, and that’s not how to 
do it. It is crucial that the actor updating the CRDT acts in serial issuing an 
increasing count of events. Thats why we put the CRDT code inside Riak, inside 
the vnode.

You’re doing neither one thing nor the other, in that your using a datatype 
bucket but not the datatype API (it sends Ops from the client, you’re doing 
read/modify/write.)

I can see the issue here is that the API is external only, if you _must_ use 
the internal API have a look at 
https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl and you 
see that the CRDT_OP is all that is sent by the client.

https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162

Those put options matter too, especially for counters, less so for sets.

I guess it would be great if riak_client had some internal API functions that 
made this easier to do from a hook. If you open an issue on 
github.com/basho/riak_kv I can look into that and make a PR.

Hope that helps

Russell

On 1 Mar 2017, at 09:30, 李明 <lmlmlmlal...@gmail.com> wrote:

> Hi 
>    I am new to erlang and riak.  I started to use riak as a kv store couple 
> of months ago. Now i want to implement a commit hook to riak so that riak 
> could help me to make some statistics.
> i read some docs and write a pre-hook scripts, which will fetch the object 
> key and store it into a set.
>    This hook works fine if there is only one client write to riak, but if i 
> increase the connection to riak writing, i found it lost some elements in the 
> set. Looks like the crdt_op did not do the merge operation.And there is no 
> obvious error in the log files.
> 
>    Could someone help me to finger out what happened or what i has missed.
> 
> i am using the riak 2.1.3
> 
> Thanks all!
> 
> 
> Here is the hook scripts:
> 
> ------------------------------------------------------------------------------------------------------
> 
> -module(myhook).
> -export([pretest/1]).
> 
> now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
>     LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
>     {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
>     TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
>                 [Year, Month, Day, Hour, Minute])),
>     TimeStr.
> 
> is_deleted(Object)->
>     case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
>         {ok,_} ->
>             true;
>         _ ->
>             false
>     end.
> 
> pretest(Object) ->
>     % timer:sleep(10000),
>     try
>       ObjBucket = riak_object:bucket(Object),
>   %           riak_object:bucket(Obj).
>               % {<<"cn-archive">>,<<"local-test">>}
> 
>               Bucket = element(2, ObjBucket),
>               BucketType = element(1, ObjBucket),
> 
>               ObjKey = riak_object:key(Object),
>               % Key = binary_to_list(ObjKey),
>               % ObjData = riak_object:get_value(Object),
>               % Msg = binary_to_list(ObjData),
>           CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, 
> Bucket}, {k, ObjKey}, {t, BucketType}]})),
> 
>           case is_deleted(Object) of
>               true ->
>                       KeyPrefix = "delete";
>                       _ ->
>                               KeyPrefix = "update"
>               end,
> 
>               CurMin = now_to_local_string(os:timestamp()),
>           IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, 
> KeyPrefix])),
> 
>           %% Get a riak client
>       {ok, C} = riak:local_client(),
>       % get node obj
>               ThisNode = atom_to_binary(node(), latin1),
> 
>               % get index obj and set context
>               BType = <<"archive">>,
>               B = <<"local-test">>,
>               
>               {SetObj, Context} = case C:get({BType, B}, IndexKey) of
>                   {error, notfound} -> 
>                       ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, 
> riak_dt_orswot),
>                       {ThisSetObj, undefined};
>                   {ok, ThisSetObj} ->
>                       % The datatype update requires the context if the value 
> exists
>                       {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, 
> riak_dt_orswot),
>                       {ThisSetObj, Ctx}
>               end,
> 
>               UpdateIndex = [{add, CommitItem}],
>               % UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, 
> Context},
>               UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, 
> undefined},
>               NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),
> 
>               error_logger:info_msg("Updating index for ~s,to set ~s~n", 
> [binary:bin_to_list(CommitItem), IndexKey]),
> 
>               C:put(NewObj),
>               Object
>     catch
>       error:Error ->
>                       {fail, 
> lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
>       end.
> 
> ------------------------------------------------------------------------------------------------------
> 
> 
> This is the set bucket props
> ------------------------------------------------------------------------------------------------------
> 
> active: true
> allow_mult: true
> basic_quorum: false
> big_vclock: 50
> chash_keyfun: {riak_core_util,chash_std_keyfun}
> claimant: 'riak@192.168.100.2'
> datatype: set
> dvv_enabled: true
> dw: quorum
> last_write_wins: false
> linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
> n_val: 3
> notfound_ok: true
> old_vclock: 86400
> postcommit: []
> pr: 0
> precommit: []
> pw: 0
> r: quorum
> rw: quorum
> small_vclock: 50
> w: quorum
> young_vclock: 20
> 
> 
> _______________________________________________
> riak-users mailing list
> riak-users@lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com


_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to