./ct_report/coverage/mod_pubsub_db_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_rdbms.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub RDBMS backend
5 %%% Created : 2 Nov 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_rdbms).
9 -author('piotr.nosek@erlang-solutions.com').
10 -author('michal.piotrowski@erlang-solutions.com').
11
12 -behaviour(mod_pubsub_db).
13
14 -include("pubsub.hrl").
15 -include("mongoose_logger.hrl").
16 -include("jlib.hrl").
17
18 -export([init/2, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 del_node/1,
29 set_node/1,
30 find_node_by_id/1,
31 find_nodes_by_key/1,
32 find_node_by_name/2,
33 delete_node/1,
34 get_subnodes/2,
35 get_parentnodes_tree/2,
36 get_subnodes_tree/2
37 ]).
38 % Affiliations
39 -export([
40 set_affiliation/3,
41 get_affiliation/2
42 ]).
43 % Subscriptions
44 -export([
45 add_subscription/5,
46 set_subscription_opts/4,
47 get_node_subscriptions/1,
48 get_node_entity_subscriptions/2,
49 delete_subscription/3,
50 delete_all_subscriptions/2,
51 update_subscription/4
52 ]).
53 % Item ids in state
54 -export([
55 add_item/3,
56 remove_items/3,
57 remove_all_items/1
58 ]).
59 % Whole items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 %% GDPR related
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Behaviour callbacks
79 %%====================================================================
80
81 %% ------------------------ Backend start/stop ------------------------
82
83 init(_HostType, _Opts) ->
84 % -------------------- State building ----------------------------
85 39 mongoose_rdbms:prepare(pubsub_get_item_rows_id, pubsub_items, [nidx],
86 <<"SELECT nidx, created_luser, created_lserver, itemid "
87 "FROM pubsub_items WHERE nidx = ?">>),
88 39 mongoose_rdbms:prepare(pubsub_get_affiliation_rows_id, pubsub_affiliations, [nidx],
89 <<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations WHERE nidx = ?">>),
90 39 mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_id, pubsub_subscriptions, [nidx],
91 <<"SELECT nidx, luser, lserver, lresource, type, sub_id "
92 "FROM pubsub_subscriptions WHERE nidx = ?">>),
93 39 mongoose_rdbms:prepare(pubsub_get_item_rows, pubsub_items, [created_luser, created_lserver],
94 <<"SELECT nidx, created_luser, created_lserver, itemid FROM pubsub_items"
95 " WHERE created_luser = ? AND created_lserver = ?">>),
96 39 mongoose_rdbms:prepare(pubsub_get_affiliation_rows, pubsub_affiliations, [luser, lserver],
97 <<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations "
98 "WHERE luser = ? AND lserver = ?">>),
99 39 mongoose_rdbms:prepare(pubsub_get_subscriptions_rows, pubsub_subscriptions, [luser, lserver],
100 <<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions "
101 "WHERE luser = ? AND lserver = ?">>),
102 39 mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_resource, pubsub_subscriptions,
103 [luser, lserver, lresource],
104 <<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions "
105 "WHERE luser = ? AND lserver = ? AND lresource = ?">>),
106 39 mongoose_rdbms:prepare(pubsub_get_idxs_of_own_nodes_with_pending_subs, pubsub_affiliations,
107 [aff, luser, lserver, 'pubsub_subscriptions.type'],
108 <<"SELECT DISTINCT s.nidx FROM pubsub_affiliations AS a "
109 "INNER JOIN pubsub_subscriptions s ON a.nidx = s.nidx "
110 "WHERE a.aff = ? AND a.luser = ? AND a.lserver = ? AND s.type = ?">>),
111
112 % ------------------- Affiliations --------------------------------
113 39 mongoose_rdbms:prepare(pubsub_get_affiliation, pubsub_affiliations, [nidx, luser, lserver],
114 <<"SELECT aff FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>),
115 39 mongoose_rdbms:prepare(pubsub_delete_affiliation, pubsub_affiliations, [nidx, luser, lserver],
116 <<"DELETE FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>),
117 39 mongoose_rdbms:prepare(pubsub_delete_all_affiliations, pubsub_affiliations, [nidx],
118 <<"DELETE FROM pubsub_affiliations WHERE nidx = ?">>),
119
120 % ------------------- Subscriptions --------------------------------
121 39 mongoose_rdbms:prepare(pubsub_insert_subscription, pubsub_subscriptions,
122 [nidx, luser, lserver, lresource, type, sub_id, options],
123 <<"INSERT INTO pubsub_subscriptions (nidx, luser, lserver, lresource, "
124 "type, sub_id, options) VALUES (?, ?, ?, ?, ?, ?, ?)">>),
125 39 mongoose_rdbms:prepare(pubsub_update_subscription_opts, pubsub_subscriptions,
126 [options, nidx, luser, lserver, lresource, sub_id],
127 <<"UPDATE pubsub_subscriptions SET options = ? WHERE nidx = ? "
128 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
129 39 mongoose_rdbms:prepare(pubsub_get_node_subs, pubsub_subscriptions, [nidx],
130 <<"SELECT luser, lserver, lresource, type, sub_id, options "
131 "FROM pubsub_subscriptions WHERE nidx = ?">>),
132 39 mongoose_rdbms:prepare(pubsub_get_node_entity_subs, pubsub_subscriptions,
133 [nidx, luser, lserver, lresource],
134 <<"SELECT type, sub_id, options FROM pubsub_subscriptions "
135 "WHERE nidx = ? AND luser = ? AND lserver = ? AND lresource = ?">>),
136 39 mongoose_rdbms:prepare(pubsub_delete_subscription, pubsub_subscriptions,
137 [nidx, luser, lserver, lresource, sub_id],
138 <<"DELETE FROM pubsub_subscriptions WHERE nidx = ? "
139 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
140 39 mongoose_rdbms:prepare(pubsub_delete_all_subscriptions, pubsub_subscriptions,
141 [nidx, luser, lserver, lresource],
142 <<"DELETE FROM pubsub_subscriptions"" WHERE nidx = ? "
143 "AND luser = ? AND lserver = ? AND lresource = ?">>),
144 39 mongoose_rdbms:prepare(pubsub_delete_all_subscriptions_id, pubsub_subscriptions, [nidx],
145 <<"DELETE FROM pubsub_subscriptions WHERE nidx = ?">>),
146 39 mongoose_rdbms:prepare(pubsub_delete_user_subscriptions,
147 pubsub_subscriptions, [luser, lserver],
148 <<"DELETE FROM pubsub_subscriptions WHERE luser = ? AND lserver = ?">>),
149 39 mongoose_rdbms:prepare(pubsub_update_subscription, pubsub_subscriptions,
150 [type, nidx, luser, lserver, lresource, sub_id],
151 <<"UPDATE pubsub_subscriptions SET type = ? WHERE nidx = ? "
152 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
153
154 % ------------------- Items --------------------------------
155 39 mongoose_rdbms:prepare(pubsub_get_entity_items, pubsub_items,
156 [nidx, created_luser, created_lserver],
157 <<"SELECT itemid FROM pubsub_items WHERE nidx = ? "
158 "AND created_luser = ? AND created_lserver = ?">>),
159 39 mongoose_rdbms:prepare(pubsub_delete_item, pubsub_items,
160 [nidx, created_luser, created_lserver, itemid],
161 <<"DELETE FROM pubsub_items WHERE nidx = ? AND created_luser = ? ",
162 "AND created_lserver = ? AND itemid = ?">>),
163 39 mongoose_rdbms:prepare(pubsub_delete_all_items, pubsub_items, [nidx],
164 <<"DELETE FROM pubsub_items WHERE nidx = ?">>),
165 39 ItemColumns = item_columns(),
166 39 mongoose_rdbms:prepare(pubsub_get_item, pubsub_items, [nidx, itemid],
167 <<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? AND itemid = ?">>),
168 39 mongoose_rdbms:prepare(pubsub_get_items, pubsub_items, [nidx],
169 <<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? "
170 "ORDER BY modified_at DESC">>),
171 39 {LimitSQL, LimitMSSQL} = rdbms_queries:get_db_specific_limits_binaries(),
172 39 mongoose_rdbms:prepare(pubsub_get_items_limit, pubsub_items,
173 rdbms_queries:add_limit_arg(limit, [nidx]),
174 <<"SELECT ", LimitMSSQL/binary, " ", ItemColumns/binary,
175 " FROM pubsub_items WHERE nidx = ? ORDER BY modified_at DESC ", LimitSQL/binary>>),
176 39 mongoose_rdbms:prepare(pubsub_del_item, pubsub_items, [nidx, itemid],
177 <<"DELETE FROM pubsub_items WHERE nidx = ? AND itemid = ?">>),
178
179 % ------------------- Nodes --------------------------------
180 39 mongoose_rdbms:prepare(pubsub_insert_node, pubsub_nodes, [p_key, name, type, owners, options],
181 <<"INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (?, ?, ?, ?, ?)">>),
182 39 mongoose_rdbms:prepare(pubsub_insert_parent, pubsub_node_collections, [name, parent_name],
183 <<"INSERT INTO pubsub_node_collections (name, parent_name) VALUES (?, ?)">>),
184 39 mongoose_rdbms:prepare(pubsub_update_pubsub_node, pubsub_nodes, [type, owners, options, nidx],
185 <<"UPDATE pubsub_nodes SET type = ?, owners = ?, options = ? WHERE nidx = ?">>),
186 39 PubsubNodeFields = pubsub_node_fields(),
187 39 mongoose_rdbms:prepare(pubsub_select_node_by_key_and_name, pubsub_nodes, [p_key, name],
188 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ? AND name = ?">>),
189 39 mongoose_rdbms:prepare(pubsub_select_node_by_id, pubsub_nodes, [nidx],
190 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE nidx = ?">>),
191 39 mongoose_rdbms:prepare(pubsub_select_nodes_by_key, pubsub_nodes, [p_key],
192 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ?">>),
193 39 mongoose_rdbms:prepare(pubsub_select_nodes_in_list_with_key, pubsub_nodes, [p_key, names],
194 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes "
195 "WHERE p_key = ? AND name IN (?)">>),
196 39 PubsubNodeFieldsPrefixed = pubsub_node_fields_pn(),
197 39 mongoose_rdbms:prepare(pubsub_select_nodes_by_affiliated_user,
198 pubsub_affiliations, [luser, lserver],
199 <<"SELECT aff, ", PubsubNodeFieldsPrefixed/binary, " FROM pubsub_affiliations AS pa "
200 "INNER JOIN pubsub_nodes AS pn ON pa.nidx = pn.nidx WHERE luser = ? AND lserver = ?">>),
201 39 mongoose_rdbms:prepare(pubsub_select_top_level_nodes, pubsub_nodes, [p_key],
202 <<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn "
203 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name "
204 "WHERE p_key = ? AND collection.parent_name IS NULL">>),
205 39 mongoose_rdbms:prepare(pubsub_select_subnodes, pubsub_nodes,
206 ['pubsub_node_collections.parent_name', p_key],
207 <<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn "
208 "INNER JOIN pubsub_node_collections as collection ON pn.name = collection.name AND "
209 "collection.parent_name = ? WHERE p_key = ?">>),
210 39 mongoose_rdbms:prepare(pubsub_select_node_with_parents, pubsub_nodes,
211 [p_key, name],
212 <<"SELECT pn.nidx, pn.name, collection.parent_name from pubsub_nodes as pn "
213 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name "
214 "WHERE pn.p_key = ? AND pn.name = ?">>),
215 39 mongoose_rdbms:prepare(pubsub_select_node_with_children, pubsub_nodes,
216 [p_key, name],
217 <<"SELECT pn.nidx, pn.name, collection.name from pubsub_nodes as pn "
218 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.parent_name "
219 "WHERE pn.p_key = ? AND pn.name = ?">>),
220 39 mongoose_rdbms:prepare(pubsub_delete_node, pubsub_nodes, [p_key, name],
221 <<"DELETE from pubsub_nodes WHERE p_key = ? AND name = ?">>),
222 39 mongoose_rdbms:prepare(pubsub_del_parents, pubsub_node_collections, [name],
223 <<"DELETE FROM pubsub_node_collections WHERE name = ?">>),
224
225 % ------------------- GDPR --------------------------------
226 39 mongoose_rdbms:prepare(pubsub_get_user_items, pubsub_items, [created_luser, created_lserver],
227 <<"SELECT name, itemid, payload FROM pubsub_items INNER JOIN pubsub_nodes "
228 "ON pubsub_items.nidx = pubsub_nodes.nidx WHERE created_luser = ? "
229 "AND created_lserver = ?">>),
230 39 mongoose_rdbms:prepare(pubsub_get_user_subscriptions, pubsub_subscriptions, [luser, lserver],
231 <<"SELECT name FROM pubsub_subscriptions INNER JOIN pubsub_nodes "
232 "ON pubsub_subscriptions.nidx = pubsub_nodes.nidx WHERE luser = ? AND lserver = ?">>),
233 39 prepare_select_nodes_by_owner(),
234
235 39 rdbms_queries:prepare_upsert(global, pubsub_affiliation_upsert, pubsub_affiliations,
236 [<<"nidx">>, <<"luser">>, <<"lserver">>, <<"aff">>],
237 [<<"aff">>],
238 [<<"nidx">>, <<"luser">>, <<"lserver">>]),
239 39 ItemInsertFields = [<<"nidx">>, <<"itemid">>,
240 <<"created_luser">>, <<"created_lserver">>, <<"created_at">>,
241 <<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>,
242 <<"publisher">>, <<"payload">>],
243 39 ItemUpdateFields = [<<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>,
244 <<"publisher">>, <<"payload">>],
245 39 rdbms_queries:prepare_upsert(global, pubsub_item_upsert, pubsub_items,
246 ItemInsertFields,
247 ItemUpdateFields,
248 [<<"nidx">>, <<"itemid">>]),
249 39 ok.
250
251 -spec stop() -> ok.
252 stop() ->
253 39 ok.
254
255 %% ------------------------ Queries execution --------------------
256 prepare_select_nodes_by_owner() ->
257 39 case {mongoose_rdbms:db_engine(global), mongoose_rdbms:db_type()} of
258 {mysql, _} ->
259
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
260 <<"SELECT name, type FROM pubsub_nodes WHERE owners = convert(?, JSON);">>);
261 {pgsql, _} ->
262 39 mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
263 <<"SELECT name, type FROM pubsub_nodes WHERE owners ::json->>0 like ? "
264 "AND JSON_ARRAY_LENGTH(owners) = 1">>);
265 {odbc, mssql} ->
266
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
267 <<"SELECT name, type FROM pubsub_nodes WHERE cast(owners as nvarchar(max)) = ?;">>)
268 end.
269 % -------------------- State building ----------------------------
270 -spec execute_get_item_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
271 mongoose_rdbms:query_result().
272 execute_get_item_rows_id(Nidx) ->
273 166 mongoose_rdbms:execute_successfully(global, pubsub_get_item_rows_id, [Nidx]).
274
275 -spec execute_get_affiliation_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
276 mongoose_rdbms:query_result().
277 execute_get_affiliation_rows_id(Nidx) ->
278 166 mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows_id, [Nidx]).
279
280 -spec execute_get_subscriptions_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
281 mongoose_rdbms:query_result().
282 execute_get_subscriptions_rows_id(Nidx) ->
283 166 mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows_id, [Nidx]).
284
285 -spec execute_get_item_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
286 mongoose_rdbms:query_result().
287 execute_get_item_rows(LU, LS) ->
288 988 mongoose_rdbms:execute_successfully(LS, pubsub_get_item_rows, [LU, LS]).
289
290 -spec execute_get_affiliation_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
291 mongoose_rdbms:query_result().
292 execute_get_affiliation_rows(LU, LS) ->
293 988 mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows, [LU, LS]).
294
295 -spec execute_get_subscriptions_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
296 mongoose_rdbms:query_result().
297 execute_get_subscriptions_rows(LU, LS) ->
298 15 mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows, [LU, LS]).
299
300 -spec execute_get_subscriptions_rows_resource(LU :: jid:luser(), LS :: jid:lserver(),
301 LR :: jid:lresource()) -> mongoose_rdbms:query_result().
302 execute_get_subscriptions_rows_resource(LU, LS, LR) ->
303 1525 mongoose_rdbms:execute_successfully(global,
304 pubsub_get_subscriptions_rows_resource, [LU, LS, LR]).
305
306 -spec execute_get_idxs_of_own_nodes_with_pending_subs(LS :: jid:lserver(),
307 Aff :: integer(),
308 LU :: jid:luser(),
309 Sub :: integer()) ->
310 mongoose_rdbms:query_result().
311 execute_get_idxs_of_own_nodes_with_pending_subs(LS, Aff, LU, Sub) ->
312 2 mongoose_rdbms:execute_successfully(LS, pubsub_get_idxs_of_own_nodes_with_pending_subs,
313 [Aff, LU, LS, Sub]).
314
315 % ------------------- Affiliations --------------------------------
316 -spec execute_get_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(),
317 LS :: jid:lserver()) -> mongoose_rdbms:query_result().
318 execute_get_affiliation(Nidx, LU, LS) ->
319 791 mongoose_rdbms:execute_successfully(LS, pubsub_get_affiliation, [Nidx, LU, LS]).
320
321 -spec execute_delete_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(),
322 LS :: jid:lserver()) -> mongoose_rdbms:query_result().
323 execute_delete_affiliation(Nidx, LU, LS) ->
324 19 mongoose_rdbms:execute_successfully(LS, pubsub_delete_affiliation, [Nidx, LU, LS]).
325
326 -spec execute_delete_all_affiliations(Nidx :: mod_pubsub:nodeIdx()) ->
327 mongoose_rdbms:query_result().
328 execute_delete_all_affiliations(Nidx) ->
329 156 mongoose_rdbms:execute_successfully(global, pubsub_delete_all_affiliations, [Nidx]).
330
331 % ------------------- Subscriptions --------------------------------
332 -spec execute_insert_subscription(Nidx :: mod_pubsub:nodeIdx(),
333 LU :: jid:luser(),
334 LS :: jid:lserver(),
335 LR :: jid:lresource(),
336 Sub :: integer(),
337 SubId :: mod_pubsub:subId(),
338 EncodedOpts :: iodata()) ->
339 mongoose_rdbms:query_result().
340 execute_insert_subscription(Nidx, LU, LS, LR, Sub, SubId, EncodedOpts) ->
341 94 mongoose_rdbms:execute_successfully(LS, pubsub_insert_subscription,
342 [Nidx, LU, LS, LR, Sub, SubId, EncodedOpts]).
343
344 -spec execute_update_subscription_opts(EncodedOpts :: iodata(),
345 Nidx :: mod_pubsub:nodeIdx(),
346 LU :: jid:luser(),
347 LS :: jid:lserver(),
348 LR :: jid:lresource(),
349 SubId :: mod_pubsub:subId()) ->
350 mongoose_rdbms:query_result().
351 execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId) ->
352 4 mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription_opts,
353 [EncodedOpts, Nidx, LU, LS, LR, SubId]).
354
355 -spec execute_get_node_subs(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
356 execute_get_node_subs(Nidx) ->
357 574 mongoose_rdbms:execute_successfully(global, pubsub_get_node_subs, [Nidx]).
358
359 -spec execute_get_node_entity_subs(LS :: jid:lserver(),
360 Nidx :: mod_pubsub:nodeIdx(),
361 LU :: jid:luser(),
362 LR :: jid:lresource()) ->
363 mongoose_rdbms:query_result().
364 execute_get_node_entity_subs(LS, Nidx, LU, LR) ->
365 487 mongoose_rdbms:execute_successfully(LS, pubsub_get_node_entity_subs, [Nidx, LU, LS, LR]).
366
367 -spec execute_delete_subscription(LS :: jid:lserver(),
368 Nidx :: mod_pubsub:nodeIdx(),
369 LU :: jid:luser(),
370 LR :: jid:lresource(),
371 SubId :: mod_pubsub:subId()) ->
372 mongoose_rdbms:query_result().
373 execute_delete_subscription(LS, Nidx, LU, LR, SubId) ->
374 4 mongoose_rdbms:execute_successfully(LS, pubsub_delete_subscription, [Nidx, LU, LS, LR, SubId]).
375
376 -spec execute_delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
377 LU :: jid:luser(),
378 LS :: jid:lserver(),
379 LR :: jid:lresource()) ->
380 mongoose_rdbms:query_result().
381 execute_delete_all_subscriptions(Nidx, LU, LS, LR) ->
382 19 mongoose_rdbms:execute_successfully(LS, pubsub_delete_all_subscriptions, [Nidx, LU, LS, LR]).
383
384 -spec execute_delete_all_subscriptions_id(Nidx :: mod_pubsub:nodeIdx()) ->
385 mongoose_rdbms:query_result().
386 execute_delete_all_subscriptions_id(Nidx) ->
387 156 mongoose_rdbms:execute_successfully(global, pubsub_delete_all_subscriptions_id, [Nidx]).
388
389 -spec execute_delete_user_subscriptions(LS :: jid:lserver(), LU :: jid:luser()) ->
390 mongoose_rdbms:query_result().
391 execute_delete_user_subscriptions(LS, LU) ->
392 78 mongoose_rdbms:execute_successfully(LS, pubsub_delete_user_subscriptions, [LU, LS]).
393
394 -spec execute_update_subscription(Subscription :: integer(),
395 Nidx :: mod_pubsub:nodeIdx(),
396 LU :: jid:luser(),
397 LS :: jid:lserver(),
398 LR :: jid:lresource(),
399 SubId :: mod_pubsub:subId()) ->
400 mongoose_rdbms:query_result().
401 execute_update_subscription(Subscription, Nidx, LU, LS, LR, SubId) ->
402 3 mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription,
403 [Subscription, Nidx, LU, LS, LR, SubId]).
404
405 % ------------------- Items --------------------------------
406 -spec execute_get_entity_items(LS :: jid:lserver(),
407 Nidx :: mod_pubsub:nodeIdx(),
408 LU :: jid:lserver()) ->
409 mongoose_rdbms:query_result().
410 execute_get_entity_items(LS, Nidx, LU) ->
411 266 mongoose_rdbms:execute_successfully(LS, pubsub_get_entity_items, [Nidx, LU, LS]).
412
413 -spec execute_delete_item(LS :: jid:lserver(),
414 Nidx :: mod_pubsub:nodeIdx(),
415 LU :: jid:luser(),
416 ItemId :: mod_pubsub:itemId()) ->
417 mongoose_rdbms:query_result().
418 execute_delete_item(LS, Nidx, LU, ItemId) ->
419 14 mongoose_rdbms:execute_successfully(LS, pubsub_delete_item, [Nidx, LU, LS, ItemId]).
420
421 -spec execute_delete_all_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
422 execute_delete_all_items(Nidx) ->
423 160 mongoose_rdbms:execute_successfully(global, pubsub_delete_all_items, [Nidx]).
424
425 -spec execute_get_item(Nidx :: mod_pubsub:nodeIdx(),
426 ItemId :: mod_pubsub:itemId()) ->
427 mongoose_rdbms:query_result().
428 execute_get_item(Nidx, ItemId) ->
429 135 mongoose_rdbms:execute_successfully(global, pubsub_get_item, [Nidx, ItemId]).
430
431 -spec execute_get_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
432 execute_get_items(Nidx) ->
433 18 mongoose_rdbms:execute_successfully(global, pubsub_get_items, [Nidx]).
434
435 -spec execute_get_items(Nidx :: mod_pubsub:nodeIdx(), Limit :: pos_integer()) ->
436 mongoose_rdbms:query_result().
437 execute_get_items(Nidx, Limit) ->
438 25 Args = rdbms_queries:add_limit_arg(Limit, [Nidx]),
439 25 mongoose_rdbms:execute_successfully(global, pubsub_get_items_limit, Args).
440
441 -spec execute_del_item(Nidx :: mod_pubsub:nodeIdx(),
442 ItemId :: mod_pubsub:itemId()) ->
443 mongoose_rdbms:query_result().
444 execute_del_item(Nidx, ItemId) ->
445 22 mongoose_rdbms:execute_successfully(global, pubsub_del_item, [Nidx, ItemId]).
446
447 % ------------------- Nodes --------------------------------
448 -spec execute_insert_pubsub_node(binary(), binary(), binary(), iodata(), iodata()) ->
449 mongoose_rdbms:query_result().
450 execute_insert_pubsub_node(Key, Name, Type, Owners, Options) ->
451 229 mongoose_rdbms:execute_successfully(global, pubsub_insert_node,
452 [Key, Name, Type, Owners, Options]).
453
454 -spec execute_insert_parent(binary(), binary()) -> mongoose_rdbms:query_result().
455 execute_insert_parent(Name, ParentName) ->
456 28 mongoose_rdbms:execute_successfully(global, pubsub_insert_parent,
457 [Name, ParentName]).
458
459 -spec execute_update_pubsub_node(Type :: binary(),
460 OwnersJid :: iodata(),
461 Opts :: iodata(),
462 Nidx :: mod_pubsub:nodeIdx()) ->
463 mongoose_rdbms:query_result().
464 execute_update_pubsub_node(Type, OwnersJid, Opts, Nidx) ->
465 11 mongoose_rdbms:execute_successfully(global, pubsub_update_pubsub_node,
466 [Type, OwnersJid, Opts, Nidx]).
467
468 -spec execute_select_node_by_key_and_name(Key :: binary() | jid:ljid(),
469 Node :: mod_pubsub:nodeId()) ->
470 mongoose_rdbms:query_result().
471 execute_select_node_by_key_and_name(Key, Node) ->
472 1506 mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_key_and_name,
473 [Key, Node]).
474
475 -spec execute_select_node_by_id(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
476 execute_select_node_by_id(Nidx) ->
477 275 mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_id, [Nidx]).
478
479 -spec execute_select_nodes_by_key(Key :: binary() | jid:ljid()) ->
480 mongoose_rdbms:query_result().
481 execute_select_nodes_by_key(Key) ->
482 30 mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_key, [Key]).
483
484 -spec execute_select_nodes_by_affiliated_user(LU :: jid:luser(), LS :: jid:lserver()) ->
485 mongoose_rdbms:query_result().
486 execute_select_nodes_by_affiliated_user(LU, LS) ->
487 78 mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_affiliated_user, [LU, LS]).
488
489 -spec execute_select_subnodes(Key :: binary(),
490 Node :: mod_pubsub:nodeId() | <<>>) ->
491 mongoose_rdbms:query_result().
492 execute_select_subnodes(Key, <<>>) ->
493 8 mongoose_rdbms:execute_successfully(global, pubsub_select_top_level_nodes, [Key]);
494 execute_select_subnodes(Key, Node) ->
495 92 mongoose_rdbms:execute_successfully(global, pubsub_select_subnodes, [Node, Key]).
496
497 -spec execute_select_node_with_parents(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
498 mongoose_rdbms:query_result().
499 execute_select_node_with_parents(Key, Name) ->
500 366 mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_parents, [Key, Name]).
501
502 -spec execute_select_node_with_children(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
503 mongoose_rdbms:query_result().
504 execute_select_node_with_children(Key, Name) ->
505 66 mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_children, [Key, Name]).
506
507 -spec execute_delete_node(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
508 mongoose_rdbms:query_result().
509 execute_delete_node(Key, Node) ->
510 156 mongoose_rdbms:execute_successfully(global, pubsub_delete_node, [Key, Node]).
511
512 -spec execute_del_parents(Name :: mod_pubsub:nodeId()) -> mongoose_rdbms:query_result().
513 execute_del_parents(Name) ->
514 11 mongoose_rdbms:execute_successfully(global, pubsub_del_parents, [Name]).
515
516 % ------------------- GDPR --------------------------------
517 -spec execute_get_user_items(LU :: jid:luser(), LS :: jid:lserver()) ->
518 mongoose_rdbms:query_result().
519 execute_get_user_items(LU, LS) ->
520 21 mongoose_rdbms:execute_successfully(global, pubsub_get_user_items, [LU, LS]).
521
522 -spec execute_select_nodes_by_owner(LJID :: binary()) -> mongoose_rdbms:query_result().
523 execute_select_nodes_by_owner(LJID) ->
524 21 case mongoose_rdbms:db_engine(global) of
525 pgsql ->
526 21 mongoose_rdbms:execute_successfully(global,
527 pubsub_select_nodes_by_owner, [LJID]);
528 _ ->
529
:-(
mongoose_rdbms:execute_successfully(global,
530 pubsub_select_nodes_by_owner, [iolist_to_binary(["[\"", LJID, "\"]"])])
531 end.
532
533 -spec execute_get_user_subscriptions(LU :: jid:luser(), LS :: jid:lserver()) ->
534 mongoose_rdbms:query_result().
535 execute_get_user_subscriptions(LU, LS) ->
536 21 mongoose_rdbms:execute_successfully(global, pubsub_get_user_subscriptions, [LU, LS]).
537
538 %% ------------------------ Fun execution ------------------------
539
540 transaction(Fun, ErrorDebug) ->
541 389 case mongoose_rdbms:sql_transaction(global, mod_pubsub_db:extra_debug_fun(Fun)) of
542 {atomic, Result} ->
543 389 Result;
544 {aborted, ReasonData} ->
545
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
546 end.
547
548 dirty(Fun, ErrorDebug) ->
549 1731 try mongoose_rdbms:sql_dirty(global, mod_pubsub_db:extra_debug_fun(Fun)) of
550 Result ->
551 1731 Result
552 catch
553 _C:ReasonData ->
554
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
555 end.
556
557 %% ------------------------ Direct #pubsub_state access ------------------------
558 %% TODO: Functions for direct #pubsub_access are currently inefficient for RDBMS
559 %% - refactor them or remove as many of them as possible from the API at some point
560 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
561 LJID :: jid:ljid()) ->
562 {ok, mod_pubsub:pubsubState()}.
563 get_state(Nidx, LJID) ->
564 266 {ok, ItemIds} = get_entity_items(Nidx, LJID),
565 266 {ok, Affiliation} = get_affiliation(Nidx, LJID),
566 266 {ok, Subscriptions} = get_node_entity_subscriptions(Nidx, LJID),
567 266 {ok, #pubsub_state{
568 stateid = {LJID, Nidx},
569 items = ItemIds,
570 affiliation = Affiliation,
571 subscriptions = Subscriptions
572 }}.
573
574 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
575 {ok, [mod_pubsub:pubsubState()]}.
576 get_states(Nidx) ->
577 166 {selected, ItemRows} = execute_get_item_rows_id(Nidx),
578 166 {selected, AffiliationRows} = execute_get_affiliation_rows_id(Nidx),
579 166 {selected, SubRows} = execute_get_subscriptions_rows_id(Nidx),
580 166 States = build_states(ItemRows, AffiliationRows, SubRows),
581 166 {ok, States}.
582
583 -spec get_states_by_lus(LJID :: jid:ljid()) ->
584 {ok, [mod_pubsub:pubsubState()]}.
585 get_states_by_lus({ LU, LS, _ }) ->
586 15 {selected, ItemRows} = execute_get_item_rows(LU, LS),
587 15 {selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS),
588 15 {selected, SubRows} = execute_get_subscriptions_rows(LU, LS),
589 15 States = build_states(ItemRows, AffiliationRows, SubRows),
590 15 {ok, States}.
591
592 -spec get_states_by_bare(LJID :: jid:ljid()) ->
593 {ok, [mod_pubsub:pubsubState()]}.
594 get_states_by_bare({ LU, LS, _ }) ->
595 973 {selected, ItemRows} = execute_get_item_rows(LU, LS),
596 973 {selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS),
597 973 {selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, <<>>),
598 973 States = build_states(ItemRows, AffiliationRows, SubRows),
599 973 {ok, States}.
600
601 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
602 {ok, [mod_pubsub:pubsubState()]}.
603 get_states_by_bare_and_full({ LU, LS, LR } = LJID) ->
604 552 {ok, StatesBare} = get_states_by_bare(LJID),
605 552 {selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, LR),
606 552 StatesFull = build_states([], [], SubRows),
607 552 {ok, StatesFull ++ StatesBare}.
608
609 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
610 {ok, [mod_pubsub:nodeIdx()]}.
611 get_idxs_of_own_nodes_with_pending_subs({ LU, LS, _ }) ->
612 2 {selected, Rows} =
613 execute_get_idxs_of_own_nodes_with_pending_subs(LS, aff2int(owner), LU, sub2int(pending)),
614 2 {ok, [ mongoose_rdbms:result_to_integer(Nidx) || {Nidx} <- Rows ]}.
615
616 %% ------------------------ Direct #pubsub_item access ------------------------
617
618 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
619 {ok, {[mod_pubsub:pubsubItem()], none}}.
620 get_items(Nidx, Opts) ->
621 51 MaxItems = maps:get(max_items, Opts, undefined),
622 51 ItemIds = maps:get(item_ids, Opts, undefined),
623 51 Rows = get_item_rows(Nidx, MaxItems, ItemIds),
624 51 Result = [item_to_record(Row) || Row <- Rows],
625 51 {ok, {Result, none}}.
626
627 -spec get_item_rows(Nidx :: mod_pubsub:nodeIdx(),
628 MaxItems :: undefined | non_neg_integer(),
629 ItemIds :: undefined | mod_pubsub:itemId()) -> [tuple()].
630 get_item_rows(Nidx, undefined, undefined) ->
631 18 {selected, Rows} = execute_get_items(Nidx),
632 18 Rows;
633 get_item_rows(Nidx, MaxItems, undefined) ->
634 25 {selected, Rows} = execute_get_items(Nidx, MaxItems),
635 25 Rows;
636 get_item_rows(Nidx, MaxItems, ItemIds) ->
637 %% Returned items have same order as ItemIds
638 8 get_item_rows_acc(Nidx, MaxItems, ItemIds, []).
639
640 8 get_item_rows_acc(_Nidx, _MaxItems, [], AccRows) -> AccRows;
641 get_item_rows_acc(Nidx, MaxItems, [ItemId | ItemIds], AccRows) ->
642 8 case execute_get_item(Nidx, ItemId) of
643 {selected, []} ->
644
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, AccRows);
645 {selected, [Item]} when MaxItems =:= undefined;
646 length(AccRows) < MaxItems ->
647 8 get_item_rows_acc(Nidx, MaxItems, ItemIds, [Item | AccRows]);
648 {selected, [_]} ->
649
:-(
AccRows
650 end.
651
652 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
653 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
654 get_item(Nidx, ItemId) ->
655 127 case execute_get_item(Nidx, ItemId) of
656 {selected, []} ->
657 119 {error, item_not_found};
658 {selected, [Item]} ->
659 8 {ok, item_to_record(Item)}
660 end.
661
662 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
663 set_item(#pubsub_item{itemid = {ItemId, NodeIdx},
664 creation = {CreatedAtNow, {CreatedLUser, CreatedLServer, _}},
665 modification = {ModifiedAtNow, {ModifiedLUser, ModifiedLServer, ModifiedLResource}},
666 publisher = PublisherIn,
667 payload = Payload}) ->
668 119 PayloadWrapped = #xmlel{name = <<"item">>, children = Payload},
669 119 PayloadXML = exml:to_binary(PayloadWrapped),
670 119 CreatedAt = CreatedAtNow,
671 119 ModifiedAt = ModifiedAtNow,
672 119 Publisher = null_or_bin_jid(PublisherIn),
673 119 InsertParams = [NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt,
674 ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
675 Publisher, PayloadXML],
676 119 UpdateParams = [ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
677 Publisher, PayloadXML],
678 119 UniqueKeyValues = [NodeIdx, ItemId],
679 119 {updated, _} = rdbms_queries:execute_upsert(global, pubsub_item_upsert,
680 InsertParams, UpdateParams, UniqueKeyValues),
681 119 ok.
682
683 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
684 del_item(Nidx, ItemId) ->
685 8 {updated, _} = execute_del_item(Nidx, ItemId),
686 8 ok.
687
688 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
689 del_items(_, []) ->
690 113 ok;
691 del_items(Nidx, ItemIds) ->
692 10 [{updated, _} = execute_del_item(Nidx, Item) || Item <- ItemIds],
693 10 ok.
694
695 % ------------------- Node management --------------------------------
696
697 -spec create_node(Nidx :: mod_pubsub:nodeIdx(), Owner :: jid:ljid()) -> ok.
698 create_node(Nidx, LJID) ->
699 229 set_affiliation(Nidx, LJID, owner).
700
701 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
702 {ok, [mod_pubsub:pubsubState()]}.
703 del_node(Nidx) ->
704 156 {ok, States} = get_states(Nidx),
705 156 {updated, _} = execute_delete_all_subscriptions_id(Nidx),
706 156 {updated, _} = execute_delete_all_items(Nidx),
707 156 {updated, _} = execute_delete_all_affiliations(Nidx),
708 156 {ok, States}.
709
710 -spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
711 set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type,
712 owners = Owners, options = Opts, parents = Parents}) ->
713 229 ExtKey = encode_key(Key),
714 229 ExtOwners = jiffy:encode([jid:to_binary(Owner) || Owner <- Owners]),
715 229 ExtOpts = jiffy:encode({Opts}),
716 229 {updated, 1} = execute_insert_pubsub_node(ExtKey, Name, Type, ExtOwners, ExtOpts),
717 229 {selected, [Row]} = execute_select_node_by_key_and_name(ExtKey, Name),
718 229 #pubsub_node{id = Nidx} = decode_pubsub_node_row(Row),
719 229 set_parents(Name, Parents),
720 229 {ok, Nidx};
721
722 set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type,
723 owners = Owners, options = Opts, parents = Parents}) ->
724 11 OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
725 11 execute_update_pubsub_node(Type, jiffy:encode(OwnersJid), jiffy:encode({Opts}), Nidx),
726 11 execute_del_parents(Name),
727 11 set_parents(Name, Parents),
728 11 {ok, Nidx}.
729
730 -spec set_parents(mod_pubsub:nodeId(), [mod_pubsub:nodeId()]) -> ok.
731 set_parents(Name, Parents) ->
732 240 [execute_insert_parent(Name, ParentName) || ParentName <- Parents],
733 240 ok.
734
735 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
736 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
737 find_node_by_id(Nidx) ->
738 275 case execute_select_node_by_id(Nidx) of
739 {selected, []} ->
740
:-(
{error, not_found};
741 {selected, [Row]} ->
742 275 {ok, decode_pubsub_node_row(Row)}
743 end.
744
745 -spec find_node_by_name(Key :: mod_pubsub:hostPubsub() | jid:ljid(),
746 Node :: mod_pubsub:nodeId()) ->
747 mod_pubsub:pubsubNode() | false.
748 find_node_by_name(Key, Node) ->
749 1277 case execute_select_node_by_key_and_name(encode_key(Key), Node) of
750 {selected, [Row]} ->
751 1025 decode_pubsub_node_row(Row);
752 {selected, []} ->
753 252 false
754 end.
755
756 decode_pubsub_node_row({Nidx, KeySQL, Name, Type, Owners, Options}) ->
757 1641 Key = decode_key(KeySQL),
758 1641 {DecodedOpts} = jiffy:decode(Options),
759 1641 DecodedOptions = [maybe_option_value_to_atom(key_to_existing_atom(Item)) ||
760 1641 Item <- DecodedOpts],
761 1641 DecodedOwners = [jid:to_lower(jid:from_binary(Owner)) ||
762 1641 Owner <- jiffy:decode(Owners)],
763 1641 #pubsub_node{nodeid = {Key, Name},
764 id = mongoose_rdbms:result_to_integer(Nidx),
765 type = Type,
766 owners = DecodedOwners,
767 options = DecodedOptions}.
768
769 maybe_option_value_to_atom({access_model, Value}) ->
770 1641 {access_model, binary_to_existing_atom(Value, utf8)};
771 maybe_option_value_to_atom({publish_model, Value}) ->
772 1641 {publish_model, binary_to_existing_atom(Value, utf8)};
773 maybe_option_value_to_atom({notification_type, Value}) ->
774 1641 {notification_type, binary_to_existing_atom(Value, utf8)};
775 maybe_option_value_to_atom({node_type, Value}) ->
776 621 {node_type, binary_to_existing_atom(Value, utf8)};
777 maybe_option_value_to_atom({send_last_published_item, Value}) ->
778 1641 {send_last_published_item, binary_to_existing_atom(Value, utf8)};
779 maybe_option_value_to_atom(Other) ->
780 19834 Other.
781
782 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
783 [mod_pubsub:pubsubNode()].
784 find_nodes_by_key(Key) ->
785 30 {selected, Rows} =
786 execute_select_nodes_by_key(encode_key(Key)),
787 30 [decode_pubsub_node_row(Row) || Row <- Rows].
788
789 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
790 delete_node(#pubsub_node{nodeid = {Key, Node}}) ->
791 156 {updated, _} = execute_delete_node(encode_key(Key), Node),
792 156 ok.
793
794 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
795 [mod_pubsub:pubsubNode()].
796 get_subnodes(Key, Node) ->
797 100 {selected, Rows} = execute_select_subnodes(encode_key(Key), Node),
798 100 [decode_pubsub_node_row(Row) || Row <- Rows].
799
800 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
801 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
802 get_parentnodes_tree(Key, Node) ->
803 309 find_nodes_with_parents(Key, [Node], 0, []).
804
805 find_nodes_with_parents(_, [], _, Acc) ->
806 309 Acc;
807 find_nodes_with_parents(_, _, 100, Acc) ->
808
:-(
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}),
809
:-(
Acc;
810 find_nodes_with_parents(Key, Nodes, Depth, Acc) ->
811 363 ExtKey = encode_key(Key),
812 363 Rows = lists:flatmap(fun(Name) ->
813 366 {selected, Rs} = execute_select_node_with_parents(ExtKey, Name),
814 366 Rs
815 end, Nodes),
816 363 Map = lists:foldl(fun update_nodes_map/2, #{}, Rows),
817 363 MapTransformer = fun(Nidx, #{name := NodeName,
818 next_level := Parents}, {ParentsAcc, NodesAcc}) ->
819 366 NewParents = [Parents | ParentsAcc],
820 366 Node = #pubsub_node{id = Nidx,
821 nodeid = {Key, NodeName},
822 parents = Parents},
823 366 NewNodes = [Node | NodesAcc],
824 366 {NewParents, NewNodes}
825 end,
826
827 363 {Parents, NewNodes} = maps:fold(MapTransformer, {[], []}, Map),
828 363 NewAcc = [{Depth, NewNodes} | Acc],
829 363 find_nodes_with_parents(Key, lists:flatten(Parents), Depth + 1, NewAcc).
830
831 update_nodes_map({NidxSQL, NodeName, NextLevelNodeName}, Map) ->
832 433 Nidx = mongoose_rdbms:result_to_integer(NidxSQL),
833 433 case maps:get(Nidx, Map, undefined) of
834 undefined ->
835 430 NewNode = #{name => NodeName,
836 next_level => maybe_add_parent_to_list(NextLevelNodeName, [])},
837 430 Map#{Nidx => NewNode};
838 #{next_level := NextLevelNodes} = Node ->
839 3 UpdatedNode = Node#{next_level := maybe_add_parent_to_list(NextLevelNodeName, NextLevelNodes)},
840 3 Map#{Nidx := UpdatedNode}
841 end.
842
843 maybe_add_parent_to_list(null, List) ->
844 373 List;
845 maybe_add_parent_to_list(ParentName, List) ->
846 60 [ParentName | List].
847
848 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
849 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
850 get_subnodes_tree(Key, Node) ->
851 63 find_subnodes(Key, [Node], 0, []).
852
853 find_subnodes(_Key, [], _, Acc) ->
854 63 Acc;
855 find_subnodes(_, _, 100, Acc) ->
856
:-(
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}),
857
:-(
Acc;
858 find_subnodes(Key, Nodes, Depth, Acc) ->
859 66 ExtKey = encode_key(Key),
860 66 Rows = lists:flatmap(fun(Name) ->
861 66 {selected, Rs} = execute_select_node_with_children(ExtKey, Name),
862 66 Rs
863 end, Nodes),
864 66 Map = lists:foldl(fun update_nodes_map/2, #{}, Rows),
865 66 MapTransformer = fun(Nidx, #{name := NodeName,
866 next_level := Subnodes}, {SubnodesAcc, NodesAcc}) ->
867 64 NewSubnodes = [Subnodes | SubnodesAcc],
868 64 Node = #pubsub_node{id = Nidx,
869 nodeid = {Key, NodeName}},
870 64 NewNodes = [Node | NodesAcc],
871 64 {NewSubnodes, NewNodes}
872 end,
873 66 {Subnodes, NewNodes} = maps:fold(MapTransformer, {[], []}, Map),
874 66 NewAcc = [{Depth, NewNodes} | Acc],
875 66 find_subnodes(Key, lists:flatten(Subnodes), Depth + 1, NewAcc).
876
877 % ------------------- Affiliations --------------------------------
878
879 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
880 LJID :: jid:ljid(),
881 Affiliation :: mod_pubsub:affiliation()) -> ok.
882 set_affiliation(Nidx, { LU, LS, _ } = LJID, none) ->
883 12 BareLJID = jid:to_bare(LJID),
884 12 case get_node_entity_subscriptions(Nidx, BareLJID) of
885 {ok, []} ->
886 12 del_state(Nidx, BareLJID);
887 _ ->
888
:-(
delete_affiliation_wo_subs_check(Nidx, LU, LS)
889 end;
890 set_affiliation(Nidx, { LU, LS, _ }, Affiliation) ->
891 281 Aff = aff2int(Affiliation),
892 281 InsertParams = [Nidx, LU, LS, Aff],
893 281 UpdateParams = [Aff],
894 281 UniqueKeyValues = [Nidx, LU, LS],
895 281 {updated, _} = rdbms_queries:execute_upsert(global, pubsub_affiliation_upsert,
896 InsertParams, UpdateParams, UniqueKeyValues),
897 281 ok.
898
899 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
900 LJID :: jid:ljid()) ->
901 {ok, mod_pubsub:affiliation()}.
902 get_affiliation(Nidx, { LU, LS, _ }) ->
903 791 case execute_get_affiliation(Nidx, LU, LS) of
904 {selected, [{AffInt}]} ->
905 645 {ok, sql2aff(AffInt)};
906 {selected, []} ->
907 146 {ok, none}
908 end.
909
910 % ------------------- Subscriptions --------------------------------
911
912 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
913 LJID :: jid:ljid(),
914 Sub :: mod_pubsub:subscription(),
915 SubId :: mod_pubsub:subId(),
916 SubOpts :: mod_pubsub:subOptions()) -> ok.
917 add_subscription(Nidx, { LU, LS, LR }, Sub, SubId, SubOpts) ->
918 94 EncodedOpts = jiffy:encode({SubOpts}),
919 94 {updated, _} = execute_insert_subscription(Nidx, LU, LS, LR, sub2int(Sub), SubId, EncodedOpts),
920 94 ok.
921
922 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
923 LJID :: jid:ljid(),
924 SubId :: mod_pubsub:subId(),
925 Opts :: mod_pubsub:subOptions()) -> ok.
926 set_subscription_opts(Nidx, { LU, LS, LR }, SubId, Opts) ->
927 4 EncodedOpts = jiffy:encode({Opts}),
928 4 {updated, _} = execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId),
929 4 ok.
930
931 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
932 {ok, [{Entity :: jid:ljid(),
933 Sub :: mod_pubsub:subscription(),
934 SubId :: mod_pubsub:subId(),
935 SubOpts :: mod_pubsub:subOptions()}]}.
936 get_node_subscriptions(Nidx) ->
937 574 {selected, QueryResult} = execute_get_node_subs(Nidx),
938 574 {ok, [{{LU, LS, LR}, sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)}
939 574 || {LU, LS, LR, SubInt, SubId, SubOpts} <- QueryResult ]}.
940
941 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
942 LJID :: jid:ljid()) ->
943 {ok, [{Sub :: mod_pubsub:subscription(),
944 SubId :: mod_pubsub:subId(),
945 SubOpts :: mod_pubsub:subOptions()}]}.
946 get_node_entity_subscriptions(Nidx, { LU, LS, LR }) ->
947 487 {selected, QueryResult} = execute_get_node_entity_subs(LS, Nidx, LU, LR),
948 487 {ok, [{sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)}
949 487 || {SubInt, SubId, SubOpts} <- QueryResult ]}.
950
951 -spec delete_subscription(Nidx :: mod_pubsub:nodeIdx(),
952 LJID :: jid:ljid(),
953 SubId :: mod_pubsub:subId()) ->
954 ok.
955 delete_subscription(Nidx, { LU, LS, LR }, SubId) ->
956 4 {updated, _} = execute_delete_subscription(LS, Nidx, LU, LR, SubId),
957 4 ok.
958
959 -spec delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
960 LJID :: jid:ljid()) ->
961 ok.
962 delete_all_subscriptions(Nidx, { LU, LS, LR } = LJID) ->
963 7 case get_affiliation(Nidx, LJID) of
964 {ok, none} ->
965 7 del_state(Nidx, LJID);
966 _ ->
967
:-(
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR)
968 end,
969 7 ok.
970
971 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
972 LJID :: jid:ljid(),
973 Subscription :: mod_pubsub:subscription(),
974 SubId :: mod_pubsub:subId()) ->
975 ok.
976 update_subscription(Nidx, { LU, LS, LR }, Subscription, SubId) ->
977 3 {updated, _} = execute_update_subscription(sub2int(Subscription), Nidx, LU, LS, LR, SubId),
978 3 ok.
979
980 % ------------------- Items --------------------------------
981
982 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
983 LJID :: jid:ljid(),
984 Item :: mod_pubsub:pubsubItem()) ->
985 ok.
986 add_item(_Nidx, _, Item) ->
987 119 set_item(Item),
988 119 ok.
989
990 %% TODO: Make public at some point
991 -spec get_entity_items(Nidx :: mod_pubsub:nodeIdx(),
992 LJID :: jid:ljid()) ->
993 {ok, [mod_pubsub:itemId()]}.
994 get_entity_items(Nidx, { LU, LS, _ }) ->
995 266 {selected, ItemIds} = execute_get_entity_items(LS, Nidx, LU),
996 266 {ok, [ ItemId || {ItemId} <- ItemIds]}.
997
998 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
999 LJID :: jid:ljid(),
1000 ItemIds :: [mod_pubsub:itemId()]) ->
1001 ok.
1002 remove_items(Nidx, { LU, LS, _ }, ItemIds) ->
1003 127 lists:foreach(fun(ItemId) ->
1004 14 {updated, _} = execute_delete_item(LS, Nidx, LU, ItemId)
1005 end, ItemIds).
1006
1007 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
1008 ok.
1009 remove_all_items(Nidx) ->
1010 4 {updated, _} = execute_delete_all_items(Nidx),
1011 4 ok.
1012
1013 % ------------------- GDPR-related --------------------------------
1014
1015 get_user_payloads(LUser, LServer) ->
1016 21 case execute_get_user_items(LUser, LServer) of
1017 {selected, Items} ->
1018 21 [[NodeName, ItemId, strip_payload(PayloadDB)] || {NodeName, ItemId, PayloadDB} <- Items]
1019 end.
1020
1021 get_user_nodes(LUser, LServer) ->
1022 21 LJID = jid:to_binary({LUser, LServer, <<>>}),
1023 21 {selected, Nodes} = execute_select_nodes_by_owner(LJID),
1024 21 lists:map(fun tuple_to_list/1, Nodes).
1025
1026 get_user_subscriptions(LUser, LServer) ->
1027 21 {selected, Nodes} = execute_get_user_subscriptions(LUser, LServer),
1028 21 lists:map(fun tuple_to_list/1, Nodes).
1029
1030 strip_payload(PayloadDB) ->
1031 17 PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB),
1032 17 {ok, #xmlel{children = Payload}} = exml:parse(PayloadXML),
1033 17 exml:to_binary(Payload).
1034
1035 -spec delete_user_subscriptions(jid:ljid()) -> ok.
1036 delete_user_subscriptions({ LU, LS, _ }) ->
1037 78 {updated, _} = execute_delete_user_subscriptions(LS, LU),
1038 78 ok.
1039
1040 find_nodes_by_affiliated_user({ LU, LS, _ }) ->
1041 78 {selected, NodesWithAffs} = execute_select_nodes_by_affiliated_user(LU, LS),
1042 78 lists:map(fun decode_pubsub_node_with_aff_row/1, NodesWithAffs).
1043
1044 decode_pubsub_node_with_aff_row(Row) ->
1045 14 [Aff | NodeRow] = tuple_to_list(Row),
1046 14 {decode_pubsub_node_row(list_to_tuple(NodeRow)), sql2aff(Aff)}.
1047
1048 %%====================================================================
1049 %% Helpers
1050 %%====================================================================
1051
1052 -spec item_columns() -> binary().
1053 item_columns() ->
1054 39 <<"nidx, itemid, created_luser, created_lserver, created_at, "
1055 "modified_luser, modified_lserver, modified_lresource, modified_at, "
1056 "publisher, payload">>.
1057
1058 -spec pubsub_node_fields() -> binary().
1059 pubsub_node_fields() ->
1060 39 <<"nidx, p_key, name, type, owners, options">>.
1061
1062 -spec pubsub_node_fields_pn() -> binary().
1063 pubsub_node_fields_pn() ->
1064 39 <<"pn.nidx, pn.p_key, pn.name, pn.type, pn.owners, pn.options">>.
1065
1066 -spec del_state(Nidx :: mod_pubsub:nodeIdx(),
1067 LJID :: jid:ljid()) -> ok.
1068 del_state(Nidx, {LU, LS, LR}) ->
1069 19 delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR),
1070 19 delete_affiliation_wo_subs_check(Nidx, LU, LS),
1071 19 ok.
1072
1073 -spec delete_all_subscriptions_wo_aff_check(Nidx :: mod_pubsub:nodeIdx(),
1074 LU :: jid:luser(),
1075 LS :: jid:lserver(),
1076 LR :: jid:lresource()) -> ok.
1077 delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR) ->
1078 19 {updated, _} = execute_delete_all_subscriptions(Nidx, LU, LS, LR),
1079 19 ok.
1080
1081 -spec delete_affiliation_wo_subs_check(Nidx :: mod_pubsub:nodeIdx(),
1082 LU :: jid:luser(),
1083 LS :: jid:lserver()) -> ok.
1084 delete_affiliation_wo_subs_check(Nidx, LU, LS) ->
1085 19 {updated, _} = execute_delete_affiliation(Nidx, LU, LS),
1086 19 ok.
1087
1088 -type item_row() :: { NidxSql :: integer() | binary(),
1089 LU :: binary(),
1090 LS :: binary(),
1091 ItemId :: binary() }.
1092 -type aff_row() :: { NidxSql :: integer() | binary(),
1093 LU :: binary(),
1094 LS :: binary(),
1095 AffInt :: integer() }.
1096 -type sub_row() :: { NidxSql :: integer() | binary(),
1097 LU :: binary(),
1098 LS :: binary(),
1099 LR :: binary(),
1100 TypeInt :: integer(),
1101 SubId :: binary() }.
1102
1103 -spec build_states(ItemRows :: [item_row()], AffRows :: [aff_row()], SubRows :: [sub_row()]) ->
1104 [mod_pubsub:pubsubState()].
1105 build_states(ItemRows, AffRows, SubRows) ->
1106 1706 Result1 = item_rows_to_states(ItemRows, #{}),
1107 1706 Result2 = aff_rows_to_states(AffRows, Result1),
1108 1706 maps:values(sub_rows_to_states(SubRows, Result2)).
1109
1110 item_rows_to_states([], Acc) ->
1111 1706 Acc;
1112 item_rows_to_states([{ NidxSql, LU, LS, ItemId } | RRows], Acc) ->
1113 125 Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1114 125 LJID = { LU, LS, <<>> },
1115 125 PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1116 125 #pubsub_state{ items = Items0 } = PS,
1117 125 NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ items = [ItemId | Items0] } },
1118 125 item_rows_to_states(RRows, NAcc).
1119
1120 aff_rows_to_states([], Acc) ->
1121 1706 Acc;
1122 aff_rows_to_states([{ NidxSql, LU, LS, AffInt } | RRows], Acc) ->
1123 374 Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1124 374 LJID = { LU, LS, <<>> },
1125 374 PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1126 374 NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ affiliation = sql2aff(AffInt) } },
1127 374 aff_rows_to_states(RRows, NAcc).
1128
1129 sub_rows_to_states([], Acc) ->
1130 1706 Acc;
1131 sub_rows_to_states([{ NidxSql, LU, LS, LR, TypeInt, SubId } | RRows], Acc) ->
1132 92 Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1133 92 LJID = { LU, LS, LR },
1134 92 PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1135 92 #pubsub_state{ subscriptions = Subs0 } = PS,
1136 92 NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{
1137 subscriptions = [{sql2sub(TypeInt), SubId} | Subs0] } },
1138 92 sub_rows_to_states(RRows, NAcc).
1139
1140 -spec aff2int(mod_pubsub:affiliation()) -> integer().
1141
:-(
aff2int(none) -> 0;
1142 233 aff2int(owner) -> 1;
1143 2 aff2int(publisher) -> 2;
1144 46 aff2int(publish_only) -> 3;
1145 2 aff2int(member) -> 4;
1146
:-(
aff2int(outcast) -> 5.
1147
1148 -spec sql2aff(integer() | binary()) -> mod_pubsub:affiliation().
1149 sql2aff(SqlInt) ->
1150 1033 int2aff(mongoose_rdbms:result_to_integer(SqlInt)).
1151
1152 -spec int2aff(integer()) -> mod_pubsub:affiliation().
1153
:-(
int2aff(0) -> none;
1154 926 int2aff(1) -> owner;
1155 8 int2aff(2) -> publisher;
1156 89 int2aff(3) -> publish_only;
1157 10 int2aff(4) -> member;
1158
:-(
int2aff(5) -> outcast.
1159
1160 -spec sub2int(mod_pubsub:subscription()) -> integer().
1161
:-(
sub2int(none) -> 0;
1162 11 sub2int(pending) -> 1;
1163 88 sub2int(subscribed) -> 3.
1164
1165 -spec sql2sub(integer() | binary()) -> mod_pubsub:subscription().
1166 sql2sub(SqlInt) ->
1167 290 int2sub(mongoose_rdbms:result_to_integer(SqlInt)).
1168
1169 -spec int2sub(integer()) -> mod_pubsub:subscription().
1170
:-(
int2sub(0) -> none;
1171 24 int2sub(1) -> pending;
1172 266 int2sub(3) -> subscribed.
1173
1174 sql_to_sub_opts(SqlOpts) ->
1175 198 {Opts} = jiffy:decode(SqlOpts),
1176 198 lists:map(fun key_to_existing_atom/1, Opts).
1177
1178 item_to_record({NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt,
1179 ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
1180 PublisherIn, PayloadDB}) ->
1181 56 PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB),
1182 56 {ok, #xmlel{children = Payload}} = exml:parse(PayloadXML),
1183 56 ItemAndNodeId = {ItemId, mongoose_rdbms:result_to_integer(NodeIdx)},
1184 56 Creation = {mongoose_rdbms:result_to_integer(CreatedAt),
1185 {CreatedLUser, CreatedLServer, <<>>}},
1186 56 Modification = {mongoose_rdbms:result_to_integer(ModifiedAt),
1187 {ModifiedLUser, ModifiedLServer, ModifiedLResource}},
1188 56 Publisher = decode_publisher(PublisherIn),
1189 56 #pubsub_item{itemid = ItemAndNodeId,
1190 creation = Creation,
1191 modification = Modification,
1192 publisher = Publisher,
1193 payload = Payload}.
1194
1195 decode_publisher(null) ->
1196 54 undefined;
1197 decode_publisher(Binary) ->
1198 %% Silently returns `error` if parsing fails.
1199 2 jid:from_binary(Binary).
1200
1201 encode_key(Key) when is_binary(Key) ->
1202 2051 Key;
1203 encode_key({_, _, _} = JID) ->
1204 170 jid:to_binary(JID).
1205
1206 decode_key(KeySQL) ->
1207 1641 case jid:from_binary(KeySQL) of
1208 #jid{luser = <<>>, lserver = Host, lresource = <<>>} ->
1209 1531 Host;
1210 #jid{luser = LUser, lserver = LServer, lresource = LResource} ->
1211 110 {LUser, LServer, LResource}
1212 end.
1213
1214 null_or_bin_jid(undefined) ->
1215 115 null;
1216 null_or_bin_jid(Jid) ->
1217 4 jid:to_binary(Jid).
1218
1219 key_to_existing_atom({Key, Value}) when is_atom(Key)->
1220
:-(
{Key, Value};
1221 key_to_existing_atom({Key, Value}) ->
1222 27045 {binary_to_existing_atom(Key, utf8), Value}.
1223
Line Hits Source