1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mod_pubsub_db_rdbms.erl |
3 |
|
%%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com> |
4 |
|
%%% Purpose : PubSub RDBMS backend |
5 |
|
%%% Created : 2 Nov 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com> |
6 |
|
%%%---------------------------------------------------------------------- |
7 |
|
|
8 |
|
-module(mod_pubsub_db_rdbms). |
9 |
|
-author('piotr.nosek@erlang-solutions.com'). |
10 |
|
-author('michal.piotrowski@erlang-solutions.com'). |
11 |
|
|
12 |
|
-behaviour(mod_pubsub_db). |
13 |
|
|
14 |
|
-include("pubsub.hrl"). |
15 |
|
-include("mongoose_logger.hrl"). |
16 |
|
-include("jlib.hrl"). |
17 |
|
|
18 |
|
-export([init/2, stop/0]). |
19 |
|
% Funs execution |
20 |
|
-export([transaction/2, dirty/2]). |
21 |
|
% Direct #pubsub_state access |
22 |
|
-export([get_state/2, |
23 |
|
get_states/1, get_states_by_lus/1, get_states_by_bare/1, |
24 |
|
get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]). |
25 |
|
% Node management |
26 |
|
-export([ |
27 |
|
create_node/2, |
28 |
|
del_node/1, |
29 |
|
set_node/1, |
30 |
|
find_node_by_id/1, |
31 |
|
find_nodes_by_key/1, |
32 |
|
find_node_by_name/2, |
33 |
|
delete_node/1, |
34 |
|
get_subnodes/2, |
35 |
|
get_parentnodes_tree/2, |
36 |
|
get_subnodes_tree/2 |
37 |
|
]). |
38 |
|
% Affiliations |
39 |
|
-export([ |
40 |
|
set_affiliation/3, |
41 |
|
get_affiliation/2 |
42 |
|
]). |
43 |
|
% Subscriptions |
44 |
|
-export([ |
45 |
|
add_subscription/5, |
46 |
|
set_subscription_opts/4, |
47 |
|
get_node_subscriptions/1, |
48 |
|
get_node_entity_subscriptions/2, |
49 |
|
delete_subscription/3, |
50 |
|
delete_all_subscriptions/2, |
51 |
|
update_subscription/4 |
52 |
|
]). |
53 |
|
% Item ids in state |
54 |
|
-export([ |
55 |
|
add_item/3, |
56 |
|
remove_items/3, |
57 |
|
remove_all_items/1 |
58 |
|
]). |
59 |
|
% Whole items |
60 |
|
-export([ |
61 |
|
get_items/2, |
62 |
|
get_item/2, |
63 |
|
set_item/1, |
64 |
|
del_item/2, |
65 |
|
del_items/2 |
66 |
|
]). |
67 |
|
|
68 |
|
%% GDPR related |
69 |
|
-export([ |
70 |
|
get_user_payloads/2, |
71 |
|
get_user_nodes/2, |
72 |
|
get_user_subscriptions/2, |
73 |
|
delete_user_subscriptions/1, |
74 |
|
find_nodes_by_affiliated_user/1 |
75 |
|
]). |
76 |
|
|
77 |
|
%%==================================================================== |
78 |
|
%% Behaviour callbacks |
79 |
|
%%==================================================================== |
80 |
|
|
81 |
|
%% ------------------------ Backend start/stop ------------------------ |
82 |
|
|
83 |
|
init(_HostType, _Opts) -> |
84 |
|
% -------------------- State building ---------------------------- |
85 |
39 |
mongoose_rdbms:prepare(pubsub_get_item_rows_id, pubsub_items, [nidx], |
86 |
|
<<"SELECT nidx, created_luser, created_lserver, itemid " |
87 |
|
"FROM pubsub_items WHERE nidx = ?">>), |
88 |
39 |
mongoose_rdbms:prepare(pubsub_get_affiliation_rows_id, pubsub_affiliations, [nidx], |
89 |
|
<<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations WHERE nidx = ?">>), |
90 |
39 |
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_id, pubsub_subscriptions, [nidx], |
91 |
|
<<"SELECT nidx, luser, lserver, lresource, type, sub_id " |
92 |
|
"FROM pubsub_subscriptions WHERE nidx = ?">>), |
93 |
39 |
mongoose_rdbms:prepare(pubsub_get_item_rows, pubsub_items, [created_luser, created_lserver], |
94 |
|
<<"SELECT nidx, created_luser, created_lserver, itemid FROM pubsub_items" |
95 |
|
" WHERE created_luser = ? AND created_lserver = ?">>), |
96 |
39 |
mongoose_rdbms:prepare(pubsub_get_affiliation_rows, pubsub_affiliations, [luser, lserver], |
97 |
|
<<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations " |
98 |
|
"WHERE luser = ? AND lserver = ?">>), |
99 |
39 |
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows, pubsub_subscriptions, [luser, lserver], |
100 |
|
<<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions " |
101 |
|
"WHERE luser = ? AND lserver = ?">>), |
102 |
39 |
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_resource, pubsub_subscriptions, |
103 |
|
[luser, lserver, lresource], |
104 |
|
<<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions " |
105 |
|
"WHERE luser = ? AND lserver = ? AND lresource = ?">>), |
106 |
39 |
mongoose_rdbms:prepare(pubsub_get_idxs_of_own_nodes_with_pending_subs, pubsub_affiliations, |
107 |
|
[aff, luser, lserver, 'pubsub_subscriptions.type'], |
108 |
|
<<"SELECT DISTINCT s.nidx FROM pubsub_affiliations AS a " |
109 |
|
"INNER JOIN pubsub_subscriptions s ON a.nidx = s.nidx " |
110 |
|
"WHERE a.aff = ? AND a.luser = ? AND a.lserver = ? AND s.type = ?">>), |
111 |
|
|
112 |
|
% ------------------- Affiliations -------------------------------- |
113 |
39 |
mongoose_rdbms:prepare(pubsub_get_affiliation, pubsub_affiliations, [nidx, luser, lserver], |
114 |
|
<<"SELECT aff FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>), |
115 |
39 |
mongoose_rdbms:prepare(pubsub_delete_affiliation, pubsub_affiliations, [nidx, luser, lserver], |
116 |
|
<<"DELETE FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>), |
117 |
39 |
mongoose_rdbms:prepare(pubsub_delete_all_affiliations, pubsub_affiliations, [nidx], |
118 |
|
<<"DELETE FROM pubsub_affiliations WHERE nidx = ?">>), |
119 |
|
|
120 |
|
% ------------------- Subscriptions -------------------------------- |
121 |
39 |
mongoose_rdbms:prepare(pubsub_insert_subscription, pubsub_subscriptions, |
122 |
|
[nidx, luser, lserver, lresource, type, sub_id, options], |
123 |
|
<<"INSERT INTO pubsub_subscriptions (nidx, luser, lserver, lresource, " |
124 |
|
"type, sub_id, options) VALUES (?, ?, ?, ?, ?, ?, ?)">>), |
125 |
39 |
mongoose_rdbms:prepare(pubsub_update_subscription_opts, pubsub_subscriptions, |
126 |
|
[options, nidx, luser, lserver, lresource, sub_id], |
127 |
|
<<"UPDATE pubsub_subscriptions SET options = ? WHERE nidx = ? " |
128 |
|
"AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>), |
129 |
39 |
mongoose_rdbms:prepare(pubsub_get_node_subs, pubsub_subscriptions, [nidx], |
130 |
|
<<"SELECT luser, lserver, lresource, type, sub_id, options " |
131 |
|
"FROM pubsub_subscriptions WHERE nidx = ?">>), |
132 |
39 |
mongoose_rdbms:prepare(pubsub_get_node_entity_subs, pubsub_subscriptions, |
133 |
|
[nidx, luser, lserver, lresource], |
134 |
|
<<"SELECT type, sub_id, options FROM pubsub_subscriptions " |
135 |
|
"WHERE nidx = ? AND luser = ? AND lserver = ? AND lresource = ?">>), |
136 |
39 |
mongoose_rdbms:prepare(pubsub_delete_subscription, pubsub_subscriptions, |
137 |
|
[nidx, luser, lserver, lresource, sub_id], |
138 |
|
<<"DELETE FROM pubsub_subscriptions WHERE nidx = ? " |
139 |
|
"AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>), |
140 |
39 |
mongoose_rdbms:prepare(pubsub_delete_all_subscriptions, pubsub_subscriptions, |
141 |
|
[nidx, luser, lserver, lresource], |
142 |
|
<<"DELETE FROM pubsub_subscriptions"" WHERE nidx = ? " |
143 |
|
"AND luser = ? AND lserver = ? AND lresource = ?">>), |
144 |
39 |
mongoose_rdbms:prepare(pubsub_delete_all_subscriptions_id, pubsub_subscriptions, [nidx], |
145 |
|
<<"DELETE FROM pubsub_subscriptions WHERE nidx = ?">>), |
146 |
39 |
mongoose_rdbms:prepare(pubsub_delete_user_subscriptions, |
147 |
|
pubsub_subscriptions, [luser, lserver], |
148 |
|
<<"DELETE FROM pubsub_subscriptions WHERE luser = ? AND lserver = ?">>), |
149 |
39 |
mongoose_rdbms:prepare(pubsub_update_subscription, pubsub_subscriptions, |
150 |
|
[type, nidx, luser, lserver, lresource, sub_id], |
151 |
|
<<"UPDATE pubsub_subscriptions SET type = ? WHERE nidx = ? " |
152 |
|
"AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>), |
153 |
|
|
154 |
|
% ------------------- Items -------------------------------- |
155 |
39 |
mongoose_rdbms:prepare(pubsub_get_entity_items, pubsub_items, |
156 |
|
[nidx, created_luser, created_lserver], |
157 |
|
<<"SELECT itemid FROM pubsub_items WHERE nidx = ? " |
158 |
|
"AND created_luser = ? AND created_lserver = ?">>), |
159 |
39 |
mongoose_rdbms:prepare(pubsub_delete_item, pubsub_items, |
160 |
|
[nidx, created_luser, created_lserver, itemid], |
161 |
|
<<"DELETE FROM pubsub_items WHERE nidx = ? AND created_luser = ? ", |
162 |
|
"AND created_lserver = ? AND itemid = ?">>), |
163 |
39 |
mongoose_rdbms:prepare(pubsub_delete_all_items, pubsub_items, [nidx], |
164 |
|
<<"DELETE FROM pubsub_items WHERE nidx = ?">>), |
165 |
39 |
ItemColumns = item_columns(), |
166 |
39 |
mongoose_rdbms:prepare(pubsub_get_item, pubsub_items, [nidx, itemid], |
167 |
|
<<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? AND itemid = ?">>), |
168 |
39 |
mongoose_rdbms:prepare(pubsub_get_items, pubsub_items, [nidx], |
169 |
|
<<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? " |
170 |
|
"ORDER BY modified_at DESC">>), |
171 |
39 |
{LimitSQL, LimitMSSQL} = rdbms_queries:get_db_specific_limits_binaries(), |
172 |
39 |
mongoose_rdbms:prepare(pubsub_get_items_limit, pubsub_items, |
173 |
|
rdbms_queries:add_limit_arg(limit, [nidx]), |
174 |
|
<<"SELECT ", LimitMSSQL/binary, " ", ItemColumns/binary, |
175 |
|
" FROM pubsub_items WHERE nidx = ? ORDER BY modified_at DESC ", LimitSQL/binary>>), |
176 |
39 |
mongoose_rdbms:prepare(pubsub_del_item, pubsub_items, [nidx, itemid], |
177 |
|
<<"DELETE FROM pubsub_items WHERE nidx = ? AND itemid = ?">>), |
178 |
|
|
179 |
|
% ------------------- Nodes -------------------------------- |
180 |
39 |
mongoose_rdbms:prepare(pubsub_insert_node, pubsub_nodes, [p_key, name, type, owners, options], |
181 |
|
<<"INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (?, ?, ?, ?, ?)">>), |
182 |
39 |
mongoose_rdbms:prepare(pubsub_insert_parent, pubsub_node_collections, [name, parent_name], |
183 |
|
<<"INSERT INTO pubsub_node_collections (name, parent_name) VALUES (?, ?)">>), |
184 |
39 |
mongoose_rdbms:prepare(pubsub_update_pubsub_node, pubsub_nodes, [type, owners, options, nidx], |
185 |
|
<<"UPDATE pubsub_nodes SET type = ?, owners = ?, options = ? WHERE nidx = ?">>), |
186 |
39 |
PubsubNodeFields = pubsub_node_fields(), |
187 |
39 |
mongoose_rdbms:prepare(pubsub_select_node_by_key_and_name, pubsub_nodes, [p_key, name], |
188 |
|
<<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ? AND name = ?">>), |
189 |
39 |
mongoose_rdbms:prepare(pubsub_select_node_by_id, pubsub_nodes, [nidx], |
190 |
|
<<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE nidx = ?">>), |
191 |
39 |
mongoose_rdbms:prepare(pubsub_select_nodes_by_key, pubsub_nodes, [p_key], |
192 |
|
<<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ?">>), |
193 |
39 |
mongoose_rdbms:prepare(pubsub_select_nodes_in_list_with_key, pubsub_nodes, [p_key, names], |
194 |
|
<<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes " |
195 |
|
"WHERE p_key = ? AND name IN (?)">>), |
196 |
39 |
PubsubNodeFieldsPrefixed = pubsub_node_fields_pn(), |
197 |
39 |
mongoose_rdbms:prepare(pubsub_select_nodes_by_affiliated_user, |
198 |
|
pubsub_affiliations, [luser, lserver], |
199 |
|
<<"SELECT aff, ", PubsubNodeFieldsPrefixed/binary, " FROM pubsub_affiliations AS pa " |
200 |
|
"INNER JOIN pubsub_nodes AS pn ON pa.nidx = pn.nidx WHERE luser = ? AND lserver = ?">>), |
201 |
39 |
mongoose_rdbms:prepare(pubsub_select_top_level_nodes, pubsub_nodes, [p_key], |
202 |
|
<<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn " |
203 |
|
"LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name " |
204 |
|
"WHERE p_key = ? AND collection.parent_name IS NULL">>), |
205 |
39 |
mongoose_rdbms:prepare(pubsub_select_subnodes, pubsub_nodes, |
206 |
|
['pubsub_node_collections.parent_name', p_key], |
207 |
|
<<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn " |
208 |
|
"INNER JOIN pubsub_node_collections as collection ON pn.name = collection.name AND " |
209 |
|
"collection.parent_name = ? WHERE p_key = ?">>), |
210 |
39 |
mongoose_rdbms:prepare(pubsub_select_node_with_parents, pubsub_nodes, |
211 |
|
[p_key, name], |
212 |
|
<<"SELECT pn.nidx, pn.name, collection.parent_name from pubsub_nodes as pn " |
213 |
|
"LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name " |
214 |
|
"WHERE pn.p_key = ? AND pn.name = ?">>), |
215 |
39 |
mongoose_rdbms:prepare(pubsub_select_node_with_children, pubsub_nodes, |
216 |
|
[p_key, name], |
217 |
|
<<"SELECT pn.nidx, pn.name, collection.name from pubsub_nodes as pn " |
218 |
|
"LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.parent_name " |
219 |
|
"WHERE pn.p_key = ? AND pn.name = ?">>), |
220 |
39 |
mongoose_rdbms:prepare(pubsub_delete_node, pubsub_nodes, [p_key, name], |
221 |
|
<<"DELETE from pubsub_nodes WHERE p_key = ? AND name = ?">>), |
222 |
39 |
mongoose_rdbms:prepare(pubsub_del_parents, pubsub_node_collections, [name], |
223 |
|
<<"DELETE FROM pubsub_node_collections WHERE name = ?">>), |
224 |
|
|
225 |
|
% ------------------- GDPR -------------------------------- |
226 |
39 |
mongoose_rdbms:prepare(pubsub_get_user_items, pubsub_items, [created_luser, created_lserver], |
227 |
|
<<"SELECT name, itemid, payload FROM pubsub_items INNER JOIN pubsub_nodes " |
228 |
|
"ON pubsub_items.nidx = pubsub_nodes.nidx WHERE created_luser = ? " |
229 |
|
"AND created_lserver = ?">>), |
230 |
39 |
mongoose_rdbms:prepare(pubsub_get_user_subscriptions, pubsub_subscriptions, [luser, lserver], |
231 |
|
<<"SELECT name FROM pubsub_subscriptions INNER JOIN pubsub_nodes " |
232 |
|
"ON pubsub_subscriptions.nidx = pubsub_nodes.nidx WHERE luser = ? AND lserver = ?">>), |
233 |
39 |
prepare_select_nodes_by_owner(), |
234 |
|
|
235 |
39 |
rdbms_queries:prepare_upsert(global, pubsub_affiliation_upsert, pubsub_affiliations, |
236 |
|
[<<"nidx">>, <<"luser">>, <<"lserver">>, <<"aff">>], |
237 |
|
[<<"aff">>], |
238 |
|
[<<"nidx">>, <<"luser">>, <<"lserver">>]), |
239 |
39 |
ItemInsertFields = [<<"nidx">>, <<"itemid">>, |
240 |
|
<<"created_luser">>, <<"created_lserver">>, <<"created_at">>, |
241 |
|
<<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>, |
242 |
|
<<"publisher">>, <<"payload">>], |
243 |
39 |
ItemUpdateFields = [<<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>, |
244 |
|
<<"publisher">>, <<"payload">>], |
245 |
39 |
rdbms_queries:prepare_upsert(global, pubsub_item_upsert, pubsub_items, |
246 |
|
ItemInsertFields, |
247 |
|
ItemUpdateFields, |
248 |
|
[<<"nidx">>, <<"itemid">>]), |
249 |
39 |
ok. |
250 |
|
|
251 |
|
-spec stop() -> ok. |
252 |
|
stop() -> |
253 |
39 |
ok. |
254 |
|
|
255 |
|
%% ------------------------ Queries execution -------------------- |
256 |
|
prepare_select_nodes_by_owner() -> |
257 |
39 |
case {mongoose_rdbms:db_engine(global), mongoose_rdbms:db_type()} of |
258 |
|
{mysql, _} -> |
259 |
:-( |
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners], |
260 |
|
<<"SELECT name, type FROM pubsub_nodes WHERE owners = convert(?, JSON);">>); |
261 |
|
{pgsql, _} -> |
262 |
:-( |
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners], |
263 |
|
<<"SELECT name, type FROM pubsub_nodes WHERE owners ::json->>0 like ? " |
264 |
|
"AND JSON_ARRAY_LENGTH(owners) = 1">>); |
265 |
|
{odbc, mssql} -> |
266 |
39 |
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners], |
267 |
|
<<"SELECT name, type FROM pubsub_nodes WHERE cast(owners as nvarchar(max)) = ?;">>) |
268 |
|
end. |
269 |
|
% -------------------- State building ---------------------------- |
270 |
|
-spec execute_get_item_rows_id(Nidx :: mod_pubsub:nodeIdx()) -> |
271 |
|
mongoose_rdbms:query_result(). |
272 |
|
execute_get_item_rows_id(Nidx) -> |
273 |
166 |
mongoose_rdbms:execute_successfully(global, pubsub_get_item_rows_id, [Nidx]). |
274 |
|
|
275 |
|
-spec execute_get_affiliation_rows_id(Nidx :: mod_pubsub:nodeIdx()) -> |
276 |
|
mongoose_rdbms:query_result(). |
277 |
|
execute_get_affiliation_rows_id(Nidx) -> |
278 |
166 |
mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows_id, [Nidx]). |
279 |
|
|
280 |
|
-spec execute_get_subscriptions_rows_id(Nidx :: mod_pubsub:nodeIdx()) -> |
281 |
|
mongoose_rdbms:query_result(). |
282 |
|
execute_get_subscriptions_rows_id(Nidx) -> |
283 |
166 |
mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows_id, [Nidx]). |
284 |
|
|
285 |
|
-spec execute_get_item_rows(LU :: jid:luser(), LS :: jid:lserver()) -> |
286 |
|
mongoose_rdbms:query_result(). |
287 |
|
execute_get_item_rows(LU, LS) -> |
288 |
988 |
mongoose_rdbms:execute_successfully(LS, pubsub_get_item_rows, [LU, LS]). |
289 |
|
|
290 |
|
-spec execute_get_affiliation_rows(LU :: jid:luser(), LS :: jid:lserver()) -> |
291 |
|
mongoose_rdbms:query_result(). |
292 |
|
execute_get_affiliation_rows(LU, LS) -> |
293 |
988 |
mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows, [LU, LS]). |
294 |
|
|
295 |
|
-spec execute_get_subscriptions_rows(LU :: jid:luser(), LS :: jid:lserver()) -> |
296 |
|
mongoose_rdbms:query_result(). |
297 |
|
execute_get_subscriptions_rows(LU, LS) -> |
298 |
15 |
mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows, [LU, LS]). |
299 |
|
|
300 |
|
-spec execute_get_subscriptions_rows_resource(LU :: jid:luser(), LS :: jid:lserver(), |
301 |
|
LR :: jid:lresource()) -> mongoose_rdbms:query_result(). |
302 |
|
execute_get_subscriptions_rows_resource(LU, LS, LR) -> |
303 |
1525 |
mongoose_rdbms:execute_successfully(global, |
304 |
|
pubsub_get_subscriptions_rows_resource, [LU, LS, LR]). |
305 |
|
|
306 |
|
-spec execute_get_idxs_of_own_nodes_with_pending_subs(LS :: jid:lserver(), |
307 |
|
Aff :: integer(), |
308 |
|
LU :: jid:luser(), |
309 |
|
Sub :: integer()) -> |
310 |
|
mongoose_rdbms:query_result(). |
311 |
|
execute_get_idxs_of_own_nodes_with_pending_subs(LS, Aff, LU, Sub) -> |
312 |
2 |
mongoose_rdbms:execute_successfully(LS, pubsub_get_idxs_of_own_nodes_with_pending_subs, |
313 |
|
[Aff, LU, LS, Sub]). |
314 |
|
|
315 |
|
% ------------------- Affiliations -------------------------------- |
316 |
|
-spec execute_get_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(), |
317 |
|
LS :: jid:lserver()) -> mongoose_rdbms:query_result(). |
318 |
|
execute_get_affiliation(Nidx, LU, LS) -> |
319 |
791 |
mongoose_rdbms:execute_successfully(LS, pubsub_get_affiliation, [Nidx, LU, LS]). |
320 |
|
|
321 |
|
-spec execute_delete_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(), |
322 |
|
LS :: jid:lserver()) -> mongoose_rdbms:query_result(). |
323 |
|
execute_delete_affiliation(Nidx, LU, LS) -> |
324 |
19 |
mongoose_rdbms:execute_successfully(LS, pubsub_delete_affiliation, [Nidx, LU, LS]). |
325 |
|
|
326 |
|
-spec execute_delete_all_affiliations(Nidx :: mod_pubsub:nodeIdx()) -> |
327 |
|
mongoose_rdbms:query_result(). |
328 |
|
execute_delete_all_affiliations(Nidx) -> |
329 |
156 |
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_affiliations, [Nidx]). |
330 |
|
|
331 |
|
% ------------------- Subscriptions -------------------------------- |
332 |
|
-spec execute_insert_subscription(Nidx :: mod_pubsub:nodeIdx(), |
333 |
|
LU :: jid:luser(), |
334 |
|
LS :: jid:lserver(), |
335 |
|
LR :: jid:lresource(), |
336 |
|
Sub :: integer(), |
337 |
|
SubId :: mod_pubsub:subId(), |
338 |
|
EncodedOpts :: iodata()) -> |
339 |
|
mongoose_rdbms:query_result(). |
340 |
|
execute_insert_subscription(Nidx, LU, LS, LR, Sub, SubId, EncodedOpts) -> |
341 |
94 |
mongoose_rdbms:execute_successfully(LS, pubsub_insert_subscription, |
342 |
|
[Nidx, LU, LS, LR, Sub, SubId, EncodedOpts]). |
343 |
|
|
344 |
|
-spec execute_update_subscription_opts(EncodedOpts :: iodata(), |
345 |
|
Nidx :: mod_pubsub:nodeIdx(), |
346 |
|
LU :: jid:luser(), |
347 |
|
LS :: jid:lserver(), |
348 |
|
LR :: jid:lresource(), |
349 |
|
SubId :: mod_pubsub:subId()) -> |
350 |
|
mongoose_rdbms:query_result(). |
351 |
|
execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId) -> |
352 |
4 |
mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription_opts, |
353 |
|
[EncodedOpts, Nidx, LU, LS, LR, SubId]). |
354 |
|
|
355 |
|
-spec execute_get_node_subs(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result(). |
356 |
|
execute_get_node_subs(Nidx) -> |
357 |
574 |
mongoose_rdbms:execute_successfully(global, pubsub_get_node_subs, [Nidx]). |
358 |
|
|
359 |
|
-spec execute_get_node_entity_subs(LS :: jid:lserver(), |
360 |
|
Nidx :: mod_pubsub:nodeIdx(), |
361 |
|
LU :: jid:luser(), |
362 |
|
LR :: jid:lresource()) -> |
363 |
|
mongoose_rdbms:query_result(). |
364 |
|
execute_get_node_entity_subs(LS, Nidx, LU, LR) -> |
365 |
487 |
mongoose_rdbms:execute_successfully(LS, pubsub_get_node_entity_subs, [Nidx, LU, LS, LR]). |
366 |
|
|
367 |
|
-spec execute_delete_subscription(LS :: jid:lserver(), |
368 |
|
Nidx :: mod_pubsub:nodeIdx(), |
369 |
|
LU :: jid:luser(), |
370 |
|
LR :: jid:lresource(), |
371 |
|
SubId :: mod_pubsub:subId()) -> |
372 |
|
mongoose_rdbms:query_result(). |
373 |
|
execute_delete_subscription(LS, Nidx, LU, LR, SubId) -> |
374 |
4 |
mongoose_rdbms:execute_successfully(LS, pubsub_delete_subscription, [Nidx, LU, LS, LR, SubId]). |
375 |
|
|
376 |
|
-spec execute_delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(), |
377 |
|
LU :: jid:luser(), |
378 |
|
LS :: jid:lserver(), |
379 |
|
LR :: jid:lresource()) -> |
380 |
|
mongoose_rdbms:query_result(). |
381 |
|
execute_delete_all_subscriptions(Nidx, LU, LS, LR) -> |
382 |
19 |
mongoose_rdbms:execute_successfully(LS, pubsub_delete_all_subscriptions, [Nidx, LU, LS, LR]). |
383 |
|
|
384 |
|
-spec execute_delete_all_subscriptions_id(Nidx :: mod_pubsub:nodeIdx()) -> |
385 |
|
mongoose_rdbms:query_result(). |
386 |
|
execute_delete_all_subscriptions_id(Nidx) -> |
387 |
156 |
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_subscriptions_id, [Nidx]). |
388 |
|
|
389 |
|
-spec execute_delete_user_subscriptions(LS :: jid:lserver(), LU :: jid:luser()) -> |
390 |
|
mongoose_rdbms:query_result(). |
391 |
|
execute_delete_user_subscriptions(LS, LU) -> |
392 |
78 |
mongoose_rdbms:execute_successfully(LS, pubsub_delete_user_subscriptions, [LU, LS]). |
393 |
|
|
394 |
|
-spec execute_update_subscription(Subscription :: integer(), |
395 |
|
Nidx :: mod_pubsub:nodeIdx(), |
396 |
|
LU :: jid:luser(), |
397 |
|
LS :: jid:lserver(), |
398 |
|
LR :: jid:lresource(), |
399 |
|
SubId :: mod_pubsub:subId()) -> |
400 |
|
mongoose_rdbms:query_result(). |
401 |
|
execute_update_subscription(Subscription, Nidx, LU, LS, LR, SubId) -> |
402 |
3 |
mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription, |
403 |
|
[Subscription, Nidx, LU, LS, LR, SubId]). |
404 |
|
|
405 |
|
% ------------------- Items -------------------------------- |
406 |
|
-spec execute_get_entity_items(LS :: jid:lserver(), |
407 |
|
Nidx :: mod_pubsub:nodeIdx(), |
408 |
|
LU :: jid:lserver()) -> |
409 |
|
mongoose_rdbms:query_result(). |
410 |
|
execute_get_entity_items(LS, Nidx, LU) -> |
411 |
266 |
mongoose_rdbms:execute_successfully(LS, pubsub_get_entity_items, [Nidx, LU, LS]). |
412 |
|
|
413 |
|
-spec execute_delete_item(LS :: jid:lserver(), |
414 |
|
Nidx :: mod_pubsub:nodeIdx(), |
415 |
|
LU :: jid:luser(), |
416 |
|
ItemId :: mod_pubsub:itemId()) -> |
417 |
|
mongoose_rdbms:query_result(). |
418 |
|
execute_delete_item(LS, Nidx, LU, ItemId) -> |
419 |
14 |
mongoose_rdbms:execute_successfully(LS, pubsub_delete_item, [Nidx, LU, LS, ItemId]). |
420 |
|
|
421 |
|
-spec execute_delete_all_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result(). |
422 |
|
execute_delete_all_items(Nidx) -> |
423 |
160 |
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_items, [Nidx]). |
424 |
|
|
425 |
|
-spec execute_get_item(Nidx :: mod_pubsub:nodeIdx(), |
426 |
|
ItemId :: mod_pubsub:itemId()) -> |
427 |
|
mongoose_rdbms:query_result(). |
428 |
|
execute_get_item(Nidx, ItemId) -> |
429 |
135 |
mongoose_rdbms:execute_successfully(global, pubsub_get_item, [Nidx, ItemId]). |
430 |
|
|
431 |
|
-spec execute_get_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result(). |
432 |
|
execute_get_items(Nidx) -> |
433 |
18 |
mongoose_rdbms:execute_successfully(global, pubsub_get_items, [Nidx]). |
434 |
|
|
435 |
|
-spec execute_get_items(Nidx :: mod_pubsub:nodeIdx(), Limit :: pos_integer()) -> |
436 |
|
mongoose_rdbms:query_result(). |
437 |
|
execute_get_items(Nidx, Limit) -> |
438 |
25 |
Args = rdbms_queries:add_limit_arg(Limit, [Nidx]), |
439 |
25 |
mongoose_rdbms:execute_successfully(global, pubsub_get_items_limit, Args). |
440 |
|
|
441 |
|
-spec execute_del_item(Nidx :: mod_pubsub:nodeIdx(), |
442 |
|
ItemId :: mod_pubsub:itemId()) -> |
443 |
|
mongoose_rdbms:query_result(). |
444 |
|
execute_del_item(Nidx, ItemId) -> |
445 |
22 |
mongoose_rdbms:execute_successfully(global, pubsub_del_item, [Nidx, ItemId]). |
446 |
|
|
447 |
|
% ------------------- Nodes -------------------------------- |
448 |
|
-spec execute_insert_pubsub_node(binary(), binary(), binary(), iodata(), iodata()) -> |
449 |
|
mongoose_rdbms:query_result(). |
450 |
|
execute_insert_pubsub_node(Key, Name, Type, Owners, Options) -> |
451 |
229 |
mongoose_rdbms:execute_successfully(global, pubsub_insert_node, |
452 |
|
[Key, Name, Type, Owners, Options]). |
453 |
|
|
454 |
|
-spec execute_insert_parent(binary(), binary()) -> mongoose_rdbms:query_result(). |
455 |
|
execute_insert_parent(Name, ParentName) -> |
456 |
28 |
mongoose_rdbms:execute_successfully(global, pubsub_insert_parent, |
457 |
|
[Name, ParentName]). |
458 |
|
|
459 |
|
-spec execute_update_pubsub_node(Type :: binary(), |
460 |
|
OwnersJid :: iodata(), |
461 |
|
Opts :: iodata(), |
462 |
|
Nidx :: mod_pubsub:nodeIdx()) -> |
463 |
|
mongoose_rdbms:query_result(). |
464 |
|
execute_update_pubsub_node(Type, OwnersJid, Opts, Nidx) -> |
465 |
11 |
mongoose_rdbms:execute_successfully(global, pubsub_update_pubsub_node, |
466 |
|
[Type, OwnersJid, Opts, Nidx]). |
467 |
|
|
468 |
|
-spec execute_select_node_by_key_and_name(Key :: binary() | jid:ljid(), |
469 |
|
Node :: mod_pubsub:nodeId()) -> |
470 |
|
mongoose_rdbms:query_result(). |
471 |
|
execute_select_node_by_key_and_name(Key, Node) -> |
472 |
1506 |
mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_key_and_name, |
473 |
|
[Key, Node]). |
474 |
|
|
475 |
|
-spec execute_select_node_by_id(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result(). |
476 |
|
execute_select_node_by_id(Nidx) -> |
477 |
273 |
mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_id, [Nidx]). |
478 |
|
|
479 |
|
-spec execute_select_nodes_by_key(Key :: binary() | jid:ljid()) -> |
480 |
|
mongoose_rdbms:query_result(). |
481 |
|
execute_select_nodes_by_key(Key) -> |
482 |
30 |
mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_key, [Key]). |
483 |
|
|
484 |
|
-spec execute_select_nodes_by_affiliated_user(LU :: jid:luser(), LS :: jid:lserver()) -> |
485 |
|
mongoose_rdbms:query_result(). |
486 |
|
execute_select_nodes_by_affiliated_user(LU, LS) -> |
487 |
78 |
mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_affiliated_user, [LU, LS]). |
488 |
|
|
489 |
|
-spec execute_select_subnodes(Key :: binary(), |
490 |
|
Node :: mod_pubsub:nodeId() | <<>>) -> |
491 |
|
mongoose_rdbms:query_result(). |
492 |
|
execute_select_subnodes(Key, <<>>) -> |
493 |
8 |
mongoose_rdbms:execute_successfully(global, pubsub_select_top_level_nodes, [Key]); |
494 |
|
execute_select_subnodes(Key, Node) -> |
495 |
92 |
mongoose_rdbms:execute_successfully(global, pubsub_select_subnodes, [Node, Key]). |
496 |
|
|
497 |
|
-spec execute_select_node_with_parents(Key :: binary(), Node :: mod_pubsub:nodeId()) -> |
498 |
|
mongoose_rdbms:query_result(). |
499 |
|
execute_select_node_with_parents(Key, Name) -> |
500 |
366 |
mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_parents, [Key, Name]). |
501 |
|
|
502 |
|
-spec execute_select_node_with_children(Key :: binary(), Node :: mod_pubsub:nodeId()) -> |
503 |
|
mongoose_rdbms:query_result(). |
504 |
|
execute_select_node_with_children(Key, Name) -> |
505 |
66 |
mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_children, [Key, Name]). |
506 |
|
|
507 |
|
-spec execute_delete_node(Key :: binary(), Node :: mod_pubsub:nodeId()) -> |
508 |
|
mongoose_rdbms:query_result(). |
509 |
|
execute_delete_node(Key, Node) -> |
510 |
156 |
mongoose_rdbms:execute_successfully(global, pubsub_delete_node, [Key, Node]). |
511 |
|
|
512 |
|
-spec execute_del_parents(Name :: mod_pubsub:nodeId()) -> mongoose_rdbms:query_result(). |
513 |
|
execute_del_parents(Name) -> |
514 |
11 |
mongoose_rdbms:execute_successfully(global, pubsub_del_parents, [Name]). |
515 |
|
|
516 |
|
% ------------------- GDPR -------------------------------- |
517 |
|
-spec execute_get_user_items(LU :: jid:luser(), LS :: jid:lserver()) -> |
518 |
|
mongoose_rdbms:query_result(). |
519 |
|
execute_get_user_items(LU, LS) -> |
520 |
21 |
mongoose_rdbms:execute_successfully(global, pubsub_get_user_items, [LU, LS]). |
521 |
|
|
522 |
|
-spec execute_select_nodes_by_owner(LJID :: binary()) -> mongoose_rdbms:query_result(). |
523 |
|
execute_select_nodes_by_owner(LJID) -> |
524 |
21 |
case mongoose_rdbms:db_engine(global) of |
525 |
|
pgsql -> |
526 |
:-( |
mongoose_rdbms:execute_successfully(global, |
527 |
|
pubsub_select_nodes_by_owner, [LJID]); |
528 |
|
_ -> |
529 |
21 |
mongoose_rdbms:execute_successfully(global, |
530 |
|
pubsub_select_nodes_by_owner, [iolist_to_binary(["[\"", LJID, "\"]"])]) |
531 |
|
end. |
532 |
|
|
533 |
|
-spec execute_get_user_subscriptions(LU :: jid:luser(), LS :: jid:lserver()) -> |
534 |
|
mongoose_rdbms:query_result(). |
535 |
|
execute_get_user_subscriptions(LU, LS) -> |
536 |
21 |
mongoose_rdbms:execute_successfully(global, pubsub_get_user_subscriptions, [LU, LS]). |
537 |
|
|
538 |
|
%% ------------------------ Fun execution ------------------------ |
539 |
|
|
540 |
|
transaction(Fun, ErrorDebug) -> |
541 |
389 |
case mongoose_rdbms:sql_transaction(global, mod_pubsub_db:extra_debug_fun(Fun)) of |
542 |
|
{atomic, Result} -> |
543 |
389 |
Result; |
544 |
|
{aborted, ReasonData} -> |
545 |
:-( |
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed) |
546 |
|
end. |
547 |
|
|
548 |
|
dirty(Fun, ErrorDebug) -> |
549 |
1731 |
try mongoose_rdbms:sql_dirty(global, mod_pubsub_db:extra_debug_fun(Fun)) of |
550 |
|
Result -> |
551 |
1731 |
Result |
552 |
|
catch |
553 |
|
_C:ReasonData -> |
554 |
:-( |
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed) |
555 |
|
end. |
556 |
|
|
557 |
|
%% ------------------------ Direct #pubsub_state access ------------------------ |
558 |
|
%% TODO: Functions for direct #pubsub_access are currently inefficient for RDBMS |
559 |
|
%% - refactor them or remove as many of them as possible from the API at some point |
560 |
|
-spec get_state(Nidx :: mod_pubsub:nodeIdx(), |
561 |
|
LJID :: jid:ljid()) -> |
562 |
|
{ok, mod_pubsub:pubsubState()}. |
563 |
|
get_state(Nidx, LJID) -> |
564 |
266 |
{ok, ItemIds} = get_entity_items(Nidx, LJID), |
565 |
266 |
{ok, Affiliation} = get_affiliation(Nidx, LJID), |
566 |
266 |
{ok, Subscriptions} = get_node_entity_subscriptions(Nidx, LJID), |
567 |
266 |
{ok, #pubsub_state{ |
568 |
|
stateid = {LJID, Nidx}, |
569 |
|
items = ItemIds, |
570 |
|
affiliation = Affiliation, |
571 |
|
subscriptions = Subscriptions |
572 |
|
}}. |
573 |
|
|
574 |
|
-spec get_states(Nidx :: mod_pubsub:nodeIdx()) -> |
575 |
|
{ok, [mod_pubsub:pubsubState()]}. |
576 |
|
get_states(Nidx) -> |
577 |
166 |
{selected, ItemRows} = execute_get_item_rows_id(Nidx), |
578 |
166 |
{selected, AffiliationRows} = execute_get_affiliation_rows_id(Nidx), |
579 |
166 |
{selected, SubRows} = execute_get_subscriptions_rows_id(Nidx), |
580 |
166 |
States = build_states(ItemRows, AffiliationRows, SubRows), |
581 |
166 |
{ok, States}. |
582 |
|
|
583 |
|
-spec get_states_by_lus(LJID :: jid:ljid()) -> |
584 |
|
{ok, [mod_pubsub:pubsubState()]}. |
585 |
|
get_states_by_lus({ LU, LS, _ }) -> |
586 |
15 |
{selected, ItemRows} = execute_get_item_rows(LU, LS), |
587 |
15 |
{selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS), |
588 |
15 |
{selected, SubRows} = execute_get_subscriptions_rows(LU, LS), |
589 |
15 |
States = build_states(ItemRows, AffiliationRows, SubRows), |
590 |
15 |
{ok, States}. |
591 |
|
|
592 |
|
-spec get_states_by_bare(LJID :: jid:ljid()) -> |
593 |
|
{ok, [mod_pubsub:pubsubState()]}. |
594 |
|
get_states_by_bare({ LU, LS, _ }) -> |
595 |
973 |
{selected, ItemRows} = execute_get_item_rows(LU, LS), |
596 |
973 |
{selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS), |
597 |
973 |
{selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, <<>>), |
598 |
973 |
States = build_states(ItemRows, AffiliationRows, SubRows), |
599 |
973 |
{ok, States}. |
600 |
|
|
601 |
|
-spec get_states_by_bare_and_full(LJID :: jid:ljid()) -> |
602 |
|
{ok, [mod_pubsub:pubsubState()]}. |
603 |
|
get_states_by_bare_and_full({ LU, LS, LR } = LJID) -> |
604 |
552 |
{ok, StatesBare} = get_states_by_bare(LJID), |
605 |
552 |
{selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, LR), |
606 |
552 |
StatesFull = build_states([], [], SubRows), |
607 |
552 |
{ok, StatesFull ++ StatesBare}. |
608 |
|
|
609 |
|
-spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) -> |
610 |
|
{ok, [mod_pubsub:nodeIdx()]}. |
611 |
|
get_idxs_of_own_nodes_with_pending_subs({ LU, LS, _ }) -> |
612 |
2 |
{selected, Rows} = |
613 |
|
execute_get_idxs_of_own_nodes_with_pending_subs(LS, aff2int(owner), LU, sub2int(pending)), |
614 |
2 |
{ok, [ mongoose_rdbms:result_to_integer(Nidx) || {Nidx} <- Rows ]}. |
615 |
|
|
616 |
|
%% ------------------------ Direct #pubsub_item access ------------------------ |
617 |
|
|
618 |
|
-spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) -> |
619 |
|
{ok, {[mod_pubsub:pubsubItem()], none}}. |
620 |
|
get_items(Nidx, Opts) -> |
621 |
51 |
MaxItems = maps:get(max_items, Opts, undefined), |
622 |
51 |
ItemIds = maps:get(item_ids, Opts, undefined), |
623 |
51 |
Rows = get_item_rows(Nidx, MaxItems, ItemIds), |
624 |
51 |
Result = [item_to_record(Row) || Row <- Rows], |
625 |
51 |
{ok, {Result, none}}. |
626 |
|
|
627 |
|
-spec get_item_rows(Nidx :: mod_pubsub:nodeIdx(), |
628 |
|
MaxItems :: undefined | non_neg_integer(), |
629 |
|
ItemIds :: undefined | mod_pubsub:itemId()) -> [tuple()]. |
630 |
|
get_item_rows(Nidx, undefined, undefined) -> |
631 |
18 |
{selected, Rows} = execute_get_items(Nidx), |
632 |
18 |
Rows; |
633 |
|
get_item_rows(Nidx, MaxItems, undefined) -> |
634 |
25 |
{selected, Rows} = execute_get_items(Nidx, MaxItems), |
635 |
25 |
Rows; |
636 |
|
get_item_rows(Nidx, MaxItems, ItemIds) -> |
637 |
|
%% Returned items have same order as ItemIds |
638 |
8 |
get_item_rows_acc(Nidx, MaxItems, ItemIds, []). |
639 |
|
|
640 |
8 |
get_item_rows_acc(_Nidx, _MaxItems, [], AccRows) -> AccRows; |
641 |
|
get_item_rows_acc(Nidx, MaxItems, [ItemId | ItemIds], AccRows) -> |
642 |
8 |
case execute_get_item(Nidx, ItemId) of |
643 |
|
{selected, []} -> |
644 |
:-( |
get_item_rows_acc(Nidx, MaxItems, ItemIds, AccRows); |
645 |
|
{selected, [Item]} when MaxItems =:= undefined; |
646 |
|
length(AccRows) < MaxItems -> |
647 |
8 |
get_item_rows_acc(Nidx, MaxItems, ItemIds, [Item | AccRows]); |
648 |
|
{selected, [_]} -> |
649 |
:-( |
AccRows |
650 |
|
end. |
651 |
|
|
652 |
|
-spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> |
653 |
|
{ok, mod_pubsub:pubsubItem()} | {error, item_not_found}. |
654 |
|
get_item(Nidx, ItemId) -> |
655 |
127 |
case execute_get_item(Nidx, ItemId) of |
656 |
|
{selected, []} -> |
657 |
119 |
{error, item_not_found}; |
658 |
|
{selected, [Item]} -> |
659 |
8 |
{ok, item_to_record(Item)} |
660 |
|
end. |
661 |
|
|
662 |
|
-spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok. |
663 |
|
set_item(#pubsub_item{itemid = {ItemId, NodeIdx}, |
664 |
|
creation = {CreatedAtNow, {CreatedLUser, CreatedLServer, _}}, |
665 |
|
modification = {ModifiedAtNow, {ModifiedLUser, ModifiedLServer, ModifiedLResource}}, |
666 |
|
publisher = PublisherIn, |
667 |
|
payload = Payload}) -> |
668 |
119 |
PayloadWrapped = #xmlel{name = <<"item">>, children = Payload}, |
669 |
119 |
PayloadXML = exml:to_binary(PayloadWrapped), |
670 |
119 |
CreatedAt = CreatedAtNow, |
671 |
119 |
ModifiedAt = ModifiedAtNow, |
672 |
119 |
Publisher = null_or_bin_jid(PublisherIn), |
673 |
119 |
InsertParams = [NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt, |
674 |
|
ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt, |
675 |
|
Publisher, PayloadXML], |
676 |
119 |
UpdateParams = [ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt, |
677 |
|
Publisher, PayloadXML], |
678 |
119 |
UniqueKeyValues = [NodeIdx, ItemId], |
679 |
119 |
{updated, _} = rdbms_queries:execute_upsert(global, pubsub_item_upsert, |
680 |
|
InsertParams, UpdateParams, UniqueKeyValues), |
681 |
119 |
ok. |
682 |
|
|
683 |
|
-spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok. |
684 |
|
del_item(Nidx, ItemId) -> |
685 |
8 |
{updated, _} = execute_del_item(Nidx, ItemId), |
686 |
8 |
ok. |
687 |
|
|
688 |
|
-spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok. |
689 |
|
del_items(_, []) -> |
690 |
113 |
ok; |
691 |
|
del_items(Nidx, ItemIds) -> |
692 |
10 |
[{updated, _} = execute_del_item(Nidx, Item) || Item <- ItemIds], |
693 |
10 |
ok. |
694 |
|
|
695 |
|
% ------------------- Node management -------------------------------- |
696 |
|
|
697 |
|
-spec create_node(Nidx :: mod_pubsub:nodeIdx(), Owner :: jid:ljid()) -> ok. |
698 |
|
create_node(Nidx, LJID) -> |
699 |
229 |
set_affiliation(Nidx, LJID, owner). |
700 |
|
|
701 |
|
-spec del_node(Nidx :: mod_pubsub:nodeIdx()) -> |
702 |
|
{ok, [mod_pubsub:pubsubState()]}. |
703 |
|
del_node(Nidx) -> |
704 |
156 |
{ok, States} = get_states(Nidx), |
705 |
156 |
{updated, _} = execute_delete_all_subscriptions_id(Nidx), |
706 |
156 |
{updated, _} = execute_delete_all_items(Nidx), |
707 |
156 |
{updated, _} = execute_delete_all_affiliations(Nidx), |
708 |
156 |
{ok, States}. |
709 |
|
|
710 |
|
-spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}. |
711 |
|
set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type, |
712 |
|
owners = Owners, options = Opts, parents = Parents}) -> |
713 |
229 |
ExtKey = encode_key(Key), |
714 |
229 |
ExtOwners = jiffy:encode([jid:to_binary(Owner) || Owner <- Owners]), |
715 |
229 |
ExtOpts = jiffy:encode({Opts}), |
716 |
229 |
{updated, 1} = execute_insert_pubsub_node(ExtKey, Name, Type, ExtOwners, ExtOpts), |
717 |
229 |
{selected, [Row]} = execute_select_node_by_key_and_name(ExtKey, Name), |
718 |
229 |
#pubsub_node{id = Nidx} = decode_pubsub_node_row(Row), |
719 |
229 |
set_parents(Name, Parents), |
720 |
229 |
{ok, Nidx}; |
721 |
|
|
722 |
|
set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type, |
723 |
|
owners = Owners, options = Opts, parents = Parents}) -> |
724 |
11 |
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners], |
725 |
11 |
execute_update_pubsub_node(Type, jiffy:encode(OwnersJid), jiffy:encode({Opts}), Nidx), |
726 |
11 |
execute_del_parents(Name), |
727 |
11 |
set_parents(Name, Parents), |
728 |
11 |
{ok, Nidx}. |
729 |
|
|
730 |
|
-spec set_parents(mod_pubsub:nodeId(), [mod_pubsub:nodeId()]) -> ok. |
731 |
|
set_parents(Name, Parents) -> |
732 |
240 |
[execute_insert_parent(Name, ParentName) || ParentName <- Parents], |
733 |
240 |
ok. |
734 |
|
|
735 |
|
-spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) -> |
736 |
|
{error, not_found} | {ok, mod_pubsub:pubsubNode()}. |
737 |
|
find_node_by_id(Nidx) -> |
738 |
273 |
case execute_select_node_by_id(Nidx) of |
739 |
|
{selected, []} -> |
740 |
:-( |
{error, not_found}; |
741 |
|
{selected, [Row]} -> |
742 |
273 |
{ok, decode_pubsub_node_row(Row)} |
743 |
|
end. |
744 |
|
|
745 |
|
-spec find_node_by_name(Key :: mod_pubsub:hostPubsub() | jid:ljid(), |
746 |
|
Node :: mod_pubsub:nodeId()) -> |
747 |
|
mod_pubsub:pubsubNode() | false. |
748 |
|
find_node_by_name(Key, Node) -> |
749 |
1277 |
case execute_select_node_by_key_and_name(encode_key(Key), Node) of |
750 |
|
{selected, [Row]} -> |
751 |
1025 |
decode_pubsub_node_row(Row); |
752 |
|
{selected, []} -> |
753 |
252 |
false |
754 |
|
end. |
755 |
|
|
756 |
|
decode_pubsub_node_row({Nidx, KeySQL, Name, Type, Owners, Options}) -> |
757 |
1672 |
Key = decode_key(KeySQL), |
758 |
1672 |
{DecodedOpts} = jiffy:decode(Options), |
759 |
1672 |
DecodedOptions = [maybe_option_value_to_atom(key_to_existing_atom(Item)) || |
760 |
1672 |
Item <- DecodedOpts], |
761 |
1672 |
DecodedOwners = [jid:to_lower(jid:from_binary(Owner)) || |
762 |
1672 |
Owner <- jiffy:decode(Owners)], |
763 |
1672 |
#pubsub_node{nodeid = {Key, Name}, |
764 |
|
id = mongoose_rdbms:result_to_integer(Nidx), |
765 |
|
type = Type, |
766 |
|
owners = DecodedOwners, |
767 |
|
options = DecodedOptions}. |
768 |
|
|
769 |
|
maybe_option_value_to_atom({access_model, Value}) -> |
770 |
1672 |
{access_model, binary_to_existing_atom(Value, utf8)}; |
771 |
|
maybe_option_value_to_atom({publish_model, Value}) -> |
772 |
1672 |
{publish_model, binary_to_existing_atom(Value, utf8)}; |
773 |
|
maybe_option_value_to_atom({notification_type, Value}) -> |
774 |
1672 |
{notification_type, binary_to_existing_atom(Value, utf8)}; |
775 |
|
maybe_option_value_to_atom({node_type, Value}) -> |
776 |
634 |
{node_type, binary_to_existing_atom(Value, utf8)}; |
777 |
|
maybe_option_value_to_atom({send_last_published_item, Value}) -> |
778 |
1672 |
{send_last_published_item, binary_to_existing_atom(Value, utf8)}; |
779 |
|
maybe_option_value_to_atom(Other) -> |
780 |
20206 |
Other. |
781 |
|
|
782 |
|
-spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) -> |
783 |
|
[mod_pubsub:pubsubNode()]. |
784 |
|
find_nodes_by_key(Key) -> |
785 |
30 |
{selected, Rows} = |
786 |
|
execute_select_nodes_by_key(encode_key(Key)), |
787 |
30 |
[decode_pubsub_node_row(Row) || Row <- Rows]. |
788 |
|
|
789 |
|
-spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok. |
790 |
|
delete_node(#pubsub_node{nodeid = {Key, Node}}) -> |
791 |
156 |
{updated, _} = execute_delete_node(encode_key(Key), Node), |
792 |
156 |
ok. |
793 |
|
|
794 |
|
-spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) -> |
795 |
|
[mod_pubsub:pubsubNode()]. |
796 |
|
get_subnodes(Key, Node) -> |
797 |
100 |
{selected, Rows} = execute_select_subnodes(encode_key(Key), Node), |
798 |
100 |
[decode_pubsub_node_row(Row) || Row <- Rows]. |
799 |
|
|
800 |
|
-spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) -> |
801 |
|
[{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}]. |
802 |
|
get_parentnodes_tree(Key, Node) -> |
803 |
309 |
find_nodes_with_parents(Key, [Node], 0, []). |
804 |
|
|
805 |
|
find_nodes_with_parents(_, [], _, Acc) -> |
806 |
309 |
Acc; |
807 |
|
find_nodes_with_parents(_, _, 100, Acc) -> |
808 |
:-( |
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}), |
809 |
:-( |
Acc; |
810 |
|
find_nodes_with_parents(Key, Nodes, Depth, Acc) -> |
811 |
363 |
ExtKey = encode_key(Key), |
812 |
363 |
Rows = lists:flatmap(fun(Name) -> |
813 |
366 |
{selected, Rs} = execute_select_node_with_parents(ExtKey, Name), |
814 |
366 |
Rs |
815 |
|
end, Nodes), |
816 |
363 |
Map = lists:foldl(fun update_nodes_map/2, #{}, Rows), |
817 |
363 |
MapTransformer = fun(Nidx, #{name := NodeName, |
818 |
|
next_level := Parents}, {ParentsAcc, NodesAcc}) -> |
819 |
366 |
NewParents = [Parents | ParentsAcc], |
820 |
366 |
Node = #pubsub_node{id = Nidx, |
821 |
|
nodeid = {Key, NodeName}, |
822 |
|
parents = Parents}, |
823 |
366 |
NewNodes = [Node | NodesAcc], |
824 |
366 |
{NewParents, NewNodes} |
825 |
|
end, |
826 |
|
|
827 |
363 |
{Parents, NewNodes} = maps:fold(MapTransformer, {[], []}, Map), |
828 |
363 |
NewAcc = [{Depth, NewNodes} | Acc], |
829 |
363 |
find_nodes_with_parents(Key, lists:flatten(Parents), Depth + 1, NewAcc). |
830 |
|
|
831 |
|
update_nodes_map({NidxSQL, NodeName, NextLevelNodeName}, Map) -> |
832 |
433 |
Nidx = mongoose_rdbms:result_to_integer(NidxSQL), |
833 |
433 |
case maps:get(Nidx, Map, undefined) of |
834 |
|
undefined -> |
835 |
430 |
NewNode = #{name => NodeName, |
836 |
|
next_level => maybe_add_parent_to_list(NextLevelNodeName, [])}, |
837 |
430 |
Map#{Nidx => NewNode}; |
838 |
|
#{next_level := NextLevelNodes} = Node -> |
839 |
3 |
UpdatedNode = Node#{next_level := maybe_add_parent_to_list(NextLevelNodeName, NextLevelNodes)}, |
840 |
3 |
Map#{Nidx := UpdatedNode} |
841 |
|
end. |
842 |
|
|
843 |
|
maybe_add_parent_to_list(null, List) -> |
844 |
373 |
List; |
845 |
|
maybe_add_parent_to_list(ParentName, List) -> |
846 |
60 |
[ParentName | List]. |
847 |
|
|
848 |
|
-spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) -> |
849 |
|
[{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}]. |
850 |
|
get_subnodes_tree(Key, Node) -> |
851 |
63 |
find_subnodes(Key, [Node], 0, []). |
852 |
|
|
853 |
|
find_subnodes(_Key, [], _, Acc) -> |
854 |
63 |
Acc; |
855 |
|
find_subnodes(_, _, 100, Acc) -> |
856 |
:-( |
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}), |
857 |
:-( |
Acc; |
858 |
|
find_subnodes(Key, Nodes, Depth, Acc) -> |
859 |
66 |
ExtKey = encode_key(Key), |
860 |
66 |
Rows = lists:flatmap(fun(Name) -> |
861 |
66 |
{selected, Rs} = execute_select_node_with_children(ExtKey, Name), |
862 |
66 |
Rs |
863 |
|
end, Nodes), |
864 |
66 |
Map = lists:foldl(fun update_nodes_map/2, #{}, Rows), |
865 |
66 |
MapTransformer = fun(Nidx, #{name := NodeName, |
866 |
|
next_level := Subnodes}, {SubnodesAcc, NodesAcc}) -> |
867 |
64 |
NewSubnodes = [Subnodes | SubnodesAcc], |
868 |
64 |
Node = #pubsub_node{id = Nidx, |
869 |
|
nodeid = {Key, NodeName}}, |
870 |
64 |
NewNodes = [Node | NodesAcc], |
871 |
64 |
{NewSubnodes, NewNodes} |
872 |
|
end, |
873 |
66 |
{Subnodes, NewNodes} = maps:fold(MapTransformer, {[], []}, Map), |
874 |
66 |
NewAcc = [{Depth, NewNodes} | Acc], |
875 |
66 |
find_subnodes(Key, lists:flatten(Subnodes), Depth + 1, NewAcc). |
876 |
|
|
877 |
|
% ------------------- Affiliations -------------------------------- |
878 |
|
|
879 |
|
-spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(), |
880 |
|
LJID :: jid:ljid(), |
881 |
|
Affiliation :: mod_pubsub:affiliation()) -> ok. |
882 |
|
set_affiliation(Nidx, { LU, LS, _ } = LJID, none) -> |
883 |
12 |
BareLJID = jid:to_bare(LJID), |
884 |
12 |
case get_node_entity_subscriptions(Nidx, BareLJID) of |
885 |
|
{ok, []} -> |
886 |
12 |
del_state(Nidx, BareLJID); |
887 |
|
_ -> |
888 |
:-( |
delete_affiliation_wo_subs_check(Nidx, LU, LS) |
889 |
|
end; |
890 |
|
set_affiliation(Nidx, { LU, LS, _ }, Affiliation) -> |
891 |
281 |
Aff = aff2int(Affiliation), |
892 |
281 |
InsertParams = [Nidx, LU, LS, Aff], |
893 |
281 |
UpdateParams = [Aff], |
894 |
281 |
UniqueKeyValues = [Nidx, LU, LS], |
895 |
281 |
{updated, _} = rdbms_queries:execute_upsert(global, pubsub_affiliation_upsert, |
896 |
|
InsertParams, UpdateParams, UniqueKeyValues), |
897 |
281 |
ok. |
898 |
|
|
899 |
|
-spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(), |
900 |
|
LJID :: jid:ljid()) -> |
901 |
|
{ok, mod_pubsub:affiliation()}. |
902 |
|
get_affiliation(Nidx, { LU, LS, _ }) -> |
903 |
791 |
case execute_get_affiliation(Nidx, LU, LS) of |
904 |
|
{selected, [{AffInt}]} -> |
905 |
645 |
{ok, sql2aff(AffInt)}; |
906 |
|
{selected, []} -> |
907 |
146 |
{ok, none} |
908 |
|
end. |
909 |
|
|
910 |
|
% ------------------- Subscriptions -------------------------------- |
911 |
|
|
912 |
|
-spec add_subscription(Nidx :: mod_pubsub:nodeIdx(), |
913 |
|
LJID :: jid:ljid(), |
914 |
|
Sub :: mod_pubsub:subscription(), |
915 |
|
SubId :: mod_pubsub:subId(), |
916 |
|
SubOpts :: mod_pubsub:subOptions()) -> ok. |
917 |
|
add_subscription(Nidx, { LU, LS, LR }, Sub, SubId, SubOpts) -> |
918 |
94 |
EncodedOpts = jiffy:encode({SubOpts}), |
919 |
94 |
{updated, _} = execute_insert_subscription(Nidx, LU, LS, LR, sub2int(Sub), SubId, EncodedOpts), |
920 |
94 |
ok. |
921 |
|
|
922 |
|
-spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(), |
923 |
|
LJID :: jid:ljid(), |
924 |
|
SubId :: mod_pubsub:subId(), |
925 |
|
Opts :: mod_pubsub:subOptions()) -> ok. |
926 |
|
set_subscription_opts(Nidx, { LU, LS, LR }, SubId, Opts) -> |
927 |
4 |
EncodedOpts = jiffy:encode({Opts}), |
928 |
4 |
{updated, _} = execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId), |
929 |
4 |
ok. |
930 |
|
|
931 |
|
-spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) -> |
932 |
|
{ok, [{Entity :: jid:ljid(), |
933 |
|
Sub :: mod_pubsub:subscription(), |
934 |
|
SubId :: mod_pubsub:subId(), |
935 |
|
SubOpts :: mod_pubsub:subOptions()}]}. |
936 |
|
get_node_subscriptions(Nidx) -> |
937 |
574 |
{selected, QueryResult} = execute_get_node_subs(Nidx), |
938 |
574 |
{ok, [{{LU, LS, LR}, sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)} |
939 |
574 |
|| {LU, LS, LR, SubInt, SubId, SubOpts} <- QueryResult ]}. |
940 |
|
|
941 |
|
-spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(), |
942 |
|
LJID :: jid:ljid()) -> |
943 |
|
{ok, [{Sub :: mod_pubsub:subscription(), |
944 |
|
SubId :: mod_pubsub:subId(), |
945 |
|
SubOpts :: mod_pubsub:subOptions()}]}. |
946 |
|
get_node_entity_subscriptions(Nidx, { LU, LS, LR }) -> |
947 |
487 |
{selected, QueryResult} = execute_get_node_entity_subs(LS, Nidx, LU, LR), |
948 |
487 |
{ok, [{sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)} |
949 |
487 |
|| {SubInt, SubId, SubOpts} <- QueryResult ]}. |
950 |
|
|
951 |
|
-spec delete_subscription(Nidx :: mod_pubsub:nodeIdx(), |
952 |
|
LJID :: jid:ljid(), |
953 |
|
SubId :: mod_pubsub:subId()) -> |
954 |
|
ok. |
955 |
|
delete_subscription(Nidx, { LU, LS, LR }, SubId) -> |
956 |
4 |
{updated, _} = execute_delete_subscription(LS, Nidx, LU, LR, SubId), |
957 |
4 |
ok. |
958 |
|
|
959 |
|
-spec delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(), |
960 |
|
LJID :: jid:ljid()) -> |
961 |
|
ok. |
962 |
|
delete_all_subscriptions(Nidx, { LU, LS, LR } = LJID) -> |
963 |
7 |
case get_affiliation(Nidx, LJID) of |
964 |
|
{ok, none} -> |
965 |
7 |
del_state(Nidx, LJID); |
966 |
|
_ -> |
967 |
:-( |
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR) |
968 |
|
end, |
969 |
7 |
ok. |
970 |
|
|
971 |
|
-spec update_subscription(Nidx :: mod_pubsub:nodeIdx(), |
972 |
|
LJID :: jid:ljid(), |
973 |
|
Subscription :: mod_pubsub:subscription(), |
974 |
|
SubId :: mod_pubsub:subId()) -> |
975 |
|
ok. |
976 |
|
update_subscription(Nidx, { LU, LS, LR }, Subscription, SubId) -> |
977 |
3 |
{updated, _} = execute_update_subscription(sub2int(Subscription), Nidx, LU, LS, LR, SubId), |
978 |
3 |
ok. |
979 |
|
|
980 |
|
% ------------------- Items -------------------------------- |
981 |
|
|
982 |
|
-spec add_item(Nidx :: mod_pubsub:nodeIdx(), |
983 |
|
LJID :: jid:ljid(), |
984 |
|
Item :: mod_pubsub:pubsubItem()) -> |
985 |
|
ok. |
986 |
|
add_item(_Nidx, _, Item) -> |
987 |
119 |
set_item(Item), |
988 |
119 |
ok. |
989 |
|
|
990 |
|
%% TODO: Make public at some point |
991 |
|
-spec get_entity_items(Nidx :: mod_pubsub:nodeIdx(), |
992 |
|
LJID :: jid:ljid()) -> |
993 |
|
{ok, [mod_pubsub:itemId()]}. |
994 |
|
get_entity_items(Nidx, { LU, LS, _ }) -> |
995 |
266 |
{selected, ItemIds} = execute_get_entity_items(LS, Nidx, LU), |
996 |
266 |
{ok, [ ItemId || {ItemId} <- ItemIds]}. |
997 |
|
|
998 |
|
-spec remove_items(Nidx :: mod_pubsub:nodeIdx(), |
999 |
|
LJID :: jid:ljid(), |
1000 |
|
ItemIds :: [mod_pubsub:itemId()]) -> |
1001 |
|
ok. |
1002 |
|
remove_items(Nidx, { LU, LS, _ }, ItemIds) -> |
1003 |
127 |
lists:foreach(fun(ItemId) -> |
1004 |
14 |
{updated, _} = execute_delete_item(LS, Nidx, LU, ItemId) |
1005 |
|
end, ItemIds). |
1006 |
|
|
1007 |
|
-spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) -> |
1008 |
|
ok. |
1009 |
|
remove_all_items(Nidx) -> |
1010 |
4 |
{updated, _} = execute_delete_all_items(Nidx), |
1011 |
4 |
ok. |
1012 |
|
|
1013 |
|
% ------------------- GDPR-related -------------------------------- |
1014 |
|
|
1015 |
|
get_user_payloads(LUser, LServer) -> |
1016 |
21 |
case execute_get_user_items(LUser, LServer) of |
1017 |
|
{selected, Items} -> |
1018 |
21 |
[[NodeName, ItemId, strip_payload(PayloadDB)] || {NodeName, ItemId, PayloadDB} <- Items] |
1019 |
|
end. |
1020 |
|
|
1021 |
|
get_user_nodes(LUser, LServer) -> |
1022 |
21 |
LJID = jid:to_binary({LUser, LServer, <<>>}), |
1023 |
21 |
{selected, Nodes} = execute_select_nodes_by_owner(LJID), |
1024 |
21 |
lists:map(fun tuple_to_list/1, Nodes). |
1025 |
|
|
1026 |
|
get_user_subscriptions(LUser, LServer) -> |
1027 |
21 |
{selected, Nodes} = execute_get_user_subscriptions(LUser, LServer), |
1028 |
21 |
lists:map(fun tuple_to_list/1, Nodes). |
1029 |
|
|
1030 |
|
strip_payload(PayloadDB) -> |
1031 |
17 |
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB), |
1032 |
17 |
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML), |
1033 |
17 |
exml:to_binary(Payload). |
1034 |
|
|
1035 |
|
-spec delete_user_subscriptions(jid:ljid()) -> ok. |
1036 |
|
delete_user_subscriptions({ LU, LS, _ }) -> |
1037 |
78 |
{updated, _} = execute_delete_user_subscriptions(LS, LU), |
1038 |
78 |
ok. |
1039 |
|
|
1040 |
|
find_nodes_by_affiliated_user({ LU, LS, _ }) -> |
1041 |
78 |
{selected, NodesWithAffs} = execute_select_nodes_by_affiliated_user(LU, LS), |
1042 |
78 |
lists:map(fun decode_pubsub_node_with_aff_row/1, NodesWithAffs). |
1043 |
|
|
1044 |
|
decode_pubsub_node_with_aff_row(Row) -> |
1045 |
14 |
[Aff | NodeRow] = tuple_to_list(Row), |
1046 |
14 |
{decode_pubsub_node_row(list_to_tuple(NodeRow)), sql2aff(Aff)}. |
1047 |
|
|
1048 |
|
%%==================================================================== |
1049 |
|
%% Helpers |
1050 |
|
%%==================================================================== |
1051 |
|
|
1052 |
|
-spec item_columns() -> binary(). |
1053 |
|
item_columns() -> |
1054 |
39 |
<<"nidx, itemid, created_luser, created_lserver, created_at, " |
1055 |
|
"modified_luser, modified_lserver, modified_lresource, modified_at, " |
1056 |
|
"publisher, payload">>. |
1057 |
|
|
1058 |
|
-spec pubsub_node_fields() -> binary(). |
1059 |
|
pubsub_node_fields() -> |
1060 |
39 |
<<"nidx, p_key, name, type, owners, options">>. |
1061 |
|
|
1062 |
|
-spec pubsub_node_fields_pn() -> binary(). |
1063 |
|
pubsub_node_fields_pn() -> |
1064 |
39 |
<<"pn.nidx, pn.p_key, pn.name, pn.type, pn.owners, pn.options">>. |
1065 |
|
|
1066 |
|
-spec del_state(Nidx :: mod_pubsub:nodeIdx(), |
1067 |
|
LJID :: jid:ljid()) -> ok. |
1068 |
|
del_state(Nidx, {LU, LS, LR}) -> |
1069 |
19 |
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR), |
1070 |
19 |
delete_affiliation_wo_subs_check(Nidx, LU, LS), |
1071 |
19 |
ok. |
1072 |
|
|
1073 |
|
-spec delete_all_subscriptions_wo_aff_check(Nidx :: mod_pubsub:nodeIdx(), |
1074 |
|
LU :: jid:luser(), |
1075 |
|
LS :: jid:lserver(), |
1076 |
|
LR :: jid:lresource()) -> ok. |
1077 |
|
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR) -> |
1078 |
19 |
{updated, _} = execute_delete_all_subscriptions(Nidx, LU, LS, LR), |
1079 |
19 |
ok. |
1080 |
|
|
1081 |
|
-spec delete_affiliation_wo_subs_check(Nidx :: mod_pubsub:nodeIdx(), |
1082 |
|
LU :: jid:luser(), |
1083 |
|
LS :: jid:lserver()) -> ok. |
1084 |
|
delete_affiliation_wo_subs_check(Nidx, LU, LS) -> |
1085 |
19 |
{updated, _} = execute_delete_affiliation(Nidx, LU, LS), |
1086 |
19 |
ok. |
1087 |
|
|
1088 |
|
-type item_row() :: { NidxSql :: integer() | binary(), |
1089 |
|
LU :: binary(), |
1090 |
|
LS :: binary(), |
1091 |
|
ItemId :: binary() }. |
1092 |
|
-type aff_row() :: { NidxSql :: integer() | binary(), |
1093 |
|
LU :: binary(), |
1094 |
|
LS :: binary(), |
1095 |
|
AffInt :: integer() }. |
1096 |
|
-type sub_row() :: { NidxSql :: integer() | binary(), |
1097 |
|
LU :: binary(), |
1098 |
|
LS :: binary(), |
1099 |
|
LR :: binary(), |
1100 |
|
TypeInt :: integer(), |
1101 |
|
SubId :: binary() }. |
1102 |
|
|
1103 |
|
-spec build_states(ItemRows :: [item_row()], AffRows :: [aff_row()], SubRows :: [sub_row()]) -> |
1104 |
|
[mod_pubsub:pubsubState()]. |
1105 |
|
build_states(ItemRows, AffRows, SubRows) -> |
1106 |
1706 |
Result1 = item_rows_to_states(ItemRows, #{}), |
1107 |
1706 |
Result2 = aff_rows_to_states(AffRows, Result1), |
1108 |
1706 |
maps:values(sub_rows_to_states(SubRows, Result2)). |
1109 |
|
|
1110 |
|
item_rows_to_states([], Acc) -> |
1111 |
1706 |
Acc; |
1112 |
|
item_rows_to_states([{ NidxSql, LU, LS, ItemId } | RRows], Acc) -> |
1113 |
125 |
Nidx = mongoose_rdbms:result_to_integer(NidxSql), |
1114 |
125 |
LJID = { LU, LS, <<>> }, |
1115 |
125 |
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }), |
1116 |
125 |
#pubsub_state{ items = Items0 } = PS, |
1117 |
125 |
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ items = [ItemId | Items0] } }, |
1118 |
125 |
item_rows_to_states(RRows, NAcc). |
1119 |
|
|
1120 |
|
aff_rows_to_states([], Acc) -> |
1121 |
1706 |
Acc; |
1122 |
|
aff_rows_to_states([{ NidxSql, LU, LS, AffInt } | RRows], Acc) -> |
1123 |
370 |
Nidx = mongoose_rdbms:result_to_integer(NidxSql), |
1124 |
370 |
LJID = { LU, LS, <<>> }, |
1125 |
370 |
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }), |
1126 |
370 |
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ affiliation = sql2aff(AffInt) } }, |
1127 |
370 |
aff_rows_to_states(RRows, NAcc). |
1128 |
|
|
1129 |
|
sub_rows_to_states([], Acc) -> |
1130 |
1706 |
Acc; |
1131 |
|
sub_rows_to_states([{ NidxSql, LU, LS, LR, TypeInt, SubId } | RRows], Acc) -> |
1132 |
94 |
Nidx = mongoose_rdbms:result_to_integer(NidxSql), |
1133 |
94 |
LJID = { LU, LS, LR }, |
1134 |
94 |
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }), |
1135 |
94 |
#pubsub_state{ subscriptions = Subs0 } = PS, |
1136 |
94 |
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ |
1137 |
|
subscriptions = [{sql2sub(TypeInt), SubId} | Subs0] } }, |
1138 |
94 |
sub_rows_to_states(RRows, NAcc). |
1139 |
|
|
1140 |
|
-spec aff2int(mod_pubsub:affiliation()) -> integer(). |
1141 |
:-( |
aff2int(none) -> 0; |
1142 |
233 |
aff2int(owner) -> 1; |
1143 |
2 |
aff2int(publisher) -> 2; |
1144 |
46 |
aff2int(publish_only) -> 3; |
1145 |
2 |
aff2int(member) -> 4; |
1146 |
:-( |
aff2int(outcast) -> 5. |
1147 |
|
|
1148 |
|
-spec sql2aff(integer() | binary()) -> mod_pubsub:affiliation(). |
1149 |
|
sql2aff(SqlInt) -> |
1150 |
1029 |
int2aff(mongoose_rdbms:result_to_integer(SqlInt)). |
1151 |
|
|
1152 |
|
-spec int2aff(integer()) -> mod_pubsub:affiliation(). |
1153 |
:-( |
int2aff(0) -> none; |
1154 |
923 |
int2aff(1) -> owner; |
1155 |
8 |
int2aff(2) -> publisher; |
1156 |
88 |
int2aff(3) -> publish_only; |
1157 |
10 |
int2aff(4) -> member; |
1158 |
:-( |
int2aff(5) -> outcast. |
1159 |
|
|
1160 |
|
-spec sub2int(mod_pubsub:subscription()) -> integer(). |
1161 |
:-( |
sub2int(none) -> 0; |
1162 |
11 |
sub2int(pending) -> 1; |
1163 |
88 |
sub2int(subscribed) -> 3. |
1164 |
|
|
1165 |
|
-spec sql2sub(integer() | binary()) -> mod_pubsub:subscription(). |
1166 |
|
sql2sub(SqlInt) -> |
1167 |
291 |
int2sub(mongoose_rdbms:result_to_integer(SqlInt)). |
1168 |
|
|
1169 |
|
-spec int2sub(integer()) -> mod_pubsub:subscription(). |
1170 |
:-( |
int2sub(0) -> none; |
1171 |
24 |
int2sub(1) -> pending; |
1172 |
267 |
int2sub(3) -> subscribed. |
1173 |
|
|
1174 |
|
sql_to_sub_opts(SqlOpts) -> |
1175 |
197 |
{Opts} = jiffy:decode(SqlOpts), |
1176 |
197 |
lists:map(fun key_to_existing_atom/1, Opts). |
1177 |
|
|
1178 |
|
item_to_record({NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt, |
1179 |
|
ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt, |
1180 |
|
PublisherIn, PayloadDB}) -> |
1181 |
56 |
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB), |
1182 |
56 |
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML), |
1183 |
56 |
ItemAndNodeId = {ItemId, mongoose_rdbms:result_to_integer(NodeIdx)}, |
1184 |
56 |
Creation = {mongoose_rdbms:result_to_integer(CreatedAt), |
1185 |
|
{CreatedLUser, CreatedLServer, <<>>}}, |
1186 |
56 |
Modification = {mongoose_rdbms:result_to_integer(ModifiedAt), |
1187 |
|
{ModifiedLUser, ModifiedLServer, ModifiedLResource}}, |
1188 |
56 |
Publisher = decode_publisher(PublisherIn), |
1189 |
56 |
#pubsub_item{itemid = ItemAndNodeId, |
1190 |
|
creation = Creation, |
1191 |
|
modification = Modification, |
1192 |
|
publisher = Publisher, |
1193 |
|
payload = Payload}. |
1194 |
|
|
1195 |
|
decode_publisher(null) -> |
1196 |
54 |
undefined; |
1197 |
|
decode_publisher(Binary) -> |
1198 |
|
%% Silently returns `error` if parsing fails. |
1199 |
2 |
jid:from_binary(Binary). |
1200 |
|
|
1201 |
|
encode_key(Key) when is_binary(Key) -> |
1202 |
2051 |
Key; |
1203 |
|
encode_key({_, _, _} = JID) -> |
1204 |
170 |
jid:to_binary(JID). |
1205 |
|
|
1206 |
|
decode_key(KeySQL) -> |
1207 |
1672 |
case jid:from_binary(KeySQL) of |
1208 |
|
#jid{luser = <<>>, lserver = Host, lresource = <<>>} -> |
1209 |
1564 |
Host; |
1210 |
|
#jid{luser = LUser, lserver = LServer, lresource = LResource} -> |
1211 |
108 |
{LUser, LServer, LResource} |
1212 |
|
end. |
1213 |
|
|
1214 |
|
null_or_bin_jid(undefined) -> |
1215 |
115 |
null; |
1216 |
|
null_or_bin_jid(Jid) -> |
1217 |
4 |
jid:to_binary(Jid). |
1218 |
|
|
1219 |
|
key_to_existing_atom({Key, Value}) when is_atom(Key)-> |
1220 |
:-( |
{Key, Value}; |
1221 |
|
key_to_existing_atom({Key, Value}) -> |
1222 |
27554 |
{binary_to_existing_atom(Key, utf8), Value}. |
1223 |
|
|