./ct_report/coverage/mod_pubsub_db_mnesia.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_mnesia.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub Mnesia backend
5 %%% Created : 26 Oct 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_mnesia).
9 -author('piotr.nosek@erlang-solutions.com').
10
11 -behaviour(mod_pubsub_db).
12
13 -include("pubsub.hrl").
14 -include("jlib.hrl").
15 -include("mongoose_logger.hrl").
16 -include_lib("stdlib/include/qlc.hrl").
17
18 -export([start/0, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([del_node/1, get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 set_node/1,
29 find_node_by_id/1,
30 find_nodes_by_key/1,
31 find_node_by_name/2,
32 delete_node/1,
33 get_subnodes/2,
34 get_parentnodes_tree/2,
35 get_subnodes_tree/2
36 ]).
37 % Affiliations
38 -export([
39 set_affiliation/3,
40 get_affiliation/2
41 ]).
42 % Subscriptions
43 -export([
44 add_subscription/5,
45 set_subscription_opts/4,
46 get_node_subscriptions/1,
47 get_node_entity_subscriptions/2,
48 delete_subscription/3,
49 delete_all_subscriptions/2,
50 update_subscription/4
51 ]).
52 % Item ids in state
53 -export([
54 add_item/3,
55 remove_items/3,
56 remove_all_items/1
57 ]).
58
59 % Whole Items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 % GDPR
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Internal records definitions
79 %%====================================================================
80
81 -record(pubsub_subscription, {
82 subid :: mod_pubsub:subId(),
83 options = [] :: mod_pubsub:subOptions()
84 }).
85
86 %%====================================================================
87 %% Behaviour callbacks
88 %%====================================================================
89
90 %% ------------------------ Backend start/stop ------------------------
91
92 -spec start() -> ok.
93 start() ->
94
:-(
mnesia:create_table(pubsub_state,
95 [{disc_copies, [node()]},
96 {type, ordered_set},
97 {attributes, record_info(fields, pubsub_state)}]),
98
:-(
mnesia:add_table_copy(pubsub_state, node(), disc_copies),
99
:-(
mnesia:create_table(pubsub_item,
100 [{disc_only_copies, [node()]},
101 {attributes, record_info(fields, pubsub_item)}]),
102
:-(
mnesia:add_table_copy(pubsub_item, node(), disc_only_copies),
103
:-(
mnesia:create_table(pubsub_node,
104 [{disc_copies, [node()]},
105 {attributes, record_info(fields, pubsub_node)}]),
106
:-(
mnesia:add_table_index(pubsub_node, id),
107
:-(
mnesia:create_table(pubsub_subscription,
108 [{disc_copies, [node()]},
109 {attributes, record_info(fields, pubsub_subscription)},
110 {type, set}]),
111
:-(
mnesia:add_table_copy(pubsub_subscription, node(), disc_copies),
112
:-(
CreateSubnodeTableResult = mnesia:create_table(pubsub_subnode,
113 [{disc_copies, [node()]},
114 {attributes, record_info(fields, pubsub_subnode)},
115 {type, bag}]),
116
:-(
mnesia:add_table_copy(pubsub_subnode, node(), disc_copies),
117
:-(
maybe_fill_subnode_table(CreateSubnodeTableResult),
118
:-(
pubsub_index:init(),
119
:-(
ok.
120
121 -spec stop() -> ok.
122 stop() ->
123
:-(
ok.
124
125 %% If pubsub_subnode table was missing, than fill it with data
126 maybe_fill_subnode_table({atomic, ok}) ->
127
:-(
F = fun(#pubsub_node{} = Node, Acc) ->
128
:-(
set_subnodes(Node, []),
129
:-(
Acc
130 end,
131
:-(
mnesia:transaction(fun() -> mnesia:foldl(F, ok, pubsub_node) end);
132 maybe_fill_subnode_table(_Other) ->
133
:-(
ok.
134
135 %% ------------------------ Fun execution ------------------------
136
137 -spec transaction(Fun :: fun(() -> {result | error, any()}),
138 ErrorDebug :: map()) ->
139 {result | error, any()}.
140 transaction(Fun, ErrorDebug) ->
141
:-(
transaction(Fun, ErrorDebug, 3).
142
143 transaction(Fun, ErrorDebug, Retries) ->
144
:-(
case mnesia:transaction(mod_pubsub_db:extra_debug_fun(Fun)) of
145 {atomic, Result} ->
146
:-(
Result;
147 {aborted, ReasonData} when Retries > 0 ->
148
:-(
?LOG_WARNING(#{what => pubsub_transaction_retry, retries => Retries,
149
:-(
reason => ReasonData, debug => ErrorDebug}),
150
:-(
timer:sleep(100),
151
:-(
transaction(Fun, ErrorDebug, Retries - 1);
152 {aborted, ReasonData} ->
153
:-(
?LOG_WARNING(#{what => pubsub_transaction_failed, reason => ReasonData,
154
:-(
debug => ErrorDebug}),
155
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
156 end.
157
158 -spec dirty(Fun :: fun(() -> {result | error, any()}),
159 ErrorDebug :: map()) ->
160 {result | error, any()}.
161 dirty(Fun, ErrorDebug) ->
162
:-(
try mnesia:sync_dirty(mod_pubsub_db:extra_debug_fun(Fun), []) of
163 Result ->
164
:-(
Result
165 catch
166 _C:ReasonData ->
167
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
168 end.
169
170 %% ------------------------ GDPR-related ------------------------
171
172 -spec get_user_payloads(LUser :: jid:luser(), LServer :: jid:lserver()) ->
173 [NodeNameItemIDAndPayload :: [binary()]].
174 get_user_payloads(LUser, LServer) ->
175
:-(
{atomic, Recs} = mnesia:transaction(fun() -> get_user_payloads_t(LUser, LServer) end),
176
:-(
Recs.
177
178 get_user_payloads_t(LUser, LServer) ->
179
:-(
BareUserMatchSpec = {'_', {LUser, LServer, '_'}},
180
:-(
Items = mnesia:match_object(#pubsub_item{creation = BareUserMatchSpec, _ = '_'}),
181
:-(
[[node_name(Nidx), ItemId, << <<(exml:to_binary(P))/binary>> || P <- Payload >>] ||
182
:-(
#pubsub_item{itemid = {ItemId, Nidx}, payload = Payload} <- Items].
183
184 -spec get_user_nodes(LUser :: jid:luser(), LServer :: jid:lserver()) ->
185 [NodeNameAndType :: [binary()]].
186 get_user_nodes(LUser, LServer) ->
187
:-(
LJID = {LUser, LServer, <<>>},
188
:-(
{atomic, Recs} = mnesia:transaction(fun() ->
189
:-(
Nodes = mnesia:match_object(#pubsub_node{owners = [LJID], _ = '_'}),
190
:-(
[[NodeName, Type] || #pubsub_node{nodeid = {_, NodeName}, type = Type} <- Nodes]
191 end),
192
:-(
Recs.
193
194 -spec get_user_subscriptions(LUser :: jid:luser(), LServer :: jid:lserver()) ->
195 [NodeName :: [binary()]].
196 get_user_subscriptions(LUser, LServer) ->
197
:-(
{atomic, Recs} = mnesia:transaction(fun() -> get_user_subscriptions_t(LUser, LServer) end),
198
:-(
Recs.
199
200 get_user_subscriptions_t(LUser, LServer) ->
201
:-(
UserMatchSpec = {LUser, LServer, '_'},
202
:-(
SubscriptionStates
203 = mnesia:match_object(#pubsub_state{stateid = {UserMatchSpec, '_'},
204 subscriptions = [{subscribed, '_'}], _ = '_'}),
205
:-(
[ [node_name(Nidx)] || #pubsub_state{stateid = {_, Nidx}} <- SubscriptionStates].
206
207 node_name(Nidx) ->
208
:-(
case find_node_by_id(Nidx) of
209
:-(
{ok, #pubsub_node{ nodeid = {_, NodeName} }} -> NodeName;
210
:-(
_ -> <<>>
211 end.
212
213 -spec find_nodes_by_affiliated_user(JID :: jid:ljid()) ->
214 [{mod_pubsub:pubsubNode(), mod_pubsub:affiliation()}].
215 find_nodes_by_affiliated_user(LJID) ->
216
:-(
{ok, States} = get_states_by_lus(LJID),
217
:-(
lists:map(fun(#pubsub_state{ stateid = {_, Nidx}, affiliation = Aff }) ->
218
:-(
{ok, Node} = find_node_by_id(Nidx),
219
:-(
{Node, Aff}
220 end, States).
221
222 %% ------------------------ Direct #pubsub_state access ------------------------
223
224 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
225 LJID :: jid:ljid()) ->
226 {ok, mod_pubsub:pubsubState()}.
227 get_state(Nidx, LJID) ->
228
:-(
get_state(Nidx, LJID, read).
229
230 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
231 {ok, [mod_pubsub:pubsubState()]}.
232 get_states(Nidx) ->
233
:-(
States = mnesia:match_object(#pubsub_state{stateid = {'_', Nidx}, _ = '_'}),
234
:-(
{ok, States}.
235
236 -spec get_states_by_lus(LJID :: jid:ljid()) ->
237 {ok, [mod_pubsub:pubsubState()]}.
238 get_states_by_lus({ LUser, LServer, _ }) ->
239
:-(
{ok, mnesia:match_object(#pubsub_state{stateid = {{LUser, LServer, '_'}, '_'}, _ = '_'})}.
240
241 -spec get_states_by_bare(LJID :: jid:ljid()) ->
242 {ok, [mod_pubsub:pubsubState()]}.
243 get_states_by_bare(LJID) ->
244
:-(
LBare = jid:to_bare(LJID),
245
:-(
{ok, mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
246
247 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
248 {ok, [mod_pubsub:pubsubState()]}.
249 get_states_by_bare_and_full(LJID) ->
250
:-(
LBare = jid:to_bare(LJID),
251
:-(
{ok, mnesia:match_object(#pubsub_state{stateid = {LJID, '_'}, _ = '_'})
252 ++ mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
253
254 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
255 {ok, [mod_pubsub:nodeIdx()]}.
256 get_idxs_of_own_nodes_with_pending_subs(LJID) ->
257
:-(
LBare = jid:to_bare(LJID),
258
:-(
MyStates = mnesia:match_object(#pubsub_state{stateid = {LBare, '_'},
259 affiliation = owner, _ = '_'}),
260
:-(
NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- MyStates],
261
:-(
ResultNidxs = mnesia:foldl(pa:bind(fun get_idxs_with_pending_subs/3, NodeIdxs),
262 [], pubsub_state),
263
:-(
{ok, ResultNidxs}.
264
265 %% ------------------------ Node management ------------------------
266
267 -spec create_node(Nidx :: mod_pubsub:nodeIdx(),
268 Owner :: jid:ljid()) ->
269 ok.
270 create_node(Nidx, Owner) ->
271
:-(
set_affiliation(Nidx, Owner, owner).
272
273 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
274 {ok, [mod_pubsub:pubsubState()]}.
275 del_node(Nidx) ->
276
:-(
{ok, States} = get_states(Nidx),
277
:-(
lists:foreach(fun (#pubsub_state{stateid = {LJID, _}, items = Items}) ->
278
:-(
del_items(Nidx, Items),
279
:-(
del_state_by_idx_and_ljid(Nidx, LJID)
280 end, States),
281
:-(
{ok, States}.
282
283 -spec set_node(mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
284 set_node(#pubsub_node{id = undefined} = Node) ->
285
:-(
CreateNode = Node#pubsub_node{id = pubsub_index:new(node)},
286
:-(
OldParents = [],
287
:-(
set_node(CreateNode, OldParents);
288 set_node(Node) ->
289
:-(
OldParents = get_parentnodes_names(Node),
290
:-(
set_node(Node, OldParents).
291
292 set_node(#pubsub_node{id = Nidx} = Node, OldParents) ->
293
:-(
mnesia:write(Node),
294
:-(
set_subnodes(Node, OldParents),
295
:-(
{ok, Nidx}.
296
297 get_parentnodes_names(#pubsub_node{nodeid = NodeId}) ->
298
:-(
case mnesia:read(pubsub_node, NodeId, write) of
299 [] ->
300
:-(
[];
301 [#pubsub_node{parents = Parents}] ->
302
:-(
Parents
303 end.
304
305 set_subnodes(#pubsub_node{parents = Parents}, Parents) ->
306
:-(
ok;
307 set_subnodes(#pubsub_node{nodeid = NodeId, parents = NewParents}, OldParents) ->
308
:-(
set_subnodes(NodeId, NewParents, OldParents).
309
310 set_subnodes({Key, Node}, NewParents, OldParents) ->
311
:-(
OldParentsSet = sets:from_list(OldParents),
312
:-(
NewParentsSet = sets:from_list(NewParents),
313
:-(
Deleted = sets:to_list(sets:subtract(OldParentsSet, NewParentsSet)),
314
:-(
Added = sets:to_list(sets:subtract(NewParentsSet, OldParentsSet)),
315
:-(
DeletedObjects = names_to_subnode_records(Key, Node, Deleted),
316
:-(
AddedObjects = names_to_subnode_records(Key, Node, Added),
317
:-(
lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, DeletedObjects),
318
:-(
lists:foreach(fun(Object) -> mnesia:write(Object) end, AddedObjects),
319
:-(
ok.
320
321 names_to_subnode_records(Key, Node, Parents) ->
322
:-(
[#pubsub_subnode{nodeid = {Key, Parent}, subnode = Node} || Parent <- Parents].
323
324 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
325 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
326 find_node_by_id(Nidx) ->
327
:-(
case mnesia:index_read(pubsub_node, Nidx, #pubsub_node.id) of
328
:-(
[#pubsub_node{} = Record] -> {ok, Record};
329 [] ->
330
:-(
{error, not_found}
331 end.
332
333 -spec find_node_by_name(
334 Key :: mod_pubsub:hostPubsub() | jid:ljid(),
335 Node :: mod_pubsub:nodeId()) ->
336 mod_pubsub:pubsubNode() | false.
337 find_node_by_name(Key, Node) ->
338
:-(
case mnesia:read(pubsub_node, oid(Key, Node), read) of
339
:-(
[] -> false;
340
:-(
[NodeRec] -> NodeRec
341 end.
342
343 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
344 [mod_pubsub:pubsubNode()].
345 find_nodes_by_key(Key) ->
346
:-(
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, _ = '_'}).
347
348
:-(
oid(Key, Name) -> {Key, Name}.
349
350
351 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
352 delete_node(#pubsub_node{nodeid = NodeId, parents = Parents}) ->
353
:-(
set_subnodes(NodeId, [], Parents),
354
:-(
mnesia:delete({pubsub_node, NodeId}).
355
356
357 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
358 [mod_pubsub:pubsubNode()].
359 get_subnodes(Key, <<>>) ->
360
:-(
get_nodes_without_parents(Key);
361 get_subnodes(Key, Node) ->
362
:-(
Subnodes = get_subnodes_names(Key, Node),
363
:-(
find_nodes_by_names(Key, Subnodes).
364
365 %% Like get_subnodes, but returns node names instead of node records.
366 get_subnodes_names(Key, Node) ->
367
:-(
MnesiaRecords = mnesia:read({pubsub_subnode, {Key, Node}}),
368
:-(
[Subnode || #pubsub_subnode{subnode = Subnode} <- MnesiaRecords].
369
370 %% Warning: this function is full table scan and can return a lot of records
371 get_nodes_without_parents(Key) ->
372
:-(
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, parents = [], _ = '_'}).
373
374 %% Return a list of {Depth, Nodes} where Nodes are parent nodes of nodes of the lower level.
375 %% So, we start with {0, [NodeRecord]}, where NodeRecord is node that corresponds to Node argument.
376 %% The next level is {1, Nodes1}, where Nodes1 are parent nodes of Node.
377 %% The next level can be {2, Nodes2}, where Nodes2 are parents of Nodes1 (without duplicates).
378 %%
379 %% Each node can be returned only ones by the function.
380 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
381 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
382 get_parentnodes_tree(Key, Node) ->
383
:-(
case find_node_by_name(Key, Node) of
384 false ->
385
:-(
[ {0, []} ]; %% node not found case
386
387 #pubsub_node{parents = []} = Record ->
388
:-(
[ {0, [Record]} ];
389
390 #pubsub_node{parents = Parents} = Record ->
391
:-(
Depth = 1,
392 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
393 %% which we don't expand again
394
:-(
KnownNodesSet = sets:from_list([Node]),
395
:-(
extract_parents(Key, Node, Parents, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
396 end.
397
398 %% Each call extract Parents on the level and recurse to the next level.
399 %% KnownNodesSet are nodes to be filtered out.
400 extract_parents(Key, InitialNode, Parents, Depth, KnownNodesSet) ->
401
:-(
ParentRecords = find_nodes_by_names(Key, Parents),
402
:-(
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Parents)),
403 %% Names of parents of parents
404
:-(
PPNames = lists:usort(lists:flatmap(fun(#pubsub_node{parents = PP}) -> PP end, ParentRecords)),
405
:-(
CyclicNames = [Name || Name <- PPNames, sets:is_element(Name, KnownNodesSet1)],
406
:-(
case CyclicNames of
407
:-(
[] -> [];
408
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
409
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
410 end,
411 %% PPNames is ordset, so we don't need to worry about having duplicates in it.
412 %% CyclicNames is usually an empty list.
413
:-(
PPNamesToGet = PPNames -- CyclicNames,
414 case PPNamesToGet of
415
:-(
[] -> [];
416
:-(
_ -> extract_parents(Key, InitialNode, PPNamesToGet, Depth + 1, KnownNodesSet1)
417
:-(
end ++ [ {Depth, ParentRecords} ].
418
419 find_nodes_by_names(Key, Nodes) ->
420 %% Contains false for missing nodes
421
:-(
MaybeRecords = [find_node_by_name(Key, Node) || Node <- Nodes],
422 %% Filter out false-s
423
:-(
[Record || Record = #pubsub_node{} <- MaybeRecords].
424
425
426 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
427 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
428 get_subnodes_tree(Key, Node) ->
429
:-(
case find_node_by_name(Key, Node) of
430 false ->
431
:-(
[ {1, []}, {0, [false]} ]; %% node not found case
432
433 #pubsub_node{} = Record ->
434
:-(
Subnodes = get_subnodes_names(Key, Node),
435
:-(
Depth = 1,
436 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
437 %% which we don't expand again
438
:-(
KnownNodesSet = sets:from_list([Node]),
439
:-(
extract_subnodes(Key, Node, Subnodes, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
440 end.
441
442 %% Each call extract Subnodes on the level and recurse to the next level.
443 %% KnownNodesSet are nodes to be filtered out.
444 extract_subnodes(Key, InitialNode, Subnodes, Depth, KnownNodesSet) ->
445
:-(
SubnodesRecords = find_nodes_by_names(Key, Subnodes),
446
:-(
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Subnodes)),
447 %% Names of subnodes of subnodes
448
:-(
SSNames = lists:usort(lists:flatmap(fun(Subnode) -> get_subnodes_names(Key, Subnode) end, Subnodes)),
449
:-(
CyclicNames = [Name || Name <- SSNames, sets:is_element(Name, KnownNodesSet1)],
450
:-(
case CyclicNames of
451
:-(
[] -> [];
452
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
453
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
454 end,
455
:-(
SSNamesToGet = SSNames -- CyclicNames,
456 case SSNamesToGet of
457
:-(
[] -> [ {Depth + 1, []} ];
458
:-(
_ -> extract_subnodes(Key, InitialNode, SSNamesToGet, Depth + 1, KnownNodesSet1)
459
:-(
end ++ [ {Depth, SubnodesRecords} ].
460
461 %% ------------------------ Affiliations ------------------------
462
463 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
464 LJID :: jid:ljid(),
465 Affiliation :: mod_pubsub:affiliation()) -> ok.
466 set_affiliation(Nidx, LJID, Affiliation) ->
467
:-(
BareLJID = jid:to_bare(LJID),
468
:-(
{ok, State} = get_state(Nidx, BareLJID, write),
469
:-(
case {Affiliation, State#pubsub_state.subscriptions} of
470
:-(
{none, []} -> del_state_by_idx_and_ljid(Nidx, BareLJID);
471
:-(
_ -> mnesia:write(State#pubsub_state{ affiliation = Affiliation })
472 end.
473
474 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
475 LJID :: jid:ljid()) ->
476 {ok, mod_pubsub:affiliation()}.
477 get_affiliation(Nidx, LJID) ->
478
:-(
{ok, State} = get_state(Nidx, jid:to_bare(LJID), read),
479
:-(
{ok, State#pubsub_state.affiliation}.
480
481 %% ------------------------ Subscriptions ------------------------
482
483 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
484 LJID :: jid:ljid(),
485 Sub :: mod_pubsub:subscription(),
486 SubId :: mod_pubsub:subId(),
487 SubOpts :: mod_pubsub:subOptions()) -> ok.
488 add_subscription(Nidx, LJID, Sub, SubId, SubOpts) ->
489
:-(
{ok, State} = get_state(Nidx, LJID, write),
490
:-(
NSubscriptions = [{Sub, SubId} | State#pubsub_state.subscriptions ],
491
:-(
mnesia:write(State#pubsub_state{ subscriptions = NSubscriptions }),
492
:-(
set_subscription_opts(Nidx, LJID, SubId, SubOpts).
493
494 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
495 JID :: jid:ljid(),
496 SubId :: mod_pubsub:subId(),
497 Opts :: mod_pubsub:subOptions()) -> ok.
498 set_subscription_opts(_Nidx, _JID, SubId, Opts) ->
499
:-(
mnesia:write(#pubsub_subscription{ subid = SubId, options = Opts }).
500
501 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
502 {ok, [{Entity :: jid:ljid(),
503 Sub :: mod_pubsub:subscription(),
504 SubId :: mod_pubsub:subId(),
505 Opts :: mod_pubsub:subOptions()}]}.
506 get_node_subscriptions(Nidx) ->
507
:-(
{ok, States} = get_states(Nidx),
508
:-(
{ok, states_to_subscriptions(States)}.
509
510 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
511 LJID :: jid:ljid()) ->
512 {ok, [{Sub :: mod_pubsub:subscription(),
513 SubId :: mod_pubsub:subId(),
514 Opts :: mod_pubsub:subOptions()}]}.
515 get_node_entity_subscriptions(Nidx, LJID) ->
516
:-(
{ok, State} = get_state(Nidx, LJID, read),
517
:-(
{ok, add_opts_to_subs(State#pubsub_state.subscriptions)}.
518
519 -spec delete_subscription(
520 Nidx :: mod_pubsub:nodeIdx(),
521 LJID :: jid:ljid(),
522 SubId :: mod_pubsub:subId()) ->
523 ok.
524 delete_subscription(Nidx, LJID, SubId) ->
525
:-(
{ok, State} = get_state(Nidx, LJID, write),
526
:-(
NewSubs = lists:keydelete(SubId, 2, State#pubsub_state.subscriptions),
527
:-(
mnesia:delete({pubsub_subscription, SubId}),
528
:-(
case {State#pubsub_state.affiliation, NewSubs} of
529
:-(
{none, []} -> del_state_by_idx_and_ljid(Nidx, LJID);
530
:-(
_ -> mnesia:write(State#pubsub_state{subscriptions = NewSubs})
531 end.
532
533 -spec delete_all_subscriptions(
534 Nidx :: mod_pubsub:nodeIdx(),
535 LJID :: jid:ljid()) ->
536 ok.
537 delete_all_subscriptions(Nidx, LJID) ->
538
:-(
{ok, State} = get_state(Nidx, LJID, write),
539
:-(
delete_all_subscriptions_by_state(State).
540
541 -spec delete_user_subscriptions(jid:ljid()) -> ok.
542 delete_user_subscriptions(LJID) ->
543
:-(
{ok, States} = get_states_by_lus(LJID),
544
:-(
lists:foreach(fun delete_all_subscriptions_by_state/1, States).
545
546 -spec delete_all_subscriptions_by_state(mod_pubsub:pubsubState()) -> ok.
547 delete_all_subscriptions_by_state(State) ->
548
:-(
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end,
549 State#pubsub_state.subscriptions),
550
:-(
case State#pubsub_state.affiliation of
551
:-(
none -> del_state(State);
552
:-(
_ -> mnesia:write(State#pubsub_state{subscriptions = []})
553 end.
554
555 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
556 LJID :: jid:ljid(),
557 Subscription :: mod_pubsub:subscription(),
558 SubId :: mod_pubsub:subId()) ->
559 ok.
560 update_subscription(Nidx, LJID, Subscription, SubId) ->
561
:-(
{ok, State} = get_state(Nidx, LJID, write),
562
:-(
NewSubs = lists:keyreplace(SubId, 2, State#pubsub_state.subscriptions, {Subscription, SubId}),
563
:-(
mnesia:write(State#pubsub_state{ subscriptions = NewSubs }).
564
565 %% ------------------------ Items ------------------------
566
567 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
568 LJID :: jid:ljid(),
569 ItemIds :: [mod_pubsub:itemId()]) ->
570 ok.
571 remove_items(Nidx, LJID, ItemIds) ->
572
:-(
{ok, State} = get_state(Nidx, jid:to_bare(LJID), write),
573
:-(
NewItems = State#pubsub_state.items -- ItemIds,
574
:-(
mnesia:write(State#pubsub_state{ items = NewItems }).
575
576 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
577 ok.
578 remove_all_items(Nidx) ->
579
:-(
{ok, States} = get_states(Nidx),
580
:-(
lists:foreach(fun(#pubsub_state{ items = [] }) ->
581
:-(
ok;
582 (#pubsub_state{} = S) ->
583
:-(
mnesia:write(S#pubsub_state{ items = [] })
584 end, States).
585
586 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
587 JID :: jid:ljid(),
588 Item :: mod_pubsub:pubsubItem()) ->
589 ok.
590 add_item(Nidx, JID, #pubsub_item{itemid = {ItemId, _}} = Item) ->
591
:-(
set_item(Item),
592
:-(
{ok, State} = get_state(Nidx, jid:to_bare(JID), write),
593
:-(
NewItems = [ItemId | State#pubsub_state.items],
594
:-(
mnesia:write(State#pubsub_state{ items = NewItems }).
595
596 %% ------------------------ Direct #pubsub_item access ------------------------
597
598 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
599 {ok, {[mod_pubsub:pubsubItem()], none}}.
600 get_items(Nidx, Opts) ->
601
:-(
Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}),
602
:-(
Sorted = lists:reverse(lists:keysort(#pubsub_item.modification, Items)),
603
:-(
ItemsLimitedByIds = filter_items_by_item_ids(Sorted, maps:get(item_ids, Opts, undefined)),
604
:-(
ItemsLimitedByMaxItems = limit_items(ItemsLimitedByIds, maps:get(max_items, Opts, undefined)),
605
:-(
{ok, {ItemsLimitedByMaxItems, none}}.
606
607 filter_items_by_item_ids(Items, undefined) ->
608
:-(
Items;
609 filter_items_by_item_ids(Items, ItemIds) ->
610
:-(
lists:filter(fun (#pubsub_item{itemid = {ItemId, _}}) ->
611
:-(
lists:member(ItemId, ItemIds)
612 end, Items).
613
614 limit_items(Items, undefined) ->
615
:-(
Items;
616 limit_items(Items, MaxItems) ->
617
:-(
lists:sublist(Items, MaxItems).
618
619 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
620 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
621 get_item(Nidx, ItemId) ->
622
:-(
case mnesia:read({pubsub_item, {ItemId, Nidx}}) of
623
:-(
[Item] when is_record(Item, pubsub_item) -> {ok, Item};
624
:-(
_ -> {error, item_not_found}
625 end.
626
627 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
628 set_item(Item) ->
629
:-(
mnesia:write(Item).
630
631 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
632 del_item(Nidx, ItemId) ->
633
:-(
mnesia:delete({pubsub_item, {ItemId, Nidx}}).
634
635 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
636 del_items(Nidx, ItemIds) ->
637
:-(
lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId) end,
638 ItemIds).
639
640 %%====================================================================
641 %% Internal functions
642 %%====================================================================
643
644 -spec del_state_by_idx_and_ljid(Nidx :: mod_pubsub:nodeIdx(),
645 LJID :: jid:ljid()) -> ok.
646 del_state_by_idx_and_ljid(Nidx, LJID) ->
647
:-(
{ok, State} = get_state(Nidx, LJID, write),
648
:-(
del_state(State).
649
650 -spec del_state(mod_pubsub:pubsubState()) -> ok.
651 del_state(#pubsub_state{ stateid = {LJID, Nidx}, subscriptions = Subs }) ->
652
:-(
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, Subs),
653
:-(
mnesia:delete({pubsub_state, {LJID, Nidx}}).
654
655 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
656 LJID :: jid:ljid(),
657 LockKind :: write | read) ->
658 {ok, mod_pubsub:pubsubState()}.
659 get_state(Nidx, LJID, LockKind) ->
660
:-(
StateId = {LJID, Nidx},
661
:-(
case mnesia:read(pubsub_state, StateId, LockKind) of
662
:-(
[#pubsub_state{} = State] -> {ok, State};
663
:-(
_ -> {ok, #pubsub_state{stateid = StateId}}
664 end.
665
666 -spec states_to_subscriptions([mod_pubsub:pubsubState()]) ->
667 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
668 states_to_subscriptions([]) ->
669
:-(
[];
670 states_to_subscriptions([#pubsub_state{ subscriptions = [] } | RStates]) ->
671
:-(
states_to_subscriptions(RStates);
672 states_to_subscriptions([#pubsub_state{ stateid = {J, _}, subscriptions = Subs } | RStates]) ->
673
:-(
add_jid_and_opts_to_subs(Subs, J, RStates).
674
675 -spec add_jid_and_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}],
676 LJID :: jid:ljid(),
677 RStates :: [mod_pubsub:pubsubState()]) ->
678 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
679 add_jid_and_opts_to_subs([], _J, RStates) ->
680
:-(
states_to_subscriptions(RStates);
681 add_jid_and_opts_to_subs([{S, SubId} | RSubs], J, RStates) ->
682
:-(
Opts = read_sub_options(SubId),
683
:-(
[ {J, S, SubId, Opts} | add_jid_and_opts_to_subs(RSubs, J, RStates) ].
684
685 -spec add_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]) ->
686 [{mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
687 add_opts_to_subs([]) ->
688
:-(
[];
689 add_opts_to_subs([{S, SubId} | RSubs]) ->
690
:-(
Opts = read_sub_options(SubId),
691
:-(
[ {S, SubId, Opts} | add_opts_to_subs(RSubs) ].
692
693 -spec read_sub_options(SubId :: mod_pubsub:subId()) -> mod_pubsub:subOptions().
694 read_sub_options(SubId) ->
695
:-(
case mnesia:read({pubsub_subscription, SubId}) of
696
:-(
[] -> [];
697
:-(
[#pubsub_subscription{ options = Opts0 }] -> Opts0
698 end.
699
700 -spec get_idxs_with_pending_subs(NodeIdxs :: [mod_pubsub:nodeIdx()],
701 PubsubState :: mod_pubsub:pubsubState(),
702 Acc :: [mod_pubsub:nodeIdx()]) ->
703 [mod_pubsub:nodeIdx()].
704 get_idxs_with_pending_subs(NodeIdxs,
705 #pubsub_state{stateid = {_, Nidx}, subscriptions = Subs},
706 Acc) ->
707
:-(
case lists:member(Nidx, NodeIdxs)
708
:-(
andalso lists:any(fun is_pending_sub/1, Subs) of
709
:-(
true -> [Nidx | Acc];
710
:-(
false -> Acc
711 end.
712
713
:-(
is_pending_sub({pending, _}) -> true;
714
:-(
is_pending_sub(_) -> false.
715
Line Hits Source