./ct_report/coverage/mod_pubsub_db_mnesia.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_mnesia.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub Mnesia backend
5 %%% Created : 26 Oct 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_mnesia).
9 -author('piotr.nosek@erlang-solutions.com').
10
11 -behaviour(mod_pubsub_db).
12
13 -include("pubsub.hrl").
14 -include("jlib.hrl").
15 -include("mongoose_logger.hrl").
16 -include_lib("stdlib/include/qlc.hrl").
17
18 -export([init/2, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([del_node/1, get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 set_node/1,
29 find_node_by_id/1,
30 find_nodes_by_key/1,
31 find_node_by_name/2,
32 delete_node/1,
33 get_subnodes/2,
34 get_parentnodes_tree/2,
35 get_subnodes_tree/2
36 ]).
37 % Affiliations
38 -export([
39 set_affiliation/3,
40 get_affiliation/2
41 ]).
42 % Subscriptions
43 -export([
44 add_subscription/5,
45 set_subscription_opts/4,
46 get_node_subscriptions/1,
47 get_node_entity_subscriptions/2,
48 delete_subscription/3,
49 delete_all_subscriptions/2,
50 update_subscription/4
51 ]).
52 % Item ids in state
53 -export([
54 add_item/3,
55 remove_items/3,
56 remove_all_items/1
57 ]).
58
59 % Whole Items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 % GDPR
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Internal records definitions
79 %%====================================================================
80
81 -record(pubsub_subscription, {
82 subid :: mod_pubsub:subId(),
83 options = [] :: mod_pubsub:subOptions()
84 }).
85
86 %%====================================================================
87 %% Behaviour callbacks
88 %%====================================================================
89
90 %% ------------------------ Backend start/stop ------------------------
91
92 init(_HostType, _Opts) ->
93
:-(
mongoose_mnesia:create_table(pubsub_state,
94 [{disc_copies, [node()]}, {type, ordered_set},
95 {attributes, record_info(fields, pubsub_state)}]),
96
:-(
mongoose_mnesia:create_table(pubsub_item,
97 [{disc_only_copies, [node()]},
98 {attributes, record_info(fields, pubsub_item)}]),
99
:-(
mongoose_mnesia:create_table(pubsub_node,
100 [{disc_copies, [node()]},
101 {attributes, record_info(fields, pubsub_node)}]),
102
:-(
mnesia:add_table_index(pubsub_node, id),
103
:-(
mongoose_mnesia:create_table(pubsub_subscription,
104 [{disc_copies, [node()]}, {type, set},
105 {attributes, record_info(fields, pubsub_subscription)}]),
106
:-(
CreateSubnodeTableResult = mongoose_mnesia:create_table(pubsub_subnode,
107 [{disc_copies, [node()]}, {type, bag},
108 {attributes, record_info(fields, pubsub_subnode)}]),
109
:-(
maybe_fill_subnode_table(CreateSubnodeTableResult),
110
:-(
pubsub_index:init(),
111
:-(
ok.
112
113 -spec stop() -> ok.
114 stop() ->
115
:-(
ok.
116
117 %% If pubsub_subnode table was missing, than fill it with data
118 maybe_fill_subnode_table({atomic, ok}) ->
119
:-(
F = fun(#pubsub_node{} = Node, Acc) ->
120
:-(
set_subnodes(Node, []),
121
:-(
Acc
122 end,
123
:-(
mnesia:transaction(fun() -> mnesia:foldl(F, ok, pubsub_node) end);
124 maybe_fill_subnode_table(_Other) ->
125
:-(
ok.
126
127 %% ------------------------ Fun execution ------------------------
128
129 -spec transaction(Fun :: fun(() -> {result | error, any()}),
130 ErrorDebug :: map()) ->
131 {result | error, any()}.
132 transaction(Fun, ErrorDebug) ->
133
:-(
transaction(Fun, ErrorDebug, 3).
134
135 transaction(Fun, ErrorDebug, Retries) ->
136
:-(
case mnesia:transaction(mod_pubsub_db:extra_debug_fun(Fun)) of
137 {atomic, Result} ->
138
:-(
Result;
139 {aborted, ReasonData} when Retries > 0 ->
140
:-(
?LOG_WARNING(#{what => pubsub_transaction_retry, retries => Retries,
141
:-(
reason => ReasonData, debug => ErrorDebug}),
142
:-(
timer:sleep(100),
143
:-(
transaction(Fun, ErrorDebug, Retries - 1);
144 {aborted, ReasonData} ->
145
:-(
?LOG_WARNING(#{what => pubsub_transaction_failed, reason => ReasonData,
146
:-(
debug => ErrorDebug}),
147
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
148 end.
149
150 -spec dirty(Fun :: fun(() -> {result | error, any()}),
151 ErrorDebug :: map()) ->
152 {result | error, any()}.
153 dirty(Fun, ErrorDebug) ->
154
:-(
try mnesia:sync_dirty(mod_pubsub_db:extra_debug_fun(Fun), []) of
155 Result ->
156
:-(
Result
157 catch
158 _C:ReasonData ->
159
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
160 end.
161
162 %% ------------------------ GDPR-related ------------------------
163
164 -spec get_user_payloads(LUser :: jid:luser(), LServer :: jid:lserver()) ->
165 [NodeNameItemIDAndPayload :: [binary()]].
166 get_user_payloads(LUser, LServer) ->
167
:-(
{atomic, Recs} = mnesia:transaction(fun() -> get_user_payloads_t(LUser, LServer) end),
168
:-(
Recs.
169
170 get_user_payloads_t(LUser, LServer) ->
171
:-(
BareUserMatchSpec = {'_', {LUser, LServer, '_'}},
172
:-(
Items = mnesia:match_object(#pubsub_item{creation = BareUserMatchSpec, _ = '_'}),
173
:-(
[[node_name(Nidx), ItemId, << <<(exml:to_binary(P))/binary>> || P <- Payload >>] ||
174
:-(
#pubsub_item{itemid = {ItemId, Nidx}, payload = Payload} <- Items].
175
176 -spec get_user_nodes(LUser :: jid:luser(), LServer :: jid:lserver()) ->
177 [NodeNameAndType :: [binary()]].
178 get_user_nodes(LUser, LServer) ->
179
:-(
LJID = {LUser, LServer, <<>>},
180
:-(
{atomic, Recs} = mnesia:transaction(fun() ->
181
:-(
Nodes = mnesia:match_object(#pubsub_node{owners = [LJID], _ = '_'}),
182
:-(
[[NodeName, Type] || #pubsub_node{nodeid = {_, NodeName}, type = Type} <- Nodes]
183 end),
184
:-(
Recs.
185
186 -spec get_user_subscriptions(LUser :: jid:luser(), LServer :: jid:lserver()) ->
187 [NodeName :: [binary()]].
188 get_user_subscriptions(LUser, LServer) ->
189
:-(
{atomic, Recs} = mnesia:transaction(fun() -> get_user_subscriptions_t(LUser, LServer) end),
190
:-(
Recs.
191
192 get_user_subscriptions_t(LUser, LServer) ->
193
:-(
UserMatchSpec = {LUser, LServer, '_'},
194
:-(
SubscriptionStates
195 = mnesia:match_object(#pubsub_state{stateid = {UserMatchSpec, '_'},
196 subscriptions = [{subscribed, '_'}], _ = '_'}),
197
:-(
[ [node_name(Nidx)] || #pubsub_state{stateid = {_, Nidx}} <- SubscriptionStates].
198
199 node_name(Nidx) ->
200
:-(
case find_node_by_id(Nidx) of
201
:-(
{ok, #pubsub_node{ nodeid = {_, NodeName} }} -> NodeName;
202
:-(
_ -> <<>>
203 end.
204
205 -spec find_nodes_by_affiliated_user(JID :: jid:ljid()) ->
206 [{mod_pubsub:pubsubNode(), mod_pubsub:affiliation()}].
207 find_nodes_by_affiliated_user(LJID) ->
208
:-(
{ok, States} = get_states_by_lus(LJID),
209
:-(
lists:map(fun(#pubsub_state{ stateid = {_, Nidx}, affiliation = Aff }) ->
210
:-(
{ok, Node} = find_node_by_id(Nidx),
211
:-(
{Node, Aff}
212 end, States).
213
214 %% ------------------------ Direct #pubsub_state access ------------------------
215
216 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
217 LJID :: jid:ljid()) ->
218 {ok, mod_pubsub:pubsubState()}.
219 get_state(Nidx, LJID) ->
220
:-(
get_state(Nidx, LJID, read).
221
222 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
223 {ok, [mod_pubsub:pubsubState()]}.
224 get_states(Nidx) ->
225
:-(
States = mnesia:match_object(#pubsub_state{stateid = {'_', Nidx}, _ = '_'}),
226
:-(
{ok, States}.
227
228 -spec get_states_by_lus(LJID :: jid:ljid()) ->
229 {ok, [mod_pubsub:pubsubState()]}.
230 get_states_by_lus({ LUser, LServer, _ }) ->
231
:-(
{ok, mnesia:match_object(#pubsub_state{stateid = {{LUser, LServer, '_'}, '_'}, _ = '_'})}.
232
233 -spec get_states_by_bare(LJID :: jid:ljid()) ->
234 {ok, [mod_pubsub:pubsubState()]}.
235 get_states_by_bare(LJID) ->
236
:-(
LBare = jid:to_bare(LJID),
237
:-(
{ok, mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
238
239 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
240 {ok, [mod_pubsub:pubsubState()]}.
241 get_states_by_bare_and_full(LJID) ->
242
:-(
LBare = jid:to_bare(LJID),
243
:-(
{ok, mnesia:match_object(#pubsub_state{stateid = {LJID, '_'}, _ = '_'})
244 ++ mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}.
245
246 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
247 {ok, [mod_pubsub:nodeIdx()]}.
248 get_idxs_of_own_nodes_with_pending_subs(LJID) ->
249
:-(
LBare = jid:to_bare(LJID),
250
:-(
MyStates = mnesia:match_object(#pubsub_state{stateid = {LBare, '_'},
251 affiliation = owner, _ = '_'}),
252
:-(
NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- MyStates],
253
:-(
ResultNidxs = mnesia:foldl(pa:bind(fun get_idxs_with_pending_subs/3, NodeIdxs),
254 [], pubsub_state),
255
:-(
{ok, ResultNidxs}.
256
257 %% ------------------------ Node management ------------------------
258
259 -spec create_node(Nidx :: mod_pubsub:nodeIdx(),
260 Owner :: jid:ljid()) ->
261 ok.
262 create_node(Nidx, Owner) ->
263
:-(
set_affiliation(Nidx, Owner, owner).
264
265 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
266 {ok, [mod_pubsub:pubsubState()]}.
267 del_node(Nidx) ->
268
:-(
{ok, States} = get_states(Nidx),
269
:-(
lists:foreach(fun (#pubsub_state{stateid = {LJID, _}, items = Items}) ->
270
:-(
del_items(Nidx, Items),
271
:-(
del_state_by_idx_and_ljid(Nidx, LJID)
272 end, States),
273
:-(
{ok, States}.
274
275 -spec set_node(mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
276 set_node(#pubsub_node{id = undefined} = Node) ->
277
:-(
CreateNode = Node#pubsub_node{id = pubsub_index:new(node)},
278
:-(
OldParents = [],
279
:-(
set_node(CreateNode, OldParents);
280 set_node(Node) ->
281
:-(
OldParents = get_parentnodes_names(Node),
282
:-(
set_node(Node, OldParents).
283
284 set_node(#pubsub_node{id = Nidx} = Node, OldParents) ->
285
:-(
mnesia:write(Node),
286
:-(
set_subnodes(Node, OldParents),
287
:-(
{ok, Nidx}.
288
289 get_parentnodes_names(#pubsub_node{nodeid = NodeId}) ->
290
:-(
case mnesia:read(pubsub_node, NodeId, write) of
291 [] ->
292
:-(
[];
293 [#pubsub_node{parents = Parents}] ->
294
:-(
Parents
295 end.
296
297 set_subnodes(#pubsub_node{parents = Parents}, Parents) ->
298
:-(
ok;
299 set_subnodes(#pubsub_node{nodeid = NodeId, parents = NewParents}, OldParents) ->
300
:-(
set_subnodes(NodeId, NewParents, OldParents).
301
302 set_subnodes({Key, Node}, NewParents, OldParents) ->
303
:-(
OldParentsSet = sets:from_list(OldParents),
304
:-(
NewParentsSet = sets:from_list(NewParents),
305
:-(
Deleted = sets:to_list(sets:subtract(OldParentsSet, NewParentsSet)),
306
:-(
Added = sets:to_list(sets:subtract(NewParentsSet, OldParentsSet)),
307
:-(
DeletedObjects = names_to_subnode_records(Key, Node, Deleted),
308
:-(
AddedObjects = names_to_subnode_records(Key, Node, Added),
309
:-(
lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, DeletedObjects),
310
:-(
lists:foreach(fun(Object) -> mnesia:write(Object) end, AddedObjects),
311
:-(
ok.
312
313 names_to_subnode_records(Key, Node, Parents) ->
314
:-(
[#pubsub_subnode{nodeid = {Key, Parent}, subnode = Node} || Parent <- Parents].
315
316 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
317 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
318 find_node_by_id(Nidx) ->
319
:-(
case mnesia:index_read(pubsub_node, Nidx, #pubsub_node.id) of
320
:-(
[#pubsub_node{} = Record] -> {ok, Record};
321 [] ->
322
:-(
{error, not_found}
323 end.
324
325 -spec find_node_by_name(
326 Key :: mod_pubsub:hostPubsub() | jid:ljid(),
327 Node :: mod_pubsub:nodeId()) ->
328 mod_pubsub:pubsubNode() | false.
329 find_node_by_name(Key, Node) ->
330
:-(
case mnesia:read(pubsub_node, oid(Key, Node), read) of
331
:-(
[] -> false;
332
:-(
[NodeRec] -> NodeRec
333 end.
334
335 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
336 [mod_pubsub:pubsubNode()].
337 find_nodes_by_key(Key) ->
338
:-(
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, _ = '_'}).
339
340
:-(
oid(Key, Name) -> {Key, Name}.
341
342
343 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
344 delete_node(#pubsub_node{nodeid = NodeId, parents = Parents}) ->
345
:-(
set_subnodes(NodeId, [], Parents),
346
:-(
mnesia:delete({pubsub_node, NodeId}).
347
348
349 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
350 [mod_pubsub:pubsubNode()].
351 get_subnodes(Key, <<>>) ->
352
:-(
get_nodes_without_parents(Key);
353 get_subnodes(Key, Node) ->
354
:-(
Subnodes = get_subnodes_names(Key, Node),
355
:-(
find_nodes_by_names(Key, Subnodes).
356
357 %% Like get_subnodes, but returns node names instead of node records.
358 get_subnodes_names(Key, Node) ->
359
:-(
MnesiaRecords = mnesia:read({pubsub_subnode, {Key, Node}}),
360
:-(
[Subnode || #pubsub_subnode{subnode = Subnode} <- MnesiaRecords].
361
362 %% Warning: this function is full table scan and can return a lot of records
363 get_nodes_without_parents(Key) ->
364
:-(
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, parents = [], _ = '_'}).
365
366 %% Return a list of {Depth, Nodes} where Nodes are parent nodes of nodes of the lower level.
367 %% So, we start with {0, [NodeRecord]}, where NodeRecord is node that corresponds to Node argument.
368 %% The next level is {1, Nodes1}, where Nodes1 are parent nodes of Node.
369 %% The next level can be {2, Nodes2}, where Nodes2 are parents of Nodes1 (without duplicates).
370 %%
371 %% Each node can be returned only ones by the function.
372 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
373 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
374 get_parentnodes_tree(Key, Node) ->
375
:-(
case find_node_by_name(Key, Node) of
376 false ->
377
:-(
[ {0, []} ]; %% node not found case
378
379 #pubsub_node{parents = []} = Record ->
380
:-(
[ {0, [Record]} ];
381
382 #pubsub_node{parents = Parents} = Record ->
383
:-(
Depth = 1,
384 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
385 %% which we don't expand again
386
:-(
KnownNodesSet = sets:from_list([Node]),
387
:-(
extract_parents(Key, Node, Parents, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
388 end.
389
390 %% Each call extract Parents on the level and recurse to the next level.
391 %% KnownNodesSet are nodes to be filtered out.
392 extract_parents(Key, InitialNode, Parents, Depth, KnownNodesSet) ->
393
:-(
ParentRecords = find_nodes_by_names(Key, Parents),
394
:-(
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Parents)),
395 %% Names of parents of parents
396
:-(
PPNames = lists:usort(lists:flatmap(fun(#pubsub_node{parents = PP}) -> PP end, ParentRecords)),
397
:-(
CyclicNames = [Name || Name <- PPNames, sets:is_element(Name, KnownNodesSet1)],
398
:-(
case CyclicNames of
399
:-(
[] -> [];
400
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
401
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
402 end,
403 %% PPNames is ordset, so we don't need to worry about having duplicates in it.
404 %% CyclicNames is usually an empty list.
405
:-(
PPNamesToGet = PPNames -- CyclicNames,
406 case PPNamesToGet of
407
:-(
[] -> [];
408
:-(
_ -> extract_parents(Key, InitialNode, PPNamesToGet, Depth + 1, KnownNodesSet1)
409
:-(
end ++ [ {Depth, ParentRecords} ].
410
411 find_nodes_by_names(Key, Nodes) ->
412 %% Contains false for missing nodes
413
:-(
MaybeRecords = [find_node_by_name(Key, Node) || Node <- Nodes],
414 %% Filter out false-s
415
:-(
[Record || Record = #pubsub_node{} <- MaybeRecords].
416
417
418 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
419 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
420 get_subnodes_tree(Key, Node) ->
421
:-(
case find_node_by_name(Key, Node) of
422 false ->
423
:-(
[ {1, []}, {0, [false]} ]; %% node not found case
424
425 #pubsub_node{} = Record ->
426
:-(
Subnodes = get_subnodes_names(Key, Node),
427
:-(
Depth = 1,
428 %% To avoid accidental cyclic issues, let's maintain the list of known nodes
429 %% which we don't expand again
430
:-(
KnownNodesSet = sets:from_list([Node]),
431
:-(
extract_subnodes(Key, Node, Subnodes, Depth, KnownNodesSet) ++ [ {0, [Record]} ]
432 end.
433
434 %% Each call extract Subnodes on the level and recurse to the next level.
435 %% KnownNodesSet are nodes to be filtered out.
436 extract_subnodes(Key, InitialNode, Subnodes, Depth, KnownNodesSet) ->
437
:-(
SubnodesRecords = find_nodes_by_names(Key, Subnodes),
438
:-(
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Subnodes)),
439 %% Names of subnodes of subnodes
440
:-(
SSNames = lists:usort(lists:flatmap(fun(Subnode) -> get_subnodes_names(Key, Subnode) end, Subnodes)),
441
:-(
CyclicNames = [Name || Name <- SSNames, sets:is_element(Name, KnownNodesSet1)],
442
:-(
case CyclicNames of
443
:-(
[] -> [];
444
:-(
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected,
445
:-(
pubsub_node => InitialNode, cyclic_names => CyclicNames})
446 end,
447
:-(
SSNamesToGet = SSNames -- CyclicNames,
448 case SSNamesToGet of
449
:-(
[] -> [ {Depth + 1, []} ];
450
:-(
_ -> extract_subnodes(Key, InitialNode, SSNamesToGet, Depth + 1, KnownNodesSet1)
451
:-(
end ++ [ {Depth, SubnodesRecords} ].
452
453 %% ------------------------ Affiliations ------------------------
454
455 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
456 LJID :: jid:ljid(),
457 Affiliation :: mod_pubsub:affiliation()) -> ok.
458 set_affiliation(Nidx, LJID, Affiliation) ->
459
:-(
BareLJID = jid:to_bare(LJID),
460
:-(
{ok, State} = get_state(Nidx, BareLJID, write),
461
:-(
case {Affiliation, State#pubsub_state.subscriptions} of
462
:-(
{none, []} -> del_state_by_idx_and_ljid(Nidx, BareLJID);
463
:-(
_ -> mnesia:write(State#pubsub_state{ affiliation = Affiliation })
464 end.
465
466 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
467 LJID :: jid:ljid()) ->
468 {ok, mod_pubsub:affiliation()}.
469 get_affiliation(Nidx, LJID) ->
470
:-(
{ok, State} = get_state(Nidx, jid:to_bare(LJID), read),
471
:-(
{ok, State#pubsub_state.affiliation}.
472
473 %% ------------------------ Subscriptions ------------------------
474
475 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
476 LJID :: jid:ljid(),
477 Sub :: mod_pubsub:subscription(),
478 SubId :: mod_pubsub:subId(),
479 SubOpts :: mod_pubsub:subOptions()) -> ok.
480 add_subscription(Nidx, LJID, Sub, SubId, SubOpts) ->
481
:-(
{ok, State} = get_state(Nidx, LJID, write),
482
:-(
NSubscriptions = [{Sub, SubId} | State#pubsub_state.subscriptions ],
483
:-(
mnesia:write(State#pubsub_state{ subscriptions = NSubscriptions }),
484
:-(
set_subscription_opts(Nidx, LJID, SubId, SubOpts).
485
486 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
487 JID :: jid:ljid(),
488 SubId :: mod_pubsub:subId(),
489 Opts :: mod_pubsub:subOptions()) -> ok.
490 set_subscription_opts(_Nidx, _JID, SubId, Opts) ->
491
:-(
mnesia:write(#pubsub_subscription{ subid = SubId, options = Opts }).
492
493 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
494 {ok, [{Entity :: jid:ljid(),
495 Sub :: mod_pubsub:subscription(),
496 SubId :: mod_pubsub:subId(),
497 Opts :: mod_pubsub:subOptions()}]}.
498 get_node_subscriptions(Nidx) ->
499
:-(
{ok, States} = get_states(Nidx),
500
:-(
{ok, states_to_subscriptions(States)}.
501
502 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
503 LJID :: jid:ljid()) ->
504 {ok, [{Sub :: mod_pubsub:subscription(),
505 SubId :: mod_pubsub:subId(),
506 Opts :: mod_pubsub:subOptions()}]}.
507 get_node_entity_subscriptions(Nidx, LJID) ->
508
:-(
{ok, State} = get_state(Nidx, LJID, read),
509
:-(
{ok, add_opts_to_subs(State#pubsub_state.subscriptions)}.
510
511 -spec delete_subscription(
512 Nidx :: mod_pubsub:nodeIdx(),
513 LJID :: jid:ljid(),
514 SubId :: mod_pubsub:subId()) ->
515 ok.
516 delete_subscription(Nidx, LJID, SubId) ->
517
:-(
{ok, State} = get_state(Nidx, LJID, write),
518
:-(
NewSubs = lists:keydelete(SubId, 2, State#pubsub_state.subscriptions),
519
:-(
mnesia:delete({pubsub_subscription, SubId}),
520
:-(
case {State#pubsub_state.affiliation, NewSubs} of
521
:-(
{none, []} -> del_state_by_idx_and_ljid(Nidx, LJID);
522
:-(
_ -> mnesia:write(State#pubsub_state{subscriptions = NewSubs})
523 end.
524
525 -spec delete_all_subscriptions(
526 Nidx :: mod_pubsub:nodeIdx(),
527 LJID :: jid:ljid()) ->
528 ok.
529 delete_all_subscriptions(Nidx, LJID) ->
530
:-(
{ok, State} = get_state(Nidx, LJID, write),
531
:-(
delete_all_subscriptions_by_state(State).
532
533 -spec delete_user_subscriptions(jid:ljid()) -> ok.
534 delete_user_subscriptions(LJID) ->
535
:-(
{ok, States} = get_states_by_lus(LJID),
536
:-(
lists:foreach(fun delete_all_subscriptions_by_state/1, States).
537
538 -spec delete_all_subscriptions_by_state(mod_pubsub:pubsubState()) -> ok.
539 delete_all_subscriptions_by_state(State) ->
540
:-(
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end,
541 State#pubsub_state.subscriptions),
542
:-(
case State#pubsub_state.affiliation of
543
:-(
none -> del_state(State);
544
:-(
_ -> mnesia:write(State#pubsub_state{subscriptions = []})
545 end.
546
547 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
548 LJID :: jid:ljid(),
549 Subscription :: mod_pubsub:subscription(),
550 SubId :: mod_pubsub:subId()) ->
551 ok.
552 update_subscription(Nidx, LJID, Subscription, SubId) ->
553
:-(
{ok, State} = get_state(Nidx, LJID, write),
554
:-(
NewSubs = lists:keyreplace(SubId, 2, State#pubsub_state.subscriptions, {Subscription, SubId}),
555
:-(
mnesia:write(State#pubsub_state{ subscriptions = NewSubs }).
556
557 %% ------------------------ Items ------------------------
558
559 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
560 LJID :: jid:ljid(),
561 ItemIds :: [mod_pubsub:itemId()]) ->
562 ok.
563 remove_items(Nidx, LJID, ItemIds) ->
564
:-(
{ok, State} = get_state(Nidx, jid:to_bare(LJID), write),
565
:-(
NewItems = State#pubsub_state.items -- ItemIds,
566
:-(
mnesia:write(State#pubsub_state{ items = NewItems }).
567
568 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
569 ok.
570 remove_all_items(Nidx) ->
571
:-(
{ok, States} = get_states(Nidx),
572
:-(
lists:foreach(fun(#pubsub_state{ items = [] }) ->
573
:-(
ok;
574 (#pubsub_state{} = S) ->
575
:-(
mnesia:write(S#pubsub_state{ items = [] })
576 end, States).
577
578 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
579 JID :: jid:ljid(),
580 Item :: mod_pubsub:pubsubItem()) ->
581 ok.
582 add_item(Nidx, JID, #pubsub_item{itemid = {ItemId, _}} = Item) ->
583
:-(
set_item(Item),
584
:-(
{ok, State} = get_state(Nidx, jid:to_bare(JID), write),
585
:-(
NewItems = [ItemId | State#pubsub_state.items],
586
:-(
mnesia:write(State#pubsub_state{ items = NewItems }).
587
588 %% ------------------------ Direct #pubsub_item access ------------------------
589
590 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
591 {ok, {[mod_pubsub:pubsubItem()], none}}.
592 get_items(Nidx, Opts) ->
593
:-(
Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}),
594
:-(
Sorted = lists:reverse(lists:keysort(#pubsub_item.modification, Items)),
595
:-(
ItemsLimitedByIds = filter_items_by_item_ids(Sorted, maps:get(item_ids, Opts, undefined)),
596
:-(
ItemsLimitedByMaxItems = limit_items(ItemsLimitedByIds, maps:get(max_items, Opts, undefined)),
597
:-(
{ok, {ItemsLimitedByMaxItems, none}}.
598
599 filter_items_by_item_ids(Items, undefined) ->
600
:-(
Items;
601 filter_items_by_item_ids(Items, ItemIds) ->
602
:-(
lists:filter(fun (#pubsub_item{itemid = {ItemId, _}}) ->
603
:-(
lists:member(ItemId, ItemIds)
604 end, Items).
605
606 limit_items(Items, undefined) ->
607
:-(
Items;
608 limit_items(Items, MaxItems) ->
609
:-(
lists:sublist(Items, MaxItems).
610
611 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
612 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
613 get_item(Nidx, ItemId) ->
614
:-(
case mnesia:read({pubsub_item, {ItemId, Nidx}}) of
615
:-(
[Item] when is_record(Item, pubsub_item) -> {ok, Item};
616
:-(
_ -> {error, item_not_found}
617 end.
618
619 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
620 set_item(Item) ->
621
:-(
mnesia:write(Item).
622
623 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
624 del_item(Nidx, ItemId) ->
625
:-(
mnesia:delete({pubsub_item, {ItemId, Nidx}}).
626
627 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
628 del_items(Nidx, ItemIds) ->
629
:-(
lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId) end,
630 ItemIds).
631
632 %%====================================================================
633 %% Internal functions
634 %%====================================================================
635
636 -spec del_state_by_idx_and_ljid(Nidx :: mod_pubsub:nodeIdx(),
637 LJID :: jid:ljid()) -> ok.
638 del_state_by_idx_and_ljid(Nidx, LJID) ->
639
:-(
{ok, State} = get_state(Nidx, LJID, write),
640
:-(
del_state(State).
641
642 -spec del_state(mod_pubsub:pubsubState()) -> ok.
643 del_state(#pubsub_state{ stateid = {LJID, Nidx}, subscriptions = Subs }) ->
644
:-(
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, Subs),
645
:-(
mnesia:delete({pubsub_state, {LJID, Nidx}}).
646
647 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
648 LJID :: jid:ljid(),
649 LockKind :: write | read) ->
650 {ok, mod_pubsub:pubsubState()}.
651 get_state(Nidx, LJID, LockKind) ->
652
:-(
StateId = {LJID, Nidx},
653
:-(
case mnesia:read(pubsub_state, StateId, LockKind) of
654
:-(
[#pubsub_state{} = State] -> {ok, State};
655
:-(
_ -> {ok, #pubsub_state{stateid = StateId}}
656 end.
657
658 -spec states_to_subscriptions([mod_pubsub:pubsubState()]) ->
659 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
660 states_to_subscriptions([]) ->
661
:-(
[];
662 states_to_subscriptions([#pubsub_state{ subscriptions = [] } | RStates]) ->
663
:-(
states_to_subscriptions(RStates);
664 states_to_subscriptions([#pubsub_state{ stateid = {J, _}, subscriptions = Subs } | RStates]) ->
665
:-(
add_jid_and_opts_to_subs(Subs, J, RStates).
666
667 -spec add_jid_and_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}],
668 LJID :: jid:ljid(),
669 RStates :: [mod_pubsub:pubsubState()]) ->
670 [{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
671 add_jid_and_opts_to_subs([], _J, RStates) ->
672
:-(
states_to_subscriptions(RStates);
673 add_jid_and_opts_to_subs([{S, SubId} | RSubs], J, RStates) ->
674
:-(
Opts = read_sub_options(SubId),
675
:-(
[ {J, S, SubId, Opts} | add_jid_and_opts_to_subs(RSubs, J, RStates) ].
676
677 -spec add_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]) ->
678 [{mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}].
679 add_opts_to_subs([]) ->
680
:-(
[];
681 add_opts_to_subs([{S, SubId} | RSubs]) ->
682
:-(
Opts = read_sub_options(SubId),
683
:-(
[ {S, SubId, Opts} | add_opts_to_subs(RSubs) ].
684
685 -spec read_sub_options(SubId :: mod_pubsub:subId()) -> mod_pubsub:subOptions().
686 read_sub_options(SubId) ->
687
:-(
case mnesia:read({pubsub_subscription, SubId}) of
688
:-(
[] -> [];
689
:-(
[#pubsub_subscription{ options = Opts0 }] -> Opts0
690 end.
691
692 -spec get_idxs_with_pending_subs(NodeIdxs :: [mod_pubsub:nodeIdx()],
693 PubsubState :: mod_pubsub:pubsubState(),
694 Acc :: [mod_pubsub:nodeIdx()]) ->
695 [mod_pubsub:nodeIdx()].
696 get_idxs_with_pending_subs(NodeIdxs,
697 #pubsub_state{stateid = {_, Nidx}, subscriptions = Subs},
698 Acc) ->
699
:-(
case lists:member(Nidx, NodeIdxs)
700
:-(
andalso lists:any(fun is_pending_sub/1, Subs) of
701
:-(
true -> [Nidx | Acc];
702
:-(
false -> Acc
703 end.
704
705
:-(
is_pending_sub({pending, _}) -> true;
706
:-(
is_pending_sub(_) -> false.
707
Line Hits Source