./ct_report/coverage/mod_pubsub_db_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_rdbms.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub RDBMS backend
5 %%% Created : 2 Nov 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_rdbms).
9 -author('piotr.nosek@erlang-solutions.com').
10 -author('michal.piotrowski@erlang-solutions.com').
11
12 -behaviour(mod_pubsub_db).
13
14 -include("pubsub.hrl").
15 -include("mongoose_logger.hrl").
16 -include("jlib.hrl").
17
18 -export([start/0, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 del_node/1,
29 set_node/1,
30 find_node_by_id/1,
31 find_nodes_by_key/1,
32 find_node_by_name/2,
33 delete_node/1,
34 get_subnodes/2,
35 get_parentnodes_tree/2,
36 get_subnodes_tree/2
37 ]).
38 % Affiliations
39 -export([
40 set_affiliation/3,
41 get_affiliation/2
42 ]).
43 % Subscriptions
44 -export([
45 add_subscription/5,
46 set_subscription_opts/4,
47 get_node_subscriptions/1,
48 get_node_entity_subscriptions/2,
49 delete_subscription/3,
50 delete_all_subscriptions/2,
51 update_subscription/4
52 ]).
53 % Item ids in state
54 -export([
55 add_item/3,
56 remove_items/3,
57 remove_all_items/1
58 ]).
59 % Whole items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 %% GDPR related
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Behaviour callbacks
79 %%====================================================================
80
81 %% ------------------------ Backend start/stop ------------------------
82
83 -spec start() -> ok.
84 start() ->
85
:-(
mod_pubsub_db_mnesia:start(),
86 % -------------------- State building ----------------------------
87
:-(
mongoose_rdbms:prepare(pubsub_get_item_rows_id, pubsub_items, [nidx],
88 <<"SELECT nidx, created_luser, created_lserver, itemid "
89 "FROM pubsub_items WHERE nidx = ?">>),
90
:-(
mongoose_rdbms:prepare(pubsub_get_affiliation_rows_id, pubsub_affiliations, [nidx],
91 <<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations WHERE nidx = ?">>),
92
:-(
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_id, pubsub_subscriptions, [nidx],
93 <<"SELECT nidx, luser, lserver, lresource, type, sub_id "
94 "FROM pubsub_subscriptions WHERE nidx = ?">>),
95
:-(
mongoose_rdbms:prepare(pubsub_get_item_rows, pubsub_items, [created_luser, created_lserver],
96 <<"SELECT nidx, created_luser, created_lserver, itemid FROM pubsub_items"
97 " WHERE created_luser = ? AND created_lserver = ?">>),
98
:-(
mongoose_rdbms:prepare(pubsub_get_affiliation_rows, pubsub_affiliations, [luser, lserver],
99 <<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations "
100 "WHERE luser = ? AND lserver = ?">>),
101
:-(
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows, pubsub_subscriptions, [luser, lserver],
102 <<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions "
103 "WHERE luser = ? AND lserver = ?">>),
104
:-(
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_resource, pubsub_subscriptions,
105 [luser, lserver, lresource],
106 <<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions "
107 "WHERE luser = ? AND lserver = ? AND lresource = ?">>),
108
:-(
mongoose_rdbms:prepare(pubsub_get_idxs_of_own_nodes_with_pending_subs, pubsub_affiliations,
109 [aff, luser, lserver, 'pubsub_subscriptions.type'],
110 <<"SELECT DISTINCT s.nidx FROM pubsub_affiliations AS a "
111 "INNER JOIN pubsub_subscriptions s ON a.nidx = s.nidx "
112 "WHERE a.aff = ? AND a.luser = ? AND a.lserver = ? AND s.type = ?">>),
113
114 % ------------------- Affiliations --------------------------------
115
:-(
mongoose_rdbms:prepare(pubsub_get_affiliation, pubsub_affiliations, [nidx, luser, lserver],
116 <<"SELECT aff FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>),
117
:-(
mongoose_rdbms:prepare(pubsub_delete_affiliation, pubsub_affiliations, [nidx, luser, lserver],
118 <<"DELETE FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>),
119
:-(
mongoose_rdbms:prepare(pubsub_delete_all_affiliations, pubsub_affiliations, [nidx],
120 <<"DELETE FROM pubsub_affiliations WHERE nidx = ?">>),
121
122 % ------------------- Subscriptions --------------------------------
123
:-(
mongoose_rdbms:prepare(pubsub_insert_subscription, pubsub_subscriptions,
124 [nidx, luser, lserver, lresource, type, sub_id, options],
125 <<"INSERT INTO pubsub_subscriptions (nidx, luser, lserver, lresource, "
126 "type, sub_id, options) VALUES (?, ?, ?, ?, ?, ?, ?)">>),
127
:-(
mongoose_rdbms:prepare(pubsub_update_subscription_opts, pubsub_subscriptions,
128 [options, nidx, luser, lserver, lresource, sub_id],
129 <<"UPDATE pubsub_subscriptions SET options = ? WHERE nidx = ? "
130 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
131
:-(
mongoose_rdbms:prepare(pubsub_get_node_subs, pubsub_subscriptions, [nidx],
132 <<"SELECT luser, lserver, lresource, type, sub_id, options "
133 "FROM pubsub_subscriptions WHERE nidx = ?">>),
134
:-(
mongoose_rdbms:prepare(pubsub_get_node_entity_subs, pubsub_subscriptions,
135 [nidx, luser, lserver, lresource],
136 <<"SELECT type, sub_id, options FROM pubsub_subscriptions "
137 "WHERE nidx = ? AND luser = ? AND lserver = ? AND lresource = ?">>),
138
:-(
mongoose_rdbms:prepare(pubsub_delete_subscription, pubsub_subscriptions,
139 [nidx, luser, lserver, lresource, sub_id],
140 <<"DELETE FROM pubsub_subscriptions WHERE nidx = ? "
141 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
142
:-(
mongoose_rdbms:prepare(pubsub_delete_all_subscriptions, pubsub_subscriptions,
143 [nidx, luser, lserver, lresource],
144 <<"DELETE FROM pubsub_subscriptions"" WHERE nidx = ? "
145 "AND luser = ? AND lserver = ? AND lresource = ?">>),
146
:-(
mongoose_rdbms:prepare(pubsub_delete_all_subscriptions_id, pubsub_subscriptions, [nidx],
147 <<"DELETE FROM pubsub_subscriptions WHERE nidx = ?">>),
148
:-(
mongoose_rdbms:prepare(pubsub_delete_user_subscriptions,
149 pubsub_subscriptions, [luser, lserver],
150 <<"DELETE FROM pubsub_subscriptions WHERE luser = ? AND lserver = ?">>),
151
:-(
mongoose_rdbms:prepare(pubsub_update_subscription, pubsub_subscriptions,
152 [type, nidx, luser, lserver, lresource, sub_id],
153 <<"UPDATE pubsub_subscriptions SET type = ? WHERE nidx = ? "
154 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
155
156 % ------------------- Items --------------------------------
157
:-(
mongoose_rdbms:prepare(pubsub_get_entity_items, pubsub_items,
158 [nidx, created_luser, created_lserver],
159 <<"SELECT itemid FROM pubsub_items WHERE nidx = ? "
160 "AND created_luser = ? AND created_lserver = ?">>),
161
:-(
mongoose_rdbms:prepare(pubsub_delete_item, pubsub_items,
162 [nidx, created_luser, created_lserver, itemid],
163 <<"DELETE FROM pubsub_items WHERE nidx = ? AND created_luser = ? ",
164 "AND created_lserver = ? AND itemid = ?">>),
165
:-(
mongoose_rdbms:prepare(pubsub_delete_all_items, pubsub_items, [nidx],
166 <<"DELETE FROM pubsub_items WHERE nidx = ?">>),
167
:-(
ItemColumns = item_columns(),
168
:-(
mongoose_rdbms:prepare(pubsub_get_item, pubsub_items, [nidx, itemid],
169 <<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? AND itemid = ?">>),
170
:-(
mongoose_rdbms:prepare(pubsub_get_items, pubsub_items, [nidx],
171 <<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? "
172 "ORDER BY modified_at DESC">>),
173
:-(
{LimitSQL, LimitMSSQL} = rdbms_queries:get_db_specific_limits_binaries(),
174
:-(
mongoose_rdbms:prepare(pubsub_get_items_limit, pubsub_items,
175 rdbms_queries:add_limit_arg(limit, [nidx]),
176 <<"SELECT ", LimitMSSQL/binary, " ", ItemColumns/binary,
177 " FROM pubsub_items WHERE nidx = ? ORDER BY modified_at DESC ", LimitSQL/binary>>),
178
:-(
mongoose_rdbms:prepare(pubsub_del_item, pubsub_items, [nidx, itemid],
179 <<"DELETE FROM pubsub_items WHERE nidx = ? AND itemid = ?">>),
180
181 % ------------------- Nodes --------------------------------
182
:-(
mongoose_rdbms:prepare(pubsub_insert_node, pubsub_nodes, [p_key, name, type, owners, options],
183 <<"INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (?, ?, ?, ?, ?)">>),
184
:-(
mongoose_rdbms:prepare(pubsub_insert_parent, pubsub_node_collections, [name, parent_name],
185 <<"INSERT INTO pubsub_node_collections (name, parent_name) VALUES (?, ?)">>),
186
:-(
mongoose_rdbms:prepare(pubsub_update_pubsub_node, pubsub_nodes, [type, owners, options, nidx],
187 <<"UPDATE pubsub_nodes SET type = ?, owners = ?, options = ? WHERE nidx = ?">>),
188
:-(
PubsubNodeFields = pubsub_node_fields(),
189
:-(
mongoose_rdbms:prepare(pubsub_select_node_by_key_and_name, pubsub_nodes, [p_key, name],
190 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ? AND name = ?">>),
191
:-(
mongoose_rdbms:prepare(pubsub_select_node_by_id, pubsub_nodes, [nidx],
192 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE nidx = ?">>),
193
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_key, pubsub_nodes, [p_key],
194 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ?">>),
195
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_in_list_with_key, pubsub_nodes, [p_key, names],
196 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes "
197 "WHERE p_key = ? AND name IN (?)">>),
198
:-(
PubsubNodeFieldsPrefixed = pubsub_node_fields_pn(),
199
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_affiliated_user,
200 pubsub_affiliations, [luser, lserver],
201 <<"SELECT aff, ", PubsubNodeFieldsPrefixed/binary, " FROM pubsub_affiliations AS pa "
202 "INNER JOIN pubsub_nodes AS pn ON pa.nidx = pn.nidx WHERE luser = ? AND lserver = ?">>),
203
:-(
mongoose_rdbms:prepare(pubsub_select_top_level_nodes, pubsub_nodes, [p_key],
204 <<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn "
205 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name "
206 "WHERE p_key = ? AND collection.parent_name IS NULL">>),
207
:-(
mongoose_rdbms:prepare(pubsub_select_subnodes, pubsub_nodes,
208 ['pubsub_node_collections.parent_name', p_key],
209 <<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn "
210 "INNER JOIN pubsub_node_collections as collection ON pn.name = collection.name AND "
211 "collection.parent_name = ? WHERE p_key = ?">>),
212
:-(
mongoose_rdbms:prepare(pubsub_select_node_with_parents, pubsub_nodes,
213 [p_key, name],
214 <<"SELECT pn.nidx, pn.name, collection.parent_name from pubsub_nodes as pn "
215 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name "
216 "WHERE pn.p_key = ? AND pn.name = ?">>),
217
:-(
mongoose_rdbms:prepare(pubsub_select_node_with_children, pubsub_nodes,
218 [p_key, name],
219 <<"SELECT pn.nidx, pn.name, collection.name from pubsub_nodes as pn "
220 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.parent_name "
221 "WHERE pn.p_key = ? AND pn.name = ?">>),
222
:-(
mongoose_rdbms:prepare(pubsub_delete_node, pubsub_nodes, [p_key, name],
223 <<"DELETE from pubsub_nodes WHERE p_key = ? AND name = ?">>),
224
:-(
mongoose_rdbms:prepare(pubsub_del_parents, pubsub_node_collections, [name],
225 <<"DELETE FROM pubsub_node_collections WHERE name = ?">>),
226
227 % ------------------- GDPR --------------------------------
228
:-(
mongoose_rdbms:prepare(pubsub_get_user_items, pubsub_items, [created_luser, created_lserver],
229 <<"SELECT name, itemid, payload FROM pubsub_items INNER JOIN pubsub_nodes "
230 "ON pubsub_items.nidx = pubsub_nodes.nidx WHERE created_luser = ? "
231 "AND created_lserver = ?">>),
232
:-(
mongoose_rdbms:prepare(pubsub_get_user_subscriptions, pubsub_subscriptions, [luser, lserver],
233 <<"SELECT name FROM pubsub_subscriptions INNER JOIN pubsub_nodes "
234 "ON pubsub_subscriptions.nidx = pubsub_nodes.nidx WHERE luser = ? AND lserver = ?">>),
235
:-(
prepare_select_nodes_by_owner(),
236
237
:-(
rdbms_queries:prepare_upsert(global, pubsub_affiliation_upsert, pubsub_affiliations,
238 [<<"nidx">>, <<"luser">>, <<"lserver">>, <<"aff">>],
239 [<<"aff">>],
240 [<<"nidx">>, <<"luser">>, <<"lserver">>]),
241
:-(
ItemInsertFields = [<<"nidx">>, <<"itemid">>,
242 <<"created_luser">>, <<"created_lserver">>, <<"created_at">>,
243 <<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>,
244 <<"publisher">>, <<"payload">>],
245
:-(
ItemUpdateFields = [<<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>,
246 <<"publisher">>, <<"payload">>],
247
:-(
rdbms_queries:prepare_upsert(global, pubsub_item_upsert, pubsub_items,
248 ItemInsertFields,
249 ItemUpdateFields,
250 [<<"nidx">>, <<"itemid">>]),
251
:-(
ok.
252
253 -spec stop() -> ok.
254 stop() ->
255
:-(
mod_pubsub_db_mnesia:stop().
256
257 %% ------------------------ Queries execution --------------------
258 prepare_select_nodes_by_owner() ->
259
:-(
case {mongoose_rdbms:db_engine(global), mongoose_rdbms:db_type()} of
260 {mysql, _} ->
261
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
262 <<"SELECT name, type FROM pubsub_nodes WHERE owners = convert(?, JSON);">>);
263 {pgsql, _} ->
264
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
265 <<"SELECT name, type FROM pubsub_nodes WHERE owners ::json->>0 like ? "
266 "AND JSON_ARRAY_LENGTH(owners) = 1">>);
267 {odbc, mssql} ->
268
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
269 <<"SELECT name, type FROM pubsub_nodes WHERE cast(owners as nvarchar(max)) = ?;">>)
270 end.
271 % -------------------- State building ----------------------------
272 -spec execute_get_item_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
273 mongoose_rdbms:query_result().
274 execute_get_item_rows_id(Nidx) ->
275
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_item_rows_id, [Nidx]).
276
277 -spec execute_get_affiliation_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
278 mongoose_rdbms:query_result().
279 execute_get_affiliation_rows_id(Nidx) ->
280
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows_id, [Nidx]).
281
282 -spec execute_get_subscriptions_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
283 mongoose_rdbms:query_result().
284 execute_get_subscriptions_rows_id(Nidx) ->
285
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows_id, [Nidx]).
286
287 -spec execute_get_item_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
288 mongoose_rdbms:query_result().
289 execute_get_item_rows(LU, LS) ->
290
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_item_rows, [LU, LS]).
291
292 -spec execute_get_affiliation_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
293 mongoose_rdbms:query_result().
294 execute_get_affiliation_rows(LU, LS) ->
295
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows, [LU, LS]).
296
297 -spec execute_get_subscriptions_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
298 mongoose_rdbms:query_result().
299 execute_get_subscriptions_rows(LU, LS) ->
300
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows, [LU, LS]).
301
302 -spec execute_get_subscriptions_rows_resource(LU :: jid:luser(), LS :: jid:lserver(),
303 LR :: jid:lresource()) -> mongoose_rdbms:query_result().
304 execute_get_subscriptions_rows_resource(LU, LS, LR) ->
305
:-(
mongoose_rdbms:execute_successfully(global,
306 pubsub_get_subscriptions_rows_resource, [LU, LS, LR]).
307
308 -spec execute_get_idxs_of_own_nodes_with_pending_subs(LS :: jid:lserver(),
309 Aff :: integer(),
310 LU :: jid:luser(),
311 Sub :: integer()) ->
312 mongoose_rdbms:query_result().
313 execute_get_idxs_of_own_nodes_with_pending_subs(LS, Aff, LU, Sub) ->
314
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_idxs_of_own_nodes_with_pending_subs,
315 [Aff, LU, LS, Sub]).
316
317 % ------------------- Affiliations --------------------------------
318 -spec execute_get_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(),
319 LS :: jid:lserver()) -> mongoose_rdbms:query_result().
320 execute_get_affiliation(Nidx, LU, LS) ->
321
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_affiliation, [Nidx, LU, LS]).
322
323 -spec execute_delete_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(),
324 LS :: jid:lserver()) -> mongoose_rdbms:query_result().
325 execute_delete_affiliation(Nidx, LU, LS) ->
326
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_affiliation, [Nidx, LU, LS]).
327
328 -spec execute_delete_all_affiliations(Nidx :: mod_pubsub:nodeIdx()) ->
329 mongoose_rdbms:query_result().
330 execute_delete_all_affiliations(Nidx) ->
331
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_affiliations, [Nidx]).
332
333 % ------------------- Subscriptions --------------------------------
334 -spec execute_insert_subscription(Nidx :: mod_pubsub:nodeIdx(),
335 LU :: jid:luser(),
336 LS :: jid:lserver(),
337 LR :: jid:lresource(),
338 Sub :: integer(),
339 SubId :: mod_pubsub:subId(),
340 EncodedOpts :: iodata()) ->
341 mongoose_rdbms:query_result().
342 execute_insert_subscription(Nidx, LU, LS, LR, Sub, SubId, EncodedOpts) ->
343
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_insert_subscription,
344 [Nidx, LU, LS, LR, Sub, SubId, EncodedOpts]).
345
346 -spec execute_update_subscription_opts(EncodedOpts :: iodata(),
347 Nidx :: mod_pubsub:nodeIdx(),
348 LU :: jid:luser(),
349 LS :: jid:lserver(),
350 LR :: jid:lresource(),
351 SubId :: mod_pubsub:subId()) ->
352 mongoose_rdbms:query_result().
353 execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId) ->
354
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription_opts,
355 [EncodedOpts, Nidx, LU, LS, LR, SubId]).
356
357 -spec execute_get_node_subs(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
358 execute_get_node_subs(Nidx) ->
359
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_node_subs, [Nidx]).
360
361 -spec execute_get_node_entity_subs(LS :: jid:lserver(),
362 Nidx :: mod_pubsub:nodeIdx(),
363 LU :: jid:luser(),
364 LR :: jid:lresource()) ->
365 mongoose_rdbms:query_result().
366 execute_get_node_entity_subs(LS, Nidx, LU, LR) ->
367
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_node_entity_subs, [Nidx, LU, LS, LR]).
368
369 -spec execute_delete_subscription(LS :: jid:lserver(),
370 Nidx :: mod_pubsub:nodeIdx(),
371 LU :: jid:luser(),
372 LR :: jid:lresource(),
373 SubId :: mod_pubsub:subId()) ->
374 mongoose_rdbms:query_result().
375 execute_delete_subscription(LS, Nidx, LU, LR, SubId) ->
376
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_subscription, [Nidx, LU, LS, LR, SubId]).
377
378 -spec execute_delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
379 LU :: jid:luser(),
380 LS :: jid:lserver(),
381 LR :: jid:lresource()) ->
382 mongoose_rdbms:query_result().
383 execute_delete_all_subscriptions(Nidx, LU, LS, LR) ->
384
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_all_subscriptions, [Nidx, LU, LS, LR]).
385
386 -spec execute_delete_all_subscriptions_id(Nidx :: mod_pubsub:nodeIdx()) ->
387 mongoose_rdbms:query_result().
388 execute_delete_all_subscriptions_id(Nidx) ->
389
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_subscriptions_id, [Nidx]).
390
391 -spec execute_delete_user_subscriptions(LS :: jid:lserver(), LU :: jid:luser()) ->
392 mongoose_rdbms:query_result().
393 execute_delete_user_subscriptions(LS, LU) ->
394
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_user_subscriptions, [LU, LS]).
395
396 -spec execute_update_subscription(Subscription :: integer(),
397 Nidx :: mod_pubsub:nodeIdx(),
398 LU :: jid:luser(),
399 LS :: jid:lserver(),
400 LR :: jid:lresource(),
401 SubId :: mod_pubsub:subId()) ->
402 mongoose_rdbms:query_result().
403 execute_update_subscription(Subscription, Nidx, LU, LS, LR, SubId) ->
404
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription,
405 [Subscription, Nidx, LU, LS, LR, SubId]).
406
407 % ------------------- Items --------------------------------
408 -spec execute_get_entity_items(LS :: jid:lserver(),
409 Nidx :: mod_pubsub:nodeIdx(),
410 LU :: jid:lserver()) ->
411 mongoose_rdbms:query_result().
412 execute_get_entity_items(LS, Nidx, LU) ->
413
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_entity_items, [Nidx, LU, LS]).
414
415 -spec execute_delete_item(LS :: jid:lserver(),
416 Nidx :: mod_pubsub:nodeIdx(),
417 LU :: jid:luser(),
418 ItemId :: mod_pubsub:itemId()) ->
419 mongoose_rdbms:query_result().
420 execute_delete_item(LS, Nidx, LU, ItemId) ->
421
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_item, [Nidx, LU, LS, ItemId]).
422
423 -spec execute_delete_all_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
424 execute_delete_all_items(Nidx) ->
425
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_items, [Nidx]).
426
427 -spec execute_get_item(Nidx :: mod_pubsub:nodeIdx(),
428 ItemId :: mod_pubsub:itemId()) ->
429 mongoose_rdbms:query_result().
430 execute_get_item(Nidx, ItemId) ->
431
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_item, [Nidx, ItemId]).
432
433 -spec execute_get_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
434 execute_get_items(Nidx) ->
435
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_items, [Nidx]).
436
437 -spec execute_get_items(Nidx :: mod_pubsub:nodeIdx(), Limit :: pos_integer()) ->
438 mongoose_rdbms:query_result().
439 execute_get_items(Nidx, Limit) ->
440
:-(
Args = rdbms_queries:add_limit_arg(Limit, [Nidx]),
441
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_items_limit, Args).
442
443 -spec execute_del_item(Nidx :: mod_pubsub:nodeIdx(),
444 ItemId :: mod_pubsub:itemId()) ->
445 mongoose_rdbms:query_result().
446 execute_del_item(Nidx, ItemId) ->
447
:-(
mongoose_rdbms:execute_successfully(global, pubsub_del_item, [Nidx, ItemId]).
448
449 % ------------------- Nodes --------------------------------
450 -spec execute_insert_pubsub_node(binary(), binary(), binary(), iodata(), iodata()) ->
451 mongoose_rdbms:query_result().
452 execute_insert_pubsub_node(Key, Name, Type, Owners, Options) ->
453
:-(
mongoose_rdbms:execute_successfully(global, pubsub_insert_node,
454 [Key, Name, Type, Owners, Options]).
455
456 -spec execute_insert_parent(binary(), binary()) -> mongoose_rdbms:query_result().
457 execute_insert_parent(Name, ParentName) ->
458
:-(
mongoose_rdbms:execute_successfully(global, pubsub_insert_parent,
459 [Name, ParentName]).
460
461 -spec execute_update_pubsub_node(Type :: binary(),
462 OwnersJid :: iodata(),
463 Opts :: iodata(),
464 Nidx :: modpubsub:nodeIdx()) ->
465 mongoose_rdbms:query_result().
466 execute_update_pubsub_node(Type, OwnersJid, Opts, Nidx) ->
467
:-(
mongoose_rdbms:execute_successfully(global, pubsub_update_pubsub_node,
468 [Type, OwnersJid, Opts, Nidx]).
469
470 -spec execute_select_node_by_key_and_name(Key :: binary() | jid:ljid(),
471 Node :: mod_pubsub:nodeId()) ->
472 mongoose_rdbms:query_result().
473 execute_select_node_by_key_and_name(Key, Node) ->
474
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_key_and_name,
475 [Key, Node]).
476
477 -spec execute_select_node_by_id(Nidx :: modpubsub:nodeIdx()) -> mongoose_rdbms:query_result().
478 execute_select_node_by_id(Nidx) ->
479
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_id, [Nidx]).
480
481 -spec execute_select_nodes_by_key(Key :: binary() | jid:ljid()) ->
482 mongoose_rdbms:query_result().
483 execute_select_nodes_by_key(Key) ->
484
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_key, [Key]).
485
486 -spec execute_select_nodes_by_affiliated_user(LU :: jid:luser(), LS :: jid:lserver()) ->
487 mongoose_rdbms:query_result().
488 execute_select_nodes_by_affiliated_user(LU, LS) ->
489
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_affiliated_user, [LU, LS]).
490
491 -spec execute_select_subnodes(Key :: binary(),
492 Node :: mod_pubsub:nodeId() | <<>>) ->
493 mongoose_rdbms:query_result().
494 execute_select_subnodes(Key, <<>>) ->
495
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_top_level_nodes, [Key]);
496 execute_select_subnodes(Key, Node) ->
497
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_subnodes, [Node, Key]).
498
499 -spec execute_select_node_with_parents(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
500 mongoose_rdbms:query_result().
501 execute_select_node_with_parents(Key, Name) ->
502
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_parents, [Key, Name]).
503
504 -spec execute_select_node_with_children(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
505 mongoose_rdbms:query_result().
506 execute_select_node_with_children(Key, Name) ->
507
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_children, [Key, Name]).
508
509 -spec execute_delete_node(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
510 mongoose_rdbms:query_result().
511 execute_delete_node(Key, Node) ->
512
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_node, [Key, Node]).
513
514 -spec execute_del_parents(Name :: mod_pubsub:nodeId()) -> mongoose_rdbms:query_result().
515 execute_del_parents(Name) ->
516
:-(
mongoose_rdbms:execute_successfully(global, pubsub_del_parents, [Name]).
517
518 % ------------------- GDPR --------------------------------
519 -spec execute_get_user_items(LU :: jid:luser(), LS :: jid:lserver()) ->
520 mongoose_rdbms:query_result().
521 execute_get_user_items(LU, LS) ->
522
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_user_items, [LU, LS]).
523
524 -spec execute_select_nodes_by_owner(LJID :: binary()) -> mongoose_rdbms:query_result().
525 execute_select_nodes_by_owner(LJID) ->
526
:-(
case mongoose_rdbms:db_engine(global) of
527 pgsql ->
528
:-(
mongoose_rdbms:execute_successfully(global,
529 pubsub_select_nodes_by_owner, [LJID]);
530 _ ->
531
:-(
mongoose_rdbms:execute_successfully(global,
532 pubsub_select_nodes_by_owner, [iolist_to_binary(["[\"", LJID, "\"]"])])
533 end.
534
535 -spec execute_get_user_subscriptions(LU :: jid:luser(), LS :: jid:lserver()) ->
536 mongoose_rdbms:query_result().
537 execute_get_user_subscriptions(LU, LS) ->
538
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_user_subscriptions, [LU, LS]).
539
540 %% ------------------------ Fun execution ------------------------
541
542 transaction(Fun, ErrorDebug) ->
543
:-(
case mongoose_rdbms:sql_transaction(global, mod_pubsub_db:extra_debug_fun(Fun)) of
544 {atomic, Result} ->
545
:-(
Result;
546 {aborted, ReasonData} ->
547
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
548 end.
549
550 dirty(Fun, ErrorDebug) ->
551
:-(
try mongoose_rdbms:sql_dirty(global, mod_pubsub_db:extra_debug_fun(Fun)) of
552 Result ->
553
:-(
Result
554 catch
555 _C:ReasonData ->
556
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
557 end.
558
559 %% ------------------------ Direct #pubsub_state access ------------------------
560 %% TODO: Functions for direct #pubsub_access are currently inefficient for RDBMS
561 %% - refactor them or remove as many of them as possible from the API at some point
562 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
563 LJID :: jid:ljid()) ->
564 {ok, mod_pubsub:pubsubState()}.
565 get_state(Nidx, LJID) ->
566
:-(
{ok, ItemIds} = get_entity_items(Nidx, LJID),
567
:-(
{ok, Affiliation} = get_affiliation(Nidx, LJID),
568
:-(
{ok, Subscriptions} = get_node_entity_subscriptions(Nidx, LJID),
569
:-(
{ok, #pubsub_state{
570 stateid = {LJID, Nidx},
571 items = ItemIds,
572 affiliation = Affiliation,
573 subscriptions = Subscriptions
574 }}.
575
576 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
577 {ok, [mod_pubsub:pubsubState()]}.
578 get_states(Nidx) ->
579
:-(
{selected, ItemRows} = execute_get_item_rows_id(Nidx),
580
:-(
{selected, AffiliationRows} = execute_get_affiliation_rows_id(Nidx),
581
:-(
{selected, SubRows} = execute_get_subscriptions_rows_id(Nidx),
582
:-(
States = build_states(ItemRows, AffiliationRows, SubRows),
583
:-(
{ok, States}.
584
585 -spec get_states_by_lus(LJID :: jid:ljid()) ->
586 {ok, [mod_pubsub:pubsubState()]}.
587 get_states_by_lus({ LU, LS, _ }) ->
588
:-(
{selected, ItemRows} = execute_get_item_rows(LU, LS),
589
:-(
{selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS),
590
:-(
{selected, SubRows} = execute_get_subscriptions_rows(LU, LS),
591
:-(
States = build_states(ItemRows, AffiliationRows, SubRows),
592
:-(
{ok, States}.
593
594 -spec get_states_by_bare(LJID :: jid:ljid()) ->
595 {ok, [mod_pubsub:pubsubState()]}.
596 get_states_by_bare({ LU, LS, _ }) ->
597
:-(
{selected, ItemRows} = execute_get_item_rows(LU, LS),
598
:-(
{selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS),
599
:-(
{selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, <<>>),
600
:-(
States = build_states(ItemRows, AffiliationRows, SubRows),
601
:-(
{ok, States}.
602
603 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
604 {ok, [mod_pubsub:pubsubState()]}.
605 get_states_by_bare_and_full({ LU, LS, LR } = LJID) ->
606
:-(
{ok, StatesBare} = get_states_by_bare(LJID),
607
:-(
{selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, LR),
608
:-(
StatesFull = build_states([], [], SubRows),
609
:-(
{ok, StatesFull ++ StatesBare}.
610
611 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
612 {ok, [mod_pubsub:nodeIdx()]}.
613 get_idxs_of_own_nodes_with_pending_subs({ LU, LS, _ }) ->
614
:-(
{selected, Rows} =
615 execute_get_idxs_of_own_nodes_with_pending_subs(LS, aff2int(owner), LU, sub2int(pending)),
616
:-(
{ok, [ mongoose_rdbms:result_to_integer(Nidx) || {Nidx} <- Rows ]}.
617
618 %% ------------------------ Direct #pubsub_item access ------------------------
619
620 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
621 {ok, {[mod_pubsub:pubsubItem()], none}}.
622 get_items(Nidx, Opts) ->
623
:-(
MaxItems = maps:get(max_items, Opts, undefined),
624
:-(
ItemIds = maps:get(item_ids, Opts, undefined),
625
:-(
Rows = get_item_rows(Nidx, MaxItems, ItemIds),
626
:-(
Result = [item_to_record(Row) || Row <- Rows],
627
:-(
{ok, {Result, none}}.
628
629 -spec get_item_rows(Nidx :: mod_pubsub:nodeIdx(),
630 MaxItems :: undefined | non_neg_integer(),
631 ItemIds :: undefined | mod_pubsub:itemId()) -> [tuple()].
632 get_item_rows(Nidx, undefined, undefined) ->
633
:-(
{selected, Rows} = execute_get_items(Nidx),
634
:-(
Rows;
635 get_item_rows(Nidx, MaxItems, undefined) ->
636
:-(
{selected, Rows} = execute_get_items(Nidx, MaxItems),
637
:-(
Rows;
638 get_item_rows(Nidx, MaxItems, ItemIds) ->
639 %% Returned items have same order as ItemIds
640
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, []).
641
642
:-(
get_item_rows_acc(_Nidx, _MaxItems, [], AccRows) -> AccRows;
643 get_item_rows_acc(Nidx, MaxItems, [ItemId | ItemIds], AccRows) ->
644
:-(
case execute_get_item(Nidx, ItemId) of
645 {selected, []} ->
646
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, AccRows);
647 {selected, [Item]} when MaxItems =:= undefined;
648 length(AccRows) < MaxItems ->
649
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, [Item | AccRows]);
650 {selected, [_]} ->
651
:-(
AccRows
652 end.
653
654 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
655 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
656 get_item(Nidx, ItemId) ->
657
:-(
case execute_get_item(Nidx, ItemId) of
658 {selected, []} ->
659
:-(
{error, item_not_found};
660 {selected, [Item]} ->
661
:-(
{ok, item_to_record(Item)}
662 end.
663
664 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
665 set_item(#pubsub_item{itemid = {ItemId, NodeIdx},
666 creation = {CreatedAtNow, {CreatedLUser, CreatedLServer, _}},
667 modification = {ModifiedAtNow, {ModifiedLUser, ModifiedLServer, ModifiedLResource}},
668 publisher = PublisherIn,
669 payload = Payload}) ->
670
:-(
PayloadWrapped = #xmlel{name = <<"item">>, children = Payload},
671
:-(
PayloadXML = exml:to_binary(PayloadWrapped),
672
:-(
CreatedAt = CreatedAtNow,
673
:-(
ModifiedAt = ModifiedAtNow,
674
:-(
Publisher = null_or_bin_jid(PublisherIn),
675
:-(
InsertParams = [NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt,
676 ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
677 Publisher, PayloadXML],
678
:-(
UpdateParams = [ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
679 Publisher, PayloadXML],
680
:-(
UniqueKeyValues = [NodeIdx, ItemId],
681
:-(
{updated, _} = rdbms_queries:execute_upsert(global, pubsub_item_upsert,
682 InsertParams, UpdateParams, UniqueKeyValues),
683
:-(
ok.
684
685 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
686 del_item(Nidx, ItemId) ->
687
:-(
{updated, _} = execute_del_item(Nidx, ItemId),
688
:-(
ok.
689
690 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
691 del_items(_, []) ->
692
:-(
ok;
693 del_items(Nidx, ItemIds) ->
694
:-(
[{updated, _} = execute_del_item(Nidx, Item) || Item <- ItemIds],
695
:-(
ok.
696
697 % ------------------- Node management --------------------------------
698
699 -spec create_node(Nidx :: mod_pubsub:nodeIdx(), Owner :: jid:ljid()) -> ok.
700 create_node(Nidx, LJID) ->
701
:-(
set_affiliation(Nidx, LJID, owner).
702
703 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
704 {ok, [mod_pubsub:pubsubState()]}.
705 del_node(Nidx) ->
706
:-(
{ok, States} = get_states(Nidx),
707
:-(
{updated, _} = execute_delete_all_subscriptions_id(Nidx),
708
:-(
{updated, _} = execute_delete_all_items(Nidx),
709
:-(
{updated, _} = execute_delete_all_affiliations(Nidx),
710
:-(
{ok, States}.
711
712 -spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
713 set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type,
714 owners = Owners, options = Opts, parents = Parents}) ->
715
:-(
ExtKey = encode_key(Key),
716
:-(
ExtOwners = jiffy:encode([jid:to_binary(Owner) || Owner <- Owners]),
717
:-(
ExtOpts = jiffy:encode({Opts}),
718
:-(
{updated, 1} = execute_insert_pubsub_node(ExtKey, Name, Type, ExtOwners, ExtOpts),
719
:-(
{selected, [Row]} = execute_select_node_by_key_and_name(ExtKey, Name),
720
:-(
#pubsub_node{id = Nidx} = decode_pubsub_node_row(Row),
721
:-(
set_parents(Name, Parents),
722
:-(
{ok, Nidx};
723
724 set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type,
725 owners = Owners, options = Opts, parents = Parents}) ->
726
:-(
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
727
:-(
execute_update_pubsub_node(Type, jiffy:encode(OwnersJid), jiffy:encode({Opts}), Nidx),
728
:-(
execute_del_parents(Name),
729
:-(
set_parents(Name, Parents),
730
:-(
{ok, Nidx}.
731
732 -spec set_parents(mod_pubsub:nodeId(), [mod_pubsub:nodeId()]) -> ok.
733 set_parents(Name, Parents) ->
734
:-(
[execute_insert_parent(Name, ParentName) || ParentName <- Parents],
735
:-(
ok.
736
737 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
738 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
739 find_node_by_id(Nidx) ->
740
:-(
case execute_select_node_by_id(Nidx) of
741 {selected, []} ->
742
:-(
{error, not_found};
743 {selected, [Row]} ->
744
:-(
{ok, decode_pubsub_node_row(Row)}
745 end.
746
747 -spec find_node_by_name(Key :: mod_pubsub:hostPubsub() | jid:ljid(),
748 Node :: mod_pubsub:nodeId()) ->
749 mod_pubsub:pubsubNode() | false.
750 find_node_by_name(Key, Node) ->
751
:-(
case execute_select_node_by_key_and_name(encode_key(Key), Node) of
752 {selected, [Row]} ->
753
:-(
decode_pubsub_node_row(Row);
754 {selected, []} ->
755
:-(
false
756 end.
757
758 decode_pubsub_node_row({Nidx, KeySQL, Name, Type, Owners, Options}) ->
759
:-(
Key = decode_key(KeySQL),
760
:-(
{DecodedOpts} = jiffy:decode(Options),
761
:-(
DecodedOptions = [maybe_option_value_to_atom(key_to_existing_atom(Item)) ||
762
:-(
Item <- DecodedOpts],
763
:-(
DecodedOwners = [jid:to_lower(jid:from_binary(Owner)) ||
764
:-(
Owner <- jiffy:decode(Owners)],
765
:-(
#pubsub_node{nodeid = {Key, Name},
766 id = mongoose_rdbms:result_to_integer(Nidx),
767 type = Type,
768 owners = DecodedOwners,
769 options = DecodedOptions}.
770
771 maybe_option_value_to_atom({access_model, Value}) ->
772
:-(
{access_model, binary_to_existing_atom(Value, utf8)};
773 maybe_option_value_to_atom({publish_model, Value}) ->
774
:-(
{publish_model, binary_to_existing_atom(Value, utf8)};
775 maybe_option_value_to_atom({notification_type, Value}) ->
776
:-(
{notification_type, binary_to_existing_atom(Value, utf8)};
777 maybe_option_value_to_atom({node_type, Value}) ->
778
:-(
{node_type, binary_to_existing_atom(Value, utf8)};
779 maybe_option_value_to_atom({send_last_published_item, Value}) ->
780
:-(
{send_last_published_item, binary_to_existing_atom(Value, utf8)};
781 maybe_option_value_to_atom(Other) ->
782
:-(
Other.
783
784 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
785 [mod_pubsub:pubsubNode()].
786 find_nodes_by_key(Key) ->
787
:-(
{selected, Rows} =
788 execute_select_nodes_by_key(encode_key(Key)),
789
:-(
[decode_pubsub_node_row(Row) || Row <- Rows].
790
791 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
792 delete_node(#pubsub_node{nodeid = {Key, Node}}) ->
793
:-(
{updated, _} = execute_delete_node(encode_key(Key), Node),
794
:-(
ok.
795
796 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
797 [mod_pubsub:pubsubNode()].
798 get_subnodes(Key, Node) ->
799
:-(
{selected, Rows} = execute_select_subnodes(encode_key(Key), Node),
800
:-(
[decode_pubsub_node_row(Row) || Row <- Rows].
801
802 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
803 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
804 get_parentnodes_tree(Key, Node) ->
805
:-(
find_nodes_with_parents(Key, [Node], 0, []).
806
807 find_nodes_with_parents(_, [], _, Acc) ->
808
:-(
Acc;
809 find_nodes_with_parents(_, _, 100, Acc) ->
810
:-(
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}),
811
:-(
Acc;
812 find_nodes_with_parents(Key, Nodes, Depth, Acc) ->
813
:-(
ExtKey = encode_key(Key),
814
:-(
Rows = lists:flatmap(fun(Name) ->
815
:-(
{selected, Rs} = execute_select_node_with_parents(ExtKey, Name),
816
:-(
Rs
817 end, Nodes),
818
:-(
Map = lists:foldl(fun update_nodes_map/2, #{}, Rows),
819
:-(
MapTransformer = fun(Nidx, #{name := NodeName,
820 next_level := Parents}, {ParentsAcc, NodesAcc}) ->
821
:-(
NewParents = [Parents | ParentsAcc],
822
:-(
Node = #pubsub_node{id = Nidx,
823 nodeid = {Key, NodeName},
824 parents = Parents},
825
:-(
NewNodes = [Node | NodesAcc],
826
:-(
{NewParents, NewNodes}
827 end,
828
829
:-(
{Parents, NewNodes} = maps:fold(MapTransformer, {[], []}, Map),
830
:-(
NewAcc = [{Depth, NewNodes} | Acc],
831
:-(
find_nodes_with_parents(Key, lists:flatten(Parents), Depth + 1, NewAcc).
832
833 update_nodes_map({NidxSQL, NodeName, NextLevelNodeName}, Map) ->
834
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSQL),
835
:-(
case maps:get(Nidx, Map, undefined) of
836 undefined ->
837
:-(
NewNode = #{name => NodeName,
838 next_level => maybe_add_parent_to_list(NextLevelNodeName, [])},
839
:-(
Map#{Nidx => NewNode};
840 #{next_level := NextLevelNodes} = Node ->
841
:-(
UpdatedNode = Node#{next_level := maybe_add_parent_to_list(NextLevelNodeName, NextLevelNodes)},
842
:-(
Map#{Nidx := UpdatedNode}
843 end.
844
845 maybe_add_parent_to_list(null, List) ->
846
:-(
List;
847 maybe_add_parent_to_list(ParentName, List) ->
848
:-(
[ParentName | List].
849
850 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
851 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
852 get_subnodes_tree(Key, Node) ->
853
:-(
find_subnodes(Key, [Node], 0, []).
854
855 find_subnodes(_Key, [], _, Acc) ->
856
:-(
Acc;
857 find_subnodes(_, _, 100, Acc) ->
858
:-(
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}),
859
:-(
Acc;
860 find_subnodes(Key, Nodes, Depth, Acc) ->
861
:-(
ExtKey = encode_key(Key),
862
:-(
Rows = lists:flatmap(fun(Name) ->
863
:-(
{selected, Rs} = execute_select_node_with_children(ExtKey, Name),
864
:-(
Rs
865 end, Nodes),
866
:-(
Map = lists:foldl(fun update_nodes_map/2, #{}, Rows),
867
:-(
MapTransformer = fun(Nidx, #{name := NodeName,
868 next_level := Subnodes}, {SubnodesAcc, NodesAcc}) ->
869
:-(
NewSubnodes = [Subnodes | SubnodesAcc],
870
:-(
Node = #pubsub_node{id = Nidx,
871 nodeid = {Key, NodeName}},
872
:-(
NewNodes = [Node | NodesAcc],
873
:-(
{NewSubnodes, NewNodes}
874 end,
875
:-(
{Subnodes, NewNodes} = maps:fold(MapTransformer, {[], []}, Map),
876
:-(
NewAcc = [{Depth, NewNodes} | Acc],
877
:-(
find_subnodes(Key, lists:flatten(Subnodes), Depth + 1, NewAcc).
878
879 % ------------------- Affiliations --------------------------------
880
881 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
882 LJID :: jid:ljid(),
883 Affiliation :: mod_pubsub:affiliation()) -> ok.
884 set_affiliation(Nidx, { LU, LS, _ } = LJID, none) ->
885
:-(
BareLJID = jid:to_bare(LJID),
886
:-(
case get_node_entity_subscriptions(Nidx, BareLJID) of
887 {ok, []} ->
888
:-(
del_state(Nidx, BareLJID);
889 _ ->
890
:-(
delete_affiliation_wo_subs_check(Nidx, LU, LS)
891 end;
892 set_affiliation(Nidx, { LU, LS, _ }, Affiliation) ->
893
:-(
Aff = aff2int(Affiliation),
894
:-(
InsertParams = [Nidx, LU, LS, Aff],
895
:-(
UpdateParams = [Aff],
896
:-(
UniqueKeyValues = [Nidx, LU, LS],
897
:-(
{updated, _} = rdbms_queries:execute_upsert(global, pubsub_affiliation_upsert,
898 InsertParams, UpdateParams, UniqueKeyValues),
899
:-(
ok.
900
901 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
902 LJID :: jid:ljid()) ->
903 {ok, mod_pubsub:affiliation()}.
904 get_affiliation(Nidx, { LU, LS, _ }) ->
905
:-(
case execute_get_affiliation(Nidx, LU, LS) of
906 {selected, [{AffInt}]} ->
907
:-(
{ok, sql2aff(AffInt)};
908 {selected, []} ->
909
:-(
{ok, none}
910 end.
911
912 % ------------------- Subscriptions --------------------------------
913
914 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
915 LJID :: jid:ljid(),
916 Sub :: mod_pubsub:subscription(),
917 SubId :: mod_pubsub:subId(),
918 SubOpts :: mod_pubsub:subOptions()) -> ok.
919 add_subscription(Nidx, { LU, LS, LR }, Sub, SubId, SubOpts) ->
920
:-(
EncodedOpts = jiffy:encode({SubOpts}),
921
:-(
{updated, _} = execute_insert_subscription(Nidx, LU, LS, LR, sub2int(Sub), SubId, EncodedOpts),
922
:-(
ok.
923
924 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
925 LJID :: jid:ljid(),
926 SubId :: mod_pubsub:subId(),
927 Opts :: mod_pubsub:subOptions()) -> ok.
928 set_subscription_opts(Nidx, { LU, LS, LR }, SubId, Opts) ->
929
:-(
EncodedOpts = jiffy:encode({Opts}),
930
:-(
{updated, _} = execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId),
931
:-(
ok.
932
933 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
934 {ok, [{Entity :: jid:ljid(),
935 Sub :: mod_pubsub:subscription(),
936 SubId :: mod_pubsub:subId(),
937 SubOpts :: mod_pubsub:subOptions()}]}.
938 get_node_subscriptions(Nidx) ->
939
:-(
{selected, QueryResult} = execute_get_node_subs(Nidx),
940
:-(
{ok, [{{LU, LS, LR}, sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)}
941
:-(
|| {LU, LS, LR, SubInt, SubId, SubOpts} <- QueryResult ]}.
942
943 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
944 LJID :: jid:ljid()) ->
945 {ok, [{Sub :: mod_pubsub:subscription(),
946 SubId :: mod_pubsub:subId(),
947 SubOpts :: mod_pubsub:subOptions()}]}.
948 get_node_entity_subscriptions(Nidx, { LU, LS, LR }) ->
949
:-(
{selected, QueryResult} = execute_get_node_entity_subs(LS, Nidx, LU, LR),
950
:-(
{ok, [{sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)}
951
:-(
|| {SubInt, SubId, SubOpts} <- QueryResult ]}.
952
953 -spec delete_subscription(Nidx :: mod_pubsub:nodeIdx(),
954 LJID :: jid:ljid(),
955 SubId :: mod_pubsub:subId()) ->
956 ok.
957 delete_subscription(Nidx, { LU, LS, LR }, SubId) ->
958
:-(
{updated, _} = execute_delete_subscription(LS, Nidx, LU, LR, SubId),
959
:-(
ok.
960
961 -spec delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
962 LJID :: jid:ljid()) ->
963 ok.
964 delete_all_subscriptions(Nidx, { LU, LS, LR } = LJID) ->
965
:-(
case get_affiliation(Nidx, LJID) of
966 {ok, none} ->
967
:-(
del_state(Nidx, LJID);
968 _ ->
969
:-(
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR)
970 end,
971
:-(
ok.
972
973 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
974 LJID :: jid:ljid(),
975 Subscription :: mod_pubsub:subscription(),
976 SubId :: mod_pubsub:subId()) ->
977 ok.
978 update_subscription(Nidx, { LU, LS, LR }, Subscription, SubId) ->
979
:-(
{updated, _} = execute_update_subscription(sub2int(Subscription), Nidx, LU, LS, LR, SubId),
980
:-(
ok.
981
982 % ------------------- Items --------------------------------
983
984 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
985 LJID :: jid:ljid(),
986 Item :: mod_pubsub:pubsubItem()) ->
987 ok.
988 add_item(_Nidx, _, Item) ->
989
:-(
set_item(Item),
990
:-(
ok.
991
992 %% TODO: Make public at some point
993 -spec get_entity_items(Nidx :: mod_pubsub:nodeIdx(),
994 LJID :: jid:ljid()) ->
995 {ok, [mod_pubsub:itemId()]}.
996 get_entity_items(Nidx, { LU, LS, _ }) ->
997
:-(
{selected, ItemIds} = execute_get_entity_items(LS, Nidx, LU),
998
:-(
{ok, [ ItemId || {ItemId} <- ItemIds]}.
999
1000 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
1001 LJID :: jid:ljid(),
1002 ItemIds :: [mod_pubsub:itemId()]) ->
1003 ok.
1004 remove_items(Nidx, { LU, LS, _ }, ItemIds) ->
1005
:-(
lists:foreach(fun(ItemId) ->
1006
:-(
{updated, _} = execute_delete_item(LS, Nidx, LU, ItemId)
1007 end, ItemIds).
1008
1009 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
1010 ok.
1011 remove_all_items(Nidx) ->
1012
:-(
{updated, _} = execute_delete_all_items(Nidx),
1013
:-(
ok.
1014
1015 % ------------------- GDPR-related --------------------------------
1016
1017 get_user_payloads(LUser, LServer) ->
1018
:-(
case execute_get_user_items(LUser, LServer) of
1019 {selected, Items} ->
1020
:-(
[[NodeName, ItemId, strip_payload(PayloadDB)] || {NodeName, ItemId, PayloadDB} <- Items]
1021 end.
1022
1023 get_user_nodes(LUser, LServer) ->
1024
:-(
LJID = jid:to_binary({LUser, LServer, <<>>}),
1025
:-(
{selected, Nodes} = execute_select_nodes_by_owner(LJID),
1026
:-(
lists:map(fun tuple_to_list/1, Nodes).
1027
1028 get_user_subscriptions(LUser, LServer) ->
1029
:-(
{selected, Nodes} = execute_get_user_subscriptions(LUser, LServer),
1030
:-(
lists:map(fun tuple_to_list/1, Nodes).
1031
1032 strip_payload(PayloadDB) ->
1033
:-(
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB),
1034
:-(
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML),
1035
:-(
exml:to_binary(Payload).
1036
1037 -spec delete_user_subscriptions(jid:ljid()) -> ok.
1038 delete_user_subscriptions({ LU, LS, _ }) ->
1039
:-(
{updated, _} = execute_delete_user_subscriptions(LS, LU),
1040
:-(
ok.
1041
1042 find_nodes_by_affiliated_user({ LU, LS, _ }) ->
1043
:-(
{selected, NodesWithAffs} = execute_select_nodes_by_affiliated_user(LU, LS),
1044
:-(
lists:map(fun decode_pubsub_node_with_aff_row/1, NodesWithAffs).
1045
1046 decode_pubsub_node_with_aff_row(Row) ->
1047
:-(
[Aff | NodeRow] = tuple_to_list(Row),
1048
:-(
{decode_pubsub_node_row(list_to_tuple(NodeRow)), sql2aff(Aff)}.
1049
1050 %%====================================================================
1051 %% Helpers
1052 %%====================================================================
1053
1054 -spec item_columns() -> binary().
1055 item_columns() ->
1056
:-(
<<"nidx, itemid, created_luser, created_lserver, created_at, "
1057 "modified_luser, modified_lserver, modified_lresource, modified_at, "
1058 "publisher, payload">>.
1059
1060 -spec pubsub_node_fields() -> binary().
1061 pubsub_node_fields() ->
1062
:-(
<<"nidx, p_key, name, type, owners, options">>.
1063
1064 -spec pubsub_node_fields_pn() -> binary().
1065 pubsub_node_fields_pn() ->
1066
:-(
<<"pn.nidx, pn.p_key, pn.name, pn.type, pn.owners, pn.options">>.
1067
1068 -spec del_state(Nidx :: mod_pubsub:nodeIdx(),
1069 LJID :: jid:ljid()) -> ok.
1070 del_state(Nidx, {LU, LS, LR}) ->
1071
:-(
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR),
1072
:-(
delete_affiliation_wo_subs_check(Nidx, LU, LS),
1073
:-(
ok.
1074
1075 -spec delete_all_subscriptions_wo_aff_check(Nidx :: mod_pubsub:nodeIdx(),
1076 LU :: jid:luser(),
1077 LS :: jid:lserver(),
1078 LR :: jid:lresource()) -> ok.
1079 delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR) ->
1080
:-(
{updated, _} = execute_delete_all_subscriptions(Nidx, LU, LS, LR),
1081
:-(
ok.
1082
1083 -spec delete_affiliation_wo_subs_check(Nidx :: mod_pubsub:nodeIdx(),
1084 LU :: jid:luser(),
1085 LS :: jid:lserver()) -> ok.
1086 delete_affiliation_wo_subs_check(Nidx, LU, LS) ->
1087
:-(
{updated, _} = execute_delete_affiliation(Nidx, LU, LS),
1088
:-(
ok.
1089
1090 -type item_row() :: { NidxSql :: integer() | binary(),
1091 LU :: binary(),
1092 LS :: binary(),
1093 ItemId :: binary() }.
1094 -type aff_row() :: { NidxSql :: integer() | binary(),
1095 LU :: binary(),
1096 LS :: binary(),
1097 AffInt :: integer() }.
1098 -type sub_row() :: { NidxSql :: integer() | binary(),
1099 LU :: binary(),
1100 LS :: binary(),
1101 LR :: binary(),
1102 TypeInt :: integer(),
1103 SubId :: binary() }.
1104
1105 -spec build_states(ItemRows :: [item_row()], AffRows :: [aff_row()], SubRows :: [sub_row()]) ->
1106 [mod_pubsub:pubsubState()].
1107 build_states(ItemRows, AffRows, SubRows) ->
1108
:-(
Result1 = item_rows_to_states(ItemRows, #{}),
1109
:-(
Result2 = aff_rows_to_states(AffRows, Result1),
1110
:-(
maps:values(sub_rows_to_states(SubRows, Result2)).
1111
1112 item_rows_to_states([], Acc) ->
1113
:-(
Acc;
1114 item_rows_to_states([{ NidxSql, LU, LS, ItemId } | RRows], Acc) ->
1115
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1116
:-(
LJID = { LU, LS, <<>> },
1117
:-(
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1118
:-(
#pubsub_state{ items = Items0 } = PS,
1119
:-(
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ items = [ItemId | Items0] } },
1120
:-(
item_rows_to_states(RRows, NAcc).
1121
1122 aff_rows_to_states([], Acc) ->
1123
:-(
Acc;
1124 aff_rows_to_states([{ NidxSql, LU, LS, AffInt } | RRows], Acc) ->
1125
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1126
:-(
LJID = { LU, LS, <<>> },
1127
:-(
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1128
:-(
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ affiliation = sql2aff(AffInt) } },
1129
:-(
aff_rows_to_states(RRows, NAcc).
1130
1131 sub_rows_to_states([], Acc) ->
1132
:-(
Acc;
1133 sub_rows_to_states([{ NidxSql, LU, LS, LR, TypeInt, SubId } | RRows], Acc) ->
1134
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1135
:-(
LJID = { LU, LS, LR },
1136
:-(
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1137
:-(
#pubsub_state{ subscriptions = Subs0 } = PS,
1138
:-(
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{
1139 subscriptions = [{sql2sub(TypeInt), SubId} | Subs0] } },
1140
:-(
sub_rows_to_states(RRows, NAcc).
1141
1142 -spec aff2int(mod_pubsub:affiliation()) -> integer().
1143
:-(
aff2int(none) -> 0;
1144
:-(
aff2int(owner) -> 1;
1145
:-(
aff2int(publisher) -> 2;
1146
:-(
aff2int(publish_only) -> 3;
1147
:-(
aff2int(member) -> 4;
1148
:-(
aff2int(outcast) -> 5.
1149
1150 -spec sql2aff(integer() | binary()) -> mod_pubsub:affiliation().
1151 sql2aff(SqlInt) ->
1152
:-(
int2aff(mongoose_rdbms:result_to_integer(SqlInt)).
1153
1154 -spec int2aff(integer()) -> mod_pubsub:affiliation().
1155
:-(
int2aff(0) -> none;
1156
:-(
int2aff(1) -> owner;
1157
:-(
int2aff(2) -> publisher;
1158
:-(
int2aff(3) -> publish_only;
1159
:-(
int2aff(4) -> member;
1160
:-(
int2aff(5) -> outcast.
1161
1162 -spec sub2int(mod_pubsub:subscription()) -> integer().
1163
:-(
sub2int(none) -> 0;
1164
:-(
sub2int(pending) -> 1;
1165
:-(
sub2int(subscribed) -> 3.
1166
1167 -spec sql2sub(integer() | binary()) -> mod_pubsub:subscription().
1168 sql2sub(SqlInt) ->
1169
:-(
int2sub(mongoose_rdbms:result_to_integer(SqlInt)).
1170
1171 -spec int2sub(integer()) -> mod_pubsub:subscription().
1172
:-(
int2sub(0) -> none;
1173
:-(
int2sub(1) -> pending;
1174
:-(
int2sub(3) -> subscribed.
1175
1176 sql_to_sub_opts(SqlOpts) ->
1177
:-(
{Opts} = jiffy:decode(SqlOpts),
1178
:-(
lists:map(fun key_to_existing_atom/1, Opts).
1179
1180 item_to_record({NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt,
1181 ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
1182 PublisherIn, PayloadDB}) ->
1183
:-(
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB),
1184
:-(
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML),
1185
:-(
ItemAndNodeId = {ItemId, mongoose_rdbms:result_to_integer(NodeIdx)},
1186
:-(
Creation = {mongoose_rdbms:result_to_integer(CreatedAt),
1187 {CreatedLUser, CreatedLServer, <<>>}},
1188
:-(
Modification = {mongoose_rdbms:result_to_integer(ModifiedAt),
1189 {ModifiedLUser, ModifiedLServer, ModifiedLResource}},
1190
:-(
Publisher = decode_publisher(PublisherIn),
1191
:-(
#pubsub_item{itemid = ItemAndNodeId,
1192 creation = Creation,
1193 modification = Modification,
1194 publisher = Publisher,
1195 payload = Payload}.
1196
1197 decode_publisher(null) ->
1198
:-(
undefined;
1199 decode_publisher(Binary) ->
1200 %% Silently returns `error` if parsing fails.
1201
:-(
jid:from_binary(Binary).
1202
1203 encode_key(Key) when is_binary(Key) ->
1204
:-(
Key;
1205 encode_key({_, _, _} = JID) ->
1206
:-(
jid:to_binary(JID).
1207
1208 decode_key(KeySQL) ->
1209
:-(
case jid:from_binary(KeySQL) of
1210 #jid{luser = <<>>, lserver = Host, lresource = <<>>} ->
1211
:-(
Host;
1212 #jid{luser = LUser, lserver = LServer, lresource = LResource} ->
1213
:-(
{LUser, LServer, LResource}
1214 end.
1215
1216 null_or_bin_jid(undefined) ->
1217
:-(
null;
1218 null_or_bin_jid(Jid) ->
1219
:-(
jid:to_binary(Jid).
1220
1221 key_to_existing_atom({Key, Value}) when is_atom(Key)->
1222
:-(
{Key, Value};
1223 key_to_existing_atom({Key, Value}) ->
1224
:-(
{binary_to_existing_atom(Key, utf8), Value}.
1225
Line Hits Source