./ct_report/coverage/mod_inbox_rdbms.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @copyright (C) 2018, Erlang Solutions Ltd.
3 %%% @doc
4 %%%
5 %%% @end
6 %%% Created : 30. Jan 2018 16:59
7 %%%-------------------------------------------------------------------
8 -module(mod_inbox_rdbms).
9
10 -include("mod_inbox.hrl").
11
12 -behaviour(mod_inbox_backend).
13
14 %% API
15 -export([get_inbox/4,
16 init/2,
17 set_inbox/6,
18 set_inbox_incr_unread/5,
19 reset_unread/3,
20 remove_inbox_row/2,
21 remove_domain/2,
22 clear_inbox/3,
23 get_inbox_unread/2,
24 get_entry_properties/2,
25 set_entry_properties/3]).
26
27 -type archived() :: binary().
28 -type muted_until() :: binary().
29 -type msg_content() :: binary().
30 -type db_return() :: {jid:luser(),
31 msg_content(),
32 count_bin(),
33 non_neg_integer() | binary(),
34 archived(),
35 muted_until()}.
36
37 %% ----------------------------------------------------------------------
38 %% API
39 %% ----------------------------------------------------------------------
40
41 %% TODO pools aren't multitenancy-ready yet
42 init(HostType, _Options) ->
43
:-(
RowCond = <<"WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>,
44
:-(
mongoose_rdbms:prepare(inbox_select_unread_count, inbox,
45 [luser, lserver, remote_bare_jid],
46 <<"SELECT unread_count FROM inbox ",
47 RowCond/binary>>),
48
:-(
mongoose_rdbms:prepare(inbox_select_properties, inbox,
49 [luser, lserver, remote_bare_jid],
50 <<"SELECT archive, unread_count, muted_until FROM inbox ",
51 RowCond/binary>>),
52
:-(
mongoose_rdbms:prepare(inbox_reset_unread, inbox,
53 [luser, lserver, remote_bare_jid],
54 <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary>>),
55
:-(
mongoose_rdbms:prepare(inbox_reset_unread_msg, inbox,
56 [luser, lserver, remote_bare_jid, msg_id],
57 <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary,
58 " AND msg_id = ?">>),
59
:-(
mongoose_rdbms:prepare(inbox_delete_row, inbox,
60 [luser, lserver, remote_bare_jid],
61 <<"DELETE FROM inbox ", RowCond/binary>>),
62
:-(
mongoose_rdbms:prepare(inbox_delete, inbox,
63 [luser, lserver],
64 <<"DELETE FROM inbox WHERE luser = ? AND lserver = ?">>),
65
:-(
mongoose_rdbms:prepare(inbox_delete_domain, inbox,
66 [lserver], <<"DELETE FROM inbox WHERE lserver = ?">>),
67
:-(
UniqueKeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>],
68
:-(
InsertFields =
69 UniqueKeyFields ++ [<<"content">>, <<"unread_count">>, <<"msg_id">>, <<"timestamp">>],
70
:-(
rdbms_queries:prepare_upsert(HostType, inbox_upsert, inbox,
71 InsertFields,
72 [<<"content">>, <<"unread_count">>,
73 <<"msg_id">>, <<"timestamp">>, <<"archive">>],
74 UniqueKeyFields),
75
:-(
rdbms_queries:prepare_upsert(HostType, inbox_upsert_incr_unread, inbox,
76 InsertFields,
77 [<<"content">>, <<"msg_id">>, <<"timestamp">>, <<"archive">>,
78 {<<"unread_count">>, <<"unread_count = inbox.unread_count + ?">>}],
79 UniqueKeyFields),
80
:-(
ok.
81
82 -spec get_inbox(HostType :: mongooseim:host_type(),
83 LUser :: jid:luser(),
84 LServer :: jid:lserver(),
85 Params :: mod_inbox:get_inbox_params()) -> get_inbox_res().
86 get_inbox(HostType, LUser, LServer, Params) ->
87
:-(
case get_inbox_rdbms(HostType, LUser, LServer, Params) of
88 {selected, []} ->
89
:-(
[];
90 {selected, Res} ->
91
:-(
[decode_row(HostType, R) || R <- Res]
92 end.
93
94 -spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
95 {ok, integer()}.
96 get_inbox_unread(HostType, {LUser, LServer, RemBareJID}) ->
97
:-(
Res = execute_select_unread_count(HostType, LUser, LServer, RemBareJID),
98
:-(
{ok, Val} = check_result(Res),
99 %% We read unread_count value when the message is sent and is not yet in receiver inbox
100 %% so we have to add +1
101
:-(
{ok, Val + 1}.
102
103 -spec set_inbox(HostType, InboxEntryKey, Content, Count, MsgId, Timestamp) ->
104 mod_inbox:write_res() when
105 HostType :: mongooseim:host_type(),
106 InboxEntryKey :: mod_inbox:entry_key(),
107 Content :: binary(),
108 Count :: integer(),
109 MsgId :: binary(),
110 Timestamp :: integer().
111 set_inbox(HostType, {LUser, LServer, ToBareJid}, Content, Count, MsgId, Timestamp) ->
112
:-(
LToBareJid = jid:nameprep(ToBareJid),
113
:-(
InsertParams = [LUser, LServer, LToBareJid,
114 Content, Count, MsgId, Timestamp],
115
:-(
UpdateParams = [Content, Count, MsgId, Timestamp, false],
116
:-(
UniqueKeyValues = [LUser, LServer, LToBareJid],
117
:-(
Res = rdbms_queries:execute_upsert(HostType, inbox_upsert,
118 InsertParams, UpdateParams, UniqueKeyValues),
119 %% MySQL returns 1 when an upsert is an insert
120 %% and 2, when an upsert acts as update
121
:-(
check_result_is_expected(Res, [1, 2]).
122
123 -spec remove_inbox_row(HostType :: mongooseim:host_type(),
124 InboxEntryKey :: mod_inbox:entry_key()) -> mod_inbox:write_res().
125 remove_inbox_row(HostType, {LUser, LServer, ToBareJid}) ->
126
:-(
LToBareJid = jid:nameprep(ToBareJid),
127
:-(
Res = execute_delete(HostType, LUser, LServer, LToBareJid),
128
:-(
check_result(Res).
129
130 -spec remove_domain(HostType :: mongooseim:host_type(),
131 LServer :: jid:lserver()) -> ok.
132 remove_domain(HostType, LServer) ->
133
:-(
execute_delete_domain(HostType, LServer),
134
:-(
ok.
135
136 -spec set_inbox_incr_unread(
137 mongooseim:host_type(), mod_inbox:entry_key(), binary(), binary(), integer()) ->
138 mod_inbox:count_res().
139 set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) ->
140
:-(
set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp, 1).
141
142 -spec set_inbox_incr_unread(HostType :: mongooseim:host_type(),
143 InboxEntryKey :: mod_inbox:entry_key(),
144 Content :: binary(),
145 MsgId :: binary(),
146 Timestamp :: integer(),
147 Incrs :: pos_integer()) -> mod_inbox:count_res().
148 set_inbox_incr_unread(HostType, {LUser, LServer, ToBareJid}, Content, MsgId, Timestamp, Incrs) ->
149
:-(
LToBareJid = jid:nameprep(ToBareJid),
150
:-(
InsertParams = [LUser, LServer, LToBareJid, Content, Incrs, MsgId, Timestamp],
151
:-(
UpdateParams = [Content, MsgId, Timestamp, false, Incrs],
152
:-(
UniqueKeyValues = [LUser, LServer, LToBareJid],
153
:-(
Res = rdbms_queries:execute_upsert(HostType, inbox_upsert_incr_unread,
154 InsertParams, UpdateParams, UniqueKeyValues),
155
:-(
check_result(Res).
156
157 -spec reset_unread(HosType :: mongooseim:host_type(),
158 InboxEntryKey :: mod_inbox:entry_key(),
159 MsgId :: binary() | undefined) -> mod_inbox:write_res().
160 reset_unread(HostType, {LUser, LServer, ToBareJid}, MsgId) ->
161
:-(
LToBareJid = jid:nameprep(ToBareJid),
162
:-(
Res = execute_reset_unread(HostType, LUser, LServer, LToBareJid, MsgId),
163
:-(
check_result(Res).
164
165 -spec clear_inbox(HostType :: mongooseim:host_type(),
166 LUser :: jid:luser(),
167 LServer :: jid:lserver()) -> mod_inbox:write_res().
168 clear_inbox(HostType, LUser, LServer) ->
169
:-(
Res = execute_delete(HostType, LUser, LServer),
170
:-(
check_result(Res).
171
172 -spec get_entry_properties(HosType :: mongooseim:host_type(),
173 InboxEntryKey :: mod_inbox:entry_key()) ->
174 entry_properties() | nil().
175 get_entry_properties(HostType, {LUser, LServer, RemBareJID}) ->
176
:-(
case execute_select_properties(HostType, LUser, LServer, RemBareJID) of
177 {selected, []} ->
178
:-(
[];
179 {selected, [Selected]} ->
180
:-(
decode_entries(Selected)
181 end.
182
183 -spec set_entry_properties(HostType :: mongooseim:host_type(),
184 InboxEntryKey :: mod_inbox:entry_key(),
185 entry_properties()) ->
186 entry_properties() | {error, binary()}.
187 set_entry_properties(HostType, {LUser, LServer, RemBareJID}, Properties) ->
188
:-(
case set_entry_properties_rdbms(HostType, LUser, LServer, RemBareJID, Properties) of
189 {error, Msg} when is_list(Msg) ->
190
:-(
{error, list_to_binary(Msg)};
191 {error, Msg} ->
192
:-(
{error, Msg};
193 {updated, 0} ->
194
:-(
{error, <<"item-not-found">>};
195 {selected, [Result]} ->
196
:-(
decode_entries(Result)
197 end.
198
199 decode_entries({BArchive, BCount, BMutedUntil}) ->
200
:-(
Archive = mongoose_rdbms:to_bool(BArchive),
201
:-(
Count = mongoose_rdbms:result_to_integer(BCount),
202
:-(
MutedUntil = mongoose_rdbms:result_to_integer(BMutedUntil),
203
:-(
#{archive => Archive,
204 unread_count => Count,
205 muted_until => MutedUntil}.
206
207 %% ----------------------------------------------------------------------
208 %% Internal functions
209 %% ----------------------------------------------------------------------
210
211 -spec get_inbox_rdbms(HostType :: mongooseim:host_type(),
212 LUser :: jid:luser(),
213 LServer :: jid:lserver(),
214 Params :: mod_inbox:get_inbox_params()) ->
215 mongoose_rdbms:query_result().
216 get_inbox_rdbms(HostType, LUser, LServer, Params) ->
217
:-(
QueryName = lookup_query_name(Params),
218
:-(
case mongoose_rdbms:prepared(QueryName) of
219 false ->
220
:-(
SQL = lookup_query(Params),
221
:-(
Columns = lookup_query_columns(Params),
222
:-(
mongoose_rdbms:prepare(QueryName, inbox, Columns, SQL);
223 true ->
224
:-(
ok
225 end,
226
:-(
Args = lookup_query_args(LServer, LUser, Params),
227
:-(
mongoose_rdbms:execute_successfully(HostType, QueryName, Args).
228
229 set_entry_properties_rdbms(HostType, LUser, LServer, RemBareJID, Properties) ->
230
:-(
QueryName = update_query_name(Properties),
231
:-(
case mongoose_rdbms:prepared(QueryName) of
232 false ->
233
:-(
SQL = update_properties_query(Properties),
234
:-(
Columns = update_query_columns(Properties),
235
:-(
mongoose_rdbms:prepare(QueryName, inbox, Columns, SQL);
236 true ->
237
:-(
ok
238 end,
239
:-(
{atomic, TransactionResult} =
240 mongoose_rdbms:sql_transaction(
241 LServer,
242
:-(
fun() -> set_entry_properties_t(HostType, QueryName, LUser, LServer, RemBareJID, Properties) end),
243
:-(
TransactionResult.
244
245 -spec set_entry_properties_t(mongooseim:host_type(), atom(), jid:luser(), jid:lserver(), jid:literal_jid(),
246 entry_properties()) ->
247 mongoose_rdbms:query_result().
248 set_entry_properties_t(HostType, QueryName, LUser, LServer, RemBareJID, Properties) ->
249
:-(
Args = update_query_args(LUser, LServer, RemBareJID, Properties),
250
:-(
case mongoose_rdbms:execute_successfully(HostType, QueryName, Args) of
251 {updated, 1} ->
252
:-(
execute_select_properties(HostType, LUser, LServer, RemBareJID);
253 Other ->
254
:-(
Other
255 end.
256
257 %% Inbox lookup
258
259 -spec lookup_query(mod_inbox:get_inbox_params()) -> iolist().
260 lookup_query(#{order := Order} = Params) ->
261
:-(
OrderSQL = order_to_sql(Order),
262
:-(
{LimitSQL, MSLimitSQL} = sql_and_where_limit(maps:get(limit, Params, undefined)),
263
:-(
Conditions = [lookup_sql_condition(Key, maps:get(Key, Params, undefined)) ||
264
:-(
Key <- [start, 'end', hidden_read, archive]],
265
:-(
["SELECT ", MSLimitSQL,
266 " remote_bare_jid, content, unread_count, timestamp, archive, muted_until "
267 " FROM inbox WHERE luser = ? AND lserver = ?", Conditions,
268 " ORDER BY timestamp ", OrderSQL, " ", LimitSQL].
269
270 -spec lookup_query_args(jid:lserver(), jid:luser(), mod_inbox:get_inbox_params()) -> list().
271 lookup_query_args(LServer, LUser, Params) ->
272
:-(
Args = [LUser, LServer | [maps:get(Key, Params) || Key <- lookup_arg_keys(Params)]],
273
:-(
case maps:get(limit, Params, undefined) of
274
:-(
undefined -> Args;
275
:-(
Limit -> rdbms_queries:add_limit_arg(Limit, Args)
276 end.
277
278 -spec lookup_query_columns(mod_inbox:get_inbox_params()) -> [atom()].
279 lookup_query_columns(Params) ->
280
:-(
Columns = [luser, lserver | lists:map(fun param_to_column/1, lookup_arg_keys(Params))],
281
:-(
case maps:get(limit, Params, undefined) of
282
:-(
undefined -> Columns;
283
:-(
_ -> rdbms_queries:add_limit_arg(limit, Columns)
284 end.
285
286 -spec lookup_arg_keys(mod_inbox:get_inbox_params()) -> [atom()].
287 lookup_arg_keys(Params) ->
288
:-(
lists:filter(fun(Key) -> maps:is_key(Key, Params) end, [start, 'end', archive]).
289
290 -spec lookup_query_name(mod_inbox:get_inbox_params()) -> atom().
291 lookup_query_name(Params) ->
292
:-(
IDString = lists:flatmap(fun(Param) ->
293
:-(
param_id(Param, maps:get(Param, Params, undefined))
294 end, lookup_param_keys()),
295
:-(
list_to_atom("inbox_lookup" ++ IDString).
296
297 -spec lookup_param_keys() -> [atom()].
298 lookup_param_keys() ->
299
:-(
[order, limit, start, 'end', hidden_read, archive].
300
301 -spec param_to_column(atom()) -> atom().
302
:-(
param_to_column(start) -> timestamp;
303
:-(
param_to_column('end') -> timestamp;
304
:-(
param_to_column(archive) -> archive.
305
306 -spec param_id(Key :: atom(), Value :: any()) -> string().
307
:-(
param_id(_, undefined) -> "";
308
:-(
param_id(order, desc) -> "_desc";
309
:-(
param_id(order, asc) -> "_asc";
310
:-(
param_id(limit, _) -> "_lim";
311
:-(
param_id(start, _) -> "_start";
312
:-(
param_id('end', _) -> "_end";
313
:-(
param_id(hidden_read, true) -> "_hr";
314
:-(
param_id(hidden_read, false) -> "";
315
:-(
param_id(archive, _) -> "_arch".
316
317 -spec order_to_sql(Order :: asc | desc) -> binary().
318
:-(
order_to_sql(asc) -> <<"ASC">>;
319
:-(
order_to_sql(desc) -> <<"DESC">>.
320
321 -spec sql_and_where_limit(non_neg_integer() | undefined) -> {iolist(), iolist()}.
322 sql_and_where_limit(undefined) ->
323
:-(
{"", ""};
324 sql_and_where_limit(_) ->
325
:-(
rdbms_queries:get_db_specific_limits().
326
327 -spec lookup_sql_condition(Key :: atom(), Value :: any()) -> string().
328 lookup_sql_condition(start, Timestamp) when is_integer(Timestamp) ->
329
:-(
" AND timestamp >= ?";
330 lookup_sql_condition('end', Timestamp) when is_integer(Timestamp) ->
331
:-(
" AND timestamp <= ?";
332 lookup_sql_condition(hidden_read, true) ->
333
:-(
" AND unread_count > 0";
334 lookup_sql_condition(archive, Val) when is_boolean(Val) ->
335
:-(
" AND archive = ?";
336 lookup_sql_condition(_, _) ->
337
:-(
"".
338
339 %% Property update
340
341 update_properties_query(Properties) ->
342
:-(
KVs = [{Key, maps:get(Key, Properties, undefined)} || Key <- property_keys()],
343
:-(
Parts = [update_sql_part(Key, Value) || {Key, Value} <- KVs, Value =/= undefined],
344
:-(
["UPDATE inbox SET ", string:join(Parts, ", "),
345 " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?"].
346
347 update_query_args(LUser, LServer, RemBareJID, Properties) ->
348
:-(
[maps:get(Key, Properties) || Key <- update_arg_keys(Properties)] ++
349 [LUser, LServer, RemBareJID].
350
351 update_query_columns(Properties) ->
352
:-(
update_arg_keys(Properties) ++ [luser, lserver, remote_bare_jid].
353
354 update_arg_keys(Properties) ->
355
:-(
lists:filter(fun(Key) -> maps:is_key(Key, Properties) end, [archive, muted_until]).
356
357 update_query_name(Properties) ->
358
:-(
IDString = lists:flatmap(fun(Prop) ->
359
:-(
property_id(Prop, maps:get(Prop, Properties, undefined))
360 end, property_keys()),
361
:-(
list_to_atom("inbox_update_properties" ++ IDString).
362
363 property_keys() ->
364
:-(
[unread_count, archive, muted_until].
365
366 -spec property_id(Key :: atom(), Value :: any()) -> string().
367
:-(
property_id(_, undefined) -> "";
368
:-(
property_id(unread_count, 0) -> "_read";
369
:-(
property_id(unread_count, 1) -> "_unread";
370
:-(
property_id(archive, _) -> "_arch";
371
:-(
property_id(muted_until, _) -> "_muted".
372
373 -spec update_sql_part(Key :: atom(), Value :: any()) -> string().
374 update_sql_part(unread_count, 0) ->
375
:-(
"unread_count = 0";
376 update_sql_part(unread_count, 1) ->
377
:-(
"unread_count = CASE unread_count WHEN 0 THEN 1 ELSE unread_count END";
378 update_sql_part(archive, Val) when is_boolean(Val) ->
379
:-(
"archive = ?";
380 update_sql_part(muted_until, Val) when is_integer(Val) ->
381
:-(
"muted_until = ?".
382
383 %% Query execution
384
385 -spec execute_select_unread_count(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) ->
386 mongoose_rdbms:query_result().
387 execute_select_unread_count(HostType, LUser, LServer, RemBareJID) ->
388
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_select_unread_count,
389 [LUser, LServer, RemBareJID]).
390
391 -spec execute_select_properties(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) ->
392 mongoose_rdbms:query_result().
393 execute_select_properties(HostType, LUser, LServer, RemBareJID) ->
394
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_select_properties,
395 [LUser, LServer, RemBareJID]).
396
397 -spec execute_reset_unread(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid(), binary() | undefined) ->
398 mongoose_rdbms:query_result().
399 execute_reset_unread(HostType, LUser, LServer, RemBareJID, undefined) ->
400
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread,
401 [LUser, LServer, RemBareJID]);
402 execute_reset_unread(HostType, LUser, LServer, RemBareJID, MsgId) ->
403
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread_msg,
404 [LUser, LServer, RemBareJID, MsgId]).
405
406 -spec execute_delete(mongooseim:host_type(),
407 jid:luser(), jid:lserver(), jid:literal_jid()) ->
408 mongoose_rdbms:query_result().
409 execute_delete(HostType, LUser, LServer, RemBareJID) ->
410
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_delete_row, [LUser, LServer, RemBareJID]).
411
412 -spec execute_delete(mongooseim:host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result().
413 execute_delete(HostType, LUser, LServer) ->
414
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_delete, [LUser, LServer]).
415
416 -spec execute_delete_domain(HostType :: mongooseim:host_type(),
417 LServer :: jid:lserver()) ->
418 mongoose_rdbms:query_result().
419 execute_delete_domain(HostType, LServer) ->
420
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_delete_domain, [LServer]).
421
422 %% Result processing
423
424 -spec decode_row(mongooseim:host_type(), db_return()) -> inbox_res().
425 decode_row(HostType, {Username, Content, Count, Timestamp, Archive, MutedUntil}) ->
426
:-(
Data = mongoose_rdbms:unescape_binary(HostType, Content),
427
:-(
BCount = mongoose_rdbms:result_to_integer(Count),
428
:-(
NumericTimestamp = mongoose_rdbms:result_to_integer(Timestamp),
429
:-(
BoolArchive = mongoose_rdbms:to_bool(Archive),
430
:-(
NumericMutedUntil = mongoose_rdbms:result_to_integer(MutedUntil),
431
:-(
#{remote_jid => Username,
432 msg => Data,
433 unread_count => BCount,
434 timestamp => NumericTimestamp,
435 archive => BoolArchive,
436 muted_until => NumericMutedUntil}.
437
438 -spec check_result_is_expected(_, list()) -> mod_inbox:write_res().
439 check_result_is_expected({updated, Val}, ValList) when is_list(ValList) ->
440
:-(
case lists:member(Val, ValList) of
441
:-(
true -> ok;
442
:-(
_ -> {error, {expected_does_not_match, Val, ValList}}
443 end;
444 check_result_is_expected(Result, _) ->
445
:-(
{error, {bad_result, Result}}.
446
447 -spec check_result(_) -> mod_inbox:count_res().
448 check_result({selected, []}) ->
449
:-(
{ok, 0};
450 check_result({selected, [{Val}]}) ->
451
:-(
parse_result(Val);
452 check_result({updated, _, [{Val}]}) ->
453
:-(
parse_result(Val);
454 check_result({updated, _}) ->
455
:-(
ok;
456 check_result(Result) ->
457
:-(
{error, {bad_result, Result}}.
458
459 parse_result(Value) when is_integer(Value) ->
460
:-(
{ok, Value};
461 parse_result(Value) when is_binary(Value) ->
462
:-(
{ok, binary_to_integer(Value)};
463 parse_result(null) ->
464
:-(
{ok, 0};
465 parse_result(Value) ->
466
:-(
{error, {unknown_result_value_type, Value}}.
Line Hits Source