Thanks a lot Russell, The problem was fixed now, i am trying to do a testing about how fast it could be using external and internal API.
2017-03-02 18:05 GMT+08:00 <russell.br...@bet365.com>: > Hi > > With regard to creating a serial actor, that is what vnodes are. > > > > For your hook, if you need the context, then keep the C:get, but for the > PUT just send the crdt_op, almost exactly as per > https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162 > . > > > > O = riak_kv_crdt:new({BType, B}, Key, Mod), > > CrdtOp = #crdt_op{mod=Mod, op=Op, ctx=Ctx}, > > Options = [{crdt_op, CrdtOp}, > > {retry_put_coordinator_failure, false}], > > Resp = C:put(O, Options), > > > > > > Where Mod is riak_dt_orswot, should be enough for you to imitate a CRDT > operation API call from your hook. DO NOT update the fetched CRDT, let > Riak’s vnode do that. > > > > Let Riak handle the serial actor bit. You just send the crdt_op populated > with the operation and context. > > > > Like I said in my last email, IF folk want to use CRDTs from hooks then > there should be an internal API that enables it. If you create an issue for > that on github I’d quite enjoy writing the code and raising a PR. I no > longer work at Basho, but since Riak is open source and I’m very fond of > the CRDT feature, I’m sure they’d appreciate the contribution. > > > > Cheers > > > > Russell > > > > > > > > *From:* riak-users [mailto:riak-users-boun...@lists.basho.com] *On Behalf > Of *?? > *Sent:* 02 March 2017 09:50 > *To:* Russell Brown > *Cc:* riak-users > *Subject:* Re: [CRDT_OP-in-CommitHook] > > > > Thanks Russell, > > Yes, it was a big trouble using the internal api, especially with very > few knowledge about erlang. > > Have read the riak_kv_pb_crdt, but it confused me a lot. How to define a > serial > actor and is there a simple example creating an `riak_client:put` option > object? > > > > The reason why i need to use the internal API is i believed it will bring > less performance loss than trying to create a new client which may be more > convenient, > we are running a very heavy riak cluster and i want to keep the hook > working lightly. > > > > > > > > > > 2017-03-02 16:32 GMT+08:00 Russell Brown <russell.br...@mac.com>: > > Hi, > > You’re using internal details of the CRDT implementation, I’m not sure > that is such a great idea. You always have your `Context` set to > `undefined` but if your ops are all adds that shouldn’t matter in this case. > > The issue is that you’re calling `riak_kv_crdt:update` that needs to be > called from within the vnode. The `ThisNode` value is not a serial actor, > so you may have many concurrent updates with the actor `ThisNode`, and > that’s not how to do it. It is crucial that the actor updating the CRDT > acts in serial issuing an increasing count of events. Thats why we put the > CRDT code inside Riak, inside the vnode. > > You’re doing neither one thing nor the other, in that your using a > datatype bucket but not the datatype API (it sends Ops from the client, > you’re doing read/modify/write.) > > I can see the issue here is that the API is external only, if you _must_ > use the internal API have a look at https://github.com/basho/riak_ > kv/blob/develop/src/riak_kv_pb_crdt.erl and you see that the CRDT_OP is > all that is sent by the client. > > https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162 > > Those put options matter too, especially for counters, less so for sets. > > I guess it would be great if riak_client had some internal API functions > that made this easier to do from a hook. If you open an issue on > github.com/basho/riak_kv I can look into that and make a PR. > > Hope that helps > > Russell > > > On 1 Mar 2017, at 09:30, 李明 <lmlmlmlal...@gmail.com> wrote: > > > Hi > > I am new to erlang and riak. I started to use riak as a kv store > couple of months ago. Now i want to implement a commit hook to riak so that > riak could help me to make some statistics. > > i read some docs and write a pre-hook scripts, which will fetch the > object key and store it into a set. > > This hook works fine if there is only one client write to riak, but > if i increase the connection to riak writing, i found it lost some elements > in the set. Looks like the crdt_op did not do the merge operation.And there > is no obvious error in the log files. > > > > Could someone help me to finger out what happened or what i has > missed. > > > > i am using the riak 2.1.3 > > > > Thanks all! > > > > > > Here is the hook scripts: > > > > ------------------------------------------------------------ > ------------------------------------------ > > > > -module(myhook). > > -export([pretest/1]). > > > > now_to_local_string({MegaSecs, Secs, MicroSecs}) -> > > LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}), > > {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime, > > TimeStr = lists:flatten(io_lib:format("~ > 4..0w~2..0w~2..0w~2..0w~2..0w", > > [Year, Month, Day, Hour, Minute])), > > TimeStr. > > > > is_deleted(Object)-> > > case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) > of > > {ok,_} -> > > true; > > _ -> > > false > > end. > > > > pretest(Object) -> > > % timer:sleep(10000), > > try > > ObjBucket = riak_object:bucket(Object), > > % riak_object:bucket(Obj). > > % {<<"cn-archive">>,<<"local-test">>} > > > > Bucket = element(2, ObjBucket), > > BucketType = element(1, ObjBucket), > > > > ObjKey = riak_object:key(Object), > > % Key = binary_to_list(ObjKey), > > % ObjData = riak_object:get_value(Object), > > % Msg = binary_to_list(ObjData), > > CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, > Bucket}, {k, ObjKey}, {t, BucketType}]})), > > > > case is_deleted(Object) of > > true -> > > KeyPrefix = "delete"; > > _ -> > > KeyPrefix = "update" > > end, > > > > CurMin = now_to_local_string(os:timestamp()), > > IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, > KeyPrefix])), > > > > %% Get a riak client > > {ok, C} = riak:local_client(), > > % get node obj > > ThisNode = atom_to_binary(node(), latin1), > > > > % get index obj and set context > > BType = <<"archive">>, > > B = <<"local-test">>, > > > > {SetObj, Context} = case C:get({BType, B}, IndexKey) of > > {error, notfound} -> > > ThisSetObj = riak_kv_crdt:new({BType, B}, > IndexKey, riak_dt_orswot), > > {ThisSetObj, undefined}; > > {ok, ThisSetObj} -> > > % The datatype update requires the context if the > value exists > > {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, > riak_dt_orswot), > > {ThisSetObj, Ctx} > > end, > > > > UpdateIndex = [{add, CommitItem}], > > % UpdateOp = {crdt_op, riak_dt_orswot, {update, > UpdateIndex}, Context}, > > UpdateOp = {crdt_op, riak_dt_orswot, {update, > UpdateIndex}, undefined}, > > NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp), > > > > error_logger:info_msg("Updating index for ~s,to set > ~s~n", [binary:bin_to_list(CommitItem), IndexKey]), > > > > C:put(NewObj), > > Object > > catch > > error:Error -> > > {fail, lists:flatten(io_lib:format("[ > PREHOOKEXCEPTION]~p",[Error]))} > > end. > > > > ------------------------------------------------------------ > ------------------------------------------ > > > > > > This is the set bucket props > > ------------------------------------------------------------ > ------------------------------------------ > > > > active: true > > allow_mult: true > > basic_quorum: false > > big_vclock: 50 > > chash_keyfun: {riak_core_util,chash_std_keyfun} > > claimant: 'riak@192.168.100.2' > > datatype: set > > dvv_enabled: true > > dw: quorum > > last_write_wins: false > > linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun} > > n_val: 3 > > notfound_ok: true > > old_vclock: 86400 > > postcommit: [] > > pr: 0 > > precommit: [] > > pw: 0 > > r: quorum > > rw: quorum > > small_vclock: 50 > > w: quorum > > young_vclock: 20 > > > > > > > _______________________________________________ > > riak-users mailing list > > riak-users@lists.basho.com > > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com > > > > This email and any files transmitted with it are confidential and contain > information which may be privileged or confidential and are intended solely > to be for the use of the individual(s) or entity to which they are > addressed. If you are not the intended recipient be aware that any > disclosure, copying, distribution or use of the contents of this > information is strictly prohibited and may be illegal. If you have received > this email in error, please notify us by telephone or email immediately and > delete it from your system. Activity and use of our email system is > monitored to secure its effective operation and for other lawful business > purposes. Communications using this system will also be monitored and may > be recorded to secure effective operation and for other lawful business > purposes. Internet emails are not necessarily secure. We do not accept > responsibility for changes made to this message after it was sent. You are > advised to scan this message for viruses and we cannot accept liability for > any loss or damage which may be caused as a result of any computer virus. > > This email is sent by a bet365 group entity. The bet365 group includes the > following entities: Hillside (Shared Services) Limited (registration no. > 3958393), Hillside (Spain New Media) Plc (registration no. 07833226), > bet365 Group Limited (registration no. 4241161), Hillside (Technology) > Limited (registration no. 8273456), Hillside (Media Services) Limited > (registration no. 9171710), Hillside (Trader Services) Limited > (registration no. 9171598) each registered in England and Wales with a > registered office address at bet365 House, Media Way, Stoke-on-Trent, ST1 > 5SZ, United Kingdom; Hillside (Gibraltar) Limited (registration no. 97927), > Hillside (Sports) GP Limited (registration no. 111829) and Hillside > (Gaming) GP Limited (registered no. 111830) each registered in Gibraltar > with a registered office address at Unit 1.1, First Floor, Waterport Place, > 2 Europort Avenue, Gibraltar; Hillside (UK Sports) LP (registration no. > 117), Hillside (Sports) LP (registration no. 118), Hillside (International > Sports) LP (registration no. 119), Hillside (Gaming) LP (registration no. > 120) and Hillside (International Gaming) LP (registration no. 121) each > registered in Gibraltar with a principal place of business at Unit 1.1, > First Floor, Waterport Place, 2 Europort Avenue, Gibraltar; Hillside España > Leisure S.A (CIF no. A86340270) registered in Spain with a registered > office address at C/ Conde de Aranda nº20, 2º, 28001 Madrid, Spain; > Hillside (Australia New Media) Pty Limited (registration no. 148 920 665) > registered in Australia with a registered office address at Level 4, 90 > Arthur Street, North Sydney, NSW 2060, Australia; Hillside (New Media > Malta) Limited, (registration no c.66039) registered in Malta with a > registered office address at Office 1/2373, Level G, Quantum House, 75 > Abate Rigord Street, Ta’ Xbiex XBX 1120, Malta and Hillside (New Media > Cyprus) Limited, (registration no. HE 361612) registered in Cyprus with a > registered office address at Omrania Centre, 313, 28th October Avenue, 3105 > Limassol, Cyprus. Hillside (Shared Services) Limited, Hillside (Spain New > Media) Plc and Hillside (New Media Malta) Limited also have places of > business at Unit 1.1, First Floor, Waterport Place, 2 Europort Avenue, > Gibraltar. For residents of Greece, this email is sent on behalf of B2B > Gaming Services (Malta) Limited (registration number C41936) organised > under the laws of Malta with a registered office at Apartment 21, Suite 41, > Charles Court, St. Luke's Road, Pietà, Malta. >
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com