./ct_report/coverage/mod_inbox_rdbms.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @copyright (C) 2018, Erlang Solutions Ltd.
3 %%% @doc
4 %%%
5 %%% @end
6 %%% Created : 30. Jan 2018 16:59
7 %%%-------------------------------------------------------------------
8 -module(mod_inbox_rdbms).
9
10 -include("mod_inbox.hrl").
11
12 -behaviour(mod_inbox_backend).
13
14 %% API
15 -export([get_inbox/4,
16 init/2,
17 set_inbox/6,
18 set_inbox_incr_unread/5,
19 reset_unread/3,
20 remove_inbox_row/2,
21 remove_domain/2,
22 clear_inbox/3,
23 get_inbox_unread/2,
24 get_entry_properties/2,
25 set_entry_properties/3]).
26 -export([check_result/1]).
27
28 -type archived() :: binary().
29 -type muted_until() :: binary().
30 -type msg_content() :: binary().
31 -type db_return() :: {jid:luser(),
32 msg_content(),
33 count_bin(),
34 binary(),
35 non_neg_integer() | binary(),
36 archived(),
37 muted_until()}.
38
39 %% ----------------------------------------------------------------------
40 %% API
41 %% ----------------------------------------------------------------------
42
43 %% TODO pools aren't multitenancy-ready yet
44 init(HostType, _Options) ->
45
:-(
RowCond = <<"WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>,
46
:-(
mongoose_rdbms:prepare(inbox_select_unread_count, inbox,
47 [luser, lserver, remote_bare_jid],
48 <<"SELECT unread_count FROM inbox ",
49 RowCond/binary>>),
50
:-(
mongoose_rdbms:prepare(inbox_select_properties, inbox,
51 [luser, lserver, remote_bare_jid],
52 <<"SELECT archive, unread_count, muted_until FROM inbox ",
53 RowCond/binary>>),
54
:-(
mongoose_rdbms:prepare(inbox_reset_unread, inbox,
55 [luser, lserver, remote_bare_jid],
56 <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary>>),
57
:-(
mongoose_rdbms:prepare(inbox_reset_unread_msg, inbox,
58 [luser, lserver, remote_bare_jid, msg_id],
59 <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary,
60 " AND msg_id = ?">>),
61
:-(
mongoose_rdbms:prepare(inbox_delete_row, inbox,
62 [luser, lserver, remote_bare_jid],
63 <<"DELETE FROM inbox ", RowCond/binary>>),
64
:-(
mongoose_rdbms:prepare(inbox_delete, inbox,
65 [luser, lserver],
66 <<"DELETE FROM inbox WHERE luser = ? AND lserver = ?">>),
67
:-(
mongoose_rdbms:prepare(inbox_delete_domain, inbox,
68 [lserver], <<"DELETE FROM inbox WHERE lserver = ?">>),
69
:-(
UniqueKeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>],
70
:-(
InsertFields =
71 UniqueKeyFields ++ [<<"content">>, <<"unread_count">>, <<"msg_id">>, <<"timestamp">>],
72
:-(
rdbms_queries:prepare_upsert(HostType, inbox_upsert, inbox,
73 InsertFields,
74 [<<"content">>, <<"unread_count">>,
75 <<"msg_id">>, <<"timestamp">>, <<"archive">>],
76 UniqueKeyFields),
77
:-(
rdbms_queries:prepare_upsert(HostType, inbox_upsert_incr_unread, inbox,
78 InsertFields,
79 [<<"content">>, <<"msg_id">>, <<"timestamp">>, <<"archive">>,
80 {<<"unread_count">>, <<"unread_count = inbox.unread_count + ?">>}],
81 UniqueKeyFields),
82
:-(
ok.
83
84 -spec get_inbox(HostType :: mongooseim:host_type(),
85 LUser :: jid:luser(),
86 LServer :: jid:lserver(),
87 Params :: mod_inbox:get_inbox_params()) -> get_inbox_res().
88 get_inbox(HostType, LUser, LServer, Params) ->
89
:-(
case get_inbox_rdbms(HostType, LUser, LServer, Params) of
90 {selected, []} ->
91
:-(
[];
92 {selected, Res} ->
93
:-(
[decode_row(HostType, R) || R <- Res]
94 end.
95
96 -spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
97 {ok, integer()}.
98 get_inbox_unread(HostType, {LUser, LServer, RemBareJID}) ->
99
:-(
Res = execute_select_unread_count(HostType, LUser, LServer, RemBareJID),
100
:-(
{ok, Val} = check_result(Res),
101 %% We read unread_count value when the message is sent and is not yet in receiver inbox
102 %% so we have to add +1
103
:-(
{ok, Val + 1}.
104
105 -spec set_inbox(HostType, InboxEntryKey, Content, Count, MsgId, Timestamp) ->
106 mod_inbox:write_res() when
107 HostType :: mongooseim:host_type(),
108 InboxEntryKey :: mod_inbox:entry_key(),
109 Content :: binary(),
110 Count :: integer(),
111 MsgId :: binary(),
112 Timestamp :: integer().
113 set_inbox(HostType, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp) ->
114
:-(
InsertParams = [LUser, LServer, LToBareJid,
115 Content, Count, MsgId, Timestamp],
116
:-(
UpdateParams = [Content, Count, MsgId, Timestamp, false],
117
:-(
UniqueKeyValues = [LUser, LServer, LToBareJid],
118
:-(
Res = rdbms_queries:execute_upsert(HostType, inbox_upsert,
119 InsertParams, UpdateParams, UniqueKeyValues),
120 %% MySQL returns 1 when an upsert is an insert
121 %% and 2, when an upsert acts as update
122
:-(
check_result_is_expected(Res, [1, 2]).
123
124 -spec remove_inbox_row(HostType :: mongooseim:host_type(),
125 InboxEntryKey :: mod_inbox:entry_key()) -> mod_inbox:write_res().
126 remove_inbox_row(HostType, {LUser, LServer, LToBareJid}) ->
127
:-(
Res = execute_delete(HostType, LUser, LServer, LToBareJid),
128
:-(
check_result(Res).
129
130 -spec remove_domain(HostType :: mongooseim:host_type(),
131 LServer :: jid:lserver()) -> ok.
132 remove_domain(HostType, LServer) ->
133
:-(
execute_delete_domain(HostType, LServer),
134
:-(
ok.
135
136 -spec set_inbox_incr_unread(
137 mongooseim:host_type(), mod_inbox:entry_key(), binary(), binary(), integer()) ->
138 mod_inbox:count_res().
139 set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) ->
140
:-(
set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp, 1).
141
142 -spec set_inbox_incr_unread(HostType :: mongooseim:host_type(),
143 InboxEntryKey :: mod_inbox:entry_key(),
144 Content :: binary(),
145 MsgId :: binary(),
146 Timestamp :: integer(),
147 Incrs :: pos_integer()) -> mod_inbox:count_res().
148 set_inbox_incr_unread(HostType, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs) ->
149
:-(
InsertParams = [LUser, LServer, LToBareJid, Content, Incrs, MsgId, Timestamp],
150
:-(
UpdateParams = [Content, MsgId, Timestamp, false, Incrs],
151
:-(
UniqueKeyValues = [LUser, LServer, LToBareJid],
152
:-(
Res = rdbms_queries:execute_upsert(HostType, inbox_upsert_incr_unread,
153 InsertParams, UpdateParams, UniqueKeyValues),
154
:-(
check_result(Res).
155
156 -spec reset_unread(HosType :: mongooseim:host_type(),
157 InboxEntryKey :: mod_inbox:entry_key(),
158 MsgId :: binary() | undefined) -> mod_inbox:write_res().
159 reset_unread(HostType, {LUser, LServer, LToBareJid}, MsgId) ->
160
:-(
Res = execute_reset_unread(HostType, LUser, LServer, LToBareJid, MsgId),
161
:-(
check_result(Res).
162
163 -spec clear_inbox(HostType :: mongooseim:host_type(),
164 LUser :: jid:luser(),
165 LServer :: jid:lserver()) -> mod_inbox:write_res().
166 clear_inbox(HostType, LUser, LServer) ->
167
:-(
Res = execute_delete(HostType, LUser, LServer),
168
:-(
check_result(Res).
169
170 -spec get_entry_properties(HosType :: mongooseim:host_type(),
171 InboxEntryKey :: mod_inbox:entry_key()) ->
172 entry_properties() | nil().
173 get_entry_properties(HostType, {LUser, LServer, RemBareJID}) ->
174
:-(
case execute_select_properties(HostType, LUser, LServer, RemBareJID) of
175 {selected, []} ->
176
:-(
[];
177 {selected, [Selected]} ->
178
:-(
decode_entries(Selected)
179 end.
180
181 -spec set_entry_properties(HostType :: mongooseim:host_type(),
182 InboxEntryKey :: mod_inbox:entry_key(),
183 entry_properties()) ->
184 entry_properties() | {error, binary()}.
185 set_entry_properties(HostType, {LUser, LServer, RemBareJID}, Properties) ->
186
:-(
case set_entry_properties_rdbms(HostType, LUser, LServer, RemBareJID, Properties) of
187 {error, Msg} when is_list(Msg) ->
188
:-(
{error, list_to_binary(Msg)};
189 {error, Msg} ->
190
:-(
{error, Msg};
191 {updated, 0} ->
192
:-(
{error, <<"item-not-found">>};
193 {selected, [Result]} ->
194
:-(
decode_entries(Result)
195 end.
196
197 decode_entries({BArchive, BCount, BMutedUntil}) ->
198
:-(
Archive = mongoose_rdbms:to_bool(BArchive),
199
:-(
Count = mongoose_rdbms:result_to_integer(BCount),
200
:-(
MutedUntil = mongoose_rdbms:result_to_integer(BMutedUntil),
201
:-(
#{archive => Archive,
202 unread_count => Count,
203 muted_until => MutedUntil}.
204
205 %% ----------------------------------------------------------------------
206 %% Internal functions
207 %% ----------------------------------------------------------------------
208
209 -spec get_inbox_rdbms(HostType :: mongooseim:host_type(),
210 LUser :: jid:luser(),
211 LServer :: jid:lserver(),
212 Params :: mod_inbox:get_inbox_params()) ->
213 mongoose_rdbms:query_result().
214 get_inbox_rdbms(HostType, LUser, LServer, Params) ->
215
:-(
QueryName = lookup_query_name(Params),
216
:-(
case mongoose_rdbms:prepared(QueryName) of
217 false ->
218
:-(
SQL = lookup_query(Params),
219
:-(
Columns = lookup_query_columns(Params),
220
:-(
mongoose_rdbms:prepare(QueryName, inbox, Columns, SQL);
221 true ->
222
:-(
ok
223 end,
224
:-(
Args = lookup_query_args(LServer, LUser, Params),
225
:-(
mongoose_rdbms:execute_successfully(HostType, QueryName, Args).
226
227 set_entry_properties_rdbms(HostType, LUser, LServer, RemBareJID, Properties) ->
228
:-(
QueryName = update_query_name(Properties),
229
:-(
case mongoose_rdbms:prepared(QueryName) of
230 false ->
231
:-(
SQL = update_properties_query(Properties),
232
:-(
Columns = update_query_columns(Properties),
233
:-(
mongoose_rdbms:prepare(QueryName, inbox, Columns, SQL);
234 true ->
235
:-(
ok
236 end,
237
:-(
{atomic, TransactionResult} =
238 mongoose_rdbms:sql_transaction(
239 LServer,
240
:-(
fun() -> set_entry_properties_t(HostType, QueryName, LUser, LServer, RemBareJID, Properties) end),
241
:-(
TransactionResult.
242
243 -spec set_entry_properties_t(mongooseim:host_type(), atom(), jid:luser(), jid:lserver(), jid:literal_jid(),
244 entry_properties()) ->
245 mongoose_rdbms:query_result().
246 set_entry_properties_t(HostType, QueryName, LUser, LServer, RemBareJID, Properties) ->
247
:-(
Args = update_query_args(LUser, LServer, RemBareJID, Properties),
248
:-(
case mongoose_rdbms:execute_successfully(HostType, QueryName, Args) of
249 {updated, 1} ->
250
:-(
execute_select_properties(HostType, LUser, LServer, RemBareJID);
251 Other ->
252
:-(
Other
253 end.
254
255 %% Inbox lookup
256
257 -spec lookup_query(mod_inbox:get_inbox_params()) -> iolist().
258 lookup_query(#{order := Order} = Params) ->
259
:-(
OrderSQL = order_to_sql(Order),
260
:-(
{LimitSQL, MSLimitSQL} = sql_and_where_limit(maps:get(limit, Params, undefined)),
261
:-(
Conditions = [lookup_sql_condition(Key, maps:get(Key, Params, undefined)) ||
262
:-(
Key <- [start, 'end', hidden_read, archive]],
263
:-(
["SELECT ", MSLimitSQL,
264 " remote_bare_jid, content, unread_count, msg_id, timestamp, archive, muted_until "
265 " FROM inbox WHERE luser = ? AND lserver = ?", Conditions,
266 " ORDER BY timestamp ", OrderSQL, " ", LimitSQL].
267
268 -spec lookup_query_args(jid:lserver(), jid:luser(), mod_inbox:get_inbox_params()) -> list().
269 lookup_query_args(LServer, LUser, Params) ->
270
:-(
Args = [LUser, LServer | [maps:get(Key, Params) || Key <- lookup_arg_keys(Params)]],
271
:-(
case maps:get(limit, Params, undefined) of
272
:-(
undefined -> Args;
273
:-(
Limit -> rdbms_queries:add_limit_arg(Limit, Args)
274 end.
275
276 -spec lookup_query_columns(mod_inbox:get_inbox_params()) -> [atom()].
277 lookup_query_columns(Params) ->
278
:-(
Columns = [luser, lserver | lists:map(fun param_to_column/1, lookup_arg_keys(Params))],
279
:-(
case maps:get(limit, Params, undefined) of
280
:-(
undefined -> Columns;
281
:-(
_ -> rdbms_queries:add_limit_arg(limit, Columns)
282 end.
283
284 -spec lookup_arg_keys(mod_inbox:get_inbox_params()) -> [atom()].
285 lookup_arg_keys(Params) ->
286
:-(
lists:filter(fun(Key) -> maps:is_key(Key, Params) end, [start, 'end', archive]).
287
288 -spec lookup_query_name(mod_inbox:get_inbox_params()) -> atom().
289 lookup_query_name(Params) ->
290
:-(
IDString = lists:flatmap(fun(Param) ->
291
:-(
param_id(Param, maps:get(Param, Params, undefined))
292 end, lookup_param_keys()),
293
:-(
list_to_atom("inbox_lookup" ++ IDString).
294
295 -spec lookup_param_keys() -> [atom()].
296 lookup_param_keys() ->
297
:-(
[order, limit, start, 'end', hidden_read, archive].
298
299 -spec param_to_column(atom()) -> atom().
300
:-(
param_to_column(start) -> timestamp;
301
:-(
param_to_column('end') -> timestamp;
302
:-(
param_to_column(archive) -> archive.
303
304 -spec param_id(Key :: atom(), Value :: any()) -> string().
305
:-(
param_id(_, undefined) -> "";
306
:-(
param_id(order, desc) -> "_desc";
307
:-(
param_id(order, asc) -> "_asc";
308
:-(
param_id(limit, _) -> "_lim";
309
:-(
param_id(start, _) -> "_start";
310
:-(
param_id('end', _) -> "_end";
311
:-(
param_id(hidden_read, true) -> "_hr";
312
:-(
param_id(hidden_read, false) -> "";
313
:-(
param_id(archive, _) -> "_arch".
314
315 -spec order_to_sql(Order :: asc | desc) -> binary().
316
:-(
order_to_sql(asc) -> <<"ASC">>;
317
:-(
order_to_sql(desc) -> <<"DESC">>.
318
319 -spec sql_and_where_limit(non_neg_integer() | undefined) -> {iolist(), iolist()}.
320 sql_and_where_limit(undefined) ->
321
:-(
{"", ""};
322 sql_and_where_limit(_) ->
323
:-(
rdbms_queries:get_db_specific_limits().
324
325 -spec lookup_sql_condition(Key :: atom(), Value :: any()) -> string().
326 lookup_sql_condition(start, Timestamp) when is_integer(Timestamp) ->
327
:-(
" AND timestamp >= ?";
328 lookup_sql_condition('end', Timestamp) when is_integer(Timestamp) ->
329
:-(
" AND timestamp <= ?";
330 lookup_sql_condition(hidden_read, true) ->
331
:-(
" AND unread_count > 0";
332 lookup_sql_condition(archive, Val) when is_boolean(Val) ->
333
:-(
" AND archive = ?";
334 lookup_sql_condition(_, _) ->
335
:-(
"".
336
337 %% Property update
338
339 update_properties_query(Properties) ->
340
:-(
KVs = [{Key, maps:get(Key, Properties, undefined)} || Key <- property_keys()],
341
:-(
Parts = [update_sql_part(Key, Value) || {Key, Value} <- KVs, Value =/= undefined],
342
:-(
["UPDATE inbox SET ", string:join(Parts, ", "),
343 " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?"].
344
345 update_query_args(LUser, LServer, RemBareJID, Properties) ->
346
:-(
[maps:get(Key, Properties) || Key <- update_arg_keys(Properties)] ++
347 [LUser, LServer, RemBareJID].
348
349 update_query_columns(Properties) ->
350
:-(
update_arg_keys(Properties) ++ [luser, lserver, remote_bare_jid].
351
352 update_arg_keys(Properties) ->
353
:-(
lists:filter(fun(Key) -> maps:is_key(Key, Properties) end, [archive, muted_until]).
354
355 update_query_name(Properties) ->
356
:-(
IDString = lists:flatmap(fun(Prop) ->
357
:-(
property_id(Prop, maps:get(Prop, Properties, undefined))
358 end, property_keys()),
359
:-(
list_to_atom("inbox_update_properties" ++ IDString).
360
361 property_keys() ->
362
:-(
[unread_count, archive, muted_until].
363
364 -spec property_id(Key :: atom(), Value :: any()) -> string().
365
:-(
property_id(_, undefined) -> "";
366
:-(
property_id(unread_count, 0) -> "_read";
367
:-(
property_id(unread_count, 1) -> "_unread";
368
:-(
property_id(archive, _) -> "_arch";
369
:-(
property_id(muted_until, _) -> "_muted".
370
371 -spec update_sql_part(Key :: atom(), Value :: any()) -> string().
372 update_sql_part(unread_count, 0) ->
373
:-(
"unread_count = 0";
374 update_sql_part(unread_count, 1) ->
375
:-(
"unread_count = CASE unread_count WHEN 0 THEN 1 ELSE unread_count END";
376 update_sql_part(archive, Val) when is_boolean(Val) ->
377
:-(
"archive = ?";
378 update_sql_part(muted_until, Val) when is_integer(Val) ->
379
:-(
"muted_until = ?".
380
381 %% Query execution
382
383 -spec execute_select_unread_count(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) ->
384 mongoose_rdbms:query_result().
385 execute_select_unread_count(HostType, LUser, LServer, RemBareJID) ->
386
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_select_unread_count,
387 [LUser, LServer, RemBareJID]).
388
389 -spec execute_select_properties(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) ->
390 mongoose_rdbms:query_result().
391 execute_select_properties(HostType, LUser, LServer, RemBareJID) ->
392
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_select_properties,
393 [LUser, LServer, RemBareJID]).
394
395 -spec execute_reset_unread(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid(), binary() | undefined) ->
396 mongoose_rdbms:query_result().
397 execute_reset_unread(HostType, LUser, LServer, RemBareJID, undefined) ->
398
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread,
399 [LUser, LServer, RemBareJID]);
400 execute_reset_unread(HostType, LUser, LServer, RemBareJID, MsgId) ->
401
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread_msg,
402 [LUser, LServer, RemBareJID, MsgId]).
403
404 -spec execute_delete(mongooseim:host_type(),
405 jid:luser(), jid:lserver(), jid:literal_jid()) ->
406 mongoose_rdbms:query_result().
407 execute_delete(HostType, LUser, LServer, RemBareJID) ->
408
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_delete_row, [LUser, LServer, RemBareJID]).
409
410 -spec execute_delete(mongooseim:host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result().
411 execute_delete(HostType, LUser, LServer) ->
412
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_delete, [LUser, LServer]).
413
414 -spec execute_delete_domain(HostType :: mongooseim:host_type(),
415 LServer :: jid:lserver()) ->
416 mongoose_rdbms:query_result().
417 execute_delete_domain(HostType, LServer) ->
418
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_delete_domain, [LServer]).
419
420 %% Result processing
421
422 -spec decode_row(mongooseim:host_type(), db_return()) -> inbox_res().
423 decode_row(HostType, {Username, Content, Count, MsgId, Timestamp, Archive, MutedUntil}) ->
424
:-(
{ok, Parsed} = exml:parse(mongoose_rdbms:unescape_binary(HostType, Content)),
425
:-(
BCount = mongoose_rdbms:result_to_integer(Count),
426
:-(
NumericTimestamp = mongoose_rdbms:result_to_integer(Timestamp),
427
:-(
BoolArchive = mongoose_rdbms:to_bool(Archive),
428
:-(
NumericMutedUntil = mongoose_rdbms:result_to_integer(MutedUntil),
429
:-(
#{remote_jid => Username,
430 msg => Parsed,
431 unread_count => BCount,
432 msg_id => MsgId,
433 timestamp => NumericTimestamp,
434 archive => BoolArchive,
435 muted_until => NumericMutedUntil}.
436
437 -spec check_result_is_expected(_, list()) -> mod_inbox:write_res().
438 check_result_is_expected({updated, Val}, ValList) when is_list(ValList) ->
439
:-(
case lists:member(Val, ValList) of
440
:-(
true -> ok;
441
:-(
_ -> {error, {expected_does_not_match, Val, ValList}}
442 end;
443 check_result_is_expected(Result, _) ->
444
:-(
{error, {bad_result, Result}}.
445
446 -spec check_result(_) -> mod_inbox:count_res().
447 check_result({selected, []}) ->
448
:-(
{ok, 0};
449 check_result({selected, [{Val}]}) ->
450
:-(
parse_result(Val);
451 check_result({updated, _, [{Val}]}) ->
452
:-(
parse_result(Val);
453 check_result({updated, _}) ->
454
:-(
ok;
455 check_result(Result) ->
456
:-(
{error, {bad_result, Result}}.
457
458 parse_result(Value) when is_integer(Value) ->
459
:-(
{ok, Value};
460 parse_result(Value) when is_binary(Value) ->
461
:-(
{ok, binary_to_integer(Value)};
462 parse_result(null) ->
463
:-(
{ok, 0};
464 parse_result(Value) ->
465
:-(
{error, {unknown_result_value_type, Value}}.
Line Hits Source