1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mod_roster_riak.erl |
3 |
|
%%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com> |
4 |
|
%%% Purpose : mod_roster Riak backend with quasi-transaction support |
5 |
|
%%% (see comment before transaction/1) |
6 |
|
%%% |
7 |
|
%%% MongooseIM, Copyright (C) 2015 Erlang Solutions Ltd. |
8 |
|
%%% |
9 |
|
%%%---------------------------------------------------------------------- |
10 |
|
-module(mod_roster_riak). |
11 |
|
|
12 |
|
-include("mod_roster.hrl"). |
13 |
|
-include("jlib.hrl"). |
14 |
|
-include("mongoose.hrl"). |
15 |
|
|
16 |
|
-behaviour(mod_roster_backend). |
17 |
|
|
18 |
|
%% API |
19 |
|
-export([init/2, |
20 |
|
transaction/2, |
21 |
|
read_roster_version/3, |
22 |
|
write_roster_version/5, |
23 |
|
get_roster/3, |
24 |
|
get_roster_entry/6, |
25 |
|
get_subscription_lists/3, |
26 |
|
roster_subscribe_t/2, |
27 |
|
update_roster_t/2, |
28 |
|
del_roster_t/4, |
29 |
|
remove_user_t/3]). |
30 |
|
|
31 |
|
-define(ROSTER_BUCKET(HostType, LServer), |
32 |
|
{get_opt(HostType, bucket_type, <<"rosters">>), LServer}). |
33 |
|
-define(VER_BUCKET(HostType, LServer), |
34 |
|
{get_opt(HostType, version_bucket_type, <<"roster_versions">>), LServer}). |
35 |
|
|
36 |
|
get_opt(HostType, Opt, Def) -> |
37 |
:-( |
gen_mod:get_module_opt(HostType, mod_roster, Opt, Def). |
38 |
|
|
39 |
|
%% --------------------- mod_roster backend API ------------------------------- |
40 |
|
|
41 |
|
-spec init(mongooseim:host_type(), list()) -> ok. |
42 |
|
init(_HostType, _Opts) -> |
43 |
:-( |
ok. % Common Riak pool is used |
44 |
|
|
45 |
|
%% WARNING: Riak does not support *real* transactions, so we are in fact applying |
46 |
|
%% all accumulated changes with no rollback support so it is possible to end up with |
47 |
|
%% inconsistent state e.g. if Riak connectivity goes down in the middle of application. |
48 |
|
-spec transaction(mongooseim:host_type(), fun(() -> any())) -> |
49 |
|
{aborted, any()} | {atomic, any()}. |
50 |
|
transaction(HostType, F) -> |
51 |
:-( |
put(riak_roster_t, []), |
52 |
:-( |
put(riak_version_t, []), |
53 |
:-( |
try F() of |
54 |
|
Result -> |
55 |
|
%% Applying |
56 |
:-( |
apply_t_roster(HostType), |
57 |
:-( |
apply_t_version(HostType), |
58 |
:-( |
put(riak_roster_t, []), |
59 |
:-( |
{atomic, Result} |
60 |
|
catch |
61 |
:-( |
_:Reason -> {aborted, Reason} |
62 |
|
after |
63 |
:-( |
lists:foreach(fun(Key) -> erase({riak_roster_t, Key}) end, erase(riak_roster_t)), |
64 |
:-( |
erase(riak_version_t) |
65 |
|
end. |
66 |
|
|
67 |
|
-spec read_roster_version(mongooseim:host_type(), jid:luser(), jid:lserver()) -> |
68 |
|
binary() | error. |
69 |
|
read_roster_version(HostType, LUser, LServer) -> |
70 |
:-( |
case mongoose_riak:get(?VER_BUCKET(HostType, LServer), LUser) of |
71 |
:-( |
{ok, VerObj} -> riakc_obj:get_value(VerObj); |
72 |
:-( |
_ -> error |
73 |
|
end. |
74 |
|
|
75 |
|
-spec write_roster_version(mongooseim:host_type(), jid:luser(), jid:lserver(), |
76 |
|
mod_roster:transaction_state(), mod_roster:version()) -> ok. |
77 |
|
write_roster_version(HostType, LUser, LServer, no_transaction, Ver) -> |
78 |
:-( |
VerObj = case mongoose_riak:get(?VER_BUCKET(HostType, LServer), LUser) of |
79 |
:-( |
{ok, VerObj1} -> riakc_obj:update_value(VerObj1, Ver); |
80 |
:-( |
_ -> riakc_obj:new(?VER_BUCKET(HostType, LServer), LUser, Ver) |
81 |
|
end, |
82 |
:-( |
mongoose_riak:put(VerObj); |
83 |
|
write_roster_version(_HostType, LUser, LServer, in_transaction, Ver) -> |
84 |
:-( |
Versions1 = get(riak_version_t), |
85 |
:-( |
put(riak_version_t, lists:keystore({LUser, LServer}, 1, Versions1, {{LUser, LServer}, Ver})). |
86 |
|
|
87 |
|
-spec get_roster(mongooseim:host_type(), jid:luser(), jid:lserver()) -> [mod_roster:roster()]. |
88 |
|
get_roster(HostType, LUser, LServer) -> |
89 |
:-( |
RosterMap = get_rostermap(HostType, LUser, LServer), |
90 |
:-( |
riakc_map:fold(fun({_, register}, ItemReg, Acc) -> |
91 |
:-( |
[unpack_item(ItemReg) | Acc] |
92 |
|
end, |
93 |
|
[], RosterMap). |
94 |
|
|
95 |
|
-spec get_roster_entry(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_roster:contact(), |
96 |
|
mod_roster:transaction_state(), mod_roster:entry_format()) -> |
97 |
|
mod_roster:roster() | does_not_exist. |
98 |
|
get_roster_entry(HostType, LUser, LServer, LJID, TransactionState, _Format) -> |
99 |
:-( |
RosterMap = case TransactionState of |
100 |
|
in_transaction -> |
101 |
:-( |
get_t_roster(HostType, LUser, LServer); |
102 |
|
no_transaction -> |
103 |
:-( |
get_rostermap(HostType, LUser, LServer) |
104 |
|
end, |
105 |
:-( |
find_in_rostermap(LJID, RosterMap). |
106 |
|
|
107 |
|
-spec get_subscription_lists(mongoose_acc:t(), jid:luser(), jid:lserver()) -> [mod_roster:roster()]. |
108 |
|
get_subscription_lists(Acc, LUser, LServer) -> |
109 |
:-( |
HostType = mongoose_acc:host_type(Acc), |
110 |
:-( |
get_roster(HostType, LUser, LServer). |
111 |
|
|
112 |
|
roster_subscribe_t(HostType, Item = #roster{us = {LUser, LServer}, jid = LJID}) -> |
113 |
:-( |
set_t_roster(HostType, LUser, LServer, LJID, Item). |
114 |
|
|
115 |
|
-spec update_roster_t(mongooseim:host_type(), mod_roster:roster()) -> ok. |
116 |
|
update_roster_t(HostType, Item = #roster{us = {LUser, LServer}, jid = LJID}) -> |
117 |
:-( |
set_t_roster(HostType, LUser, LServer, LJID, Item). |
118 |
|
|
119 |
|
-spec del_roster_t(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_roster:contact()) -> ok. |
120 |
|
del_roster_t(HostType, LUser, LServer, LJID) -> |
121 |
:-( |
del_t_roster(HostType, LUser, LServer, LJID). |
122 |
|
|
123 |
|
remove_user_t(HostType, LUser, LServer) -> |
124 |
:-( |
mongoose_riak:delete(?VER_BUCKET(HostType, LServer), LUser), |
125 |
:-( |
mongoose_riak:delete(?ROSTER_BUCKET(HostType, LServer), LUser), |
126 |
:-( |
ok. |
127 |
|
|
128 |
|
%% --------------------- Helpers -------------------------------- |
129 |
|
|
130 |
|
find_in_rostermap(LJID, RosterMap) -> |
131 |
:-( |
case riakc_map:find({jid:to_binary(LJID), register}, RosterMap) of |
132 |
:-( |
{ok, ItemReg} -> unpack_item(ItemReg); |
133 |
:-( |
error -> does_not_exist |
134 |
|
end. |
135 |
|
|
136 |
|
%% this is a transaction-less equivalent of get_t_roster |
137 |
|
get_rostermap(HostType, LUser, LServer) -> |
138 |
:-( |
case mongoose_riak:fetch_type(?ROSTER_BUCKET(HostType, LServer), LUser) of |
139 |
|
{ok, RMap} -> |
140 |
:-( |
RMap; |
141 |
|
_ -> |
142 |
:-( |
riakc_map:new() |
143 |
|
end. |
144 |
|
|
145 |
|
-spec unpack_item(ItemReg :: binary()) -> mod_roster:roster(). |
146 |
|
unpack_item(ItemReg) -> |
147 |
:-( |
binary_to_term(ItemReg). |
148 |
|
|
149 |
|
-spec get_t_roster(mongooseim:host_type(), jid:luser(), jid:lserver()) -> |
150 |
|
riakc_map:crdt_map(). |
151 |
|
get_t_roster(HostType, LUser, LServer) -> |
152 |
:-( |
case get({riak_roster_t, {LUser, LServer}}) of |
153 |
|
undefined -> |
154 |
:-( |
case mongoose_riak:fetch_type(?ROSTER_BUCKET(HostType, LServer), LUser) of |
155 |
:-( |
{ok, RosterMap} -> put({riak_roster_t, {LUser, LServer}}, RosterMap); |
156 |
:-( |
_ -> put({riak_roster_t, {LUser, LServer}}, riakc_map:new()) |
157 |
|
end, |
158 |
:-( |
put(riak_roster_t, [{LUser, LServer} | get(riak_roster_t)]), |
159 |
:-( |
get({riak_roster_t, {LUser, LServer}}); |
160 |
|
RosterMap -> |
161 |
:-( |
RosterMap |
162 |
|
end. |
163 |
|
|
164 |
|
-spec set_t_roster(HostType :: mongooseim:host_type(), |
165 |
|
LUser :: jid:luser(), |
166 |
|
LServer :: jid:lserver(), |
167 |
|
LJID :: jid:simple_jid(), |
168 |
|
Item :: mod_roster:roster()) -> any(). |
169 |
|
set_t_roster(HostType, LUser, LServer, LJID, Item) -> |
170 |
:-( |
RosterMap1 = get_t_roster(HostType, LUser, LServer), |
171 |
:-( |
put({riak_roster_t, {LUser, LServer}}, |
172 |
|
riakc_map:update({jid:to_binary(LJID), register}, |
173 |
:-( |
fun(R) -> riakc_register:set(term_to_binary(Item), R) end, RosterMap1)). |
174 |
|
|
175 |
|
-spec del_t_roster(HostType :: mongooseim:host_type(), |
176 |
|
LUser :: jid:luser(), |
177 |
|
LServer :: jid:lserver(), |
178 |
|
LJID :: jid:simple_jid()) -> any(). |
179 |
|
del_t_roster(HostType, LUser, LServer, LJID) -> |
180 |
:-( |
RosterMap1 = get_t_roster(HostType, LUser, LServer), |
181 |
:-( |
RosterMap = case catch riakc_map:erase({jid:to_binary(LJID), register}, RosterMap1) of |
182 |
:-( |
context_required -> RosterMap1; |
183 |
:-( |
RosterMap2 -> RosterMap2 |
184 |
|
end, |
185 |
:-( |
put({riak_roster_t, {LUser, LServer}}, RosterMap). |
186 |
|
|
187 |
|
-spec apply_t_roster(mongooseim:host_type()) -> ok. |
188 |
|
apply_t_roster(HostType) -> |
189 |
:-( |
lists:foreach( |
190 |
|
fun({LUser, LServer} = LUS) -> |
191 |
:-( |
RosterMap = erase({riak_roster_t, LUS}), |
192 |
:-( |
case riakc_map:to_op(RosterMap) of |
193 |
:-( |
undefined -> ok; |
194 |
:-( |
ToOp -> catch mongoose_riak:update_type( |
195 |
|
?ROSTER_BUCKET(HostType, LServer), LUser, ToOp) |
196 |
|
end |
197 |
|
end, get(riak_roster_t)). |
198 |
|
|
199 |
|
-spec apply_t_version(mongooseim:host_type()) -> ok. |
200 |
|
apply_t_version(HostType) -> |
201 |
:-( |
lists:foreach( |
202 |
|
fun({{LUser, LServer}, NewVer}) -> |
203 |
:-( |
catch write_roster_version(HostType, LUser, LServer, no_transaction, NewVer) |
204 |
|
end, get(riak_version_t)). |