./ct_report/coverage/mod_pubsub_db_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_pubsub_db_rdbms.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : PubSub RDBMS backend
5 %%% Created : 2 Nov 2018 by Piotr Nosek <piotr.nosek@erlang-solutions.com>
6 %%%----------------------------------------------------------------------
7
8 -module(mod_pubsub_db_rdbms).
9 -author('piotr.nosek@erlang-solutions.com').
10 -author('michal.piotrowski@erlang-solutions.com').
11
12 -behaviour(mod_pubsub_db).
13
14 -include("pubsub.hrl").
15 -include("mongoose_logger.hrl").
16 -include("jlib.hrl").
17
18 -export([init/2, stop/0]).
19 % Funs execution
20 -export([transaction/2, dirty/2]).
21 % Direct #pubsub_state access
22 -export([get_state/2,
23 get_states/1, get_states_by_lus/1, get_states_by_bare/1,
24 get_states_by_bare_and_full/1, get_idxs_of_own_nodes_with_pending_subs/1]).
25 % Node management
26 -export([
27 create_node/2,
28 del_node/1,
29 set_node/1,
30 find_node_by_id/1,
31 find_nodes_by_key/1,
32 find_node_by_name/2,
33 delete_node/1,
34 get_subnodes/2,
35 get_parentnodes_tree/2,
36 get_subnodes_tree/2
37 ]).
38 % Affiliations
39 -export([
40 set_affiliation/3,
41 get_affiliation/2
42 ]).
43 % Subscriptions
44 -export([
45 add_subscription/5,
46 set_subscription_opts/4,
47 get_node_subscriptions/1,
48 get_node_entity_subscriptions/2,
49 delete_subscription/3,
50 delete_all_subscriptions/2,
51 update_subscription/4
52 ]).
53 % Item ids in state
54 -export([
55 add_item/3,
56 remove_items/3,
57 remove_all_items/1
58 ]).
59 % Whole items
60 -export([
61 get_items/2,
62 get_item/2,
63 set_item/1,
64 del_item/2,
65 del_items/2
66 ]).
67
68 %% GDPR related
69 -export([
70 get_user_payloads/2,
71 get_user_nodes/2,
72 get_user_subscriptions/2,
73 delete_user_subscriptions/1,
74 find_nodes_by_affiliated_user/1
75 ]).
76
77 %%====================================================================
78 %% Behaviour callbacks
79 %%====================================================================
80
81 %% ------------------------ Backend start/stop ------------------------
82
83 init(_HostType, _Opts) ->
84 % -------------------- State building ----------------------------
85
:-(
mongoose_rdbms:prepare(pubsub_get_item_rows_id, pubsub_items, [nidx],
86 <<"SELECT nidx, created_luser, created_lserver, itemid "
87 "FROM pubsub_items WHERE nidx = ?">>),
88
:-(
mongoose_rdbms:prepare(pubsub_get_affiliation_rows_id, pubsub_affiliations, [nidx],
89 <<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations WHERE nidx = ?">>),
90
:-(
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_id, pubsub_subscriptions, [nidx],
91 <<"SELECT nidx, luser, lserver, lresource, type, sub_id "
92 "FROM pubsub_subscriptions WHERE nidx = ?">>),
93
:-(
mongoose_rdbms:prepare(pubsub_get_item_rows, pubsub_items, [created_luser, created_lserver],
94 <<"SELECT nidx, created_luser, created_lserver, itemid FROM pubsub_items"
95 " WHERE created_luser = ? AND created_lserver = ?">>),
96
:-(
mongoose_rdbms:prepare(pubsub_get_affiliation_rows, pubsub_affiliations, [luser, lserver],
97 <<"SELECT nidx, luser, lserver, aff FROM pubsub_affiliations "
98 "WHERE luser = ? AND lserver = ?">>),
99
:-(
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows, pubsub_subscriptions, [luser, lserver],
100 <<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions "
101 "WHERE luser = ? AND lserver = ?">>),
102
:-(
mongoose_rdbms:prepare(pubsub_get_subscriptions_rows_resource, pubsub_subscriptions,
103 [luser, lserver, lresource],
104 <<"SELECT nidx, luser, lserver, lresource, type, sub_id FROM pubsub_subscriptions "
105 "WHERE luser = ? AND lserver = ? AND lresource = ?">>),
106
:-(
mongoose_rdbms:prepare(pubsub_get_idxs_of_own_nodes_with_pending_subs, pubsub_affiliations,
107 [aff, luser, lserver, 'pubsub_subscriptions.type'],
108 <<"SELECT DISTINCT s.nidx FROM pubsub_affiliations AS a "
109 "INNER JOIN pubsub_subscriptions s ON a.nidx = s.nidx "
110 "WHERE a.aff = ? AND a.luser = ? AND a.lserver = ? AND s.type = ?">>),
111
112 % ------------------- Affiliations --------------------------------
113
:-(
mongoose_rdbms:prepare(pubsub_get_affiliation, pubsub_affiliations, [nidx, luser, lserver],
114 <<"SELECT aff FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>),
115
:-(
mongoose_rdbms:prepare(pubsub_delete_affiliation, pubsub_affiliations, [nidx, luser, lserver],
116 <<"DELETE FROM pubsub_affiliations WHERE nidx = ? AND luser = ? AND lserver = ?">>),
117
:-(
mongoose_rdbms:prepare(pubsub_delete_all_affiliations, pubsub_affiliations, [nidx],
118 <<"DELETE FROM pubsub_affiliations WHERE nidx = ?">>),
119
120 % ------------------- Subscriptions --------------------------------
121
:-(
mongoose_rdbms:prepare(pubsub_insert_subscription, pubsub_subscriptions,
122 [nidx, luser, lserver, lresource, type, sub_id, options],
123 <<"INSERT INTO pubsub_subscriptions (nidx, luser, lserver, lresource, "
124 "type, sub_id, options) VALUES (?, ?, ?, ?, ?, ?, ?)">>),
125
:-(
mongoose_rdbms:prepare(pubsub_update_subscription_opts, pubsub_subscriptions,
126 [options, nidx, luser, lserver, lresource, sub_id],
127 <<"UPDATE pubsub_subscriptions SET options = ? WHERE nidx = ? "
128 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
129
:-(
mongoose_rdbms:prepare(pubsub_get_node_subs, pubsub_subscriptions, [nidx],
130 <<"SELECT luser, lserver, lresource, type, sub_id, options "
131 "FROM pubsub_subscriptions WHERE nidx = ?">>),
132
:-(
mongoose_rdbms:prepare(pubsub_get_node_entity_subs, pubsub_subscriptions,
133 [nidx, luser, lserver, lresource],
134 <<"SELECT type, sub_id, options FROM pubsub_subscriptions "
135 "WHERE nidx = ? AND luser = ? AND lserver = ? AND lresource = ?">>),
136
:-(
mongoose_rdbms:prepare(pubsub_delete_subscription, pubsub_subscriptions,
137 [nidx, luser, lserver, lresource, sub_id],
138 <<"DELETE FROM pubsub_subscriptions WHERE nidx = ? "
139 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
140
:-(
mongoose_rdbms:prepare(pubsub_delete_all_subscriptions, pubsub_subscriptions,
141 [nidx, luser, lserver, lresource],
142 <<"DELETE FROM pubsub_subscriptions"" WHERE nidx = ? "
143 "AND luser = ? AND lserver = ? AND lresource = ?">>),
144
:-(
mongoose_rdbms:prepare(pubsub_delete_all_subscriptions_id, pubsub_subscriptions, [nidx],
145 <<"DELETE FROM pubsub_subscriptions WHERE nidx = ?">>),
146
:-(
mongoose_rdbms:prepare(pubsub_delete_user_subscriptions,
147 pubsub_subscriptions, [luser, lserver],
148 <<"DELETE FROM pubsub_subscriptions WHERE luser = ? AND lserver = ?">>),
149
:-(
mongoose_rdbms:prepare(pubsub_update_subscription, pubsub_subscriptions,
150 [type, nidx, luser, lserver, lresource, sub_id],
151 <<"UPDATE pubsub_subscriptions SET type = ? WHERE nidx = ? "
152 "AND luser = ? AND lserver = ? AND lresource = ? AND sub_id = ?">>),
153
154 % ------------------- Items --------------------------------
155
:-(
mongoose_rdbms:prepare(pubsub_get_entity_items, pubsub_items,
156 [nidx, created_luser, created_lserver],
157 <<"SELECT itemid FROM pubsub_items WHERE nidx = ? "
158 "AND created_luser = ? AND created_lserver = ?">>),
159
:-(
mongoose_rdbms:prepare(pubsub_delete_item, pubsub_items,
160 [nidx, created_luser, created_lserver, itemid],
161 <<"DELETE FROM pubsub_items WHERE nidx = ? AND created_luser = ? ",
162 "AND created_lserver = ? AND itemid = ?">>),
163
:-(
mongoose_rdbms:prepare(pubsub_delete_all_items, pubsub_items, [nidx],
164 <<"DELETE FROM pubsub_items WHERE nidx = ?">>),
165
:-(
ItemColumns = item_columns(),
166
:-(
mongoose_rdbms:prepare(pubsub_get_item, pubsub_items, [nidx, itemid],
167 <<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? AND itemid = ?">>),
168
:-(
mongoose_rdbms:prepare(pubsub_get_items, pubsub_items, [nidx],
169 <<"SELECT ", ItemColumns/binary, " FROM pubsub_items WHERE nidx = ? "
170 "ORDER BY modified_at DESC">>),
171
:-(
{LimitSQL, LimitMSSQL} = rdbms_queries:get_db_specific_limits_binaries(),
172
:-(
mongoose_rdbms:prepare(pubsub_get_items_limit, pubsub_items,
173 rdbms_queries:add_limit_arg(limit, [nidx]),
174 <<"SELECT ", LimitMSSQL/binary, " ", ItemColumns/binary,
175 " FROM pubsub_items WHERE nidx = ? ORDER BY modified_at DESC ", LimitSQL/binary>>),
176
:-(
mongoose_rdbms:prepare(pubsub_del_item, pubsub_items, [nidx, itemid],
177 <<"DELETE FROM pubsub_items WHERE nidx = ? AND itemid = ?">>),
178
179 % ------------------- Nodes --------------------------------
180
:-(
mongoose_rdbms:prepare(pubsub_insert_node, pubsub_nodes, [p_key, name, type, owners, options],
181 <<"INSERT INTO pubsub_nodes (p_key, name, type, owners, options) VALUES (?, ?, ?, ?, ?)">>),
182
:-(
mongoose_rdbms:prepare(pubsub_insert_parent, pubsub_node_collections, [name, parent_name],
183 <<"INSERT INTO pubsub_node_collections (name, parent_name) VALUES (?, ?)">>),
184
:-(
mongoose_rdbms:prepare(pubsub_update_pubsub_node, pubsub_nodes, [type, owners, options, nidx],
185 <<"UPDATE pubsub_nodes SET type = ?, owners = ?, options = ? WHERE nidx = ?">>),
186
:-(
PubsubNodeFields = pubsub_node_fields(),
187
:-(
mongoose_rdbms:prepare(pubsub_select_node_by_key_and_name, pubsub_nodes, [p_key, name],
188 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ? AND name = ?">>),
189
:-(
mongoose_rdbms:prepare(pubsub_select_node_by_id, pubsub_nodes, [nidx],
190 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE nidx = ?">>),
191
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_key, pubsub_nodes, [p_key],
192 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes WHERE p_key = ?">>),
193
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_in_list_with_key, pubsub_nodes, [p_key, names],
194 <<"SELECT ", PubsubNodeFields/binary, " from pubsub_nodes "
195 "WHERE p_key = ? AND name IN (?)">>),
196
:-(
PubsubNodeFieldsPrefixed = pubsub_node_fields_pn(),
197
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_affiliated_user,
198 pubsub_affiliations, [luser, lserver],
199 <<"SELECT aff, ", PubsubNodeFieldsPrefixed/binary, " FROM pubsub_affiliations AS pa "
200 "INNER JOIN pubsub_nodes AS pn ON pa.nidx = pn.nidx WHERE luser = ? AND lserver = ?">>),
201
:-(
mongoose_rdbms:prepare(pubsub_select_top_level_nodes, pubsub_nodes, [p_key],
202 <<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn "
203 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name "
204 "WHERE p_key = ? AND collection.parent_name IS NULL">>),
205
:-(
mongoose_rdbms:prepare(pubsub_select_subnodes, pubsub_nodes,
206 ['pubsub_node_collections.parent_name', p_key],
207 <<"SELECT ", PubsubNodeFieldsPrefixed/binary, " from pubsub_nodes as pn "
208 "INNER JOIN pubsub_node_collections as collection ON pn.name = collection.name AND "
209 "collection.parent_name = ? WHERE p_key = ?">>),
210
:-(
mongoose_rdbms:prepare(pubsub_select_node_with_parents, pubsub_nodes,
211 [p_key, name],
212 <<"SELECT pn.nidx, pn.name, collection.parent_name from pubsub_nodes as pn "
213 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.name "
214 "WHERE pn.p_key = ? AND pn.name = ?">>),
215
:-(
mongoose_rdbms:prepare(pubsub_select_node_with_children, pubsub_nodes,
216 [p_key, name],
217 <<"SELECT pn.nidx, pn.name, collection.name from pubsub_nodes as pn "
218 "LEFT JOIN pubsub_node_collections as collection ON pn.name = collection.parent_name "
219 "WHERE pn.p_key = ? AND pn.name = ?">>),
220
:-(
mongoose_rdbms:prepare(pubsub_delete_node, pubsub_nodes, [p_key, name],
221 <<"DELETE from pubsub_nodes WHERE p_key = ? AND name = ?">>),
222
:-(
mongoose_rdbms:prepare(pubsub_del_parents, pubsub_node_collections, [name],
223 <<"DELETE FROM pubsub_node_collections WHERE name = ?">>),
224
225 % ------------------- GDPR --------------------------------
226
:-(
mongoose_rdbms:prepare(pubsub_get_user_items, pubsub_items, [created_luser, created_lserver],
227 <<"SELECT name, itemid, payload FROM pubsub_items INNER JOIN pubsub_nodes "
228 "ON pubsub_items.nidx = pubsub_nodes.nidx WHERE created_luser = ? "
229 "AND created_lserver = ?">>),
230
:-(
mongoose_rdbms:prepare(pubsub_get_user_subscriptions, pubsub_subscriptions, [luser, lserver],
231 <<"SELECT name FROM pubsub_subscriptions INNER JOIN pubsub_nodes "
232 "ON pubsub_subscriptions.nidx = pubsub_nodes.nidx WHERE luser = ? AND lserver = ?">>),
233
:-(
prepare_select_nodes_by_owner(),
234
235
:-(
rdbms_queries:prepare_upsert(global, pubsub_affiliation_upsert, pubsub_affiliations,
236 [<<"nidx">>, <<"luser">>, <<"lserver">>, <<"aff">>],
237 [<<"aff">>],
238 [<<"nidx">>, <<"luser">>, <<"lserver">>]),
239
:-(
ItemInsertFields = [<<"nidx">>, <<"itemid">>,
240 <<"created_luser">>, <<"created_lserver">>, <<"created_at">>,
241 <<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>,
242 <<"publisher">>, <<"payload">>],
243
:-(
ItemUpdateFields = [<<"modified_luser">>, <<"modified_lserver">>, <<"modified_lresource">>, <<"modified_at">>,
244 <<"publisher">>, <<"payload">>],
245
:-(
rdbms_queries:prepare_upsert(global, pubsub_item_upsert, pubsub_items,
246 ItemInsertFields,
247 ItemUpdateFields,
248 [<<"nidx">>, <<"itemid">>]),
249
:-(
ok.
250
251 -spec stop() -> ok.
252 stop() ->
253
:-(
ok.
254
255 %% ------------------------ Queries execution --------------------
256 prepare_select_nodes_by_owner() ->
257
:-(
case {mongoose_rdbms:db_engine(global), mongoose_rdbms:db_type()} of
258 {mysql, _} ->
259
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
260 <<"SELECT name, type FROM pubsub_nodes WHERE owners = convert(?, JSON);">>);
261 {pgsql, _} ->
262
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
263 <<"SELECT name, type FROM pubsub_nodes WHERE owners ::json->>0 like ? "
264 "AND JSON_ARRAY_LENGTH(owners) = 1">>);
265 {odbc, mssql} ->
266
:-(
mongoose_rdbms:prepare(pubsub_select_nodes_by_owner, pubsub_nodes, [owners],
267 <<"SELECT name, type FROM pubsub_nodes WHERE cast(owners as nvarchar(max)) = ?;">>)
268 end.
269 % -------------------- State building ----------------------------
270 -spec execute_get_item_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
271 mongoose_rdbms:query_result().
272 execute_get_item_rows_id(Nidx) ->
273
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_item_rows_id, [Nidx]).
274
275 -spec execute_get_affiliation_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
276 mongoose_rdbms:query_result().
277 execute_get_affiliation_rows_id(Nidx) ->
278
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows_id, [Nidx]).
279
280 -spec execute_get_subscriptions_rows_id(Nidx :: mod_pubsub:nodeIdx()) ->
281 mongoose_rdbms:query_result().
282 execute_get_subscriptions_rows_id(Nidx) ->
283
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows_id, [Nidx]).
284
285 -spec execute_get_item_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
286 mongoose_rdbms:query_result().
287 execute_get_item_rows(LU, LS) ->
288
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_item_rows, [LU, LS]).
289
290 -spec execute_get_affiliation_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
291 mongoose_rdbms:query_result().
292 execute_get_affiliation_rows(LU, LS) ->
293
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_affiliation_rows, [LU, LS]).
294
295 -spec execute_get_subscriptions_rows(LU :: jid:luser(), LS :: jid:lserver()) ->
296 mongoose_rdbms:query_result().
297 execute_get_subscriptions_rows(LU, LS) ->
298
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_subscriptions_rows, [LU, LS]).
299
300 -spec execute_get_subscriptions_rows_resource(LU :: jid:luser(), LS :: jid:lserver(),
301 LR :: jid:lresource()) -> mongoose_rdbms:query_result().
302 execute_get_subscriptions_rows_resource(LU, LS, LR) ->
303
:-(
mongoose_rdbms:execute_successfully(global,
304 pubsub_get_subscriptions_rows_resource, [LU, LS, LR]).
305
306 -spec execute_get_idxs_of_own_nodes_with_pending_subs(LS :: jid:lserver(),
307 Aff :: integer(),
308 LU :: jid:luser(),
309 Sub :: integer()) ->
310 mongoose_rdbms:query_result().
311 execute_get_idxs_of_own_nodes_with_pending_subs(LS, Aff, LU, Sub) ->
312
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_idxs_of_own_nodes_with_pending_subs,
313 [Aff, LU, LS, Sub]).
314
315 % ------------------- Affiliations --------------------------------
316 -spec execute_get_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(),
317 LS :: jid:lserver()) -> mongoose_rdbms:query_result().
318 execute_get_affiliation(Nidx, LU, LS) ->
319
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_affiliation, [Nidx, LU, LS]).
320
321 -spec execute_delete_affiliation(Nidx :: mod_pubsub:nodeIdx(), LU :: jid:luser(),
322 LS :: jid:lserver()) -> mongoose_rdbms:query_result().
323 execute_delete_affiliation(Nidx, LU, LS) ->
324
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_affiliation, [Nidx, LU, LS]).
325
326 -spec execute_delete_all_affiliations(Nidx :: mod_pubsub:nodeIdx()) ->
327 mongoose_rdbms:query_result().
328 execute_delete_all_affiliations(Nidx) ->
329
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_affiliations, [Nidx]).
330
331 % ------------------- Subscriptions --------------------------------
332 -spec execute_insert_subscription(Nidx :: mod_pubsub:nodeIdx(),
333 LU :: jid:luser(),
334 LS :: jid:lserver(),
335 LR :: jid:lresource(),
336 Sub :: integer(),
337 SubId :: mod_pubsub:subId(),
338 EncodedOpts :: iodata()) ->
339 mongoose_rdbms:query_result().
340 execute_insert_subscription(Nidx, LU, LS, LR, Sub, SubId, EncodedOpts) ->
341
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_insert_subscription,
342 [Nidx, LU, LS, LR, Sub, SubId, EncodedOpts]).
343
344 -spec execute_update_subscription_opts(EncodedOpts :: iodata(),
345 Nidx :: mod_pubsub:nodeIdx(),
346 LU :: jid:luser(),
347 LS :: jid:lserver(),
348 LR :: jid:lresource(),
349 SubId :: mod_pubsub:subId()) ->
350 mongoose_rdbms:query_result().
351 execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId) ->
352
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription_opts,
353 [EncodedOpts, Nidx, LU, LS, LR, SubId]).
354
355 -spec execute_get_node_subs(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
356 execute_get_node_subs(Nidx) ->
357
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_node_subs, [Nidx]).
358
359 -spec execute_get_node_entity_subs(LS :: jid:lserver(),
360 Nidx :: mod_pubsub:nodeIdx(),
361 LU :: jid:luser(),
362 LR :: jid:lresource()) ->
363 mongoose_rdbms:query_result().
364 execute_get_node_entity_subs(LS, Nidx, LU, LR) ->
365
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_node_entity_subs, [Nidx, LU, LS, LR]).
366
367 -spec execute_delete_subscription(LS :: jid:lserver(),
368 Nidx :: mod_pubsub:nodeIdx(),
369 LU :: jid:luser(),
370 LR :: jid:lresource(),
371 SubId :: mod_pubsub:subId()) ->
372 mongoose_rdbms:query_result().
373 execute_delete_subscription(LS, Nidx, LU, LR, SubId) ->
374
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_subscription, [Nidx, LU, LS, LR, SubId]).
375
376 -spec execute_delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
377 LU :: jid:luser(),
378 LS :: jid:lserver(),
379 LR :: jid:lresource()) ->
380 mongoose_rdbms:query_result().
381 execute_delete_all_subscriptions(Nidx, LU, LS, LR) ->
382
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_all_subscriptions, [Nidx, LU, LS, LR]).
383
384 -spec execute_delete_all_subscriptions_id(Nidx :: mod_pubsub:nodeIdx()) ->
385 mongoose_rdbms:query_result().
386 execute_delete_all_subscriptions_id(Nidx) ->
387
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_subscriptions_id, [Nidx]).
388
389 -spec execute_delete_user_subscriptions(LS :: jid:lserver(), LU :: jid:luser()) ->
390 mongoose_rdbms:query_result().
391 execute_delete_user_subscriptions(LS, LU) ->
392
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_user_subscriptions, [LU, LS]).
393
394 -spec execute_update_subscription(Subscription :: integer(),
395 Nidx :: mod_pubsub:nodeIdx(),
396 LU :: jid:luser(),
397 LS :: jid:lserver(),
398 LR :: jid:lresource(),
399 SubId :: mod_pubsub:subId()) ->
400 mongoose_rdbms:query_result().
401 execute_update_subscription(Subscription, Nidx, LU, LS, LR, SubId) ->
402
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_update_subscription,
403 [Subscription, Nidx, LU, LS, LR, SubId]).
404
405 % ------------------- Items --------------------------------
406 -spec execute_get_entity_items(LS :: jid:lserver(),
407 Nidx :: mod_pubsub:nodeIdx(),
408 LU :: jid:lserver()) ->
409 mongoose_rdbms:query_result().
410 execute_get_entity_items(LS, Nidx, LU) ->
411
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_get_entity_items, [Nidx, LU, LS]).
412
413 -spec execute_delete_item(LS :: jid:lserver(),
414 Nidx :: mod_pubsub:nodeIdx(),
415 LU :: jid:luser(),
416 ItemId :: mod_pubsub:itemId()) ->
417 mongoose_rdbms:query_result().
418 execute_delete_item(LS, Nidx, LU, ItemId) ->
419
:-(
mongoose_rdbms:execute_successfully(LS, pubsub_delete_item, [Nidx, LU, LS, ItemId]).
420
421 -spec execute_delete_all_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
422 execute_delete_all_items(Nidx) ->
423
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_all_items, [Nidx]).
424
425 -spec execute_get_item(Nidx :: mod_pubsub:nodeIdx(),
426 ItemId :: mod_pubsub:itemId()) ->
427 mongoose_rdbms:query_result().
428 execute_get_item(Nidx, ItemId) ->
429
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_item, [Nidx, ItemId]).
430
431 -spec execute_get_items(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
432 execute_get_items(Nidx) ->
433
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_items, [Nidx]).
434
435 -spec execute_get_items(Nidx :: mod_pubsub:nodeIdx(), Limit :: pos_integer()) ->
436 mongoose_rdbms:query_result().
437 execute_get_items(Nidx, Limit) ->
438
:-(
Args = rdbms_queries:add_limit_arg(Limit, [Nidx]),
439
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_items_limit, Args).
440
441 -spec execute_del_item(Nidx :: mod_pubsub:nodeIdx(),
442 ItemId :: mod_pubsub:itemId()) ->
443 mongoose_rdbms:query_result().
444 execute_del_item(Nidx, ItemId) ->
445
:-(
mongoose_rdbms:execute_successfully(global, pubsub_del_item, [Nidx, ItemId]).
446
447 % ------------------- Nodes --------------------------------
448 -spec execute_insert_pubsub_node(binary(), binary(), binary(), iodata(), iodata()) ->
449 mongoose_rdbms:query_result().
450 execute_insert_pubsub_node(Key, Name, Type, Owners, Options) ->
451
:-(
mongoose_rdbms:execute_successfully(global, pubsub_insert_node,
452 [Key, Name, Type, Owners, Options]).
453
454 -spec execute_insert_parent(binary(), binary()) -> mongoose_rdbms:query_result().
455 execute_insert_parent(Name, ParentName) ->
456
:-(
mongoose_rdbms:execute_successfully(global, pubsub_insert_parent,
457 [Name, ParentName]).
458
459 -spec execute_update_pubsub_node(Type :: binary(),
460 OwnersJid :: iodata(),
461 Opts :: iodata(),
462 Nidx :: mod_pubsub:nodeIdx()) ->
463 mongoose_rdbms:query_result().
464 execute_update_pubsub_node(Type, OwnersJid, Opts, Nidx) ->
465
:-(
mongoose_rdbms:execute_successfully(global, pubsub_update_pubsub_node,
466 [Type, OwnersJid, Opts, Nidx]).
467
468 -spec execute_select_node_by_key_and_name(Key :: binary() | jid:ljid(),
469 Node :: mod_pubsub:nodeId()) ->
470 mongoose_rdbms:query_result().
471 execute_select_node_by_key_and_name(Key, Node) ->
472
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_key_and_name,
473 [Key, Node]).
474
475 -spec execute_select_node_by_id(Nidx :: mod_pubsub:nodeIdx()) -> mongoose_rdbms:query_result().
476 execute_select_node_by_id(Nidx) ->
477
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_by_id, [Nidx]).
478
479 -spec execute_select_nodes_by_key(Key :: binary() | jid:ljid()) ->
480 mongoose_rdbms:query_result().
481 execute_select_nodes_by_key(Key) ->
482
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_key, [Key]).
483
484 -spec execute_select_nodes_by_affiliated_user(LU :: jid:luser(), LS :: jid:lserver()) ->
485 mongoose_rdbms:query_result().
486 execute_select_nodes_by_affiliated_user(LU, LS) ->
487
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_nodes_by_affiliated_user, [LU, LS]).
488
489 -spec execute_select_subnodes(Key :: binary(),
490 Node :: mod_pubsub:nodeId() | <<>>) ->
491 mongoose_rdbms:query_result().
492 execute_select_subnodes(Key, <<>>) ->
493
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_top_level_nodes, [Key]);
494 execute_select_subnodes(Key, Node) ->
495
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_subnodes, [Node, Key]).
496
497 -spec execute_select_node_with_parents(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
498 mongoose_rdbms:query_result().
499 execute_select_node_with_parents(Key, Name) ->
500
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_parents, [Key, Name]).
501
502 -spec execute_select_node_with_children(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
503 mongoose_rdbms:query_result().
504 execute_select_node_with_children(Key, Name) ->
505
:-(
mongoose_rdbms:execute_successfully(global, pubsub_select_node_with_children, [Key, Name]).
506
507 -spec execute_delete_node(Key :: binary(), Node :: mod_pubsub:nodeId()) ->
508 mongoose_rdbms:query_result().
509 execute_delete_node(Key, Node) ->
510
:-(
mongoose_rdbms:execute_successfully(global, pubsub_delete_node, [Key, Node]).
511
512 -spec execute_del_parents(Name :: mod_pubsub:nodeId()) -> mongoose_rdbms:query_result().
513 execute_del_parents(Name) ->
514
:-(
mongoose_rdbms:execute_successfully(global, pubsub_del_parents, [Name]).
515
516 % ------------------- GDPR --------------------------------
517 -spec execute_get_user_items(LU :: jid:luser(), LS :: jid:lserver()) ->
518 mongoose_rdbms:query_result().
519 execute_get_user_items(LU, LS) ->
520
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_user_items, [LU, LS]).
521
522 -spec execute_select_nodes_by_owner(LJID :: binary()) -> mongoose_rdbms:query_result().
523 execute_select_nodes_by_owner(LJID) ->
524
:-(
case mongoose_rdbms:db_engine(global) of
525 pgsql ->
526
:-(
mongoose_rdbms:execute_successfully(global,
527 pubsub_select_nodes_by_owner, [LJID]);
528 _ ->
529
:-(
mongoose_rdbms:execute_successfully(global,
530 pubsub_select_nodes_by_owner, [iolist_to_binary(["[\"", LJID, "\"]"])])
531 end.
532
533 -spec execute_get_user_subscriptions(LU :: jid:luser(), LS :: jid:lserver()) ->
534 mongoose_rdbms:query_result().
535 execute_get_user_subscriptions(LU, LS) ->
536
:-(
mongoose_rdbms:execute_successfully(global, pubsub_get_user_subscriptions, [LU, LS]).
537
538 %% ------------------------ Fun execution ------------------------
539
540 transaction(Fun, ErrorDebug) ->
541
:-(
case mongoose_rdbms:sql_transaction(global, mod_pubsub_db:extra_debug_fun(Fun)) of
542 {atomic, Result} ->
543
:-(
Result;
544 {aborted, ReasonData} ->
545
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, transaction_failed)
546 end.
547
548 dirty(Fun, ErrorDebug) ->
549
:-(
try mongoose_rdbms:sql_dirty(global, mod_pubsub_db:extra_debug_fun(Fun)) of
550 Result ->
551
:-(
Result
552 catch
553 _C:ReasonData ->
554
:-(
mod_pubsub_db:db_error(ReasonData, ErrorDebug, dirty_failed)
555 end.
556
557 %% ------------------------ Direct #pubsub_state access ------------------------
558 %% TODO: Functions for direct #pubsub_access are currently inefficient for RDBMS
559 %% - refactor them or remove as many of them as possible from the API at some point
560 -spec get_state(Nidx :: mod_pubsub:nodeIdx(),
561 LJID :: jid:ljid()) ->
562 {ok, mod_pubsub:pubsubState()}.
563 get_state(Nidx, LJID) ->
564
:-(
{ok, ItemIds} = get_entity_items(Nidx, LJID),
565
:-(
{ok, Affiliation} = get_affiliation(Nidx, LJID),
566
:-(
{ok, Subscriptions} = get_node_entity_subscriptions(Nidx, LJID),
567
:-(
{ok, #pubsub_state{
568 stateid = {LJID, Nidx},
569 items = ItemIds,
570 affiliation = Affiliation,
571 subscriptions = Subscriptions
572 }}.
573
574 -spec get_states(Nidx :: mod_pubsub:nodeIdx()) ->
575 {ok, [mod_pubsub:pubsubState()]}.
576 get_states(Nidx) ->
577
:-(
{selected, ItemRows} = execute_get_item_rows_id(Nidx),
578
:-(
{selected, AffiliationRows} = execute_get_affiliation_rows_id(Nidx),
579
:-(
{selected, SubRows} = execute_get_subscriptions_rows_id(Nidx),
580
:-(
States = build_states(ItemRows, AffiliationRows, SubRows),
581
:-(
{ok, States}.
582
583 -spec get_states_by_lus(LJID :: jid:ljid()) ->
584 {ok, [mod_pubsub:pubsubState()]}.
585 get_states_by_lus({ LU, LS, _ }) ->
586
:-(
{selected, ItemRows} = execute_get_item_rows(LU, LS),
587
:-(
{selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS),
588
:-(
{selected, SubRows} = execute_get_subscriptions_rows(LU, LS),
589
:-(
States = build_states(ItemRows, AffiliationRows, SubRows),
590
:-(
{ok, States}.
591
592 -spec get_states_by_bare(LJID :: jid:ljid()) ->
593 {ok, [mod_pubsub:pubsubState()]}.
594 get_states_by_bare({ LU, LS, _ }) ->
595
:-(
{selected, ItemRows} = execute_get_item_rows(LU, LS),
596
:-(
{selected, AffiliationRows} = execute_get_affiliation_rows(LU, LS),
597
:-(
{selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, <<>>),
598
:-(
States = build_states(ItemRows, AffiliationRows, SubRows),
599
:-(
{ok, States}.
600
601 -spec get_states_by_bare_and_full(LJID :: jid:ljid()) ->
602 {ok, [mod_pubsub:pubsubState()]}.
603 get_states_by_bare_and_full({ LU, LS, LR } = LJID) ->
604
:-(
{ok, StatesBare} = get_states_by_bare(LJID),
605
:-(
{selected, SubRows} = execute_get_subscriptions_rows_resource(LU, LS, LR),
606
:-(
StatesFull = build_states([], [], SubRows),
607
:-(
{ok, StatesFull ++ StatesBare}.
608
609 -spec get_idxs_of_own_nodes_with_pending_subs(LJID :: jid:ljid()) ->
610 {ok, [mod_pubsub:nodeIdx()]}.
611 get_idxs_of_own_nodes_with_pending_subs({ LU, LS, _ }) ->
612
:-(
{selected, Rows} =
613 execute_get_idxs_of_own_nodes_with_pending_subs(LS, aff2int(owner), LU, sub2int(pending)),
614
:-(
{ok, [ mongoose_rdbms:result_to_integer(Nidx) || {Nidx} <- Rows ]}.
615
616 %% ------------------------ Direct #pubsub_item access ------------------------
617
618 -spec get_items(Nidx :: mod_pubsub:nodeIdx(), gen_pubsub_node:get_item_options()) ->
619 {ok, {[mod_pubsub:pubsubItem()], none}}.
620 get_items(Nidx, Opts) ->
621
:-(
MaxItems = maps:get(max_items, Opts, undefined),
622
:-(
ItemIds = maps:get(item_ids, Opts, undefined),
623
:-(
Rows = get_item_rows(Nidx, MaxItems, ItemIds),
624
:-(
Result = [item_to_record(Row) || Row <- Rows],
625
:-(
{ok, {Result, none}}.
626
627 -spec get_item_rows(Nidx :: mod_pubsub:nodeIdx(),
628 MaxItems :: undefined | non_neg_integer(),
629 ItemIds :: undefined | mod_pubsub:itemId()) -> [tuple()].
630 get_item_rows(Nidx, undefined, undefined) ->
631
:-(
{selected, Rows} = execute_get_items(Nidx),
632
:-(
Rows;
633 get_item_rows(Nidx, MaxItems, undefined) ->
634
:-(
{selected, Rows} = execute_get_items(Nidx, MaxItems),
635
:-(
Rows;
636 get_item_rows(Nidx, MaxItems, ItemIds) ->
637 %% Returned items have same order as ItemIds
638
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, []).
639
640
:-(
get_item_rows_acc(_Nidx, _MaxItems, [], AccRows) -> AccRows;
641 get_item_rows_acc(Nidx, MaxItems, [ItemId | ItemIds], AccRows) ->
642
:-(
case execute_get_item(Nidx, ItemId) of
643 {selected, []} ->
644
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, AccRows);
645 {selected, [Item]} when MaxItems =:= undefined;
646 length(AccRows) < MaxItems ->
647
:-(
get_item_rows_acc(Nidx, MaxItems, ItemIds, [Item | AccRows]);
648 {selected, [_]} ->
649
:-(
AccRows
650 end.
651
652 -spec get_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) ->
653 {ok, mod_pubsub:pubsubItem()} | {error, item_not_found}.
654 get_item(Nidx, ItemId) ->
655
:-(
case execute_get_item(Nidx, ItemId) of
656 {selected, []} ->
657
:-(
{error, item_not_found};
658 {selected, [Item]} ->
659
:-(
{ok, item_to_record(Item)}
660 end.
661
662 -spec set_item(Item :: mod_pubsub:pubsubItem()) -> ok.
663 set_item(#pubsub_item{itemid = {ItemId, NodeIdx},
664 creation = {CreatedAtNow, {CreatedLUser, CreatedLServer, _}},
665 modification = {ModifiedAtNow, {ModifiedLUser, ModifiedLServer, ModifiedLResource}},
666 publisher = PublisherIn,
667 payload = Payload}) ->
668
:-(
PayloadWrapped = #xmlel{name = <<"item">>, children = Payload},
669
:-(
PayloadXML = exml:to_binary(PayloadWrapped),
670
:-(
CreatedAt = CreatedAtNow,
671
:-(
ModifiedAt = ModifiedAtNow,
672
:-(
Publisher = null_or_bin_jid(PublisherIn),
673
:-(
InsertParams = [NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt,
674 ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
675 Publisher, PayloadXML],
676
:-(
UpdateParams = [ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
677 Publisher, PayloadXML],
678
:-(
UniqueKeyValues = [NodeIdx, ItemId],
679
:-(
{updated, _} = rdbms_queries:execute_upsert(global, pubsub_item_upsert,
680 InsertParams, UpdateParams, UniqueKeyValues),
681
:-(
ok.
682
683 -spec del_item(Nidx :: mod_pubsub:nodeIdx(), ItemId :: mod_pubsub:itemId()) -> ok.
684 del_item(Nidx, ItemId) ->
685
:-(
{updated, _} = execute_del_item(Nidx, ItemId),
686
:-(
ok.
687
688 -spec del_items(Nidx :: mod_pubsub:nodeIdx(), [ItemId :: mod_pubsub:itemId()]) -> ok.
689 del_items(_, []) ->
690
:-(
ok;
691 del_items(Nidx, ItemIds) ->
692
:-(
[{updated, _} = execute_del_item(Nidx, Item) || Item <- ItemIds],
693
:-(
ok.
694
695 % ------------------- Node management --------------------------------
696
697 -spec create_node(Nidx :: mod_pubsub:nodeIdx(), Owner :: jid:ljid()) -> ok.
698 create_node(Nidx, LJID) ->
699
:-(
set_affiliation(Nidx, LJID, owner).
700
701 -spec del_node(Nidx :: mod_pubsub:nodeIdx()) ->
702 {ok, [mod_pubsub:pubsubState()]}.
703 del_node(Nidx) ->
704
:-(
{ok, States} = get_states(Nidx),
705
:-(
{updated, _} = execute_delete_all_subscriptions_id(Nidx),
706
:-(
{updated, _} = execute_delete_all_items(Nidx),
707
:-(
{updated, _} = execute_delete_all_affiliations(Nidx),
708
:-(
{ok, States}.
709
710 -spec set_node(Node :: mod_pubsub:pubsubNode()) -> {ok, mod_pubsub:nodeIdx()}.
711 set_node(#pubsub_node{nodeid = {Key, Name}, id = undefined, type = Type,
712 owners = Owners, options = Opts, parents = Parents}) ->
713
:-(
ExtKey = encode_key(Key),
714
:-(
ExtOwners = jiffy:encode([jid:to_binary(Owner) || Owner <- Owners]),
715
:-(
ExtOpts = jiffy:encode({Opts}),
716
:-(
{updated, 1} = execute_insert_pubsub_node(ExtKey, Name, Type, ExtOwners, ExtOpts),
717
:-(
{selected, [Row]} = execute_select_node_by_key_and_name(ExtKey, Name),
718
:-(
#pubsub_node{id = Nidx} = decode_pubsub_node_row(Row),
719
:-(
set_parents(Name, Parents),
720
:-(
{ok, Nidx};
721
722 set_node(#pubsub_node{nodeid = {_, Name}, id = Nidx, type = Type,
723 owners = Owners, options = Opts, parents = Parents}) ->
724
:-(
OwnersJid = [jid:to_binary(Owner) || Owner <- Owners],
725
:-(
execute_update_pubsub_node(Type, jiffy:encode(OwnersJid), jiffy:encode({Opts}), Nidx),
726
:-(
execute_del_parents(Name),
727
:-(
set_parents(Name, Parents),
728
:-(
{ok, Nidx}.
729
730 -spec set_parents(mod_pubsub:nodeId(), [mod_pubsub:nodeId()]) -> ok.
731 set_parents(Name, Parents) ->
732
:-(
[execute_insert_parent(Name, ParentName) || ParentName <- Parents],
733
:-(
ok.
734
735 -spec find_node_by_id(Nidx :: mod_pubsub:nodeIdx()) ->
736 {error, not_found} | {ok, mod_pubsub:pubsubNode()}.
737 find_node_by_id(Nidx) ->
738
:-(
case execute_select_node_by_id(Nidx) of
739 {selected, []} ->
740
:-(
{error, not_found};
741 {selected, [Row]} ->
742
:-(
{ok, decode_pubsub_node_row(Row)}
743 end.
744
745 -spec find_node_by_name(Key :: mod_pubsub:hostPubsub() | jid:ljid(),
746 Node :: mod_pubsub:nodeId()) ->
747 mod_pubsub:pubsubNode() | false.
748 find_node_by_name(Key, Node) ->
749
:-(
case execute_select_node_by_key_and_name(encode_key(Key), Node) of
750 {selected, [Row]} ->
751
:-(
decode_pubsub_node_row(Row);
752 {selected, []} ->
753
:-(
false
754 end.
755
756 decode_pubsub_node_row({Nidx, KeySQL, Name, Type, Owners, Options}) ->
757
:-(
Key = decode_key(KeySQL),
758
:-(
{DecodedOpts} = jiffy:decode(Options),
759
:-(
DecodedOptions = [maybe_option_value_to_atom(key_to_existing_atom(Item)) ||
760
:-(
Item <- DecodedOpts],
761
:-(
DecodedOwners = [jid:to_lower(jid:from_binary(Owner)) ||
762
:-(
Owner <- jiffy:decode(Owners)],
763
:-(
#pubsub_node{nodeid = {Key, Name},
764 id = mongoose_rdbms:result_to_integer(Nidx),
765 type = Type,
766 owners = DecodedOwners,
767 options = DecodedOptions}.
768
769 maybe_option_value_to_atom({access_model, Value}) ->
770
:-(
{access_model, binary_to_existing_atom(Value, utf8)};
771 maybe_option_value_to_atom({publish_model, Value}) ->
772
:-(
{publish_model, binary_to_existing_atom(Value, utf8)};
773 maybe_option_value_to_atom({notification_type, Value}) ->
774
:-(
{notification_type, binary_to_existing_atom(Value, utf8)};
775 maybe_option_value_to_atom({node_type, Value}) ->
776
:-(
{node_type, binary_to_existing_atom(Value, utf8)};
777 maybe_option_value_to_atom({send_last_published_item, Value}) ->
778
:-(
{send_last_published_item, binary_to_existing_atom(Value, utf8)};
779 maybe_option_value_to_atom(Other) ->
780
:-(
Other.
781
782 -spec find_nodes_by_key(Key :: mod_pubsub:hostPubsub() | jid:ljid()) ->
783 [mod_pubsub:pubsubNode()].
784 find_nodes_by_key(Key) ->
785
:-(
{selected, Rows} =
786 execute_select_nodes_by_key(encode_key(Key)),
787
:-(
[decode_pubsub_node_row(Row) || Row <- Rows].
788
789 -spec delete_node(Node :: mod_pubsub:pubsubNode()) -> ok.
790 delete_node(#pubsub_node{nodeid = {Key, Node}}) ->
791
:-(
{updated, _} = execute_delete_node(encode_key(Key), Node),
792
:-(
ok.
793
794 -spec get_subnodes(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId() | <<>>) ->
795 [mod_pubsub:pubsubNode()].
796 get_subnodes(Key, Node) ->
797
:-(
{selected, Rows} = execute_select_subnodes(encode_key(Key), Node),
798
:-(
[decode_pubsub_node_row(Row) || Row <- Rows].
799
800 -spec get_parentnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
801 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
802 get_parentnodes_tree(Key, Node) ->
803
:-(
find_nodes_with_parents(Key, [Node], 0, []).
804
805 find_nodes_with_parents(_, [], _, Acc) ->
806
:-(
Acc;
807 find_nodes_with_parents(_, _, 100, Acc) ->
808
:-(
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}),
809
:-(
Acc;
810 find_nodes_with_parents(Key, Nodes, Depth, Acc) ->
811
:-(
ExtKey = encode_key(Key),
812
:-(
Rows = lists:flatmap(fun(Name) ->
813
:-(
{selected, Rs} = execute_select_node_with_parents(ExtKey, Name),
814
:-(
Rs
815 end, Nodes),
816
:-(
Map = lists:foldl(fun update_nodes_map/2, #{}, Rows),
817
:-(
MapTransformer = fun(Nidx, #{name := NodeName,
818 next_level := Parents}, {ParentsAcc, NodesAcc}) ->
819
:-(
NewParents = [Parents | ParentsAcc],
820
:-(
Node = #pubsub_node{id = Nidx,
821 nodeid = {Key, NodeName},
822 parents = Parents},
823
:-(
NewNodes = [Node | NodesAcc],
824
:-(
{NewParents, NewNodes}
825 end,
826
827
:-(
{Parents, NewNodes} = maps:fold(MapTransformer, {[], []}, Map),
828
:-(
NewAcc = [{Depth, NewNodes} | Acc],
829
:-(
find_nodes_with_parents(Key, lists:flatten(Parents), Depth + 1, NewAcc).
830
831 update_nodes_map({NidxSQL, NodeName, NextLevelNodeName}, Map) ->
832
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSQL),
833
:-(
case maps:get(Nidx, Map, undefined) of
834 undefined ->
835
:-(
NewNode = #{name => NodeName,
836 next_level => maybe_add_parent_to_list(NextLevelNodeName, [])},
837
:-(
Map#{Nidx => NewNode};
838 #{next_level := NextLevelNodes} = Node ->
839
:-(
UpdatedNode = Node#{next_level := maybe_add_parent_to_list(NextLevelNodeName, NextLevelNodes)},
840
:-(
Map#{Nidx := UpdatedNode}
841 end.
842
843 maybe_add_parent_to_list(null, List) ->
844
:-(
List;
845 maybe_add_parent_to_list(ParentName, List) ->
846
:-(
[ParentName | List].
847
848 -spec get_subnodes_tree(Key :: mod_pubsub:hostPubsub() | jid:ljid(), Node :: mod_pubsub:nodeId()) ->
849 [{Depth::non_neg_integer(), Nodes::[mod_pubsub:pubsubNode(), ...]}].
850 get_subnodes_tree(Key, Node) ->
851
:-(
find_subnodes(Key, [Node], 0, []).
852
853 find_subnodes(_Key, [], _, Acc) ->
854
:-(
Acc;
855 find_subnodes(_, _, 100, Acc) ->
856
:-(
?LOG_WARNING(#{what => pubsub_max_depth_reached, pubsub_nodes => Acc}),
857
:-(
Acc;
858 find_subnodes(Key, Nodes, Depth, Acc) ->
859
:-(
ExtKey = encode_key(Key),
860
:-(
Rows = lists:flatmap(fun(Name) ->
861
:-(
{selected, Rs} = execute_select_node_with_children(ExtKey, Name),
862
:-(
Rs
863 end, Nodes),
864
:-(
Map = lists:foldl(fun update_nodes_map/2, #{}, Rows),
865
:-(
MapTransformer = fun(Nidx, #{name := NodeName,
866 next_level := Subnodes}, {SubnodesAcc, NodesAcc}) ->
867
:-(
NewSubnodes = [Subnodes | SubnodesAcc],
868
:-(
Node = #pubsub_node{id = Nidx,
869 nodeid = {Key, NodeName}},
870
:-(
NewNodes = [Node | NodesAcc],
871
:-(
{NewSubnodes, NewNodes}
872 end,
873
:-(
{Subnodes, NewNodes} = maps:fold(MapTransformer, {[], []}, Map),
874
:-(
NewAcc = [{Depth, NewNodes} | Acc],
875
:-(
find_subnodes(Key, lists:flatten(Subnodes), Depth + 1, NewAcc).
876
877 % ------------------- Affiliations --------------------------------
878
879 -spec set_affiliation(Nidx :: mod_pubsub:nodeIdx(),
880 LJID :: jid:ljid(),
881 Affiliation :: mod_pubsub:affiliation()) -> ok.
882 set_affiliation(Nidx, { LU, LS, _ } = LJID, none) ->
883
:-(
BareLJID = jid:to_bare(LJID),
884
:-(
case get_node_entity_subscriptions(Nidx, BareLJID) of
885 {ok, []} ->
886
:-(
del_state(Nidx, BareLJID);
887 _ ->
888
:-(
delete_affiliation_wo_subs_check(Nidx, LU, LS)
889 end;
890 set_affiliation(Nidx, { LU, LS, _ }, Affiliation) ->
891
:-(
Aff = aff2int(Affiliation),
892
:-(
InsertParams = [Nidx, LU, LS, Aff],
893
:-(
UpdateParams = [Aff],
894
:-(
UniqueKeyValues = [Nidx, LU, LS],
895
:-(
{updated, _} = rdbms_queries:execute_upsert(global, pubsub_affiliation_upsert,
896 InsertParams, UpdateParams, UniqueKeyValues),
897
:-(
ok.
898
899 -spec get_affiliation(Nidx :: mod_pubsub:nodeIdx(),
900 LJID :: jid:ljid()) ->
901 {ok, mod_pubsub:affiliation()}.
902 get_affiliation(Nidx, { LU, LS, _ }) ->
903
:-(
case execute_get_affiliation(Nidx, LU, LS) of
904 {selected, [{AffInt}]} ->
905
:-(
{ok, sql2aff(AffInt)};
906 {selected, []} ->
907
:-(
{ok, none}
908 end.
909
910 % ------------------- Subscriptions --------------------------------
911
912 -spec add_subscription(Nidx :: mod_pubsub:nodeIdx(),
913 LJID :: jid:ljid(),
914 Sub :: mod_pubsub:subscription(),
915 SubId :: mod_pubsub:subId(),
916 SubOpts :: mod_pubsub:subOptions()) -> ok.
917 add_subscription(Nidx, { LU, LS, LR }, Sub, SubId, SubOpts) ->
918
:-(
EncodedOpts = jiffy:encode({SubOpts}),
919
:-(
{updated, _} = execute_insert_subscription(Nidx, LU, LS, LR, sub2int(Sub), SubId, EncodedOpts),
920
:-(
ok.
921
922 -spec set_subscription_opts(Nidx :: mod_pubsub:nodeIdx(),
923 LJID :: jid:ljid(),
924 SubId :: mod_pubsub:subId(),
925 Opts :: mod_pubsub:subOptions()) -> ok.
926 set_subscription_opts(Nidx, { LU, LS, LR }, SubId, Opts) ->
927
:-(
EncodedOpts = jiffy:encode({Opts}),
928
:-(
{updated, _} = execute_update_subscription_opts(EncodedOpts, Nidx, LU, LS, LR, SubId),
929
:-(
ok.
930
931 -spec get_node_subscriptions(Nidx :: mod_pubsub:nodeIdx()) ->
932 {ok, [{Entity :: jid:ljid(),
933 Sub :: mod_pubsub:subscription(),
934 SubId :: mod_pubsub:subId(),
935 SubOpts :: mod_pubsub:subOptions()}]}.
936 get_node_subscriptions(Nidx) ->
937
:-(
{selected, QueryResult} = execute_get_node_subs(Nidx),
938
:-(
{ok, [{{LU, LS, LR}, sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)}
939
:-(
|| {LU, LS, LR, SubInt, SubId, SubOpts} <- QueryResult ]}.
940
941 -spec get_node_entity_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
942 LJID :: jid:ljid()) ->
943 {ok, [{Sub :: mod_pubsub:subscription(),
944 SubId :: mod_pubsub:subId(),
945 SubOpts :: mod_pubsub:subOptions()}]}.
946 get_node_entity_subscriptions(Nidx, { LU, LS, LR }) ->
947
:-(
{selected, QueryResult} = execute_get_node_entity_subs(LS, Nidx, LU, LR),
948
:-(
{ok, [{sql2sub(SubInt), SubId, sql_to_sub_opts(SubOpts)}
949
:-(
|| {SubInt, SubId, SubOpts} <- QueryResult ]}.
950
951 -spec delete_subscription(Nidx :: mod_pubsub:nodeIdx(),
952 LJID :: jid:ljid(),
953 SubId :: mod_pubsub:subId()) ->
954 ok.
955 delete_subscription(Nidx, { LU, LS, LR }, SubId) ->
956
:-(
{updated, _} = execute_delete_subscription(LS, Nidx, LU, LR, SubId),
957
:-(
ok.
958
959 -spec delete_all_subscriptions(Nidx :: mod_pubsub:nodeIdx(),
960 LJID :: jid:ljid()) ->
961 ok.
962 delete_all_subscriptions(Nidx, { LU, LS, LR } = LJID) ->
963
:-(
case get_affiliation(Nidx, LJID) of
964 {ok, none} ->
965
:-(
del_state(Nidx, LJID);
966 _ ->
967
:-(
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR)
968 end,
969
:-(
ok.
970
971 -spec update_subscription(Nidx :: mod_pubsub:nodeIdx(),
972 LJID :: jid:ljid(),
973 Subscription :: mod_pubsub:subscription(),
974 SubId :: mod_pubsub:subId()) ->
975 ok.
976 update_subscription(Nidx, { LU, LS, LR }, Subscription, SubId) ->
977
:-(
{updated, _} = execute_update_subscription(sub2int(Subscription), Nidx, LU, LS, LR, SubId),
978
:-(
ok.
979
980 % ------------------- Items --------------------------------
981
982 -spec add_item(Nidx :: mod_pubsub:nodeIdx(),
983 LJID :: jid:ljid(),
984 Item :: mod_pubsub:pubsubItem()) ->
985 ok.
986 add_item(_Nidx, _, Item) ->
987
:-(
set_item(Item),
988
:-(
ok.
989
990 %% TODO: Make public at some point
991 -spec get_entity_items(Nidx :: mod_pubsub:nodeIdx(),
992 LJID :: jid:ljid()) ->
993 {ok, [mod_pubsub:itemId()]}.
994 get_entity_items(Nidx, { LU, LS, _ }) ->
995
:-(
{selected, ItemIds} = execute_get_entity_items(LS, Nidx, LU),
996
:-(
{ok, [ ItemId || {ItemId} <- ItemIds]}.
997
998 -spec remove_items(Nidx :: mod_pubsub:nodeIdx(),
999 LJID :: jid:ljid(),
1000 ItemIds :: [mod_pubsub:itemId()]) ->
1001 ok.
1002 remove_items(Nidx, { LU, LS, _ }, ItemIds) ->
1003
:-(
lists:foreach(fun(ItemId) ->
1004
:-(
{updated, _} = execute_delete_item(LS, Nidx, LU, ItemId)
1005 end, ItemIds).
1006
1007 -spec remove_all_items(Nidx :: mod_pubsub:nodeIdx()) ->
1008 ok.
1009 remove_all_items(Nidx) ->
1010
:-(
{updated, _} = execute_delete_all_items(Nidx),
1011
:-(
ok.
1012
1013 % ------------------- GDPR-related --------------------------------
1014
1015 get_user_payloads(LUser, LServer) ->
1016
:-(
case execute_get_user_items(LUser, LServer) of
1017 {selected, Items} ->
1018
:-(
[[NodeName, ItemId, strip_payload(PayloadDB)] || {NodeName, ItemId, PayloadDB} <- Items]
1019 end.
1020
1021 get_user_nodes(LUser, LServer) ->
1022
:-(
LJID = jid:to_binary({LUser, LServer, <<>>}),
1023
:-(
{selected, Nodes} = execute_select_nodes_by_owner(LJID),
1024
:-(
lists:map(fun tuple_to_list/1, Nodes).
1025
1026 get_user_subscriptions(LUser, LServer) ->
1027
:-(
{selected, Nodes} = execute_get_user_subscriptions(LUser, LServer),
1028
:-(
lists:map(fun tuple_to_list/1, Nodes).
1029
1030 strip_payload(PayloadDB) ->
1031
:-(
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB),
1032
:-(
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML),
1033
:-(
exml:to_binary(Payload).
1034
1035 -spec delete_user_subscriptions(jid:ljid()) -> ok.
1036 delete_user_subscriptions({ LU, LS, _ }) ->
1037
:-(
{updated, _} = execute_delete_user_subscriptions(LS, LU),
1038
:-(
ok.
1039
1040 find_nodes_by_affiliated_user({ LU, LS, _ }) ->
1041
:-(
{selected, NodesWithAffs} = execute_select_nodes_by_affiliated_user(LU, LS),
1042
:-(
lists:map(fun decode_pubsub_node_with_aff_row/1, NodesWithAffs).
1043
1044 decode_pubsub_node_with_aff_row(Row) ->
1045
:-(
[Aff | NodeRow] = tuple_to_list(Row),
1046
:-(
{decode_pubsub_node_row(list_to_tuple(NodeRow)), sql2aff(Aff)}.
1047
1048 %%====================================================================
1049 %% Helpers
1050 %%====================================================================
1051
1052 -spec item_columns() -> binary().
1053 item_columns() ->
1054
:-(
<<"nidx, itemid, created_luser, created_lserver, created_at, "
1055 "modified_luser, modified_lserver, modified_lresource, modified_at, "
1056 "publisher, payload">>.
1057
1058 -spec pubsub_node_fields() -> binary().
1059 pubsub_node_fields() ->
1060
:-(
<<"nidx, p_key, name, type, owners, options">>.
1061
1062 -spec pubsub_node_fields_pn() -> binary().
1063 pubsub_node_fields_pn() ->
1064
:-(
<<"pn.nidx, pn.p_key, pn.name, pn.type, pn.owners, pn.options">>.
1065
1066 -spec del_state(Nidx :: mod_pubsub:nodeIdx(),
1067 LJID :: jid:ljid()) -> ok.
1068 del_state(Nidx, {LU, LS, LR}) ->
1069
:-(
delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR),
1070
:-(
delete_affiliation_wo_subs_check(Nidx, LU, LS),
1071
:-(
ok.
1072
1073 -spec delete_all_subscriptions_wo_aff_check(Nidx :: mod_pubsub:nodeIdx(),
1074 LU :: jid:luser(),
1075 LS :: jid:lserver(),
1076 LR :: jid:lresource()) -> ok.
1077 delete_all_subscriptions_wo_aff_check(Nidx, LU, LS, LR) ->
1078
:-(
{updated, _} = execute_delete_all_subscriptions(Nidx, LU, LS, LR),
1079
:-(
ok.
1080
1081 -spec delete_affiliation_wo_subs_check(Nidx :: mod_pubsub:nodeIdx(),
1082 LU :: jid:luser(),
1083 LS :: jid:lserver()) -> ok.
1084 delete_affiliation_wo_subs_check(Nidx, LU, LS) ->
1085
:-(
{updated, _} = execute_delete_affiliation(Nidx, LU, LS),
1086
:-(
ok.
1087
1088 -type item_row() :: { NidxSql :: integer() | binary(),
1089 LU :: binary(),
1090 LS :: binary(),
1091 ItemId :: binary() }.
1092 -type aff_row() :: { NidxSql :: integer() | binary(),
1093 LU :: binary(),
1094 LS :: binary(),
1095 AffInt :: integer() }.
1096 -type sub_row() :: { NidxSql :: integer() | binary(),
1097 LU :: binary(),
1098 LS :: binary(),
1099 LR :: binary(),
1100 TypeInt :: integer(),
1101 SubId :: binary() }.
1102
1103 -spec build_states(ItemRows :: [item_row()], AffRows :: [aff_row()], SubRows :: [sub_row()]) ->
1104 [mod_pubsub:pubsubState()].
1105 build_states(ItemRows, AffRows, SubRows) ->
1106
:-(
Result1 = item_rows_to_states(ItemRows, #{}),
1107
:-(
Result2 = aff_rows_to_states(AffRows, Result1),
1108
:-(
maps:values(sub_rows_to_states(SubRows, Result2)).
1109
1110 item_rows_to_states([], Acc) ->
1111
:-(
Acc;
1112 item_rows_to_states([{ NidxSql, LU, LS, ItemId } | RRows], Acc) ->
1113
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1114
:-(
LJID = { LU, LS, <<>> },
1115
:-(
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1116
:-(
#pubsub_state{ items = Items0 } = PS,
1117
:-(
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ items = [ItemId | Items0] } },
1118
:-(
item_rows_to_states(RRows, NAcc).
1119
1120 aff_rows_to_states([], Acc) ->
1121
:-(
Acc;
1122 aff_rows_to_states([{ NidxSql, LU, LS, AffInt } | RRows], Acc) ->
1123
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1124
:-(
LJID = { LU, LS, <<>> },
1125
:-(
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1126
:-(
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{ affiliation = sql2aff(AffInt) } },
1127
:-(
aff_rows_to_states(RRows, NAcc).
1128
1129 sub_rows_to_states([], Acc) ->
1130
:-(
Acc;
1131 sub_rows_to_states([{ NidxSql, LU, LS, LR, TypeInt, SubId } | RRows], Acc) ->
1132
:-(
Nidx = mongoose_rdbms:result_to_integer(NidxSql),
1133
:-(
LJID = { LU, LS, LR },
1134
:-(
PS = maps:get({LJID, Nidx}, Acc, #pubsub_state{ stateid = {LJID, Nidx} }),
1135
:-(
#pubsub_state{ subscriptions = Subs0 } = PS,
1136
:-(
NAcc = Acc#{ {LJID, Nidx} => PS#pubsub_state{
1137 subscriptions = [{sql2sub(TypeInt), SubId} | Subs0] } },
1138
:-(
sub_rows_to_states(RRows, NAcc).
1139
1140 -spec aff2int(mod_pubsub:affiliation()) -> integer().
1141
:-(
aff2int(none) -> 0;
1142
:-(
aff2int(owner) -> 1;
1143
:-(
aff2int(publisher) -> 2;
1144
:-(
aff2int(publish_only) -> 3;
1145
:-(
aff2int(member) -> 4;
1146
:-(
aff2int(outcast) -> 5.
1147
1148 -spec sql2aff(integer() | binary()) -> mod_pubsub:affiliation().
1149 sql2aff(SqlInt) ->
1150
:-(
int2aff(mongoose_rdbms:result_to_integer(SqlInt)).
1151
1152 -spec int2aff(integer()) -> mod_pubsub:affiliation().
1153
:-(
int2aff(0) -> none;
1154
:-(
int2aff(1) -> owner;
1155
:-(
int2aff(2) -> publisher;
1156
:-(
int2aff(3) -> publish_only;
1157
:-(
int2aff(4) -> member;
1158
:-(
int2aff(5) -> outcast.
1159
1160 -spec sub2int(mod_pubsub:subscription()) -> integer().
1161
:-(
sub2int(none) -> 0;
1162
:-(
sub2int(pending) -> 1;
1163
:-(
sub2int(subscribed) -> 3.
1164
1165 -spec sql2sub(integer() | binary()) -> mod_pubsub:subscription().
1166 sql2sub(SqlInt) ->
1167
:-(
int2sub(mongoose_rdbms:result_to_integer(SqlInt)).
1168
1169 -spec int2sub(integer()) -> mod_pubsub:subscription().
1170
:-(
int2sub(0) -> none;
1171
:-(
int2sub(1) -> pending;
1172
:-(
int2sub(3) -> subscribed.
1173
1174 sql_to_sub_opts(SqlOpts) ->
1175
:-(
{Opts} = jiffy:decode(SqlOpts),
1176
:-(
lists:map(fun key_to_existing_atom/1, Opts).
1177
1178 item_to_record({NodeIdx, ItemId, CreatedLUser, CreatedLServer, CreatedAt,
1179 ModifiedLUser, ModifiedLServer, ModifiedLResource, ModifiedAt,
1180 PublisherIn, PayloadDB}) ->
1181
:-(
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB),
1182
:-(
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML),
1183
:-(
ItemAndNodeId = {ItemId, mongoose_rdbms:result_to_integer(NodeIdx)},
1184
:-(
Creation = {mongoose_rdbms:result_to_integer(CreatedAt),
1185 {CreatedLUser, CreatedLServer, <<>>}},
1186
:-(
Modification = {mongoose_rdbms:result_to_integer(ModifiedAt),
1187 {ModifiedLUser, ModifiedLServer, ModifiedLResource}},
1188
:-(
Publisher = decode_publisher(PublisherIn),
1189
:-(
#pubsub_item{itemid = ItemAndNodeId,
1190 creation = Creation,
1191 modification = Modification,
1192 publisher = Publisher,
1193 payload = Payload}.
1194
1195 decode_publisher(null) ->
1196
:-(
undefined;
1197 decode_publisher(Binary) ->
1198 %% Silently returns `error` if parsing fails.
1199
:-(
jid:from_binary(Binary).
1200
1201 encode_key(Key) when is_binary(Key) ->
1202
:-(
Key;
1203 encode_key({_, _, _} = JID) ->
1204
:-(
jid:to_binary(JID).
1205
1206 decode_key(KeySQL) ->
1207
:-(
case jid:from_binary(KeySQL) of
1208 #jid{luser = <<>>, lserver = Host, lresource = <<>>} ->
1209
:-(
Host;
1210 #jid{luser = LUser, lserver = LServer, lresource = LResource} ->
1211
:-(
{LUser, LServer, LResource}
1212 end.
1213
1214 null_or_bin_jid(undefined) ->
1215
:-(
null;
1216 null_or_bin_jid(Jid) ->
1217
:-(
jid:to_binary(Jid).
1218
1219 key_to_existing_atom({Key, Value}) when is_atom(Key)->
1220
:-(
{Key, Value};
1221 key_to_existing_atom({Key, Value}) ->
1222
:-(
{binary_to_existing_atom(Key, utf8), Value}.
1223
Line Hits Source