1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mod_pubsub_db_mnesia.erl |
3 |
|
%%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com> |
4 |
|
%%% Purpose : PubSub Mnesia backend |
5 |
|
%%% Created : 26 Oct 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com> |
6 |
|
%%%---------------------------------------------------------------------- |
7 |
|
|
8 |
|
-module(mod_pubsub_db_mnesia). |
9 |
|
-author('piotr.nosek@erlang-solutions.com'). |
10 |
|
|
11 |
|
-behaviour(mod_pubsub_db). |
12 |
|
|
13 |
|
-include("pubsub.hrl"). |
14 |
|
-include("jlib.hrl"). |
15 |
|
-include("mongoose_logger.hrl"). |
16 |
|
-include_lib("stdlib/include/qlc.hrl"). |
17 |
|
|
18 |
|
-export([init/2, stop/0]). |
19 |
|
% Funs execution |
20 |
|
-export([transaction/2, dirty/2]). |
21 |
|
% Direct #pubsub_state access |
22 |
|
-export([del_node/1, get_state/2, |
23 |
|
get_states/1, get_states_by_lus/1, get_states_by_bare/1, |
24 |
|
get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]). |
25 |
|
% Node management |
26 |
|
-export([ |
27 |
|
create_node/2, |
28 |
|
set_node/1, |
29 |
|
find_node_by_id/1, |
30 |
|
find_nodes_by_key/1, |
31 |
|
find_node_by_name/2, |
32 |
|
delete_node/1, |
33 |
|
get_subnodes/2, |
34 |
|
get_parentnodes_tree/2, |
35 |
|
get_subnodes_tree/2 |
36 |
|
]). |
37 |
|
% Affiliations |
38 |
|
-export([ |
39 |
|
set_affiliation/3, |
40 |
|
get_affiliation/2 |
41 |
|
]). |
42 |
|
% Subscriptions |
43 |
|
-export([ |
44 |
|
add_subscription/5, |
45 |
|
set_subscription_opts/4, |
46 |
|
get_node_subscriptions/1, |
47 |
|
get_node_entity_subscriptions/2, |
48 |
|
delete_subscription/3, |
49 |
|
delete_all_subscriptions/2, |
50 |
|
update_subscription/4 |
51 |
|
]). |
52 |
|
% Item ids in state |
53 |
|
-export([ |
54 |
|
add_item/3, |
55 |
|
remove_items/3, |
56 |
|
remove_all_items/1 |
57 |
|
]). |
58 |
|
|
59 |
|
% Whole Items |
60 |
|
-export([ |
61 |
|
get_items/2, |
62 |
|
get_item/2, |
63 |
|
set_item/1, |
64 |
|
del_item/2, |
65 |
|
del_items/2 |
66 |
|
]). |
67 |
|
|
68 |
|
% GDPR |
69 |
|
-export([ |
70 |
|
get_user_payloads/2, |
71 |
|
get_user_nodes/2, |
72 |
|
get_user_subscriptions/2, |
73 |
|
delete_user_subscriptions/1, |
74 |
|
find_nodes_by_affiliated_user/1 |
75 |
|
]). |
76 |
|
|
77 |
|
%%==================================================================== |
78 |
|
%% Internal records definitions |
79 |
|
%%==================================================================== |
80 |
|
|
81 |
|
-record(pubsub_subscription, { |
82 |
|
subid :: mod_pubsub:subId(), |
83 |
|
options = [] :: mod_pubsub:subOptions() |
84 |
|
}). |
85 |
|
|
86 |
|
%%==================================================================== |
87 |
|
%% Behaviour callbacks |
88 |
|
%%==================================================================== |
89 |
|
|
90 |
|
%% ------------------------ Backend start/stop ------------------------ |
91 |
|
|
92 |
|
init(_HostType, _Opts) -> |
93 |
35 |
mongoose_mnesia:create_table(pubsub_state, |
94 |
|
[{disc_copies, [node()]}, {type, ordered_set}, |
95 |
|
{attributes, record_info(fields, pubsub_state)}]), |
96 |
35 |
mongoose_mnesia:create_table(pubsub_item, |
97 |
|
[{disc_only_copies, [node()]}, |
98 |
|
{attributes, record_info(fields, pubsub_item)}]), |
99 |
35 |
mongoose_mnesia:create_table(pubsub_node, |
100 |
|
[{disc_copies, [node()]}, |
101 |
|
{attributes, record_info(fields, pubsub_node)}]), |
102 |
35 |
mnesia:add_table_index(pubsub_node, id), |
103 |
35 |
mongoose_mnesia:create_table(pubsub_subscription, |
104 |
|
[{disc_copies, [node()]}, {type, set}, |
105 |
|
{attributes, record_info(fields, pubsub_subscription)}]), |
106 |
35 |
CreateSubnodeTableResult = mongoose_mnesia:create_table(pubsub_subnode, |
107 |
|
[{disc_copies, [node()]}, {type, bag}, |
108 |
|
{attributes, record_info(fields, pubsub_subnode)}]), |
109 |
35 |
maybe_fill_subnode_table(CreateSubnodeTableResult), |
110 |
35 |
pubsub_index:init(), |
111 |
35 |
ok. |
112 |
|
|
113 |
|
-spec stop() -> ok. |
114 |
|
stop() -> |
115 |
35 |
ok. |
116 |
|
|
117 |
|
%% If pubsub_subnode table was missing, than fill it with data |
118 |
|
maybe_fill_subnode_table({atomic, ok}) -> |
119 |
2 |
F = fun(#pubsub_node{} = Node, Acc) -> |
120 |
:-( |
set_subnodes(Node, []), |
121 |
:-( |
Acc |
122 |
|
end, |
123 |
2 |
mnesia:transaction(fun() -> mnesia:foldl(F, ok, pubsub_node) end); |
124 |
|
maybe_fill_subnode_table(_Other) -> |
125 |
33 |
ok. |
126 |
|
|
127 |
|
%% ------------------------ Fun execution ------------------------ |
128 |
|
|
129 |
|
-spec transaction(Fun :: fun(() -> {result | error, any()}), |
130 |
|
ErrorDebug :: map()) -> |
131 |
|
{result | error, any()}. |
132 |
|
transaction(Fun, ErrorDebug) -> |
133 |
379 |
transaction(Fun, ErrorDebug, 3). |
134 |
|
|
135 |
|
transaction(Fun, ErrorDebug, Retries) -> |
136 |
398 |
case mnesia:transaction(mod_pubsub_db:extra_debug_fun(Fun)) of |
137 |
|
{atomic, Result} -> |
138 |
379 |
Result; |
139 |
|
{aborted, ReasonData} when Retries > 0 -> |
140 |
19 |
?LOG_WARNING(#{what => pubsub_transaction_retry, retries => Retries, |
141 |
:-( |
reason => ReasonData, debug => ErrorDebug}), |
142 |
19 |
timer:sleep(100), |
143 |
19 |
transaction(Fun, ErrorDebug, Retries - 1); |
144 |
|
{aborted, ReasonData} -> |
145 |
:-( |
?LOG_WARNING(#{what => pubsub_transaction_failed, reason => ReasonData, |
146 |
:-( |
debug => ErrorDebug}), |
147 |
:-( |
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed) |
148 |
|
end. |
149 |
|
|
150 |
|
-spec dirty(Fun :: fun(() -> {result | error, any()}), |
151 |
|
ErrorDebug :: map()) -> |
152 |
|
{result | error, any()}. |
153 |
|
dirty(Fun, ErrorDebug) -> |
154 |
1679 |
try mnesia:sync_dirty(mod_pubsub_db:extra_debug_fun(Fun), []) of |
155 |
|
Result -> |
156 |
1679 |
Result |
157 |
|
catch |
158 |
|
_C:ReasonData -> |
159 |
:-( |
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed) |
160 |
|
end. |
161 |
|
|
162 |
|
%% ------------------------ GDPR-related ------------------------ |
163 |
|
|
164 |
|
-spec get_user_payloads(LUser :: jid:luser(), LServer :: jid:lserver()) -> |
165 |
|
[NodeNameItemIDAndPayload :: [binary()]]. |
166 |
|
get_user_payloads(LUser, LServer) -> |
167 |
30 |
{atomic, Recs} = mnesia:transaction(fun() -> get_user_payloads_t(LUser, LServer) end), |
168 |
30 |
Recs. |
169 |
|
|
170 |
|
get_user_payloads_t(LUser, LServer) -> |
171 |
30 |
BareUserMatchSpec = {'_', {LUser, LServer, '_'}}, |
172 |
30 |
Items = mnesia:match_object(#pubsub_item{creation = BareUserMatchSpec, _ = '_'}), |
173 |
30 |
[[node_name(Nidx), ItemId, << <<(exml:to_binary(P))/binary>> || P <- Payload >>] || |
174 |
30 |
#pubsub_item{itemid = {ItemId, Nidx}, payload = Payload} <- Items]. |
175 |
|
|
176 |
|
-spec get_user_nodes(LUser :: jid:luser(), LServer :: jid:lserver()) -> |
177 |
|
[NodeNameAndType :: [binary()]]. |
178 |
|
get_user_nodes(LUser, LServer) -> |
179 |
30 |
LJID = {LUser, LServer, <<>>}, |
180 |
30 |
{atomic, Recs} = mnesia:transaction(fun() -> |
181 |
30 |
Nodes = mnesia:match_object(#pubsub_node{owners = [LJID], _ = '_'}), |
182 |
30 |
[[NodeName, Type] || #pubsub_node{nodeid = {_, NodeName}, type = Type} <- Nodes] |
183 |
|
end), |
184 |
30 |
Recs. |
185 |
|
|
186 |
|
-spec get_user_subscriptions(LUser :: jid:luser(), LServer :: jid:lserver()) -> |
187 |
|
[NodeName :: [binary()]]. |
188 |
|
get_user_subscriptions(LUser, LServer) -> |
189 |
30 |
{atomic, Recs} = mnesia:transaction(fun() -> get_user_subscriptions_t(LUser, LServer) end), |
190 |
30 |
Recs. |
191 |
|
|
192 |
|
get_user_subscriptions_t(LUser, LServer) -> |
193 |
30 |
UserMatchSpec = {LUser, LServer, '_'}, |
194 |
30 |
SubscriptionStates |
195 |
|
= mnesia:match_object(#pubsub_state{stateid = {UserMatchSpec, '_'}, |
196 |
|
subscriptions = [{subscribed, '_'}], _ = '_'}), |
197 |
30 |
[ [node_name(Nidx)] || #pubsub_state{stateid = {_, Nidx}} <- SubscriptionStates]. |
198 |
|
|
199 |
|
node_name(Nidx) -> |
200 |
22 |
case find_node_by_id(Nidx) of |
201 |
22 |
{ok, #pubsub_node{ nodeid = {_, NodeName} }} -> NodeName; |
202 |
:-( |
_ -> <<>> |
203 |
|
end. |
204 |
|
|
205 |
|
-spec find_nodes_by_affiliated_user(JID :: jid:ljid()) -> |
206 |
|
[{mod_pubsub:pubsubNode(), mod_pubsub:affiliation()}]. |
207 |
|
find_nodes_by_affiliated_user(LJID) -> |
208 |
96 |
{ok, States} = get_states_by_lus(LJID), |
209 |
96 |
lists:map(fun(#pubsub_state{ stateid = {_, Nidx}, affiliation = Aff }) -> |
210 |
14 |
{ok, Node} = find_node_by_id(Nidx), |
211 |
14 |
{Node, Aff} |
212 |
|
end, States). |
213 |
|
|
214 |
|
%% ------------------------ Direct #pubsub_state access ------------------------ |
215 |
|
|
216 |
|
-spec get_state(Nidx :: mod_pubsub:nodeIdx(), |
217 |
|
LJID :: jid:ljid()) -> |
218 |
|
{ok, mod_pubsub:pubsubState()}. |
219 |
|
get_state(Nidx, LJID) -> |
220 |
266 |
get_state(Nidx, LJID, read). |
221 |
|
|
222 |
|
-spec get_states(Nidx :: mod_pubsub:nodeIdx()) -> |
223 |
|
{ok, [mod_pubsub:pubsubState()]}. |
224 |
|
get_states(Nidx) -> |
225 |
757 |
States = mnesia:match_object(#pubsub_state{stateid = {'_', Nidx}, _ = '_'}), |
226 |
746 |
{ok, States}. |
227 |
|
|
228 |
|
-spec get_states_by_lus(LJID :: jid:ljid()) -> |
229 |
|
{ok, [mod_pubsub:pubsubState()]}. |
230 |
|
get_states_by_lus({ LUser, LServer, _ }) -> |
231 |
210 |
{ok, mnesia:match_object(#pubsub_state{stateid = {{LUser, LServer, '_'}, '_'}, _ = '_'})}. |
232 |
|
|
233 |
|
-spec get_states_by_bare(LJID :: jid:ljid()) -> |
234 |
|
{ok, [mod_pubsub:pubsubState()]}. |
235 |
|
get_states_by_bare(LJID) -> |
236 |
414 |
LBare = jid:to_bare(LJID), |
237 |
414 |
{ok, mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}. |
238 |
|
|
239 |
|
-spec get_states_by_bare_and_full(LJID :: jid:ljid()) -> |
240 |
|
{ok, [mod_pubsub:pubsubState()]}. |
241 |
|
get_states_by_bare_and_full(LJID) -> |
242 |
521 |
LBare = jid:to_bare(LJID), |
243 |
521 |
{ok, mnesia:match_object(#pubsub_state{stateid = {LJID, '_'}, _ = '_'}) |
244 |
|
++ mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, _ = '_'})}. |
245 |
|
|
246 |
|
-spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) -> |
247 |
|
{ok, [mod_pubsub:nodeIdx()]}. |
248 |
|
get_idxs_of_own_nodes_with_pending_subs(LJID) -> |
249 |
2 |
LBare = jid:to_bare(LJID), |
250 |
2 |
MyStates = mnesia:match_object(#pubsub_state{stateid = {LBare, '_'}, |
251 |
|
affiliation = owner, _ = '_'}), |
252 |
2 |
NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- MyStates], |
253 |
2 |
ResultNidxs = mnesia:foldl(pa:bind(fun get_idxs_with_pending_subs/3, NodeIdxs), |
254 |
|
[], pubsub_state), |
255 |
2 |
{ok, ResultNidxs}. |
256 |
|
|
257 |
|
%% ------------------------ Node management ------------------------ |
258 |
|
|
259 |
|
-spec create_node(Nidx :: mod_pubsub:nodeIdx(), |
260 |
|
Owner :: jid:ljid()) -> |
261 |
|
ok. |
262 |
|
create_node(Nidx, Owner) -> |
263 |
223 |
set_affiliation(Nidx, Owner, owner). |
264 |
|
|
265 |
|
-spec del_node(Nidx :: mod_pubsub:nodeIdx()) -> |
266 |
|
{ok, [mod_pubsub:pubsubState()]}. |
267 |
|
del_node(Nidx) -> |
268 |
160 |
{ok, States} = get_states(Nidx), |
269 |
160 |
lists:foreach(fun (#pubsub_state{stateid = {LJID, _}, items = Items}) -> |
270 |
250 |
del_items(Nidx, Items), |
271 |
250 |
del_state_by_idx_and_ljid(Nidx, LJID) |
272 |
|
end, States), |
273 |
156 |
{ok, States}. |
274 |
|
|
275 |
|
-spec set_node(mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}. |
276 |
|
set_node(#pubsub_node{id = undefined} = Node) -> |
277 |
231 |
CreateNode = Node#pubsub_node{id = pubsub_index:new(node)}, |
278 |
231 |
OldParents = [], |
279 |
231 |
set_node(CreateNode, OldParents); |
280 |
|
set_node(Node) -> |
281 |
11 |
OldParents = get_parentnodes_names(Node), |
282 |
11 |
set_node(Node, OldParents). |
283 |
|
|
284 |
|
set_node(#pubsub_node{id = Nidx} = Node, OldParents) -> |
285 |
242 |
mnesia:write(Node), |
286 |
242 |
set_subnodes(Node, OldParents), |
287 |
242 |
{ok, Nidx}. |
288 |
|
|
289 |
|
get_parentnodes_names(#pubsub_node{nodeid = NodeId}) -> |
290 |
11 |
case mnesia:read(pubsub_node, NodeId, write) of |
291 |
|
[] -> |
292 |
:-( |
[]; |
293 |
|
[#pubsub_node{parents = Parents}] -> |
294 |
11 |
Parents |
295 |
|
end. |
296 |
|
|
297 |
|
set_subnodes(#pubsub_node{parents = Parents}, Parents) -> |
298 |
214 |
ok; |
299 |
|
set_subnodes(#pubsub_node{nodeid = NodeId, parents = NewParents}, OldParents) -> |
300 |
28 |
set_subnodes(NodeId, NewParents, OldParents). |
301 |
|
|
302 |
|
set_subnodes({Key, Node}, NewParents, OldParents) -> |
303 |
188 |
OldParentsSet = sets:from_list(OldParents), |
304 |
188 |
NewParentsSet = sets:from_list(NewParents), |
305 |
188 |
Deleted = sets:to_list(sets:subtract(OldParentsSet, NewParentsSet)), |
306 |
188 |
Added = sets:to_list(sets:subtract(NewParentsSet, OldParentsSet)), |
307 |
188 |
DeletedObjects = names_to_subnode_records(Key, Node, Deleted), |
308 |
188 |
AddedObjects = names_to_subnode_records(Key, Node, Added), |
309 |
188 |
lists:foreach(fun(Object) -> mnesia:delete_object(Object) end, DeletedObjects), |
310 |
188 |
lists:foreach(fun(Object) -> mnesia:write(Object) end, AddedObjects), |
311 |
188 |
ok. |
312 |
|
|
313 |
|
names_to_subnode_records(Key, Node, Parents) -> |
314 |
376 |
[#pubsub_subnode{nodeid = {Key, Parent}, subnode = Node} || Parent <- Parents]. |
315 |
|
|
316 |
|
-spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) -> |
317 |
|
{error, not_found} | {ok, mod_pubsub:pubsubNode()}. |
318 |
|
find_node_by_id(Nidx) -> |
319 |
278 |
case mnesia:index_read(pubsub_node, Nidx, #pubsub_node.id) of |
320 |
278 |
[#pubsub_node{} = Record] -> {ok, Record}; |
321 |
|
[] -> |
322 |
:-( |
{error, not_found} |
323 |
|
end. |
324 |
|
|
325 |
|
-spec find_node_by_name( |
326 |
|
Key :: mod_pubsub:hostPubsub() | jid:ljid(), |
327 |
|
Node :: mod_pubsub:nodeId()) -> |
328 |
|
mod_pubsub:pubsubNode() | false. |
329 |
|
find_node_by_name(Key, Node) -> |
330 |
1699 |
case mnesia:read(pubsub_node, oid(Key, Node), read) of |
331 |
254 |
[] -> false; |
332 |
1445 |
[NodeRec] -> NodeRec |
333 |
|
end. |
334 |
|
|
335 |
|
-spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) -> |
336 |
|
[mod_pubsub:pubsubNode()]. |
337 |
|
find_nodes_by_key(Key) -> |
338 |
32 |
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, _ = '_'}). |
339 |
|
|
340 |
1699 |
oid(Key, Name) -> {Key, Name}. |
341 |
|
|
342 |
|
|
343 |
|
-spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok. |
344 |
|
delete_node(#pubsub_node{nodeid = NodeId, parents = Parents}) -> |
345 |
160 |
set_subnodes(NodeId, [], Parents), |
346 |
160 |
mnesia:delete({pubsub_node, NodeId}). |
347 |
|
|
348 |
|
|
349 |
|
-spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) -> |
350 |
|
[mod_pubsub:pubsubNode()]. |
351 |
|
get_subnodes(Key, <<>>) -> |
352 |
8 |
get_nodes_without_parents(Key); |
353 |
|
get_subnodes(Key, Node) -> |
354 |
95 |
Subnodes = get_subnodes_names(Key, Node), |
355 |
95 |
find_nodes_by_names(Key, Subnodes). |
356 |
|
|
357 |
|
%% Like get_subnodes, but returns node names instead of node records. |
358 |
|
get_subnodes_names(Key, Node) -> |
359 |
160 |
MnesiaRecords = mnesia:read({pubsub_subnode, {Key, Node}}), |
360 |
160 |
[Subnode || #pubsub_subnode{subnode = Subnode} <- MnesiaRecords]. |
361 |
|
|
362 |
|
%% Warning: this function is full table scan and can return a lot of records |
363 |
|
get_nodes_without_parents(Key) -> |
364 |
8 |
mnesia:match_object(#pubsub_node{nodeid = {Key, '_'}, parents = [], _ = '_'}). |
365 |
|
|
366 |
|
%% Return a list of {Depth, Nodes} where Nodes are parent nodes of nodes of the lower level. |
367 |
|
%% So, we start with {0, [NodeRecord]}, where NodeRecord is node that corresponds to Node argument. |
368 |
|
%% The next level is {1, Nodes1}, where Nodes1 are parent nodes of Node. |
369 |
|
%% The next level can be {2, Nodes2}, where Nodes2 are parents of Nodes1 (without duplicates). |
370 |
|
%% |
371 |
|
%% Each node can be returned only ones by the function. |
372 |
|
-spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) -> |
373 |
|
[{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}]. |
374 |
|
get_parentnodes_tree(Key, Node) -> |
375 |
313 |
case find_node_by_name(Key, Node) of |
376 |
|
false -> |
377 |
:-( |
[ {0, []} ]; %% node not found case |
378 |
|
|
379 |
|
#pubsub_node{parents = []} = Record -> |
380 |
262 |
[ {0, [Record]} ]; |
381 |
|
|
382 |
|
#pubsub_node{parents = Parents} = Record -> |
383 |
51 |
Depth = 1, |
384 |
|
%% To avoid accidental cyclic issues, let's maintain the list of known nodes |
385 |
|
%% which we don't expand again |
386 |
51 |
KnownNodesSet = sets:from_list([Node]), |
387 |
51 |
extract_parents(Key, Node, Parents, Depth, KnownNodesSet) ++ [ {0, [Record]} ] |
388 |
|
end. |
389 |
|
|
390 |
|
%% Each call extract Parents on the level and recurse to the next level. |
391 |
|
%% KnownNodesSet are nodes to be filtered out. |
392 |
|
extract_parents(Key, InitialNode, Parents, Depth, KnownNodesSet) -> |
393 |
54 |
ParentRecords = find_nodes_by_names(Key, Parents), |
394 |
54 |
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Parents)), |
395 |
|
%% Names of parents of parents |
396 |
54 |
PPNames = lists:usort(lists:flatmap(fun(#pubsub_node{parents = PP}) -> PP end, ParentRecords)), |
397 |
54 |
CyclicNames = [Name || Name <- PPNames, sets:is_element(Name, KnownNodesSet1)], |
398 |
54 |
case CyclicNames of |
399 |
54 |
[] -> []; |
400 |
:-( |
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected, |
401 |
:-( |
pubsub_node => InitialNode, cyclic_names => CyclicNames}) |
402 |
|
end, |
403 |
|
%% PPNames is ordset, so we don't need to worry about having duplicates in it. |
404 |
|
%% CyclicNames is usually an empty list. |
405 |
54 |
PPNamesToGet = PPNames -- CyclicNames, |
406 |
|
case PPNamesToGet of |
407 |
51 |
[] -> []; |
408 |
3 |
_ -> extract_parents(Key, InitialNode, PPNamesToGet, Depth + 1, KnownNodesSet1) |
409 |
54 |
end ++ [ {Depth, ParentRecords} ]. |
410 |
|
|
411 |
|
find_nodes_by_names(Key, Nodes) -> |
412 |
|
%% Contains false for missing nodes |
413 |
213 |
MaybeRecords = [find_node_by_name(Key, Node) || Node <- Nodes], |
414 |
|
%% Filter out false-s |
415 |
213 |
[Record || Record = #pubsub_node{} <- MaybeRecords]. |
416 |
|
|
417 |
|
|
418 |
|
-spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) -> |
419 |
|
[{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}]. |
420 |
|
get_subnodes_tree(Key, Node) -> |
421 |
64 |
case find_node_by_name(Key, Node) of |
422 |
|
false -> |
423 |
:-( |
[ {1, []}, {0, [false]} ]; %% node not found case |
424 |
|
|
425 |
|
#pubsub_node{} = Record -> |
426 |
64 |
Subnodes = get_subnodes_names(Key, Node), |
427 |
64 |
Depth = 1, |
428 |
|
%% To avoid accidental cyclic issues, let's maintain the list of known nodes |
429 |
|
%% which we don't expand again |
430 |
64 |
KnownNodesSet = sets:from_list([Node]), |
431 |
64 |
extract_subnodes(Key, Node, Subnodes, Depth, KnownNodesSet) ++ [ {0, [Record]} ] |
432 |
|
end. |
433 |
|
|
434 |
|
%% Each call extract Subnodes on the level and recurse to the next level. |
435 |
|
%% KnownNodesSet are nodes to be filtered out. |
436 |
|
extract_subnodes(Key, InitialNode, Subnodes, Depth, KnownNodesSet) -> |
437 |
64 |
SubnodesRecords = find_nodes_by_names(Key, Subnodes), |
438 |
64 |
KnownNodesSet1 = sets:union(KnownNodesSet, sets:from_list(Subnodes)), |
439 |
|
%% Names of subnodes of subnodes |
440 |
64 |
SSNames = lists:usort(lists:flatmap(fun(Subnode) -> get_subnodes_names(Key, Subnode) end, Subnodes)), |
441 |
64 |
CyclicNames = [Name || Name <- SSNames, sets:is_element(Name, KnownNodesSet1)], |
442 |
64 |
case CyclicNames of |
443 |
64 |
[] -> []; |
444 |
:-( |
_ -> ?LOG_WARNING(#{what => pubsub_cyclic_nodes_detected, |
445 |
:-( |
pubsub_node => InitialNode, cyclic_names => CyclicNames}) |
446 |
|
end, |
447 |
64 |
SSNamesToGet = SSNames -- CyclicNames, |
448 |
|
case SSNamesToGet of |
449 |
64 |
[] -> [ {Depth + 1, []} ]; |
450 |
:-( |
_ -> extract_subnodes(Key, InitialNode, SSNamesToGet, Depth + 1, KnownNodesSet1) |
451 |
64 |
end ++ [ {Depth, SubnodesRecords} ]. |
452 |
|
|
453 |
|
%% ------------------------ Affiliations ------------------------ |
454 |
|
|
455 |
|
-spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(), |
456 |
|
LJID :: jid:ljid(), |
457 |
|
Affiliation :: mod_pubsub:affiliation()) -> ok. |
458 |
|
set_affiliation(Nidx, LJID, Affiliation) -> |
459 |
277 |
BareLJID = jid:to_bare(LJID), |
460 |
277 |
{ok, State} = get_state(Nidx, BareLJID, write), |
461 |
273 |
case {Affiliation, State#pubsub_state.subscriptions} of |
462 |
12 |
{none, []} -> del_state_by_idx_and_ljid(Nidx, BareLJID); |
463 |
261 |
_ -> mnesia:write(State#pubsub_state{ affiliation = Affiliation }) |
464 |
|
end. |
465 |
|
|
466 |
|
-spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(), |
467 |
|
LJID :: jid:ljid()) -> |
468 |
|
{ok, mod_pubsub:affiliation()}. |
469 |
|
get_affiliation(Nidx, LJID) -> |
470 |
498 |
{ok, State} = get_state(Nidx, jid:to_bare(LJID), read), |
471 |
498 |
{ok, State#pubsub_state.affiliation}. |
472 |
|
|
473 |
|
%% ------------------------ Subscriptions ------------------------ |
474 |
|
|
475 |
|
-spec add_subscription(Nidx :: mod_pubsub:nodeIdx(), |
476 |
|
LJID :: jid:ljid(), |
477 |
|
Sub :: mod_pubsub:subscription(), |
478 |
|
SubId :: mod_pubsub:subId(), |
479 |
|
SubOpts :: mod_pubsub:subOptions()) -> ok. |
480 |
|
add_subscription(Nidx, LJID, Sub, SubId, SubOpts) -> |
481 |
94 |
{ok, State} = get_state(Nidx, LJID, write), |
482 |
94 |
NSubscriptions = [{Sub, SubId} | State#pubsub_state.subscriptions ], |
483 |
94 |
mnesia:write(State#pubsub_state{ subscriptions = NSubscriptions }), |
484 |
94 |
set_subscription_opts(Nidx, LJID, SubId, SubOpts). |
485 |
|
|
486 |
|
-spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(), |
487 |
|
JID :: jid:ljid(), |
488 |
|
SubId :: mod_pubsub:subId(), |
489 |
|
Opts :: mod_pubsub:subOptions()) -> ok. |
490 |
|
set_subscription_opts(_Nidx, _JID, SubId, Opts) -> |
491 |
98 |
mnesia:write(#pubsub_subscription{ subid = SubId, options = Opts }). |
492 |
|
|
493 |
|
-spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) -> |
494 |
|
{ok, [{Entity :: jid:ljid(), |
495 |
|
Sub :: mod_pubsub:subscription(), |
496 |
|
SubId :: mod_pubsub:subId(), |
497 |
|
Opts :: mod_pubsub:subOptions()}]}. |
498 |
|
get_node_subscriptions(Nidx) -> |
499 |
583 |
{ok, States} = get_states(Nidx), |
500 |
572 |
{ok, states_to_subscriptions(States)}. |
501 |
|
|
502 |
|
-spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(), |
503 |
|
LJID :: jid:ljid()) -> |
504 |
|
{ok, [{Sub :: mod_pubsub:subscription(), |
505 |
|
SubId :: mod_pubsub:subId(), |
506 |
|
Opts :: mod_pubsub:subOptions()}]}. |
507 |
|
get_node_entity_subscriptions(Nidx, LJID) -> |
508 |
209 |
{ok, State} = get_state(Nidx, LJID, read), |
509 |
209 |
{ok, add_opts_to_subs(State#pubsub_state.subscriptions)}. |
510 |
|
|
511 |
|
-spec delete_subscription( |
512 |
|
Nidx :: mod_pubsub:nodeIdx(), |
513 |
|
LJID :: jid:ljid(), |
514 |
|
SubId :: mod_pubsub:subId()) -> |
515 |
|
ok. |
516 |
|
delete_subscription(Nidx, LJID, SubId) -> |
517 |
4 |
{ok, State} = get_state(Nidx, LJID, write), |
518 |
4 |
NewSubs = lists:keydelete(SubId, 2, State#pubsub_state.subscriptions), |
519 |
4 |
mnesia:delete({pubsub_subscription, SubId}), |
520 |
4 |
case {State#pubsub_state.affiliation, NewSubs} of |
521 |
4 |
{none, []} -> del_state_by_idx_and_ljid(Nidx, LJID); |
522 |
:-( |
_ -> mnesia:write(State#pubsub_state{subscriptions = NewSubs}) |
523 |
|
end. |
524 |
|
|
525 |
|
-spec delete_all_subscriptions( |
526 |
|
Nidx :: mod_pubsub:nodeIdx(), |
527 |
|
LJID :: jid:ljid()) -> |
528 |
|
ok. |
529 |
|
delete_all_subscriptions(Nidx, LJID) -> |
530 |
7 |
{ok, State} = get_state(Nidx, LJID, write), |
531 |
7 |
delete_all_subscriptions_by_state(State). |
532 |
|
|
533 |
|
-spec delete_user_subscriptions(jid:ljid()) -> ok. |
534 |
|
delete_user_subscriptions(LJID) -> |
535 |
96 |
{ok, States} = get_states_by_lus(LJID), |
536 |
96 |
lists:foreach(fun delete_all_subscriptions_by_state/1, States). |
537 |
|
|
538 |
|
-spec delete_all_subscriptions_by_state(mod_pubsub:pubsubState()) -> ok. |
539 |
|
delete_all_subscriptions_by_state(State) -> |
540 |
24 |
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, |
541 |
|
State#pubsub_state.subscriptions), |
542 |
24 |
case State#pubsub_state.affiliation of |
543 |
10 |
none -> del_state(State); |
544 |
14 |
_ -> mnesia:write(State#pubsub_state{subscriptions = []}) |
545 |
|
end. |
546 |
|
|
547 |
|
-spec update_subscription(Nidx :: mod_pubsub:nodeIdx(), |
548 |
|
LJID :: jid:ljid(), |
549 |
|
Subscription :: mod_pubsub:subscription(), |
550 |
|
SubId :: mod_pubsub:subId()) -> |
551 |
|
ok. |
552 |
|
update_subscription(Nidx, LJID, Subscription, SubId) -> |
553 |
3 |
{ok, State} = get_state(Nidx, LJID, write), |
554 |
3 |
NewSubs = lists:keyreplace(SubId, 2, State#pubsub_state.subscriptions, {Subscription, SubId}), |
555 |
3 |
mnesia:write(State#pubsub_state{ subscriptions = NewSubs }). |
556 |
|
|
557 |
|
%% ------------------------ Items ------------------------ |
558 |
|
|
559 |
|
-spec remove_items(Nidx :: mod_pubsub:nodeIdx(), |
560 |
|
LJID :: jid:ljid(), |
561 |
|
ItemIds :: [mod_pubsub:itemId()]) -> |
562 |
|
ok. |
563 |
|
remove_items(Nidx, LJID, ItemIds) -> |
564 |
127 |
{ok, State} = get_state(Nidx, jid:to_bare(LJID), write), |
565 |
127 |
NewItems = State#pubsub_state.items -- ItemIds, |
566 |
127 |
mnesia:write(State#pubsub_state{ items = NewItems }). |
567 |
|
|
568 |
|
-spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) -> |
569 |
|
ok. |
570 |
|
remove_all_items(Nidx) -> |
571 |
4 |
{ok, States} = get_states(Nidx), |
572 |
4 |
lists:foreach(fun(#pubsub_state{ items = [] }) -> |
573 |
:-( |
ok; |
574 |
|
(#pubsub_state{} = S) -> |
575 |
4 |
mnesia:write(S#pubsub_state{ items = [] }) |
576 |
|
end, States). |
577 |
|
|
578 |
|
-spec add_item(Nidx :: mod_pubsub:nodeIdx(), |
579 |
|
JID :: jid:ljid(), |
580 |
|
Item :: mod_pubsub:pubsubItem()) -> |
581 |
|
ok. |
582 |
|
add_item(Nidx, JID, #pubsub_item{itemid = {ItemId, _}} = Item) -> |
583 |
119 |
set_item(Item), |
584 |
119 |
{ok, State} = get_state(Nidx, jid:to_bare(JID), write), |
585 |
119 |
NewItems = [ItemId | State#pubsub_state.items], |
586 |
119 |
mnesia:write(State#pubsub_state{ items = NewItems }). |
587 |
|
|
588 |
|
%% ------------------------ Direct #pubsub_item access ------------------------ |
589 |
|
|
590 |
|
-spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) -> |
591 |
|
{ok, {[mod_pubsub:pubsubItem()], none}}. |
592 |
|
get_items(Nidx, Opts) -> |
593 |
51 |
Items = mnesia:match_object(#pubsub_item{itemid = {'_', Nidx}, _ = '_'}), |
594 |
51 |
Sorted = lists:reverse(lists:keysort(#pubsub_item.modification, Items)), |
595 |
51 |
ItemsLimitedByIds = filter_items_by_item_ids(Sorted, maps:get(item_ids, Opts, undefined)), |
596 |
51 |
ItemsLimitedByMaxItems = limit_items(ItemsLimitedByIds, maps:get(max_items, Opts, undefined)), |
597 |
51 |
{ok, {ItemsLimitedByMaxItems, none}}. |
598 |
|
|
599 |
|
filter_items_by_item_ids(Items, undefined) -> |
600 |
43 |
Items; |
601 |
|
filter_items_by_item_ids(Items, ItemIds) -> |
602 |
8 |
lists:filter(fun (#pubsub_item{itemid = {ItemId, _}}) -> |
603 |
10 |
lists:member(ItemId, ItemIds) |
604 |
|
end, Items). |
605 |
|
|
606 |
|
limit_items(Items, undefined) -> |
607 |
18 |
Items; |
608 |
|
limit_items(Items, MaxItems) -> |
609 |
33 |
lists:sublist(Items, MaxItems). |
610 |
|
|
611 |
|
-spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> |
612 |
|
{ok, mod_pubsub:pubsubItem()} | {error, item_not_found}. |
613 |
|
get_item(Nidx, ItemId) -> |
614 |
127 |
case mnesia:read({pubsub_item, {ItemId, Nidx}}) of |
615 |
8 |
[Item] when is_record(Item, pubsub_item) -> {ok, Item}; |
616 |
119 |
_ -> {error, item_not_found} |
617 |
|
end. |
618 |
|
|
619 |
|
-spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok. |
620 |
|
set_item(Item) -> |
621 |
119 |
mnesia:write(Item). |
622 |
|
|
623 |
|
-spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok. |
624 |
|
del_item(Nidx, ItemId) -> |
625 |
102 |
mnesia:delete({pubsub_item, {ItemId, Nidx}}). |
626 |
|
|
627 |
|
-spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok. |
628 |
|
del_items(Nidx, ItemIds) -> |
629 |
373 |
lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId) end, |
630 |
|
ItemIds). |
631 |
|
|
632 |
|
%%==================================================================== |
633 |
|
%% Internal functions |
634 |
|
%%==================================================================== |
635 |
|
|
636 |
|
-spec del_state_by_idx_and_ljid(Nidx :: mod_pubsub:nodeIdx(), |
637 |
|
LJID :: jid:ljid()) -> ok. |
638 |
|
del_state_by_idx_and_ljid(Nidx, LJID) -> |
639 |
266 |
{ok, State} = get_state(Nidx, LJID, write), |
640 |
262 |
del_state(State). |
641 |
|
|
642 |
|
-spec del_state(mod_pubsub:pubsubState()) -> ok. |
643 |
|
del_state(#pubsub_state{ stateid = {LJID, Nidx}, subscriptions = Subs }) -> |
644 |
272 |
lists:foreach(fun({_, SubId}) -> mnesia:delete({pubsub_subscription, SubId}) end, Subs), |
645 |
272 |
mnesia:delete({pubsub_state, {LJID, Nidx}}). |
646 |
|
|
647 |
|
-spec get_state(Nidx :: mod_pubsub:nodeIdx(), |
648 |
|
LJID :: jid:ljid(), |
649 |
|
LockKind :: write | read) -> |
650 |
|
{ok, mod_pubsub:pubsubState()}. |
651 |
|
get_state(Nidx, LJID, LockKind) -> |
652 |
1870 |
StateId = {LJID, Nidx}, |
653 |
1870 |
case mnesia:read(pubsub_state, StateId, LockKind) of |
654 |
1085 |
[#pubsub_state{} = State] -> {ok, State}; |
655 |
777 |
_ -> {ok, #pubsub_state{stateid = StateId}} |
656 |
|
end. |
657 |
|
|
658 |
|
-spec states_to_subscriptions([mod_pubsub:pubsubState()]) -> |
659 |
|
[{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}]. |
660 |
|
states_to_subscriptions([]) -> |
661 |
572 |
[]; |
662 |
|
states_to_subscriptions([#pubsub_state{ subscriptions = [] } | RStates]) -> |
663 |
375 |
states_to_subscriptions(RStates); |
664 |
|
states_to_subscriptions([#pubsub_state{ stateid = {J, _}, subscriptions = Subs } | RStates]) -> |
665 |
166 |
add_jid_and_opts_to_subs(Subs, J, RStates). |
666 |
|
|
667 |
|
-spec add_jid_and_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}], |
668 |
|
LJID :: jid:ljid(), |
669 |
|
RStates :: [mod_pubsub:pubsubState()]) -> |
670 |
|
[{jid:ljid(), mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}]. |
671 |
|
add_jid_and_opts_to_subs([], _J, RStates) -> |
672 |
166 |
states_to_subscriptions(RStates); |
673 |
|
add_jid_and_opts_to_subs([{S, SubId} | RSubs], J, RStates) -> |
674 |
166 |
Opts = read_sub_options(SubId), |
675 |
166 |
[ {J, S, SubId, Opts} | add_jid_and_opts_to_subs(RSubs, J, RStates) ]. |
676 |
|
|
677 |
|
-spec add_opts_to_subs(Subs :: [{mod_pubsub:subscription(), mod_pubsub:subId()}]) -> |
678 |
|
[{mod_pubsub:subscription(), mod_pubsub:subId(), mod_pubsub:subOptions()}]. |
679 |
|
add_opts_to_subs([]) -> |
680 |
209 |
[]; |
681 |
|
add_opts_to_subs([{S, SubId} | RSubs]) -> |
682 |
35 |
Opts = read_sub_options(SubId), |
683 |
35 |
[ {S, SubId, Opts} | add_opts_to_subs(RSubs) ]. |
684 |
|
|
685 |
|
-spec read_sub_options(SubId :: mod_pubsub:subId()) -> mod_pubsub:subOptions(). |
686 |
|
read_sub_options(SubId) -> |
687 |
201 |
case mnesia:read({pubsub_subscription, SubId}) of |
688 |
:-( |
[] -> []; |
689 |
201 |
[#pubsub_subscription{ options = Opts0 }] -> Opts0 |
690 |
|
end. |
691 |
|
|
692 |
|
-spec get_idxs_with_pending_subs(NodeIdxs :: [mod_pubsub:nodeIdx()], |
693 |
|
PubsubState :: mod_pubsub:pubsubState(), |
694 |
|
Acc :: [mod_pubsub:nodeIdx()]) -> |
695 |
|
[mod_pubsub:nodeIdx()]. |
696 |
|
get_idxs_with_pending_subs(NodeIdxs, |
697 |
|
#pubsub_state{stateid = {_, Nidx}, subscriptions = Subs}, |
698 |
|
Acc) -> |
699 |
40 |
case lists:member(Nidx, NodeIdxs) |
700 |
6 |
andalso lists:any(fun is_pending_sub/1, Subs) of |
701 |
4 |
true -> [Nidx | Acc]; |
702 |
36 |
false -> Acc |
703 |
|
end. |
704 |
|
|
705 |
4 |
is_pending_sub({pending, _}) -> true; |
706 |
:-( |
is_pending_sub(_) -> false. |
707 |
|
|