1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mod_pubsub_db_mnesia.erl |
3 |
|
%%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com> |
4 |
|
%%% Purpose : PubSub Mnesia backend |
5 |
|
%%% Created : 26 Oct 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com> |
6 |
|
%%%---------------------------------------------------------------------- |
7 |
|
|
8 |
|
-module(mod_pubsub_db_mnesia). |
9 |
|
-author('piotr.nosek@erlang-solutions.com'). |
10 |
|
|
11 |
|
-behaviour(mod_pubsub_db). |
12 |
|
|
13 |
|
-include("pubsub.hrl"). |
14 |
|
-include("jlib.hrl"). |
15 |
|
-include("mongoose_logger.hrl"). |
16 |
|
-include_lib("stdlib/include/qlc.hrl"). |
17 |
|
|
18 |
|
-export([start/0, stop/0]). |
19 |
|
% Funs execution |
20 |
|
-export([transaction/2, dirty/2]). |
21 |
|
% Direct #pubsub_state access |
22 |
|
-export([del_node/1, get_state/2, |
23 |
|
get_states/1, get_states_by_lus/1, get_states_by_bare/1, |
24 |
|
get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]). |
25 |
|
% Node management |
26 |
|
-export([ |
27 |
|
create_node/2, |
28 |
|
set_node/1, |
29 |
|
find_node_by_id/1, |
30 |
|
find_nodes_by_key/1, |
31 |
|
find_node_by_name/2, |
32 |
|
delete_node/1, |
33 |
|
get_subnodes/2, |
34 |
|
get_parentnodes_tree/2, |
35 |
|
get_subnodes_tree/2 |
36 |
|
]). |
37 |
|
% Affiliations |
38 |
|
-export([ |
39 |
|
set_affiliation/3, |
40 |
|
get_affiliation/2 |
41 |
|
]). |
42 |
|
% Subscriptions |
43 |
|
-export([ |
44 |
|
add_subscription/5, |
45 |
|
set_subscription_opts/4, |
46 |
|
get_node_subscriptions/1, |
47 |
|
get_node_entity_subscriptions/2, |
48 |
|
delete_subscription/3, |
49 |
|
delete_all_subscriptions/2, |
50 |
|
update_subscription/4 |
51 |
|
]). |
52 |
|
% Item ids in state |
53 |
|
-export([ |
54 |
|
add_item/3, |
55 |
|
remove_items/3, |
56 |
|
remove_all_items/1 |
57 |
|
]). |
58 |
|
|
59 |
|
% Whole Items |
60 |
|
-export([ |
61 |
|
get_items/2, |
62 |
|
get_item/2, |
63 |
|
set_item/1, |
64 |
|
del_item/2, |
65 |
|
del_items/2 |
66 |
|
]). |
67 |
|
|
68 |
|
% GDPR |
69 |
|
-export([ |
70 |
|
get_user_payloads/2, |
71 |
|
get_user_nodes/2, |
72 |
|
get_user_subscriptions/2, |
73 |
|
delete_user_subscriptions/1, |
74 |
|
find_nodes_by_affiliated_user/1 |
75 |
|
]). |
76 |
|
|
77 |
|
%%==================================================================== |
78 |
|
%% Internal records definitions |
79 |
|
%%==================================================================== |
80 |
|
|
81 |
|
-record(pubsub_subscription, { |
82 |
|
subid :: mod_pubsub:subId(), |
83 |
|
options = [] :: mod_pubsub:subOptions() |
84 |
|
}). |
85 |
|
|
86 |
|
%%==================================================================== |
87 |
|
%% Behaviour callbacks |
88 |
|
%%==================================================================== |
89 |
|
|
90 |
|
%% ------------------------ Backend start/stop ------------------------ |
91 |
|
|
92 |
|
-spec start() -> ok. |
93 |
|
start() -> |
94 |
34 |
mnesia:create_table(pubsub_state, |
95 |
|
[{disc_copies, [node()]}, |
96 |
|
{type, ordered_set}, |
97 |
|
{attributes, record_info(fields, pubsub_state)}]), |
98 |
34 |
mnesia:add_table_copy(pubsub_state, node(), disc_copies), |
99 |
34 |
mnesia:create_table(pubsub_item, |
100 |
|
[{disc_only_copies, [node()]}, |
101 |
|
{attributes, record_info(fields, pubsub_item)}]), |
102 |
34 |
mnesia:add_table_copy(pubsub_item, node(), disc_only_copies), |
103 |
34 |
mnesia:create_table(pubsub_node, |
104 |
|
[{disc_copies, [node()]}, |
105 |
|
{attributes, record_info(fields, pubsub_node)}]), |
106 |
34 |
mnesia:add_table_index(pubsub_node, id), |
107 |
34 |
mnesia:create_table(pubsub_subscription, |
108 |
|
[{disc_copies, [node()]}, |
109 |
|
{attributes, record_info(fields, pubsub_subscription)}, |
110 |
|
{type, set}]), |
111 |
34 |
mnesia:add_table_copy(pubsub_subscription, node(), disc_copies), |
112 |
34 |
CreateSubnodeTableResult = mnesia:create_table(pubsub_subnode, |
113 |
|
[{disc_copies, [node()]}, |
114 |
|
{attributes, record_info(fields, pubsub_subnode)}, |
115 |
|
{type, bag}]), |
116 |
34 |
mnesia:add_table_copy(pubsub_subnode, node(), disc_copies), |
117 |
34 |
maybe_fill_subnode_table(CreateSubnodeTableResult), |
118 |
34 |
pubsub_index:init(), |
119 |
34 |
ok. |
120 |
|
|
121 |
|
-spec stop() -> ok. |
122 |
|
stop() -> |
123 |
34 |
ok. |
124 |
|
|
125 |
|
%% If pubsub_subnode table was missing, than fill it with data |
126 |
|
maybe_fill_subnode_table({atomic, ok}) -> |
127 |
1 |
F = fun(#pubsub_node{} = Node, Acc) -> |
128 |
:-( |
set_subnodes(Node, []), |
129 |
:-( |
Acc |
130 |
|
end, |
131 |
1 |
mnesia:transaction(fun() -> mnesia:foldl(F, ok, pubsub_node) end); |
132 |
|
maybe_fill_subnode_table(_Other) -> |
133 |
33 |
ok. |
134 |
|
|
135 |
|
%% ------------------------ Fun execution ------------------------ |
136 |
|
|
137 |
|
-spec transaction(Fun :: fun(() -> {result | error, any()}), |
138 |
|
ErrorDebug :: map()) -> |
139 |
|
{result | error, any()}. |
140 |
|
transaction(Fun, ErrorDebug) -> |
141 |
356 |
transaction(Fun, ErrorDebug, 3). |
142 |
|
|
143 |
|
transaction(Fun, ErrorDebug, Retries) -> |
144 |
380 |
case mnesia:transaction(mod_pubsub_db:extra_debug_fun(Fun)) of |
145 |
|
{atomic, Result} -> |
146 |
356 |
Result; |
147 |
|
{aborted, ReasonData} when Retries > 0 -> |
148 |
24 |
?LOG_WARNING(#{what => pubsub_transaction_retry, retries => Retries, |
149 |
:-( |
reason => ReasonData, debug => ErrorDebug}), |
150 |
24 |
timer:sleep(100), |
151 |
24 |
transaction(Fun, ErrorDebug, Retries - 1); |
152 |
|
{aborted, ReasonData} -> |
153 |
:-( |
?LOG_WARNING(#{what => pubsub_transaction_failed, reason => ReasonData, |
154 |
:-( |
debug => ErrorDebug}), |
155 |
:-( |
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed) |
156 |
|
end. |
157 |
|
|
158 |
|
-spec dirty(Fun :: fun(() -> {result | error, any()}), |
159 |
|
ErrorDebug :: map()) -> |
160 |
|
{result | error, any()}. |
161 |
|
dirty(Fun, ErrorDebug) -> |
162 |
1553 |
try mnesia:sync_dirty(mod_pubsub_db:extra_debug_fun(Fun), []) of |
163 |
|
Result -> |
164 |
1553 |
Result |
165 |
|
catch |
166 |
|
_C:ReasonData -> |
167 |
:-( |
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed) |
168 |
|
end. |
169 |
|
|
170 |
|
%% ------------------------ GDPR-related ------------------------ |
171 |
|
|
172 |
|
-spec get_user_payloads(LUser :: jid:luser(), LServer :: jid:lserver()) -> |
173 |
|
[NodeNameItemIDAndPayload :: [binary()]]. |
174 |
|
get_user_payloads(LUser, LServer) -> |
175 |
21 |
{atomic, Recs} = mnesia:transaction(fun() -> get_user_payloads_t(LUser, LServer) end), |
176 |
21 |
Recs. |
177 |
|
|
178 |
|
get_user_payloads_t(LUser, LServer) -> |
179 |
21 |
BareUserMatchSpec = {'_', {LUser, LServer, '_'}}, |
180 |
21 |
Items = mnesia:match_object(#pubsub_item{creation = BareUserMatchSpec, _ = '_'}), |
181 |
21 |
[[node_name(Nidx), ItemId, << <<(exml:to_binary(P))/binary>> || P <- Payload >>] || |
182 |
21 |
#pubsub_item{itemid = {ItemId, Nidx}, payload = Payload} <- Items]. |
183 |
|
|
184 |
|
-spec get_user_nodes(LUser :: jid:luser(), LServer :: jid:lserver()) -> |
185 |
|
[NodeNameAndType :: [binary()]]. |
186 |
|
get_user_nodes(LUser, LServer) -> |
187 |
21 |
LJID = {LUser, LServer, <<>>}, |
188 |
21 |
{atomic, Recs} = mnesia:transaction(fun() -> |
189 |
21 |
Nodes = mnesia:match_object(#pubsub_node{owners = [LJID], _ = '_'}), |
190 |
21 |
[[NodeName, Type] || #pubsub_node{nodeid = {_, NodeName}, type = Type} <- Nodes] |
191 |
|
end), |
192 |
21 |
Recs. |
193 |
|
|
194 |
|
-spec get_user_subscriptions(LUser :: jid:luser(), LServer :: jid:lserver()) -> |
195 |
|
[NodeName :: [binary()]]. |
196 |
|
get_user_subscriptions(LUser, LServer) -> |
197 |
21 |
{atomic, Recs} = mnesia:transaction(fun() -> get_user_subscriptions_t(LUser, LServer) end), |
198 |
21 |
Recs. |
199 |
|
|
200 |
|
get_user_subscriptions_t(LUser, LServer) -> |
201 |
21 |
UserMatchSpec = {LUser, LServer, '_'}, |
202 |
21 |
SubscriptionStates |
203 |
|
= mnesia:match_object(#pubsub_state{stateid = {UserMatchSpec, '_'}, |
204 |
|
subscriptions = [{subscribed, '_'}], _ = '_'}), |
205 |
21 |
[ [node_name(Nidx)] || #pubsub_state{stateid = {_, Nidx}} <- SubscriptionStates]. |
206 |
|
|
207 |
|
node_name(Nidx) -> |
208 |
22 |
case find_node_by_id(Nidx) of |
209 |
22 |
{ok, #pubsub_node{ nodeid = {_, NodeName} }} -> NodeName; |
210 |
:-( |
_ -> <<>> |
211 |
|
end. |
212 |
|
|
213 |
|
-spec find_nodes_by_affiliated_user(JID :: jid:ljid()) -> |
214 |
|
[{mod_pubsub:pubsubNode(), mod_pubsub:affiliation()}]. |
215 |
|
find_nodes_by_affiliated_user(LJID) -> |
216 |
125 |
{ok, States} = get_states_by_lus(LJID), |
217 |
125 |
lists:map(fun(#pubsub_state{ stateid = {_, Nidx}, affiliation = Aff }) -> |
218 |
21 |
{ok, Node} = find_node_by_id(Nidx), |
219 |
21 |
{Node, Aff} |
220 |
|
end, States). |
221 |
|
|
222 |
|
%% ------------------------ Direct #pubsub_state access ------------------------ |
223 |
|
|
224 |
|
-spec get_state(Nidx :: mod_pubsub:nodeIdx(), |
225 |
|
LJID :: jid:ljid()) -> |
226 |
|
{ok, mod_pubsub:pubsubState()}. |
227 |
|
get_state(Nidx, LJID) -> |
228 |
250 |
get_state(Nidx, LJID, read). |
229 |
|
|
230 |
|
-spec get_states(Nidx :: mod_pubsub:nodeIdx()) -> |
231 |
|
{ok, [mod_pubsub:pubsubState()]}. |
232 |
|
get_states(Nidx) -> |
233 |
731 |
States = mnesia:match_object(#pubsub_state{stateid = {'_', Nidx}, _ = '_'}), |
234 |
717 |
{ok, States}. |
235 |
|
|
236 |
|
-spec get_states_by_lus(LJID :: jid:ljid()) -> |
237 |
|
{ok, [mod_pubsub:pubsubState()]}. |
238 |
|
get_states_by_lus({ LUser, LServer, _ }) -> |
239 |
269 |
{ok, mnesia:match_object(#pubsub_state{stateid = {{LUser, LServer, '_'}, '_'}, _ = '_'})}. |
240 |
|
|
241 |
|
-spec get_states_by_bare(LJID :: jid:ljid()) -> |
242 |
|
{ok, [mod_pubsub:pubsubState()]}. |
243 |
|
get_states_by_bare(LJID) -> |
244 |
361 |
LBare = jid:to_bare(LJID), |
245 |
361 |
{ok, mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}. |
246 |
|
|
247 |
|
-spec get_states_by_bare_and_full(LJID :: jid:ljid()) -> |
248 |
|
{ok, [mod_pubsub:pubsubState()]}. |
249 |
|
get_states_by_bare_and_full(LJID) -> |
250 |
458 |
LBare = jid:to_bare(LJID), |
251 |
458 |
{ok, mnesia:match_object(#pubsub_state{stateid = {LJID, '_'}, _ = '_'}) |
252 |
|
++ mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}. |
253 |
|
|
254 |
|
-spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) -> |
255 |
|
{ok, [mod_pubsub:nodeIdx()]}. |
256 |
|
get_idxs_of_own_nodes_with_pending_subs(LJID) -> |
257 |
2 |
LBare = jid:to_bare(LJID), |
258 |
2 |
MyStates = mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, |
259 |
|
affiliation = owner, _ = '_'}), |
260 |
2 |
NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- MyStates], |
261 |
2 |
ResultNidxs = mnesia:foldl(pa:bind(fun get_idxs_with_pending_subs/3, NodeIdxs), |
262 |
|
[], pubsub_state), |
263 |
2 |
{ok, ResultNidxs}. |
264 |
|
|
265 |
|
%% ------------------------ Node management ------------------------ |
266 |
|
|
267 |
|
-spec create_node(Nidx :: mod_pubsub:nodeIdx(), |
268 |
|
Owner :: jid:ljid()) -> |
269 |
|
ok. |
270 |
|
create_node(Nidx, Owner) -> |
271 |
213 |
set_affiliation(Nidx, Owner, owner). |
272 |
|
|
273 |
|
-spec del_node(Nidx :: mod_pubsub:nodeIdx()) -> |
274 |
|
{ok, [mod_pubsub:pubsubState()]}. |
275 |
|
del_node(Nidx) -> |
276 |
158 |
{ok, States} = get_states(Nidx), |
277 |
158 |
lists:foreach(fun (#pubsub_state{stateid = {LJID, _}, items = Items}) -> |
278 |
248 |
del_items(Nidx, Items), |
279 |
248 |
del_state_by_idx_and_ljid(Nidx, LJID) |
280 |
|
end, States), |
281 |
156 |
{ok, States}. |
282 |
|
|
283 |
|
-spec set_node(mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}. |
284 |
|
set_node(#pubsub_node{id = undefined} = Node) -> |
285 |
222 |
CreateNode = Node#pubsub_node{id = pubsub_index:new(node)}, |
286 |
222 |
OldParents = [], |
287 |
222 |
set_node(CreateNode, OldParents); |
288 |
|
set_node(Node) -> |
289 |
11 |
OldParents = get_parentnodes_names(Node), |
290 |
11 |
set_node(Node, OldParents). |
291 |
|
|
292 |
|
set_node(#pubsub_node{id = Nidx} = Node, OldParents) -> |
293 |
233 |
mnesia:write(Node), |
294 |
233 |
set_subnodes(Node, OldParents), |
295 |
233 |
{ok, Nidx}. |
296 |
|
|
297 |
|
get_parentnodes_names(#pubsub_node{nodeid = NodeId}) -> |
298 |
11 |
case mnesia:read(pubsub_node, NodeId, write) of |
299 |
|
[] -> |
300 |
:-( |
[]; |
301 |
|
[#pubsub_node{parents = Parents}] -> |
302 |
11 |
Parents |
303 |
|
end. |
304 |
|
|
305 |
|
set_subnodes(#pubsub_node{parents = Parents}, Parents) -> |
306 |
205 |
ok; |
307 |
|
set_subnodes(#pubsub_node{nodeid = NodeId, parents = NewParents}, OldParents) -> |
308 |
28 |
set_subnodes(NodeId, NewParents, OldParents). |
309 |
|
|
310 |
|
set_subnodes({Key, Node}, NewParents, OldParents) -> |
311 |
186 |
OldParentsSet = sets:from_list(OldParents), |
312 |
186 |
NewParentsSet = sets:from_list(NewParents), |
313 |
186 |
Deleted = sets:to_list(sets:subtract(OldParentsSet, NewParentsSet)), |
314 |
186 |
Added = sets:to_list(sets:subtract(NewParentsSet, OldParentsSet)), |
315 |
186 |
DeletedObjects = names_to_subnode_records(Key, Node, Deleted), |
316 |
186 |
AddedObjects = names_to_subnode_records(Key, Node, Added), |
317 |
186 |
lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, DeletedObjects), |
318 |
186 |
lists:foreach(fun(Object) -> mnesia:write(Object) end, AddedObjects), |
319 |
186 |
ok. |
320 |
|
|
321 |
|
names_to_subnode_records(Key, Node, Parents) -> |
322 |
372 |
[#pubsub_subnode{nodeid = {Key, Parent}, subnode = Node} || Parent <- Parents]. |
323 |
|
|
324 |
|
-spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) -> |
325 |
|
{error, not_found} | {ok, mod_pubsub:pubsubNode()}. |
326 |
|
find_node_by_id(Nidx) -> |
327 |
270 |
case mnesia:index_read(pubsub_node, Nidx, #pubsub_node.id) of |
328 |
270 |
[#pubsub_node{} = Record] -> {ok, Record}; |
329 |
|
[] -> |
330 |
:-( |
{error, not_found} |
331 |
|
end. |
332 |
|
|
333 |
|
-spec find_node_by_name( |
334 |
|
Key :: mod_pubsub:hostPubsub() | jid:ljid(), |
335 |
|
Node :: mod_pubsub:nodeId()) -> |
336 |
|
mod_pubsub:pubsubNode() | false. |
337 |
|
find_node_by_name(Key, Node) -> |
338 |
1624 |
case mnesia:read(pubsub_node, oid(Key, Node), read) of |
339 |
242 |
[] -> false; |
340 |
1382 |
[NodeRec] -> NodeRec |
341 |
|
end. |
342 |
|
|
343 |
|
-spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) -> |
344 |
|
[mod_pubsub:pubsubNode()]. |
345 |
|
find_nodes_by_key(Key) -> |
346 |
18 |
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, _ = '_'}). |
347 |
|
|
348 |
1624 |
oid(Key, Name) -> {Key, Name}. |
349 |
|
|
350 |
|
|
351 |
|
-spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok. |
352 |
|
delete_node(#pubsub_node{nodeid = NodeId, parents = Parents}) -> |
353 |
158 |
set_subnodes(NodeId, [], Parents), |
354 |
158 |
mnesia:delete({pubsub_node, NodeId}). |
355 |
|
|
356 |
|
|
357 |
|
-spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) -> |
358 |
|
[mod_pubsub:pubsubNode()]. |
359 |
|
get_subnodes(Key, <<>>) -> |
360 |
8 |
get_nodes_without_parents(Key); |
361 |
|
get_subnodes(Key, Node) -> |
362 |
88 |
Subnodes = get_subnodes_names(Key, Node), |
363 |
88 |
find_nodes_by_names(Key, Subnodes). |
364 |
|
|
365 |
|
%% Like get_subnodes, but returns node names instead of node records. |
366 |
|
get_subnodes_names(Key, Node) -> |
367 |
151 |
MnesiaRecords = mnesia:read({pubsub_subnode, {Key, Node}}), |
368 |
151 |
[Subnode || #pubsub_subnode{subnode = Subnode} <- MnesiaRecords]. |
369 |
|
|
370 |
|
%% Warning: this function is full table scan and can return a lot of records |
371 |
|
get_nodes_without_parents(Key) -> |
372 |
8 |
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, parents = [], _ = '_'}). |
373 |
|
|
374 |
|
%% Return a list of {Depth, Nodes} where Nodes are parent nodes of nodes of the lower level. |
375 |
|
%% So, we start with {0, [NodeRecord]}, where NodeRecord is node that corresponds to Node argument. |
376 |
|
%% The next level is {1, Nodes1}, where Nodes1 are parent nodes of Node. |
377 |
|
%% The next level can be {2, Nodes2}, where Nodes2 are parents of Nodes1 (without duplicates). |
378 |
|
%% |
379 |
|
%% Each node can be returned only ones by the function. |
380 |
|
-spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) -> |
381 |
|
[{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}]. |
382 |
|
get_parentnodes_tree(Key, Node) -> |
383 |
294 |
case find_node_by_name(Key, Node) of |
384 |
|
false -> |
385 |
:-( |
[ {0, []} ]; %% node not found case |
386 |
|
|
387 |
|
#pubsub_node{parents = []} = Record -> |
388 |
243 |
[ {0, [Record]} ]; |
389 |
|
|
390 |
|
#pubsub_node{parents = Parents} = Record -> |
391 |
51 |
Depth = 1, |
392 |
|
%% To avoid accidental cyclic issues, let's maintain the list of known nodes |
393 |
|
%% which we don't expand again |
394 |
51 |
KnownNodesSet = sets:from_list([Node]), |
395 |
51 |
extract_parents(Key, Node, Parents, Depth, KnownNodesSet) ++ [ {0, [Record]} ] |
396 |
|
end. |
397 |
|
|
398 |
|
%% Each call extract Parents on the level and recurse to the next level. |
399 |
|
%% KnownNodesSet are nodes to be filtered out. |
400 |
|
extract_parents(Key, InitialNode, Parents, Depth, KnownNodesSet) -> |
401 |
54 |
ParentRecords = find_nodes_by_names(Key, Parents), |
402 |
54 |
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Parents)), |
403 |
|
%% Names of parents of parents |
404 |
54 |
PPNames = lists:usort(lists:flatmap(fun(#pubsub_node{parents = PP}) -> PP end, ParentRecords)), |
405 |
54 |
CyclicNames = [Name || Name <- PPNames, sets:is_element(Name, KnownNodesSet1)], |
406 |
54 |
case CyclicNames of |
407 |
54 |
[] -> []; |
408 |
:-( |
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected, |
409 |
:-( |
pubsub_node => InitialNode, cyclic_names => CyclicNames}) |
410 |
|
end, |
411 |
|
%% PPNames is ordset, so we don't need to worry about having duplicates in it. |
412 |
|
%% CyclicNames is usually an empty list. |
413 |
54 |
PPNamesToGet = PPNames -- CyclicNames, |
414 |
|
case PPNamesToGet of |
415 |
51 |
[] -> []; |
416 |
3 |
_ -> extract_parents(Key, InitialNode, PPNamesToGet, Depth + 1, KnownNodesSet1) |
417 |
54 |
end ++ [ {Depth, ParentRecords} ]. |
418 |
|
|
419 |
|
find_nodes_by_names(Key, Nodes) -> |
420 |
|
%% Contains false for missing nodes |
421 |
204 |
MaybeRecords = [find_node_by_name(Key, Node) || Node <- Nodes], |
422 |
|
%% Filter out false-s |
423 |
204 |
[Record || Record = #pubsub_node{} <- MaybeRecords]. |
424 |
|
|
425 |
|
|
426 |
|
-spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) -> |
427 |
|
[{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}]. |
428 |
|
get_subnodes_tree(Key, Node) -> |
429 |
62 |
case find_node_by_name(Key, Node) of |
430 |
|
false -> |
431 |
:-( |
[ {1, []}, {0, [false]} ]; %% node not found case |
432 |
|
|
433 |
|
#pubsub_node{} = Record -> |
434 |
62 |
Subnodes = get_subnodes_names(Key, Node), |
435 |
62 |
Depth = 1, |
436 |
|
%% To avoid accidental cyclic issues, let's maintain the list of known nodes |
437 |
|
%% which we don't expand again |
438 |
62 |
KnownNodesSet = sets:from_list([Node]), |
439 |
62 |
extract_subnodes(Key, Node, Subnodes, Depth, KnownNodesSet) ++ [ {0, [Record]} ] |
440 |
|
end. |
441 |
|
|
442 |
|
%% Each call extract Subnodes on the level and recurse to the next level. |
443 |
|
%% KnownNodesSet are nodes to be filtered out. |
444 |
|
extract_subnodes(Key, InitialNode, Subnodes, Depth, KnownNodesSet) -> |
445 |
62 |
SubnodesRecords = find_nodes_by_names(Key, Subnodes), |
446 |
62 |
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Subnodes)), |
447 |
|
%% Names of subnodes of subnodes |
448 |
62 |
SSNames = lists:usort(lists:flatmap(fun(Subnode) -> get_subnodes_names(Key, Subnode) end, Subnodes)), |
449 |
62 |
CyclicNames = [Name || Name <- SSNames, sets:is_element(Name, KnownNodesSet1)], |
450 |
62 |
case CyclicNames of |
451 |
62 |
[] -> []; |
452 |
:-( |
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected, |
453 |
:-( |
pubsub_node => InitialNode, cyclic_names => CyclicNames}) |
454 |
|
end, |
455 |
62 |
SSNamesToGet = SSNames -- CyclicNames, |
456 |
|
case SSNamesToGet of |
457 |
62 |
[] -> [ {Depth + 1, []} ]; |
458 |
:-( |
_ -> extract_subnodes(Key, InitialNode, SSNamesToGet, Depth + 1, KnownNodesSet1) |
459 |
62 |
end ++ [ {Depth, SubnodesRecords} ]. |
460 |
|
|
461 |
|
%% ------------------------ Affiliations ------------------------ |
462 |
|
|
463 |
|
-spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(), |
464 |
|
LJID :: jid:ljid(), |
465 |
|
Affiliation :: mod_pubsub:affiliation()) -> ok. |
466 |
|
set_affiliation(Nidx, LJID, Affiliation) -> |
467 |
267 |
BareLJID = jid:to_bare(LJID), |
468 |
267 |
{ok, State} = get_state(Nidx, BareLJID, write), |
469 |
259 |
case {Affiliation, State#pubsub_state.subscriptions} of |
470 |
12 |
{none, []} -> del_state_by_idx_and_ljid(Nidx, BareLJID); |
471 |
247 |
_ -> mnesia:write(State#pubsub_state{ affiliation = Affiliation }) |
472 |
|
end. |
473 |
|
|
474 |
|
-spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(), |
475 |
|
LJID :: jid:ljid()) -> |
476 |
|
{ok, mod_pubsub:affiliation()}. |
477 |
|
get_affiliation(Nidx, LJID) -> |
478 |
481 |
{ok, State} = get_state(Nidx, jid:to_bare(LJID), read), |
479 |
481 |
{ok, State#pubsub_state.affiliation}. |
480 |
|
|
481 |
|
%% ------------------------ Subscriptions ------------------------ |
482 |
|
|
483 |
|
-spec add_subscription(Nidx :: mod_pubsub:nodeIdx(), |
484 |
|
LJID :: jid:ljid(), |
485 |
|
Sub :: mod_pubsub:subscription(), |
486 |
|
SubId :: mod_pubsub:subId(), |
487 |
|
SubOpts :: mod_pubsub:subOptions()) -> ok. |
488 |
|
add_subscription(Nidx, LJID, Sub, SubId, SubOpts) -> |
489 |
93 |
{ok, State} = get_state(Nidx, LJID, write), |
490 |
93 |
NSubscriptions = [{Sub, SubId} | State#pubsub_state.subscriptions ], |
491 |
93 |
mnesia:write(State#pubsub_state{ subscriptions = NSubscriptions }), |
492 |
93 |
set_subscription_opts(Nidx, LJID, SubId, SubOpts). |
493 |
|
|
494 |
|
-spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(), |
495 |
|
JID :: jid:ljid(), |
496 |
|
SubId :: mod_pubsub:subId(), |
497 |
|
Opts :: mod_pubsub:subOptions()) -> ok. |
498 |
|
set_subscription_opts(_Nidx, _JID, SubId, Opts) -> |
499 |
97 |
mnesia:write(#pubsub_subscription{ subid = SubId, options = Opts }). |
500 |
|
|
501 |
|
-spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) -> |
502 |
|
{ok, [{Entity :: jid:ljid(), |
503 |
|
Sub :: mod_pubsub:subscription(), |
504 |
|
SubId :: mod_pubsub:subId(), |
505 |
|
Opts :: mod_pubsub:subOptions()}]}. |
506 |
|
get_node_subscriptions(Nidx) -> |
507 |
559 |
{ok, States} = get_states(Nidx), |
508 |
545 |
{ok, states_to_subscriptions(States)}. |
509 |
|
|
510 |
|
-spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(), |
511 |
|
LJID :: jid:ljid()) -> |
512 |
|
{ok, [{Sub :: mod_pubsub:subscription(), |
513 |
|
SubId :: mod_pubsub:subId(), |
514 |
|
Opts :: mod_pubsub:subOptions()}]}. |
515 |
|
get_node_entity_subscriptions(Nidx, LJID) -> |
516 |
207 |
{ok, State} = get_state(Nidx, LJID, read), |
517 |
207 |
{ok, add_opts_to_subs(State#pubsub_state.subscriptions)}. |
518 |
|
|
519 |
|
-spec delete_subscription( |
520 |
|
Nidx :: mod_pubsub:nodeIdx(), |
521 |
|
LJID :: jid:ljid(), |
522 |
|
SubId :: mod_pubsub:subId()) -> |
523 |
|
ok. |
524 |
|
delete_subscription(Nidx, LJID, SubId) -> |
525 |
4 |
{ok, State} = get_state(Nidx, LJID, write), |
526 |
4 |
NewSubs = lists:keydelete(SubId, 2, State#pubsub_state.subscriptions), |
527 |
4 |
mnesia:delete({pubsub_subscription, SubId}), |
528 |
4 |
case {State#pubsub_state.affiliation, NewSubs} of |
529 |
4 |
{none, []} -> del_state_by_idx_and_ljid(Nidx, LJID); |
530 |
:-( |
_ -> mnesia:write(State#pubsub_state{subscriptions = NewSubs}) |
531 |
|
end. |
532 |
|
|
533 |
|
-spec delete_all_subscriptions( |
534 |
|
Nidx :: mod_pubsub:nodeIdx(), |
535 |
|
LJID :: jid:ljid()) -> |
536 |
|
ok. |
537 |
|
delete_all_subscriptions(Nidx, LJID) -> |
538 |
6 |
{ok, State} = get_state(Nidx, LJID, write), |
539 |
6 |
delete_all_subscriptions_by_state(State). |
540 |
|
|
541 |
|
-spec delete_user_subscriptions(jid:ljid()) -> ok. |
542 |
|
delete_user_subscriptions(LJID) -> |
543 |
125 |
{ok, States} = get_states_by_lus(LJID), |
544 |
125 |
lists:foreach(fun delete_all_subscriptions_by_state/1, States). |
545 |
|
|
546 |
|
-spec delete_all_subscriptions_by_state(mod_pubsub:pubsubState()) -> ok. |
547 |
|
delete_all_subscriptions_by_state(State) -> |
548 |
30 |
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, |
549 |
|
State#pubsub_state.subscriptions), |
550 |
30 |
case State#pubsub_state.affiliation of |
551 |
9 |
none -> del_state(State); |
552 |
21 |
_ -> mnesia:write(State#pubsub_state{subscriptions = []}) |
553 |
|
end. |
554 |
|
|
555 |
|
-spec update_subscription(Nidx :: mod_pubsub:nodeIdx(), |
556 |
|
LJID :: jid:ljid(), |
557 |
|
Subscription :: mod_pubsub:subscription(), |
558 |
|
SubId :: mod_pubsub:subId()) -> |
559 |
|
ok. |
560 |
|
update_subscription(Nidx, LJID, Subscription, SubId) -> |
561 |
3 |
{ok, State} = get_state(Nidx, LJID, write), |
562 |
3 |
NewSubs = lists:keyreplace(SubId, 2, State#pubsub_state.subscriptions, {Subscription, SubId}), |
563 |
3 |
mnesia:write(State#pubsub_state{ subscriptions = NewSubs }). |
564 |
|
|
565 |
|
%% ------------------------ Items ------------------------ |
566 |
|
|
567 |
|
-spec remove_items(Nidx :: mod_pubsub:nodeIdx(), |
568 |
|
LJID :: jid:ljid(), |
569 |
|
ItemIds :: [mod_pubsub:itemId()]) -> |
570 |
|
ok. |
571 |
|
remove_items(Nidx, LJID, ItemIds) -> |
572 |
119 |
{ok, State} = get_state(Nidx, jid:to_bare(LJID), write), |
573 |
119 |
NewItems = State#pubsub_state.items -- ItemIds, |
574 |
119 |
mnesia:write(State#pubsub_state{ items = NewItems }). |
575 |
|
|
576 |
|
-spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) -> |
577 |
|
ok. |
578 |
|
remove_all_items(Nidx) -> |
579 |
4 |
{ok, States} = get_states(Nidx), |
580 |
4 |
lists:foreach(fun(#pubsub_state{ items = [] }) -> |
581 |
:-( |
ok; |
582 |
|
(#pubsub_state{} = S) -> |
583 |
4 |
mnesia:write(S#pubsub_state{ items = [] }) |
584 |
|
end, States). |
585 |
|
|
586 |
|
-spec add_item(Nidx :: mod_pubsub:nodeIdx(), |
587 |
|
JID :: jid:ljid(), |
588 |
|
Item :: mod_pubsub:pubsubItem()) -> |
589 |
|
ok. |
590 |
|
add_item(Nidx, JID, #pubsub_item{itemid = {ItemId, _}} = Item) -> |
591 |
111 |
set_item(Item), |
592 |
111 |
{ok, State} = get_state(Nidx, jid:to_bare(JID), write), |
593 |
111 |
NewItems = [ItemId | State#pubsub_state.items], |
594 |
111 |
mnesia:write(State#pubsub_state{ items = NewItems }). |
595 |
|
|
596 |
|
%% ------------------------ Direct #pubsub_item access ------------------------ |
597 |
|
|
598 |
|
-spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) -> |
599 |
|
{ok, {[mod_pubsub:pubsubItem()], none}}. |
600 |
|
get_items(Nidx, Opts) -> |
601 |
51 |
Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}), |
602 |
51 |
Sorted = lists:reverse(lists:keysort(#pubsub_item.modification, Items)), |
603 |
51 |
ItemsLimitedByIds = filter_items_by_item_ids(Sorted, maps:get(item_ids, Opts, undefined)), |
604 |
51 |
ItemsLimitedByMaxItems = limit_items(ItemsLimitedByIds, maps:get(max_items, Opts, undefined)), |
605 |
51 |
{ok, {ItemsLimitedByMaxItems, none}}. |
606 |
|
|
607 |
|
filter_items_by_item_ids(Items, undefined) -> |
608 |
43 |
Items; |
609 |
|
filter_items_by_item_ids(Items, ItemIds) -> |
610 |
8 |
lists:filter(fun (#pubsub_item{itemid = {ItemId, _}}) -> |
611 |
10 |
lists:member(ItemId, ItemIds) |
612 |
|
end, Items). |
613 |
|
|
614 |
|
limit_items(Items, undefined) -> |
615 |
18 |
Items; |
616 |
|
limit_items(Items, MaxItems) -> |
617 |
33 |
lists:sublist(Items, MaxItems). |
618 |
|
|
619 |
|
-spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> |
620 |
|
{ok, mod_pubsub:pubsubItem()} | {error, item_not_found}. |
621 |
|
get_item(Nidx, ItemId) -> |
622 |
119 |
case mnesia:read({pubsub_item, {ItemId, Nidx}}) of |
623 |
8 |
[Item] when is_record(Item, pubsub_item) -> {ok, Item}; |
624 |
111 |
_ -> {error, item_not_found} |
625 |
|
end. |
626 |
|
|
627 |
|
-spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok. |
628 |
|
set_item(Item) -> |
629 |
111 |
mnesia:write(Item). |
630 |
|
|
631 |
|
-spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok. |
632 |
|
del_item(Nidx, ItemId) -> |
633 |
104 |
mnesia:delete({pubsub_item, {ItemId, Nidx}}). |
634 |
|
|
635 |
|
-spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok. |
636 |
|
del_items(Nidx, ItemIds) -> |
637 |
363 |
lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId) end, |
638 |
|
ItemIds). |
639 |
|
|
640 |
|
%%==================================================================== |
641 |
|
%% Internal functions |
642 |
|
%%==================================================================== |
643 |
|
|
644 |
|
-spec del_state_by_idx_and_ljid(Nidx :: mod_pubsub:nodeIdx(), |
645 |
|
LJID :: jid:ljid()) -> ok. |
646 |
|
del_state_by_idx_and_ljid(Nidx, LJID) -> |
647 |
264 |
{ok, State} = get_state(Nidx, LJID, write), |
648 |
262 |
del_state(State). |
649 |
|
|
650 |
|
-spec del_state(mod_pubsub:pubsubState()) -> ok. |
651 |
|
del_state(#pubsub_state{ stateid = {LJID, Nidx}, subscriptions = Subs }) -> |
652 |
271 |
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, Subs), |
653 |
271 |
mnesia:delete({pubsub_state, {LJID, Nidx}}). |
654 |
|
|
655 |
|
-spec get_state(Nidx :: mod_pubsub:nodeIdx(), |
656 |
|
LJID :: jid:ljid(), |
657 |
|
LockKind :: write | read) -> |
658 |
|
{ok, mod_pubsub:pubsubState()}. |
659 |
|
get_state(Nidx, LJID, LockKind) -> |
660 |
1805 |
StateId = {LJID, Nidx}, |
661 |
1805 |
case mnesia:read(pubsub_state, StateId, LockKind) of |
662 |
1043 |
[#pubsub_state{} = State] -> {ok, State}; |
663 |
752 |
_ -> {ok, #pubsub_state{stateid = StateId}} |
664 |
|
end. |
665 |
|
|
666 |
|
-spec states_to_subscriptions([mod_pubsub:pubsubState()]) -> |
667 |
|
[{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}]. |
668 |
|
states_to_subscriptions([]) -> |
669 |
545 |
[]; |
670 |
|
states_to_subscriptions([#pubsub_state{ subscriptions = [] } | RStates]) -> |
671 |
358 |
states_to_subscriptions(RStates); |
672 |
|
states_to_subscriptions([#pubsub_state{ stateid = {J, _}, subscriptions = Subs } | RStates]) -> |
673 |
162 |
add_jid_and_opts_to_subs(Subs, J, RStates). |
674 |
|
|
675 |
|
-spec add_jid_and_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}], |
676 |
|
LJID :: jid:ljid(), |
677 |
|
RStates :: [mod_pubsub:pubsubState()]) -> |
678 |
|
[{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}]. |
679 |
|
add_jid_and_opts_to_subs([], _J, RStates) -> |
680 |
162 |
states_to_subscriptions(RStates); |
681 |
|
add_jid_and_opts_to_subs([{S, SubId} | RSubs], J, RStates) -> |
682 |
162 |
Opts = read_sub_options(SubId), |
683 |
162 |
[ {J, S, SubId, Opts} | add_jid_and_opts_to_subs(RSubs, J, RStates) ]. |
684 |
|
|
685 |
|
-spec add_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]) -> |
686 |
|
[{mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}]. |
687 |
|
add_opts_to_subs([]) -> |
688 |
207 |
[]; |
689 |
|
add_opts_to_subs([{S, SubId} | RSubs]) -> |
690 |
34 |
Opts = read_sub_options(SubId), |
691 |
34 |
[ {S, SubId, Opts} | add_opts_to_subs(RSubs) ]. |
692 |
|
|
693 |
|
-spec read_sub_options(SubId :: mod_pubsub:subId()) -> mod_pubsub:subOptions(). |
694 |
|
read_sub_options(SubId) -> |
695 |
196 |
case mnesia:read({pubsub_subscription, SubId}) of |
696 |
:-( |
[] -> []; |
697 |
196 |
[#pubsub_subscription{ options = Opts0 }] -> Opts0 |
698 |
|
end. |
699 |
|
|
700 |
|
-spec get_idxs_with_pending_subs(NodeIdxs :: [mod_pubsub:nodeIdx()], |
701 |
|
PubsubState :: mod_pubsub:pubsubState(), |
702 |
|
Acc :: [mod_pubsub:nodeIdx()]) -> |
703 |
|
[mod_pubsub:nodeIdx()]. |
704 |
|
get_idxs_with_pending_subs(NodeIdxs, |
705 |
|
#pubsub_state{stateid = {_, Nidx}, subscriptions = Subs}, |
706 |
|
Acc) -> |
707 |
24 |
case lists:member(Nidx, NodeIdxs) |
708 |
6 |
andalso lists:any(fun is_pending_sub/1, Subs) of |
709 |
4 |
true -> [Nidx | Acc]; |
710 |
20 |
false -> Acc |
711 |
|
end. |
712 |
|
|
713 |
4 |
is_pending_sub({pending, _}) -> true; |
714 |
:-( |
is_pending_sub(_) -> false. |
715 |
|
|