./ct_report/coverage/mod_pubsub_db_mnesia.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_mnesia.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub Mnesia backend
5 %%% Created : 26 Oct 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_mnesia).
9 -author('piotr.nosek@erlang-solutions.com').
10
11 -behaviour(mod_pubsub_db).
12
13 -include("pubsub.hrl").
14 -include("jlib.hrl").
15 -include("mongoose_logger.hrl").
16 -include_lib("stdlib/include/qlc.hrl").
17
18 -export([start/0, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([del_node/1, get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 set_node/1,
29 find_node_by_id/1,
30 find_nodes_by_key/1,
31 find_node_by_name/2,
32 delete_node/1,
33 get_subnodes/2,
34 get_parentnodes_tree/2,
35 get_subnodes_tree/2
36 ]).
37 % Affiliations
38 -export([
39 set_affiliation/3,
40 get_affiliation/2
41 ]).
42 % Subscriptions
43 -export([
44 add_subscription/5,
45 set_subscription_opts/4,
46 get_node_subscriptions/1,
47 get_node_entity_subscriptions/2,
48 delete_subscription/3,
49 delete_all_subscriptions/2,
50 update_subscription/4
51 ]).
52 % Item ids in state
53 -export([
54 add_item/3,
55 remove_items/3,
56 remove_all_items/1
57 ]).
58
59 % Whole Items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 % GDPR
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Internal records definitions
79 %%====================================================================
80
81 -record(pubsub_subscription, {
82 subid :: mod_pubsub:subId(),
83 options = [] :: mod_pubsub:subOptions()
84 }).
85
86 %%====================================================================
87 %% Behaviour callbacks
88 %%====================================================================
89
90 %% ------------------------ Backend start/stop ------------------------
91
92 -spec start() -> ok.
93 start() ->
94 32 mnesia:create_table(pubsub_state,
95 [{disc_copies, [node()]},
96 {type, ordered_set},
97 {attributes, record_info(fields, pubsub_state)}]),
98 32 mnesia:add_table_copy(pubsub_state, node(), disc_copies),
99 32 mnesia:create_table(pubsub_item,
100 [{disc_only_copies, [node()]},
101 {attributes, record_info(fields, pubsub_item)}]),
102 32 mnesia:add_table_copy(pubsub_item, node(), disc_only_copies),
103 32 mnesia:create_table(pubsub_node,
104 [{disc_copies, [node()]},
105 {attributes, record_info(fields, pubsub_node)}]),
106 32 mnesia:add_table_index(pubsub_node, id),
107 32 mnesia:create_table(pubsub_subscription,
108 [{disc_copies, [node()]},
109 {attributes, record_info(fields, pubsub_subscription)},
110 {type, set}]),
111 32 mnesia:add_table_copy(pubsub_subscription, node(), disc_copies),
112 32 CreateSubnodeTableResult = mnesia:create_table(pubsub_subnode,
113 [{disc_copies, [node()]},
114 {attributes, record_info(fields, pubsub_subnode)},
115 {type, bag}]),
116 32 mnesia:add_table_copy(pubsub_subnode, node(), disc_copies),
117 32 maybe_fill_subnode_table(CreateSubnodeTableResult),
118 32 pubsub_index:init(),
119 32 ok.
120
121 -spec stop() -> ok.
122 stop() ->
123 32 ok.
124
125 %% If pubsub_subnode table was missing, than fill it with data
126 maybe_fill_subnode_table({atomic, ok}) ->
127 1 F = fun(#pubsub_node{} = Node, Acc) ->
128
:-(
set_subnodes(Node, []),
129
:-(
Acc
130 end,
131 1 mnesia:transaction(fun() -> mnesia:foldl(F, ok, pubsub_node) end);
132 maybe_fill_subnode_table(_Other) ->
133 31 ok.
134
135 %% ------------------------ Fun execution ------------------------
136
137 -spec transaction(Fun :: fun(() -> {result | error, any()}),
138 ErrorDebug :: map()) ->
139 {result | error, any()}.
140 transaction(Fun, ErrorDebug) ->
141 356 transaction(Fun, ErrorDebug, 3).
142
143 transaction(Fun, ErrorDebug, Retries) ->
144 371 case mnesia:transaction(mod_pubsub_db:extra_debug_fun(Fun)) of
145 {atomic, Result} ->
146 356 Result;
147 {aborted, ReasonData} when Retries > 0 ->
148 15 ?LOG_WARNING(#{what => pubsub_transaction_retry, retries => Retries,
149
:-(
reason => ReasonData, debug => ErrorDebug}),
150 15 timer:sleep(100),
151 15 transaction(Fun, ErrorDebug, Retries - 1);
152 {aborted, ReasonData} ->
153
:-(
?LOG_WARNING(#{what => pubsub_transaction_failed, reason => ReasonData,
154
:-(
debug => ErrorDebug}),
155
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
156 end.
157
158 -spec dirty(Fun :: fun(() -> {result | error, any()}),
159 ErrorDebug :: map()) ->
160 {result | error, any()}.
161 dirty(Fun, ErrorDebug) ->
162 1155 try mnesia:sync_dirty(mod_pubsub_db:extra_debug_fun(Fun), []) of
163 Result ->
164 1155 Result
165 catch
166 _C:ReasonData ->
167
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
168 end.
169
170 %% ------------------------ GDPR-related ------------------------
171
172 -spec get_user_payloads(LUser :: jid:luser(), LServer :: jid:lserver()) ->
173 [NodeNameItemIDAndPayload :: [binary()]].
174 get_user_payloads(LUser, LServer) ->
175 30 {atomic, Recs} = mnesia:transaction(fun() -> get_user_payloads_t(LUser, LServer) end),
176 30 Recs.
177
178 get_user_payloads_t(LUser, LServer) ->
179 30 BareUserMatchSpec = {'_', {LUser, LServer, '_'}},
180 30 Items = mnesia:match_object(#pubsub_item{creation = BareUserMatchSpec, _ = '_'}),
181 30 [[node_name(Nidx), ItemId, << <<(exml:to_binary(P))/binary>> || P <- Payload >>] ||
182 30 #pubsub_item{itemid = {ItemId, Nidx}, payload = Payload} <- Items].
183
184 -spec get_user_nodes(LUser :: jid:luser(), LServer :: jid:lserver()) ->
185 [NodeNameAndType :: [binary()]].
186 get_user_nodes(LUser, LServer) ->
187 30 LJID = {LUser, LServer, <<>>},
188 30 {atomic, Recs} = mnesia:transaction(fun() ->
189 30 Nodes = mnesia:match_object(#pubsub_node{owners = [LJID], _ = '_'}),
190 30 [[NodeName, Type] || #pubsub_node{nodeid = {_, NodeName}, type = Type} <- Nodes]
191 end),
192 30 Recs.
193
194 -spec get_user_subscriptions(LUser :: jid:luser(), LServer :: jid:lserver()) ->
195 [NodeName :: [binary()]].
196 get_user_subscriptions(LUser, LServer) ->
197 30 {atomic, Recs} = mnesia:transaction(fun() -> get_user_subscriptions_t(LUser, LServer) end),
198 30 Recs.
199
200 get_user_subscriptions_t(LUser, LServer) ->
201 30 UserMatchSpec = {LUser, LServer, '_'},
202 30 SubscriptionStates
203 = mnesia:match_object(#pubsub_state{stateid = {UserMatchSpec, '_'},
204 subscriptions = [{subscribed, '_'}], _ = '_'}),
205 30 [ [node_name(Nidx)] || #pubsub_state{stateid = {_, Nidx}} <- SubscriptionStates].
206
207 node_name(Nidx) ->
208 22 case find_node_by_id(Nidx) of
209 22 {ok, #pubsub_node{ nodeid = {_, NodeName} }} -> NodeName;
210
:-(
_ -> <<>>
211 end.
212
213 -spec find_nodes_by_affiliated_user(JID :: jid:ljid()) ->
214 [{mod_pubsub:pubsubNode(), mod_pubsub:affiliation()}].
215 find_nodes_by_affiliated_user(LJID) ->
216 134 {ok, States} = get_states_by_lus(LJID),
217 134 lists:map(fun(#pubsub_state{ stateid = {_, Nidx}, affiliation = Aff }) ->
218 21 {ok, Node} = find_node_by_id(Nidx),
219 21 {Node, Aff}
220 end, States).
221
222 %% ------------------------ Direct #pubsub_state access ------------------------
223
224 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
225 LJID :: jid:ljid()) ->
226 {ok, mod_pubsub:pubsubState()}.
227 get_state(Nidx, LJID) ->
228 250 get_state(Nidx, LJID, read).
229
230 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
231 {ok, [mod_pubsub:pubsubState()]}.
232 get_states(Nidx) ->
233 722 States = mnesia:match_object(#pubsub_state{stateid = {'_', Nidx}, _ = '_'}),
234 715 {ok, States}.
235
236 -spec get_states_by_lus(LJID :: jid:ljid()) ->
237 {ok, [mod_pubsub:pubsubState()]}.
238 get_states_by_lus({ LUser, LServer, _ }) ->
239 290 {ok, mnesia:match_object(#pubsub_state{stateid = {{LUser, LServer, '_'}, '_'}, _ = '_'})}.
240
241 -spec get_states_by_bare(LJID :: jid:ljid()) ->
242 {ok, [mod_pubsub:pubsubState()]}.
243 get_states_by_bare(LJID) ->
244 379 LBare = jid:to_bare(LJID),
245 379 {ok, mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
246
247 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
248 {ok, [mod_pubsub:pubsubState()]}.
249 get_states_by_bare_and_full(LJID) ->
250 28 LBare = jid:to_bare(LJID),
251 28 {ok, mnesia:match_object(#pubsub_state{stateid = {LJID, '_'}, _ = '_'})
252 ++ mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
253
254 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
255 {ok, [mod_pubsub:nodeIdx()]}.
256 get_idxs_of_own_nodes_with_pending_subs(LJID) ->
257 2 LBare = jid:to_bare(LJID),
258 2 MyStates = mnesia:match_object(#pubsub_state{stateid = {LBare, '_'},
259 affiliation = owner, _ = '_'}),
260 2 NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- MyStates],
261 2 ResultNidxs = mnesia:foldl(pa:bind(fun get_idxs_with_pending_subs/3, NodeIdxs),
262 [], pubsub_state),
263 2 {ok, ResultNidxs}.
264
265 %% ------------------------ Node management ------------------------
266
267 -spec create_node(Nidx :: mod_pubsub:nodeIdx(),
268 Owner :: jid:ljid()) ->
269 ok.
270 create_node(Nidx, Owner) ->
271 211 set_affiliation(Nidx, Owner, owner).
272
273 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
274 {ok, [mod_pubsub:pubsubState()]}.
275 del_node(Nidx) ->
276 158 {ok, States} = get_states(Nidx),
277 158 lists:foreach(fun (#pubsub_state{stateid = {LJID, _}, items = Items}) ->
278 248 del_items(Nidx, Items),
279 248 del_state_by_idx_and_ljid(Nidx, LJID)
280 end, States),
281 156 {ok, States}.
282
283 -spec set_node(mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
284 set_node(#pubsub_node{id = undefined} = Node) ->
285 217 CreateNode = Node#pubsub_node{id = pubsub_index:new(node)},
286 217 OldParents = [],
287 217 set_node(CreateNode, OldParents);
288 set_node(Node) ->
289 11 OldParents = get_parentnodes_names(Node),
290 11 set_node(Node, OldParents).
291
292 set_node(#pubsub_node{id = Nidx} = Node, OldParents) ->
293 228 mnesia:write(Node),
294 228 set_subnodes(Node, OldParents),
295 228 {ok, Nidx}.
296
297 get_parentnodes_names(#pubsub_node{nodeid = NodeId}) ->
298 11 case mnesia:read(pubsub_node, NodeId, write) of
299 [] ->
300
:-(
[];
301 [#pubsub_node{parents = Parents}] ->
302 11 Parents
303 end.
304
305 set_subnodes(#pubsub_node{parents = Parents}, Parents) ->
306 201 ok;
307 set_subnodes(#pubsub_node{nodeid = NodeId, parents = NewParents}, OldParents) ->
308 27 set_subnodes(NodeId, NewParents, OldParents).
309
310 set_subnodes({Key, Node}, NewParents, OldParents) ->
311 185 OldParentsSet = sets:from_list(OldParents),
312 185 NewParentsSet = sets:from_list(NewParents),
313 185 Deleted = sets:to_list(sets:subtract(OldParentsSet, NewParentsSet)),
314 185 Added = sets:to_list(sets:subtract(NewParentsSet, OldParentsSet)),
315 185 DeletedObjects = names_to_subnode_records(Key, Node, Deleted),
316 185 AddedObjects = names_to_subnode_records(Key, Node, Added),
317 185 lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, DeletedObjects),
318 185 lists:foreach(fun(Object) -> mnesia:write(Object) end, AddedObjects),
319 185 ok.
320
321 names_to_subnode_records(Key, Node, Parents) ->
322 370 [#pubsub_subnode{nodeid = {Key, Parent}, subnode = Node} || Parent <- Parents].
323
324 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
325 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
326 find_node_by_id(Nidx) ->
327 214 case mnesia:index_read(pubsub_node, Nidx, #pubsub_node.id) of
328 214 [#pubsub_node{} = Record] -> {ok, Record};
329 [] ->
330
:-(
{error, not_found}
331 end.
332
333 -spec find_node_by_name(
334 Key :: mod_pubsub:hostPubsub() | jid:ljid(),
335 Node :: mod_pubsub:nodeId()) ->
336 mod_pubsub:pubsubNode() | false.
337 find_node_by_name(Key, Node) ->
338 1607 case mnesia:read(pubsub_node, oid(Key, Node), read) of
339 237 [] -> false;
340 1370 [NodeRec] -> NodeRec
341 end.
342
343 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
344 [mod_pubsub:pubsubNode()].
345 find_nodes_by_key(Key) ->
346 20 mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, _ = '_'}).
347
348 1607 oid(Key, Name) -> {Key, Name}.
349
350
351 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
352 delete_node(#pubsub_node{nodeid = NodeId, parents = Parents}) ->
353 158 set_subnodes(NodeId, [], Parents),
354 158 mnesia:delete({pubsub_node, NodeId}).
355
356
357 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
358 [mod_pubsub:pubsubNode()].
359 get_subnodes(Key, <<>>) ->
360 8 get_nodes_without_parents(Key);
361 get_subnodes(Key, Node) ->
362 90 Subnodes = get_subnodes_names(Key, Node),
363 90 find_nodes_by_names(Key, Subnodes).
364
365 %% Like get_subnodes, but returns node names instead of node records.
366 get_subnodes_names(Key, Node) ->
367 151 MnesiaRecords = mnesia:read({pubsub_subnode, {Key, Node}}),
368 151 [Subnode || #pubsub_subnode{subnode = Subnode} <- MnesiaRecords].
369
370 %% Warning: this function is full table scan and can return a lot of records
371 get_nodes_without_parents(Key) ->
372 8 mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, parents = [], _ = '_'}).
373
374 %% Return a list of {Depth, Nodes} where Nodes are parent nodes of nodes of the lower level.
375 %% So, we start with {0, [NodeRecord]}, where NodeRecord is node that corresponds to Node argument.
376 %% The next level is {1, Nodes1}, where Nodes1 are parent nodes of Node.
377 %% The next level can be {2, Nodes2}, where Nodes2 are parents of Nodes1 (without duplicates).
378 %%
379 %% Each node can be returned only ones by the function.
380 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
381 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
382 get_parentnodes_tree(Key, Node) ->
383 289 case find_node_by_name(Key, Node) of
384 false ->
385
:-(
[ {0, []} ]; %% node not found case
386
387 #pubsub_node{parents = []} = Record ->
388 238 [ {0, [Record]} ];
389
390 #pubsub_node{parents = Parents} = Record ->
391 51 Depth = 1,
392 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
393 %% which we don't expand again
394 51 KnownNodesSet = sets:from_list([Node]),
395 51 extract_parents(Key, Node, Parents, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
396 end.
397
398 %% Each call extract Parents on the level and recurse to the next level.
399 %% KnownNodesSet are nodes to be filtered out.
400 extract_parents(Key, InitialNode, Parents, Depth, KnownNodesSet) ->
401 54 ParentRecords = find_nodes_by_names(Key, Parents),
402 54 KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Parents)),
403 %% Names of parents of parents
404 54 PPNames = lists:usort(lists:flatmap(fun(#pubsub_node{parents = PP}) -> PP end, ParentRecords)),
405 54 CyclicNames = [Name || Name <- PPNames, sets:is_element(Name, KnownNodesSet1)],
406 54 case CyclicNames of
407 54 [] -> [];
408
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
409
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
410 end,
411 %% PPNames is ordset, so we don't need to worry about having duplicates in it.
412 %% CyclicNames is usually an empty list.
413 54 PPNamesToGet = PPNames -- CyclicNames,
414 case PPNamesToGet of
415 51 [] -> [];
416 3 _ -> extract_parents(Key, InitialNode, PPNamesToGet, Depth + 1, KnownNodesSet1)
417 54 end ++ [ {Depth, ParentRecords} ].
418
419 find_nodes_by_names(Key, Nodes) ->
420 %% Contains false for missing nodes
421 204 MaybeRecords = [find_node_by_name(Key, Node) || Node <- Nodes],
422 %% Filter out false-s
423 204 [Record || Record = #pubsub_node{} <- MaybeRecords].
424
425
426 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
427 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
428 get_subnodes_tree(Key, Node) ->
429 60 case find_node_by_name(Key, Node) of
430 false ->
431
:-(
[ {1, []}, {0, [false]} ]; %% node not found case
432
433 #pubsub_node{} = Record ->
434 60 Subnodes = get_subnodes_names(Key, Node),
435 60 Depth = 1,
436 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
437 %% which we don't expand again
438 60 KnownNodesSet = sets:from_list([Node]),
439 60 extract_subnodes(Key, Node, Subnodes, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
440 end.
441
442 %% Each call extract Subnodes on the level and recurse to the next level.
443 %% KnownNodesSet are nodes to be filtered out.
444 extract_subnodes(Key, InitialNode, Subnodes, Depth, KnownNodesSet) ->
445 60 SubnodesRecords = find_nodes_by_names(Key, Subnodes),
446 60 KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Subnodes)),
447 %% Names of subnodes of subnodes
448 60 SSNames = lists:usort(lists:flatmap(fun(Subnode) -> get_subnodes_names(Key, Subnode) end, Subnodes)),
449 60 CyclicNames = [Name || Name <- SSNames, sets:is_element(Name, KnownNodesSet1)],
450 60 case CyclicNames of
451 60 [] -> [];
452
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
453
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
454 end,
455 60 SSNamesToGet = SSNames -- CyclicNames,
456 case SSNamesToGet of
457 60 [] -> [ {Depth + 1, []} ];
458
:-(
_ -> extract_subnodes(Key, InitialNode, SSNamesToGet, Depth + 1, KnownNodesSet1)
459 60 end ++ [ {Depth, SubnodesRecords} ].
460
461 %% ------------------------ Affiliations ------------------------
462
463 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
464 LJID :: jid:ljid(),
465 Affiliation :: mod_pubsub:affiliation()) -> ok.
466 set_affiliation(Nidx, LJID, Affiliation) ->
467 265 BareLJID = jid:to_bare(LJID),
468 265 {ok, State} = get_state(Nidx, BareLJID, write),
469 259 case {Affiliation, State#pubsub_state.subscriptions} of
470 12 {none, []} -> del_state_by_idx_and_ljid(Nidx, BareLJID);
471 247 _ -> mnesia:write(State#pubsub_state{ affiliation = Affiliation })
472 end.
473
474 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
475 LJID :: jid:ljid()) ->
476 {ok, mod_pubsub:affiliation()}.
477 get_affiliation(Nidx, LJID) ->
478 477 {ok, State} = get_state(Nidx, jid:to_bare(LJID), read),
479 477 {ok, State#pubsub_state.affiliation}.
480
481 %% ------------------------ Subscriptions ------------------------
482
483 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
484 LJID :: jid:ljid(),
485 Sub :: mod_pubsub:subscription(),
486 SubId :: mod_pubsub:subId(),
487 SubOpts :: mod_pubsub:subOptions()) -> ok.
488 add_subscription(Nidx, LJID, Sub, SubId, SubOpts) ->
489 93 {ok, State} = get_state(Nidx, LJID, write),
490 93 NSubscriptions = [{Sub, SubId} | State#pubsub_state.subscriptions ],
491 93 mnesia:write(State#pubsub_state{ subscriptions = NSubscriptions }),
492 93 set_subscription_opts(Nidx, LJID, SubId, SubOpts).
493
494 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
495 JID :: jid:ljid(),
496 SubId :: mod_pubsub:subId(),
497 Opts :: mod_pubsub:subOptions()) -> ok.
498 set_subscription_opts(_Nidx, _JID, SubId, Opts) ->
499 97 mnesia:write(#pubsub_subscription{ subid = SubId, options = Opts }).
500
501 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
502 {ok, [{Entity :: jid:ljid(),
503 Sub :: mod_pubsub:subscription(),
504 SubId :: mod_pubsub:subId(),
505 Opts :: mod_pubsub:subOptions()}]}.
506 get_node_subscriptions(Nidx) ->
507 550 {ok, States} = get_states(Nidx),
508 543 {ok, states_to_subscriptions(States)}.
509
510 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
511 LJID :: jid:ljid()) ->
512 {ok, [{Sub :: mod_pubsub:subscription(),
513 SubId :: mod_pubsub:subId(),
514 Opts :: mod_pubsub:subOptions()}]}.
515 get_node_entity_subscriptions(Nidx, LJID) ->
516 207 {ok, State} = get_state(Nidx, LJID, read),
517 207 {ok, add_opts_to_subs(State#pubsub_state.subscriptions)}.
518
519 -spec delete_subscription(
520 Nidx :: mod_pubsub:nodeIdx(),
521 LJID :: jid:ljid(),
522 SubId :: mod_pubsub:subId()) ->
523 ok.
524 delete_subscription(Nidx, LJID, SubId) ->
525 4 {ok, State} = get_state(Nidx, LJID, write),
526 4 NewSubs = lists:keydelete(SubId, 2, State#pubsub_state.subscriptions),
527 4 mnesia:delete({pubsub_subscription, SubId}),
528 4 case {State#pubsub_state.affiliation, NewSubs} of
529 4 {none, []} -> del_state_by_idx_and_ljid(Nidx, LJID);
530
:-(
_ -> mnesia:write(State#pubsub_state{subscriptions = NewSubs})
531 end.
532
533 -spec delete_all_subscriptions(
534 Nidx :: mod_pubsub:nodeIdx(),
535 LJID :: jid:ljid()) ->
536 ok.
537 delete_all_subscriptions(Nidx, LJID) ->
538 6 {ok, State} = get_state(Nidx, LJID, write),
539 6 delete_all_subscriptions_by_state(State).
540
541 -spec delete_user_subscriptions(jid:ljid()) -> ok.
542 delete_user_subscriptions(LJID) ->
543 134 {ok, States} = get_states_by_lus(LJID),
544 134 lists:foreach(fun delete_all_subscriptions_by_state/1, States).
545
546 -spec delete_all_subscriptions_by_state(mod_pubsub:pubsubState()) -> ok.
547 delete_all_subscriptions_by_state(State) ->
548 30 lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end,
549 State#pubsub_state.subscriptions),
550 30 case State#pubsub_state.affiliation of
551 9 none -> del_state(State);
552 21 _ -> mnesia:write(State#pubsub_state{subscriptions = []})
553 end.
554
555 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
556 LJID :: jid:ljid(),
557 Subscription :: mod_pubsub:subscription(),
558 SubId :: mod_pubsub:subId()) ->
559 ok.
560 update_subscription(Nidx, LJID, Subscription, SubId) ->
561 3 {ok, State} = get_state(Nidx, LJID, write),
562 3 NewSubs = lists:keyreplace(SubId, 2, State#pubsub_state.subscriptions, {Subscription, SubId}),
563 3 mnesia:write(State#pubsub_state{ subscriptions = NewSubs }).
564
565 %% ------------------------ Items ------------------------
566
567 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
568 LJID :: jid:ljid(),
569 ItemIds :: [mod_pubsub:itemId()]) ->
570 ok.
571 remove_items(Nidx, LJID, ItemIds) ->
572 119 {ok, State} = get_state(Nidx, jid:to_bare(LJID), write),
573 119 NewItems = State#pubsub_state.items -- ItemIds,
574 119 mnesia:write(State#pubsub_state{ items = NewItems }).
575
576 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
577 ok.
578 remove_all_items(Nidx) ->
579 4 {ok, States} = get_states(Nidx),
580 4 lists:foreach(fun(#pubsub_state{ items = [] }) ->
581
:-(
ok;
582 (#pubsub_state{} = S) ->
583 4 mnesia:write(S#pubsub_state{ items = [] })
584 end, States).
585
586 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
587 JID :: jid:ljid(),
588 Item :: mod_pubsub:pubsubItem()) ->
589 ok.
590 add_item(Nidx, JID, #pubsub_item{itemid = {ItemId, _}} = Item) ->
591 111 set_item(Item),
592 111 {ok, State} = get_state(Nidx, jid:to_bare(JID), write),
593 111 NewItems = [ItemId | State#pubsub_state.items],
594 111 mnesia:write(State#pubsub_state{ items = NewItems }).
595
596 %% ------------------------ Direct #pubsub_item access ------------------------
597
598 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
599 {ok, {[mod_pubsub:pubsubItem()], none}}.
600 get_items(Nidx, Opts) ->
601 51 Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}),
602 51 Sorted = lists:reverse(lists:keysort(#pubsub_item.modification, Items)),
603 51 ItemsLimitedByIds = filter_items_by_item_ids(Sorted, maps:get(item_ids, Opts, undefined)),
604 51 ItemsLimitedByMaxItems = limit_items(ItemsLimitedByIds, maps:get(max_items, Opts, undefined)),
605 51 {ok, {ItemsLimitedByMaxItems, none}}.
606
607 filter_items_by_item_ids(Items, undefined) ->
608 43 Items;
609 filter_items_by_item_ids(Items, ItemIds) ->
610 8 lists:filter(fun (#pubsub_item{itemid = {ItemId, _}}) ->
611 10 lists:member(ItemId, ItemIds)
612 end, Items).
613
614 limit_items(Items, undefined) ->
615 18 Items;
616 limit_items(Items, MaxItems) ->
617 33 lists:sublist(Items, MaxItems).
618
619 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
620 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
621 get_item(Nidx, ItemId) ->
622 119 case mnesia:read({pubsub_item, {ItemId, Nidx}}) of
623 8 [Item] when is_record(Item, pubsub_item) -> {ok, Item};
624 111 _ -> {error, item_not_found}
625 end.
626
627 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
628 set_item(Item) ->
629 111 mnesia:write(Item).
630
631 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
632 del_item(Nidx, ItemId) ->
633 104 mnesia:delete({pubsub_item, {ItemId, Nidx}}).
634
635 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
636 del_items(Nidx, ItemIds) ->
637 363 lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId) end,
638 ItemIds).
639
640 %%====================================================================
641 %% Internal functions
642 %%====================================================================
643
644 -spec del_state_by_idx_and_ljid(Nidx :: mod_pubsub:nodeIdx(),
645 LJID :: jid:ljid()) -> ok.
646 del_state_by_idx_and_ljid(Nidx, LJID) ->
647 264 {ok, State} = get_state(Nidx, LJID, write),
648 262 del_state(State).
649
650 -spec del_state(mod_pubsub:pubsubState()) -> ok.
651 del_state(#pubsub_state{ stateid = {LJID, Nidx}, subscriptions = Subs }) ->
652 271 lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, Subs),
653 271 mnesia:delete({pubsub_state, {LJID, Nidx}}).
654
655 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
656 LJID :: jid:ljid(),
657 LockKind :: write | read) ->
658 {ok, mod_pubsub:pubsubState()}.
659 get_state(Nidx, LJID, LockKind) ->
660 1799 StateId = {LJID, Nidx},
661 1799 case mnesia:read(pubsub_state, StateId, LockKind) of
662 1039 [#pubsub_state{} = State] -> {ok, State};
663 752 _ -> {ok, #pubsub_state{stateid = StateId}}
664 end.
665
666 -spec states_to_subscriptions([mod_pubsub:pubsubState()]) ->
667 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
668 states_to_subscriptions([]) ->
669 543 [];
670 states_to_subscriptions([#pubsub_state{ subscriptions = [] } | RStates]) ->
671 358 states_to_subscriptions(RStates);
672 states_to_subscriptions([#pubsub_state{ stateid = {J, _}, subscriptions = Subs } | RStates]) ->
673 162 add_jid_and_opts_to_subs(Subs, J, RStates).
674
675 -spec add_jid_and_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}],
676 LJID :: jid:ljid(),
677 RStates :: [mod_pubsub:pubsubState()]) ->
678 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
679 add_jid_and_opts_to_subs([], _J, RStates) ->
680 162 states_to_subscriptions(RStates);
681 add_jid_and_opts_to_subs([{S, SubId} | RSubs], J, RStates) ->
682 162 Opts = read_sub_options(SubId),
683 162 [ {J, S, SubId, Opts} | add_jid_and_opts_to_subs(RSubs, J, RStates) ].
684
685 -spec add_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]) ->
686 [{mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
687 add_opts_to_subs([]) ->
688 207 [];
689 add_opts_to_subs([{S, SubId} | RSubs]) ->
690 34 Opts = read_sub_options(SubId),
691 34 [ {S, SubId, Opts} | add_opts_to_subs(RSubs) ].
692
693 -spec read_sub_options(SubId :: mod_pubsub:subId()) -> mod_pubsub:subOptions().
694 read_sub_options(SubId) ->
695 196 case mnesia:read({pubsub_subscription, SubId}) of
696
:-(
[] -> [];
697 196 [#pubsub_subscription{ options = Opts0 }] -> Opts0
698 end.
699
700 -spec get_idxs_with_pending_subs(NodeIdxs :: [mod_pubsub:nodeIdx()],
701 PubsubState :: mod_pubsub:pubsubState(),
702 Acc :: [mod_pubsub:nodeIdx()]) ->
703 [mod_pubsub:nodeIdx()].
704 get_idxs_with_pending_subs(NodeIdxs,
705 #pubsub_state{stateid = {_, Nidx}, subscriptions = Subs},
706 Acc) ->
707 21 case lists:member(Nidx, NodeIdxs)
708 6 andalso lists:any(fun is_pending_sub/1, Subs) of
709 4 true -> [Nidx | Acc];
710 17 false -> Acc
711 end.
712
713 4 is_pending_sub({pending, _}) -> true;
714
:-(
is_pending_sub(_) -> false.
715
Line Hits Source