./ct_report/coverage/mod_roster_riak.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_roster_riak.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : mod_roster Riak backend with quasi-transaction support
5 %%% (see comment before transaction/1)
6 %%%
7 %%% MongooseIM, Copyright (C) 2015 Erlang Solutions Ltd.
8 %%%
9 %%%----------------------------------------------------------------------
10 -module(mod_roster_riak).
11
12 -include("mod_roster.hrl").
13 -include("jlib.hrl").
14 -include("mongoose.hrl").
15
16 -behaviour(mod_roster_backend).
17
18 %% API
19 -export([init/2,
20 transaction/2,
21 read_roster_version/3,
22 write_roster_version/5,
23 get_roster/3,
24 get_roster_entry/6,
25 get_subscription_lists/3,
26 roster_subscribe_t/2,
27 update_roster_t/2,
28 del_roster_t/4,
29 remove_user_t/3]).
30
31 -define(ROSTER_BUCKET(HostType, LServer),
32 {get_opt(HostType, bucket_type, <<"rosters">>), LServer}).
33 -define(VER_BUCKET(HostType, LServer),
34 {get_opt(HostType, version_bucket_type, <<"roster_versions">>), LServer}).
35
36 get_opt(HostType, Opt, Def) ->
37
:-(
gen_mod:get_module_opt(HostType, mod_roster, Opt, Def).
38
39 %% --------------------- mod_roster backend API -------------------------------
40
41 -spec init(mongooseim:host_type(), list()) -> ok.
42 init(_HostType, _Opts) ->
43
:-(
ok. % Common Riak pool is used
44
45 %% WARNING: Riak does not support *real* transactions, so we are in fact applying
46 %% all accumulated changes with no rollback support so it is possible to end up with
47 %% inconsistent state e.g. if Riak connectivity goes down in the middle of application.
48 -spec transaction(mongooseim:host_type(), fun(() -> any())) ->
49 {aborted, any()} | {atomic, any()}.
50 transaction(HostType, F) ->
51
:-(
put(riak_roster_t, []),
52
:-(
put(riak_version_t, []),
53
:-(
try F() of
54 Result ->
55 %% Applying
56
:-(
apply_t_roster(HostType),
57
:-(
apply_t_version(HostType),
58
:-(
put(riak_roster_t, []),
59
:-(
{atomic, Result}
60 catch
61
:-(
_:Reason -> {aborted, Reason}
62 after
63
:-(
lists:foreach(fun(Key) -> erase({riak_roster_t, Key}) end, erase(riak_roster_t)),
64
:-(
erase(riak_version_t)
65 end.
66
67 -spec read_roster_version(mongooseim:host_type(), jid:luser(), jid:lserver()) ->
68 binary() | error.
69 read_roster_version(HostType, LUser, LServer) ->
70
:-(
case mongoose_riak:get(?VER_BUCKET(HostType, LServer), LUser) of
71
:-(
{ok, VerObj} -> riakc_obj:get_value(VerObj);
72
:-(
_ -> error
73 end.
74
75 -spec write_roster_version(mongooseim:host_type(), jid:luser(), jid:lserver(),
76 mod_roster:transaction_state(), mod_roster:version()) -> ok.
77 write_roster_version(HostType, LUser, LServer, no_transaction, Ver) ->
78
:-(
VerObj = case mongoose_riak:get(?VER_BUCKET(HostType, LServer), LUser) of
79
:-(
{ok, VerObj1} -> riakc_obj:update_value(VerObj1, Ver);
80
:-(
_ -> riakc_obj:new(?VER_BUCKET(HostType, LServer), LUser, Ver)
81 end,
82
:-(
mongoose_riak:put(VerObj);
83 write_roster_version(_HostType, LUser, LServer, in_transaction, Ver) ->
84
:-(
Versions1 = get(riak_version_t),
85
:-(
put(riak_version_t, lists:keystore({LUser, LServer}, 1, Versions1, {{LUser, LServer}, Ver})).
86
87 -spec get_roster(mongooseim:host_type(), jid:luser(), jid:lserver()) -> [mod_roster:roster()].
88 get_roster(HostType, LUser, LServer) ->
89
:-(
RosterMap = get_rostermap(HostType, LUser, LServer),
90
:-(
riakc_map:fold(fun({_, register}, ItemReg, Acc) ->
91
:-(
[unpack_item(ItemReg) | Acc]
92 end,
93 [], RosterMap).
94
95 -spec get_roster_entry(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_roster:contact(),
96 mod_roster:transaction_state(), mod_roster:entry_format()) ->
97 mod_roster:roster() | does_not_exist.
98 get_roster_entry(HostType, LUser, LServer, LJID, TransactionState, _Format) ->
99
:-(
RosterMap = case TransactionState of
100 in_transaction ->
101
:-(
get_t_roster(HostType, LUser, LServer);
102 no_transaction ->
103
:-(
get_rostermap(HostType, LUser, LServer)
104 end,
105
:-(
find_in_rostermap(LJID, RosterMap).
106
107 -spec get_subscription_lists(mongoose_acc:t(), jid:luser(), jid:lserver()) -> [mod_roster:roster()].
108 get_subscription_lists(Acc, LUser, LServer) ->
109
:-(
HostType = mongoose_acc:host_type(Acc),
110
:-(
get_roster(HostType, LUser, LServer).
111
112 roster_subscribe_t(HostType, Item = #roster{us = {LUser, LServer}, jid = LJID}) ->
113
:-(
set_t_roster(HostType, LUser, LServer, LJID, Item).
114
115 -spec update_roster_t(mongooseim:host_type(), mod_roster:roster()) -> ok.
116 update_roster_t(HostType, Item = #roster{us = {LUser, LServer}, jid = LJID}) ->
117
:-(
set_t_roster(HostType, LUser, LServer, LJID, Item).
118
119 -spec del_roster_t(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_roster:contact()) -> ok.
120 del_roster_t(HostType, LUser, LServer, LJID) ->
121
:-(
del_t_roster(HostType, LUser, LServer, LJID).
122
123 remove_user_t(HostType, LUser, LServer) ->
124
:-(
mongoose_riak:delete(?VER_BUCKET(HostType, LServer), LUser),
125
:-(
mongoose_riak:delete(?ROSTER_BUCKET(HostType, LServer), LUser),
126
:-(
ok.
127
128 %% --------------------- Helpers --------------------------------
129
130 find_in_rostermap(LJID, RosterMap) ->
131
:-(
case riakc_map:find({jid:to_binary(LJID), register}, RosterMap) of
132
:-(
{ok, ItemReg} -> unpack_item(ItemReg);
133
:-(
error -> does_not_exist
134 end.
135
136 %% this is a transaction-less equivalent of get_t_roster
137 get_rostermap(HostType, LUser, LServer) ->
138
:-(
case mongoose_riak:fetch_type(?ROSTER_BUCKET(HostType, LServer), LUser) of
139 {ok, RMap} ->
140
:-(
RMap;
141 _ ->
142
:-(
riakc_map:new()
143 end.
144
145 -spec unpack_item(ItemReg :: binary()) -> mod_roster:roster().
146 unpack_item(ItemReg) ->
147
:-(
binary_to_term(ItemReg).
148
149 -spec get_t_roster(mongooseim:host_type(), jid:luser(), jid:lserver()) ->
150 riakc_map:crdt_map().
151 get_t_roster(HostType, LUser, LServer) ->
152
:-(
case get({riak_roster_t, {LUser, LServer}}) of
153 undefined ->
154
:-(
case mongoose_riak:fetch_type(?ROSTER_BUCKET(HostType, LServer), LUser) of
155
:-(
{ok, RosterMap} -> put({riak_roster_t, {LUser, LServer}}, RosterMap);
156
:-(
_ -> put({riak_roster_t, {LUser, LServer}}, riakc_map:new())
157 end,
158
:-(
put(riak_roster_t, [{LUser, LServer} | get(riak_roster_t)]),
159
:-(
get({riak_roster_t, {LUser, LServer}});
160 RosterMap ->
161
:-(
RosterMap
162 end.
163
164 -spec set_t_roster(HostType :: mongooseim:host_type(),
165 LUser :: jid:luser(),
166 LServer :: jid:lserver(),
167 LJID :: jid:simple_jid(),
168 Item :: mod_roster:roster()) -> any().
169 set_t_roster(HostType, LUser, LServer, LJID, Item) ->
170
:-(
RosterMap1 = get_t_roster(HostType, LUser, LServer),
171
:-(
put({riak_roster_t, {LUser, LServer}},
172 riakc_map:update({jid:to_binary(LJID), register},
173
:-(
fun(R) -> riakc_register:set(term_to_binary(Item), R) end, RosterMap1)).
174
175 -spec del_t_roster(HostType :: mongooseim:host_type(),
176 LUser :: jid:luser(),
177 LServer :: jid:lserver(),
178 LJID :: jid:simple_jid()) -> any().
179 del_t_roster(HostType, LUser, LServer, LJID) ->
180
:-(
RosterMap1 = get_t_roster(HostType, LUser, LServer),
181
:-(
RosterMap = case catch riakc_map:erase({jid:to_binary(LJID), register}, RosterMap1) of
182
:-(
context_required -> RosterMap1;
183
:-(
RosterMap2 -> RosterMap2
184 end,
185
:-(
put({riak_roster_t, {LUser, LServer}}, RosterMap).
186
187 -spec apply_t_roster(mongooseim:host_type()) -> ok.
188 apply_t_roster(HostType) ->
189
:-(
lists:foreach(
190 fun({LUser, LServer} = LUS) ->
191
:-(
RosterMap = erase({riak_roster_t, LUS}),
192
:-(
case riakc_map:to_op(RosterMap) of
193
:-(
undefined -> ok;
194
:-(
ToOp -> catch mongoose_riak:update_type(
195 ?ROSTER_BUCKET(HostType, LServer), LUser, ToOp)
196 end
197 end, get(riak_roster_t)).
198
199 -spec apply_t_version(mongooseim:host_type()) -> ok.
200 apply_t_version(HostType) ->
201
:-(
lists:foreach(
202 fun({{LUser, LServer}, NewVer}) ->
203
:-(
catch write_roster_version(HostType, LUser, LServer, no_transaction, NewVer)
204 end, get(riak_version_t)).
Line Hits Source