./ct_report/coverage/mod_pubsub_db_mnesia.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_mnesia.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub Mnesia backend
5 %%% Created : 26 Oct 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_mnesia).
9 -author('piotr.nosek@erlang-solutions.com').
10
11 -behaviour(mod_pubsub_db).
12
13 -include("pubsub.hrl").
14 -include("jlib.hrl").
15 -include("mongoose_logger.hrl").
16 -include_lib("stdlib/include/qlc.hrl").
17
18 -export([init/2, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([del_node/1, get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 set_node/1,
29 find_node_by_id/1,
30 find_nodes_by_key/1,
31 find_node_by_name/2,
32 delete_node/1,
33 get_subnodes/2,
34 get_parentnodes_tree/2,
35 get_subnodes_tree/2
36 ]).
37 % Affiliations
38 -export([
39 set_affiliation/3,
40 get_affiliation/2
41 ]).
42 % Subscriptions
43 -export([
44 add_subscription/5,
45 set_subscription_opts/4,
46 get_node_subscriptions/1,
47 get_node_entity_subscriptions/2,
48 delete_subscription/3,
49 delete_all_subscriptions/2,
50 update_subscription/4
51 ]).
52 % Item ids in state
53 -export([
54 add_item/3,
55 remove_items/3,
56 remove_all_items/1
57 ]).
58
59 % Whole Items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 % GDPR
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Internal records definitions
79 %%====================================================================
80
81 -record(pubsub_subscription, {
82 subid :: mod_pubsub:subId(),
83 options = [] :: mod_pubsub:subOptions()
84 }).
85
86 %%====================================================================
87 %% Behaviour callbacks
88 %%====================================================================
89
90 %% ------------------------ Backend start/stop ------------------------
91
92 init(_HostType, _Opts) ->
93 35 mongoose_mnesia:create_table(pubsub_state,
94 [{disc_copies, [node()]}, {type, ordered_set},
95 {attributes, record_info(fields, pubsub_state)}]),
96 35 mongoose_mnesia:create_table(pubsub_item,
97 [{disc_only_copies, [node()]},
98 {attributes, record_info(fields, pubsub_item)}]),
99 35 mongoose_mnesia:create_table(pubsub_node,
100 [{disc_copies, [node()]},
101 {attributes, record_info(fields, pubsub_node)}]),
102 35 mnesia:add_table_index(pubsub_node, id),
103 35 mongoose_mnesia:create_table(pubsub_subscription,
104 [{disc_copies, [node()]}, {type, set},
105 {attributes, record_info(fields, pubsub_subscription)}]),
106 35 CreateSubnodeTableResult = mongoose_mnesia:create_table(pubsub_subnode,
107 [{disc_copies, [node()]}, {type, bag},
108 {attributes, record_info(fields, pubsub_subnode)}]),
109 35 maybe_fill_subnode_table(CreateSubnodeTableResult),
110 35 pubsub_index:init(),
111 35 ok.
112
113 -spec stop() -> ok.
114 stop() ->
115 35 ok.
116
117 %% If pubsub_subnode table was missing, than fill it with data
118 maybe_fill_subnode_table({atomic, ok}) ->
119 2 F = fun(#pubsub_node{} = Node, Acc) ->
120
:-(
set_subnodes(Node, []),
121
:-(
Acc
122 end,
123 2 mnesia:transaction(fun() -> mnesia:foldl(F, ok, pubsub_node) end);
124 maybe_fill_subnode_table(_Other) ->
125 33 ok.
126
127 %% ------------------------ Fun execution ------------------------
128
129 -spec transaction(Fun :: fun(() -> {result | error, any()}),
130 ErrorDebug :: map()) ->
131 {result | error, any()}.
132 transaction(Fun, ErrorDebug) ->
133 379 transaction(Fun, ErrorDebug, 3).
134
135 transaction(Fun, ErrorDebug, Retries) ->
136 410 case mnesia:transaction(mod_pubsub_db:extra_debug_fun(Fun)) of
137 {atomic, Result} ->
138 379 Result;
139 {aborted, ReasonData} when Retries > 0 ->
140 31 ?LOG_WARNING(#{what => pubsub_transaction_retry, retries => Retries,
141
:-(
reason => ReasonData, debug => ErrorDebug}),
142 31 timer:sleep(100),
143 31 transaction(Fun, ErrorDebug, Retries - 1);
144 {aborted, ReasonData} ->
145
:-(
?LOG_WARNING(#{what => pubsub_transaction_failed, reason => ReasonData,
146
:-(
debug => ErrorDebug}),
147
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
148 end.
149
150 -spec dirty(Fun :: fun(() -> {result | error, any()}),
151 ErrorDebug :: map()) ->
152 {result | error, any()}.
153 dirty(Fun, ErrorDebug) ->
154 1679 try mnesia:sync_dirty(mod_pubsub_db:extra_debug_fun(Fun), []) of
155 Result ->
156 1679 Result
157 catch
158 _C:ReasonData ->
159
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
160 end.
161
162 %% ------------------------ GDPR-related ------------------------
163
164 -spec get_user_payloads(LUser :: jid:luser(), LServer :: jid:lserver()) ->
165 [NodeNameItemIDAndPayload :: [binary()]].
166 get_user_payloads(LUser, LServer) ->
167 30 {atomic, Recs} = mnesia:transaction(fun() -> get_user_payloads_t(LUser, LServer) end),
168 30 Recs.
169
170 get_user_payloads_t(LUser, LServer) ->
171 30 BareUserMatchSpec = {'_', {LUser, LServer, '_'}},
172 30 Items = mnesia:match_object(#pubsub_item{creation = BareUserMatchSpec, _ = '_'}),
173 30 [[node_name(Nidx), ItemId, << <<(exml:to_binary(P))/binary>> || P <- Payload >>] ||
174 30 #pubsub_item{itemid = {ItemId, Nidx}, payload = Payload} <- Items].
175
176 -spec get_user_nodes(LUser :: jid:luser(), LServer :: jid:lserver()) ->
177 [NodeNameAndType :: [binary()]].
178 get_user_nodes(LUser, LServer) ->
179 30 LJID = {LUser, LServer, <<>>},
180 30 {atomic, Recs} = mnesia:transaction(fun() ->
181 30 Nodes = mnesia:match_object(#pubsub_node{owners = [LJID], _ = '_'}),
182 30 [[NodeName, Type] || #pubsub_node{nodeid = {_, NodeName}, type = Type} <- Nodes]
183 end),
184 30 Recs.
185
186 -spec get_user_subscriptions(LUser :: jid:luser(), LServer :: jid:lserver()) ->
187 [NodeName :: [binary()]].
188 get_user_subscriptions(LUser, LServer) ->
189 30 {atomic, Recs} = mnesia:transaction(fun() -> get_user_subscriptions_t(LUser, LServer) end),
190 30 Recs.
191
192 get_user_subscriptions_t(LUser, LServer) ->
193 30 UserMatchSpec = {LUser, LServer, '_'},
194 30 SubscriptionStates
195 = mnesia:match_object(#pubsub_state{stateid = {UserMatchSpec, '_'},
196 subscriptions = [{subscribed, '_'}], _ = '_'}),
197 30 [ [node_name(Nidx)] || #pubsub_state{stateid = {_, Nidx}} <- SubscriptionStates].
198
199 node_name(Nidx) ->
200 22 case find_node_by_id(Nidx) of
201 22 {ok, #pubsub_node{ nodeid = {_, NodeName} }} -> NodeName;
202
:-(
_ -> <<>>
203 end.
204
205 -spec find_nodes_by_affiliated_user(JID :: jid:ljid()) ->
206 [{mod_pubsub:pubsubNode(), mod_pubsub:affiliation()}].
207 find_nodes_by_affiliated_user(LJID) ->
208 96 {ok, States} = get_states_by_lus(LJID),
209 96 lists:map(fun(#pubsub_state{ stateid = {_, Nidx}, affiliation = Aff }) ->
210 14 {ok, Node} = find_node_by_id(Nidx),
211 14 {Node, Aff}
212 end, States).
213
214 %% ------------------------ Direct #pubsub_state access ------------------------
215
216 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
217 LJID :: jid:ljid()) ->
218 {ok, mod_pubsub:pubsubState()}.
219 get_state(Nidx, LJID) ->
220 266 get_state(Nidx, LJID, read).
221
222 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
223 {ok, [mod_pubsub:pubsubState()]}.
224 get_states(Nidx) ->
225 782 States = mnesia:match_object(#pubsub_state{stateid = {'_', Nidx}, _ = '_'}),
226 771 {ok, States}.
227
228 -spec get_states_by_lus(LJID :: jid:ljid()) ->
229 {ok, [mod_pubsub:pubsubState()]}.
230 get_states_by_lus({ LUser, LServer, _ }) ->
231 210 {ok, mnesia:match_object(#pubsub_state{stateid = {{LUser, LServer, '_'}, '_'}, _ = '_'})}.
232
233 -spec get_states_by_bare(LJID :: jid:ljid()) ->
234 {ok, [mod_pubsub:pubsubState()]}.
235 get_states_by_bare(LJID) ->
236 414 LBare = jid:to_bare(LJID),
237 414 {ok, mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
238
239 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
240 {ok, [mod_pubsub:pubsubState()]}.
241 get_states_by_bare_and_full(LJID) ->
242 521 LBare = jid:to_bare(LJID),
243 521 {ok, mnesia:match_object(#pubsub_state{stateid = {LJID, '_'}, _ = '_'})
244 ++ mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
245
246 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
247 {ok, [mod_pubsub:nodeIdx()]}.
248 get_idxs_of_own_nodes_with_pending_subs(LJID) ->
249 2 LBare = jid:to_bare(LJID),
250 2 MyStates = mnesia:match_object(#pubsub_state{stateid = {LBare, '_'},
251 affiliation = owner, _ = '_'}),
252 2 NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- MyStates],
253 2 ResultNidxs = mnesia:foldl(pa:bind(fun get_idxs_with_pending_subs/3, NodeIdxs),
254 [], pubsub_state),
255 2 {ok, ResultNidxs}.
256
257 %% ------------------------ Node management ------------------------
258
259 -spec create_node(Nidx :: mod_pubsub:nodeIdx(),
260 Owner :: jid:ljid()) ->
261 ok.
262 create_node(Nidx, Owner) ->
263 222 set_affiliation(Nidx, Owner, owner).
264
265 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
266 {ok, [mod_pubsub:pubsubState()]}.
267 del_node(Nidx) ->
268 173 {ok, States} = get_states(Nidx),
269 173 lists:foreach(fun (#pubsub_state{stateid = {LJID, _}, items = Items}) ->
270 263 del_items(Nidx, Items),
271 263 del_state_by_idx_and_ljid(Nidx, LJID)
272 end, States),
273 156 {ok, States}.
274
275 -spec set_node(mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
276 set_node(#pubsub_node{id = undefined} = Node) ->
277 226 CreateNode = Node#pubsub_node{id = pubsub_index:new(node)},
278 226 OldParents = [],
279 226 set_node(CreateNode, OldParents);
280 set_node(Node) ->
281 11 OldParents = get_parentnodes_names(Node),
282 11 set_node(Node, OldParents).
283
284 set_node(#pubsub_node{id = Nidx} = Node, OldParents) ->
285 237 mnesia:write(Node),
286 237 set_subnodes(Node, OldParents),
287 237 {ok, Nidx}.
288
289 get_parentnodes_names(#pubsub_node{nodeid = NodeId}) ->
290 11 case mnesia:read(pubsub_node, NodeId, write) of
291 [] ->
292
:-(
[];
293 [#pubsub_node{parents = Parents}] ->
294 11 Parents
295 end.
296
297 set_subnodes(#pubsub_node{parents = Parents}, Parents) ->
298 209 ok;
299 set_subnodes(#pubsub_node{nodeid = NodeId, parents = NewParents}, OldParents) ->
300 28 set_subnodes(NodeId, NewParents, OldParents).
301
302 set_subnodes({Key, Node}, NewParents, OldParents) ->
303 201 OldParentsSet = sets:from_list(OldParents),
304 201 NewParentsSet = sets:from_list(NewParents),
305 201 Deleted = sets:to_list(sets:subtract(OldParentsSet, NewParentsSet)),
306 201 Added = sets:to_list(sets:subtract(NewParentsSet, OldParentsSet)),
307 201 DeletedObjects = names_to_subnode_records(Key, Node, Deleted),
308 201 AddedObjects = names_to_subnode_records(Key, Node, Added),
309 201 lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, DeletedObjects),
310 201 lists:foreach(fun(Object) -> mnesia:write(Object) end, AddedObjects),
311 201 ok.
312
313 names_to_subnode_records(Key, Node, Parents) ->
314 402 [#pubsub_subnode{nodeid = {Key, Parent}, subnode = Node} || Parent <- Parents].
315
316 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
317 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
318 find_node_by_id(Nidx) ->
319 278 case mnesia:index_read(pubsub_node, Nidx, #pubsub_node.id) of
320 278 [#pubsub_node{} = Record] -> {ok, Record};
321 [] ->
322
:-(
{error, not_found}
323 end.
324
325 -spec find_node_by_name(
326 Key :: mod_pubsub:hostPubsub() | jid:ljid(),
327 Node :: mod_pubsub:nodeId()) ->
328 mod_pubsub:pubsubNode() | false.
329 find_node_by_name(Key, Node) ->
330 1741 case mnesia:read(pubsub_node, oid(Key, Node), read) of
331 249 [] -> false;
332 1492 [NodeRec] -> NodeRec
333 end.
334
335 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
336 [mod_pubsub:pubsubNode()].
337 find_nodes_by_key(Key) ->
338 32 mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, _ = '_'}).
339
340 1741 oid(Key, Name) -> {Key, Name}.
341
342
343 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
344 delete_node(#pubsub_node{nodeid = NodeId, parents = Parents}) ->
345 173 set_subnodes(NodeId, [], Parents),
346 173 mnesia:delete({pubsub_node, NodeId}).
347
348
349 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
350 [mod_pubsub:pubsubNode()].
351 get_subnodes(Key, <<>>) ->
352 8 get_nodes_without_parents(Key);
353 get_subnodes(Key, Node) ->
354 100 Subnodes = get_subnodes_names(Key, Node),
355 100 find_nodes_by_names(Key, Subnodes).
356
357 %% Like get_subnodes, but returns node names instead of node records.
358 get_subnodes_names(Key, Node) ->
359 173 MnesiaRecords = mnesia:read({pubsub_subnode, {Key, Node}}),
360 173 [Subnode || #pubsub_subnode{subnode = Subnode} <- MnesiaRecords].
361
362 %% Warning: this function is full table scan and can return a lot of records
363 get_nodes_without_parents(Key) ->
364 8 mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, parents = [], _ = '_'}).
365
366 %% Return a list of {Depth, Nodes} where Nodes are parent nodes of nodes of the lower level.
367 %% So, we start with {0, [NodeRecord]}, where NodeRecord is node that corresponds to Node argument.
368 %% The next level is {1, Nodes1}, where Nodes1 are parent nodes of Node.
369 %% The next level can be {2, Nodes2}, where Nodes2 are parents of Nodes1 (without duplicates).
370 %%
371 %% Each node can be returned only ones by the function.
372 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
373 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
374 get_parentnodes_tree(Key, Node) ->
375 311 case find_node_by_name(Key, Node) of
376 false ->
377
:-(
[ {0, []} ]; %% node not found case
378
379 #pubsub_node{parents = []} = Record ->
380 260 [ {0, [Record]} ];
381
382 #pubsub_node{parents = Parents} = Record ->
383 51 Depth = 1,
384 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
385 %% which we don't expand again
386 51 KnownNodesSet = sets:from_list([Node]),
387 51 extract_parents(Key, Node, Parents, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
388 end.
389
390 %% Each call extract Parents on the level and recurse to the next level.
391 %% KnownNodesSet are nodes to be filtered out.
392 extract_parents(Key, InitialNode, Parents, Depth, KnownNodesSet) ->
393 54 ParentRecords = find_nodes_by_names(Key, Parents),
394 54 KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Parents)),
395 %% Names of parents of parents
396 54 PPNames = lists:usort(lists:flatmap(fun(#pubsub_node{parents = PP}) -> PP end, ParentRecords)),
397 54 CyclicNames = [Name || Name <- PPNames, sets:is_element(Name, KnownNodesSet1)],
398 54 case CyclicNames of
399 54 [] -> [];
400
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
401
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
402 end,
403 %% PPNames is ordset, so we don't need to worry about having duplicates in it.
404 %% CyclicNames is usually an empty list.
405 54 PPNamesToGet = PPNames -- CyclicNames,
406 case PPNamesToGet of
407 51 [] -> [];
408 3 _ -> extract_parents(Key, InitialNode, PPNamesToGet, Depth + 1, KnownNodesSet1)
409 54 end ++ [ {Depth, ParentRecords} ].
410
411 find_nodes_by_names(Key, Nodes) ->
412 %% Contains false for missing nodes
413 226 MaybeRecords = [find_node_by_name(Key, Node) || Node <- Nodes],
414 %% Filter out false-s
415 226 [Record || Record = #pubsub_node{} <- MaybeRecords].
416
417
418 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
419 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
420 get_subnodes_tree(Key, Node) ->
421 72 case find_node_by_name(Key, Node) of
422 false ->
423
:-(
[ {1, []}, {0, [false]} ]; %% node not found case
424
425 #pubsub_node{} = Record ->
426 72 Subnodes = get_subnodes_names(Key, Node),
427 72 Depth = 1,
428 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
429 %% which we don't expand again
430 72 KnownNodesSet = sets:from_list([Node]),
431 72 extract_subnodes(Key, Node, Subnodes, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
432 end.
433
434 %% Each call extract Subnodes on the level and recurse to the next level.
435 %% KnownNodesSet are nodes to be filtered out.
436 extract_subnodes(Key, InitialNode, Subnodes, Depth, KnownNodesSet) ->
437 72 SubnodesRecords = find_nodes_by_names(Key, Subnodes),
438 72 KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Subnodes)),
439 %% Names of subnodes of subnodes
440 72 SSNames = lists:usort(lists:flatmap(fun(Subnode) -> get_subnodes_names(Key, Subnode) end, Subnodes)),
441 72 CyclicNames = [Name || Name <- SSNames, sets:is_element(Name, KnownNodesSet1)],
442 72 case CyclicNames of
443 72 [] -> [];
444
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
445
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
446 end,
447 72 SSNamesToGet = SSNames -- CyclicNames,
448 case SSNamesToGet of
449 72 [] -> [ {Depth + 1, []} ];
450
:-(
_ -> extract_subnodes(Key, InitialNode, SSNamesToGet, Depth + 1, KnownNodesSet1)
451 72 end ++ [ {Depth, SubnodesRecords} ].
452
453 %% ------------------------ Affiliations ------------------------
454
455 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
456 LJID :: jid:ljid(),
457 Affiliation :: mod_pubsub:affiliation()) -> ok.
458 set_affiliation(Nidx, LJID, Affiliation) ->
459 276 BareLJID = jid:to_bare(LJID),
460 276 {ok, State} = get_state(Nidx, BareLJID, write),
461 273 case {Affiliation, State#pubsub_state.subscriptions} of
462 12 {none, []} -> del_state_by_idx_and_ljid(Nidx, BareLJID);
463 261 _ -> mnesia:write(State#pubsub_state{ affiliation = Affiliation })
464 end.
465
466 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
467 LJID :: jid:ljid()) ->
468 {ok, mod_pubsub:affiliation()}.
469 get_affiliation(Nidx, LJID) ->
470 515 {ok, State} = get_state(Nidx, jid:to_bare(LJID), read),
471 515 {ok, State#pubsub_state.affiliation}.
472
473 %% ------------------------ Subscriptions ------------------------
474
475 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
476 LJID :: jid:ljid(),
477 Sub :: mod_pubsub:subscription(),
478 SubId :: mod_pubsub:subId(),
479 SubOpts :: mod_pubsub:subOptions()) -> ok.
480 add_subscription(Nidx, LJID, Sub, SubId, SubOpts) ->
481 94 {ok, State} = get_state(Nidx, LJID, write),
482 94 NSubscriptions = [{Sub, SubId} | State#pubsub_state.subscriptions ],
483 94 mnesia:write(State#pubsub_state{ subscriptions = NSubscriptions }),
484 94 set_subscription_opts(Nidx, LJID, SubId, SubOpts).
485
486 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
487 JID :: jid:ljid(),
488 SubId :: mod_pubsub:subId(),
489 Opts :: mod_pubsub:subOptions()) -> ok.
490 set_subscription_opts(_Nidx, _JID, SubId, Opts) ->
491 98 mnesia:write(#pubsub_subscription{ subid = SubId, options = Opts }).
492
493 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
494 {ok, [{Entity :: jid:ljid(),
495 Sub :: mod_pubsub:subscription(),
496 SubId :: mod_pubsub:subId(),
497 Opts :: mod_pubsub:subOptions()}]}.
498 get_node_subscriptions(Nidx) ->
499 595 {ok, States} = get_states(Nidx),
500 584 {ok, states_to_subscriptions(States)}.
501
502 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
503 LJID :: jid:ljid()) ->
504 {ok, [{Sub :: mod_pubsub:subscription(),
505 SubId :: mod_pubsub:subId(),
506 Opts :: mod_pubsub:subOptions()}]}.
507 get_node_entity_subscriptions(Nidx, LJID) ->
508 209 {ok, State} = get_state(Nidx, LJID, read),
509 209 {ok, add_opts_to_subs(State#pubsub_state.subscriptions)}.
510
511 -spec delete_subscription(
512 Nidx :: mod_pubsub:nodeIdx(),
513 LJID :: jid:ljid(),
514 SubId :: mod_pubsub:subId()) ->
515 ok.
516 delete_subscription(Nidx, LJID, SubId) ->
517 4 {ok, State} = get_state(Nidx, LJID, write),
518 4 NewSubs = lists:keydelete(SubId, 2, State#pubsub_state.subscriptions),
519 4 mnesia:delete({pubsub_subscription, SubId}),
520 4 case {State#pubsub_state.affiliation, NewSubs} of
521 4 {none, []} -> del_state_by_idx_and_ljid(Nidx, LJID);
522
:-(
_ -> mnesia:write(State#pubsub_state{subscriptions = NewSubs})
523 end.
524
525 -spec delete_all_subscriptions(
526 Nidx :: mod_pubsub:nodeIdx(),
527 LJID :: jid:ljid()) ->
528 ok.
529 delete_all_subscriptions(Nidx, LJID) ->
530 7 {ok, State} = get_state(Nidx, LJID, write),
531 7 delete_all_subscriptions_by_state(State).
532
533 -spec delete_user_subscriptions(jid:ljid()) -> ok.
534 delete_user_subscriptions(LJID) ->
535 96 {ok, States} = get_states_by_lus(LJID),
536 96 lists:foreach(fun delete_all_subscriptions_by_state/1, States).
537
538 -spec delete_all_subscriptions_by_state(mod_pubsub:pubsubState()) -> ok.
539 delete_all_subscriptions_by_state(State) ->
540 24 lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end,
541 State#pubsub_state.subscriptions),
542 24 case State#pubsub_state.affiliation of
543 10 none -> del_state(State);
544 14 _ -> mnesia:write(State#pubsub_state{subscriptions = []})
545 end.
546
547 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
548 LJID :: jid:ljid(),
549 Subscription :: mod_pubsub:subscription(),
550 SubId :: mod_pubsub:subId()) ->
551 ok.
552 update_subscription(Nidx, LJID, Subscription, SubId) ->
553 3 {ok, State} = get_state(Nidx, LJID, write),
554 3 NewSubs = lists:keyreplace(SubId, 2, State#pubsub_state.subscriptions, {Subscription, SubId}),
555 3 mnesia:write(State#pubsub_state{ subscriptions = NewSubs }).
556
557 %% ------------------------ Items ------------------------
558
559 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
560 LJID :: jid:ljid(),
561 ItemIds :: [mod_pubsub:itemId()]) ->
562 ok.
563 remove_items(Nidx, LJID, ItemIds) ->
564 127 {ok, State} = get_state(Nidx, jid:to_bare(LJID), write),
565 127 NewItems = State#pubsub_state.items -- ItemIds,
566 127 mnesia:write(State#pubsub_state{ items = NewItems }).
567
568 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
569 ok.
570 remove_all_items(Nidx) ->
571 4 {ok, States} = get_states(Nidx),
572 4 lists:foreach(fun(#pubsub_state{ items = [] }) ->
573
:-(
ok;
574 (#pubsub_state{} = S) ->
575 4 mnesia:write(S#pubsub_state{ items = [] })
576 end, States).
577
578 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
579 JID :: jid:ljid(),
580 Item :: mod_pubsub:pubsubItem()) ->
581 ok.
582 add_item(Nidx, JID, #pubsub_item{itemid = {ItemId, _}} = Item) ->
583 119 set_item(Item),
584 119 {ok, State} = get_state(Nidx, jid:to_bare(JID), write),
585 119 NewItems = [ItemId | State#pubsub_state.items],
586 119 mnesia:write(State#pubsub_state{ items = NewItems }).
587
588 %% ------------------------ Direct #pubsub_item access ------------------------
589
590 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
591 {ok, {[mod_pubsub:pubsubItem()], none}}.
592 get_items(Nidx, Opts) ->
593 51 Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}),
594 51 Sorted = lists:reverse(lists:keysort(#pubsub_item.modification, Items)),
595 51 ItemsLimitedByIds = filter_items_by_item_ids(Sorted, maps:get(item_ids, Opts, undefined)),
596 51 ItemsLimitedByMaxItems = limit_items(ItemsLimitedByIds, maps:get(max_items, Opts, undefined)),
597 51 {ok, {ItemsLimitedByMaxItems, none}}.
598
599 filter_items_by_item_ids(Items, undefined) ->
600 43 Items;
601 filter_items_by_item_ids(Items, ItemIds) ->
602 8 lists:filter(fun (#pubsub_item{itemid = {ItemId, _}}) ->
603 10 lists:member(ItemId, ItemIds)
604 end, Items).
605
606 limit_items(Items, undefined) ->
607 18 Items;
608 limit_items(Items, MaxItems) ->
609 33 lists:sublist(Items, MaxItems).
610
611 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
612 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
613 get_item(Nidx, ItemId) ->
614 127 case mnesia:read({pubsub_item, {ItemId, Nidx}}) of
615 8 [Item] when is_record(Item, pubsub_item) -> {ok, Item};
616 119 _ -> {error, item_not_found}
617 end.
618
619 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
620 set_item(Item) ->
621 119 mnesia:write(Item).
622
623 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
624 del_item(Nidx, ItemId) ->
625 113 mnesia:delete({pubsub_item, {ItemId, Nidx}}).
626
627 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
628 del_items(Nidx, ItemIds) ->
629 386 lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId) end,
630 ItemIds).
631
632 %%====================================================================
633 %% Internal functions
634 %%====================================================================
635
636 -spec del_state_by_idx_and_ljid(Nidx :: mod_pubsub:nodeIdx(),
637 LJID :: jid:ljid()) -> ok.
638 del_state_by_idx_and_ljid(Nidx, LJID) ->
639 279 {ok, State} = get_state(Nidx, LJID, write),
640 262 del_state(State).
641
642 -spec del_state(mod_pubsub:pubsubState()) -> ok.
643 del_state(#pubsub_state{ stateid = {LJID, Nidx}, subscriptions = Subs }) ->
644 272 lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, Subs),
645 272 mnesia:delete({pubsub_state, {LJID, Nidx}}).
646
647 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
648 LJID :: jid:ljid(),
649 LockKind :: write | read) ->
650 {ok, mod_pubsub:pubsubState()}.
651 get_state(Nidx, LJID, LockKind) ->
652 1899 StateId = {LJID, Nidx},
653 1899 case mnesia:read(pubsub_state, StateId, LockKind) of
654 1102 [#pubsub_state{} = State] -> {ok, State};
655 777 _ -> {ok, #pubsub_state{stateid = StateId}}
656 end.
657
658 -spec states_to_subscriptions([mod_pubsub:pubsubState()]) ->
659 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
660 states_to_subscriptions([]) ->
661 584 [];
662 states_to_subscriptions([#pubsub_state{ subscriptions = [] } | RStates]) ->
663 392 states_to_subscriptions(RStates);
664 states_to_subscriptions([#pubsub_state{ stateid = {J, _}, subscriptions = Subs } | RStates]) ->
665 170 add_jid_and_opts_to_subs(Subs, J, RStates).
666
667 -spec add_jid_and_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}],
668 LJID :: jid:ljid(),
669 RStates :: [mod_pubsub:pubsubState()]) ->
670 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
671 add_jid_and_opts_to_subs([], _J, RStates) ->
672 170 states_to_subscriptions(RStates);
673 add_jid_and_opts_to_subs([{S, SubId} | RSubs], J, RStates) ->
674 170 Opts = read_sub_options(SubId),
675 170 [ {J, S, SubId, Opts} | add_jid_and_opts_to_subs(RSubs, J, RStates) ].
676
677 -spec add_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]) ->
678 [{mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
679 add_opts_to_subs([]) ->
680 209 [];
681 add_opts_to_subs([{S, SubId} | RSubs]) ->
682 35 Opts = read_sub_options(SubId),
683 35 [ {S, SubId, Opts} | add_opts_to_subs(RSubs) ].
684
685 -spec read_sub_options(SubId :: mod_pubsub:subId()) -> mod_pubsub:subOptions().
686 read_sub_options(SubId) ->
687 205 case mnesia:read({pubsub_subscription, SubId}) of
688
:-(
[] -> [];
689 205 [#pubsub_subscription{ options = Opts0 }] -> Opts0
690 end.
691
692 -spec get_idxs_with_pending_subs(NodeIdxs :: [mod_pubsub:nodeIdx()],
693 PubsubState :: mod_pubsub:pubsubState(),
694 Acc :: [mod_pubsub:nodeIdx()]) ->
695 [mod_pubsub:nodeIdx()].
696 get_idxs_with_pending_subs(NodeIdxs,
697 #pubsub_state{stateid = {_, Nidx}, subscriptions = Subs},
698 Acc) ->
699 37 case lists:member(Nidx, NodeIdxs)
700 6 andalso lists:any(fun is_pending_sub/1, Subs) of
701 4 true -> [Nidx | Acc];
702 33 false -> Acc
703 end.
704
705 4 is_pending_sub({pending, _}) -> true;
706
:-(
is_pending_sub(_) -> false.
707
Line Hits Source