./ct_report/coverage/mod_inbox_rdbms.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @copyright (C) 2018, Erlang Solutions Ltd.
3 %%% @doc
4 %%%
5 %%% @end
6 %%% Created : 30. Jan 2018 16:59
7 %%%-------------------------------------------------------------------
8 -module(mod_inbox_rdbms).
9
10 -include("mod_inbox.hrl").
11
12 -behaviour(mod_inbox_backend).
13
14 %% API
15 -export([get_inbox/4,
16 init/2,
17 set_inbox/6,
18 set_inbox_incr_unread/5,
19 reset_unread/3,
20 remove_inbox_row/2,
21 remove_domain/2,
22 clear_inbox/3,
23 get_inbox_unread/2,
24 get_entry_properties/2,
25 set_entry_properties/3]).
26
27 -type archived() :: binary().
28 -type muted_until() :: binary().
29 -type msg_content() :: binary().
30 -type db_return() :: {jid:luser(),
31 msg_content(),
32 count_bin(),
33 non_neg_integer() | binary(),
34 archived(),
35 muted_until()}.
36
37 %% ----------------------------------------------------------------------
38 %% API
39 %% ----------------------------------------------------------------------
40
41 %% TODO pools aren't multitenancy-ready yet
42 init(HostType, _Options) ->
43 25 RowCond = <<"WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>,
44 25 mongoose_rdbms:prepare(inbox_select_unread_count, inbox,
45 [luser, lserver, remote_bare_jid],
46 <<"SELECT unread_count FROM inbox ",
47 RowCond/binary>>),
48 25 mongoose_rdbms:prepare(inbox_select_properties, inbox,
49 [luser, lserver, remote_bare_jid],
50 <<"SELECT archive, unread_count, muted_until FROM inbox ",
51 RowCond/binary>>),
52 25 mongoose_rdbms:prepare(inbox_reset_unread, inbox,
53 [luser, lserver, remote_bare_jid],
54 <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary>>),
55 25 mongoose_rdbms:prepare(inbox_reset_unread_msg, inbox,
56 [luser, lserver, remote_bare_jid, msg_id],
57 <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary,
58 " AND msg_id = ?">>),
59 25 mongoose_rdbms:prepare(inbox_delete_row, inbox,
60 [luser, lserver, remote_bare_jid],
61 <<"DELETE FROM inbox ", RowCond/binary>>),
62 25 mongoose_rdbms:prepare(inbox_delete, inbox,
63 [luser, lserver],
64 <<"DELETE FROM inbox WHERE luser = ? AND lserver = ?">>),
65 25 mongoose_rdbms:prepare(inbox_delete_domain, inbox,
66 [lserver], <<"DELETE FROM inbox WHERE lserver = ?">>),
67 25 UniqueKeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>],
68 25 InsertFields =
69 UniqueKeyFields ++ [<<"content">>, <<"unread_count">>, <<"msg_id">>, <<"timestamp">>],
70 25 rdbms_queries:prepare_upsert(HostType, inbox_upsert, inbox,
71 InsertFields,
72 [<<"content">>, <<"unread_count">>,
73 <<"msg_id">>, <<"timestamp">>, <<"archive">>],
74 UniqueKeyFields),
75 25 rdbms_queries:prepare_upsert(HostType, inbox_upsert_incr_unread, inbox,
76 InsertFields,
77 [<<"content">>, <<"msg_id">>, <<"timestamp">>, <<"archive">>,
78 {<<"unread_count">>, <<"unread_count = inbox.unread_count + ?">>}],
79 UniqueKeyFields),
80 25 ok.
81
82 -spec get_inbox(HostType :: mongooseim:host_type(),
83 LUser :: jid:luser(),
84 LServer :: jid:lserver(),
85 Params :: mod_inbox:get_inbox_params()) -> get_inbox_res().
86 get_inbox(HostType, LUser, LServer, Params) ->
87 282 case get_inbox_rdbms(HostType, LUser, LServer, Params) of
88 {selected, []} ->
89 43 [];
90 {selected, Res} ->
91 239 [decode_row(HostType, R) || R <- Res]
92 end.
93
94 -spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
95 {ok, integer()}.
96 get_inbox_unread(HostType, {LUser, LServer, RemBareJID}) ->
97
:-(
Res = execute_select_unread_count(HostType, LUser, LServer, RemBareJID),
98
:-(
{ok, Val} = check_result(Res),
99 %% We read unread_count value when the message is sent and is not yet in receiver inbox
100 %% so we have to add +1
101
:-(
{ok, Val + 1}.
102
103 -spec set_inbox(HostType, InboxEntryKey, Content, Count, MsgId, Timestamp) ->
104 mod_inbox:write_res() when
105 HostType :: mongooseim:host_type(),
106 InboxEntryKey :: mod_inbox:entry_key(),
107 Content :: binary(),
108 Count :: integer(),
109 MsgId :: binary(),
110 Timestamp :: integer().
111 set_inbox(HostType, {LUser, LServer, ToBareJid}, Content, Count, MsgId, Timestamp) ->
112 113 LToBareJid = jid:nameprep(ToBareJid),
113 113 InsertParams = [LUser, LServer, LToBareJid,
114 Content, Count, MsgId, Timestamp],
115 113 UpdateParams = [Content, Count, MsgId, Timestamp, false],
116 113 UniqueKeyValues = [LUser, LServer, LToBareJid],
117 113 Res = rdbms_queries:execute_upsert(HostType, inbox_upsert,
118 InsertParams, UpdateParams, UniqueKeyValues),
119 %% MySQL returns 1 when an upsert is an insert
120 %% and 2, when an upsert acts as update
121 113 check_result_is_expected(Res, [1, 2]).
122
123 -spec remove_inbox_row(HostType :: mongooseim:host_type(),
124 InboxEntryKey :: mod_inbox:entry_key()) -> mod_inbox:write_res().
125 remove_inbox_row(HostType, {LUser, LServer, ToBareJid}) ->
126 5 LToBareJid = jid:nameprep(ToBareJid),
127 5 Res = execute_delete(HostType, LUser, LServer, LToBareJid),
128 5 check_result(Res).
129
130 -spec remove_domain(HostType :: mongooseim:host_type(),
131 LServer :: jid:lserver()) -> ok.
132 remove_domain(HostType, LServer) ->
133 1 execute_delete_domain(HostType, LServer),
134 1 ok.
135
136 -spec set_inbox_incr_unread(
137 mongooseim:host_type(), mod_inbox:entry_key(), binary(), binary(), integer()) ->
138 mod_inbox:count_res().
139 set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) ->
140 180 set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp, 1).
141
142 -spec set_inbox_incr_unread(HostType :: mongooseim:host_type(),
143 InboxEntryKey :: mod_inbox:entry_key(),
144 Content :: binary(),
145 MsgId :: binary(),
146 Timestamp :: integer(),
147 Incrs :: pos_integer()) -> mod_inbox:count_res().
148 set_inbox_incr_unread(HostType, {LUser, LServer, ToBareJid}, Content, MsgId, Timestamp, Incrs) ->
149 180 LToBareJid = jid:nameprep(ToBareJid),
150 180 InsertParams = [LUser, LServer, LToBareJid, Content, Incrs, MsgId, Timestamp],
151 180 UpdateParams = [Content, MsgId, Timestamp, false, Incrs],
152 180 UniqueKeyValues = [LUser, LServer, LToBareJid],
153 180 Res = rdbms_queries:execute_upsert(HostType, inbox_upsert_incr_unread,
154 InsertParams, UpdateParams, UniqueKeyValues),
155 180 check_result(Res).
156
157 -spec reset_unread(HosType :: mongooseim:host_type(),
158 InboxEntryKey :: mod_inbox:entry_key(),
159 MsgId :: binary() | undefined) -> mod_inbox:write_res().
160 reset_unread(HostType, {LUser, LServer, ToBareJid}, MsgId) ->
161 54 LToBareJid = jid:nameprep(ToBareJid),
162 54 Res = execute_reset_unread(HostType, LUser, LServer, LToBareJid, MsgId),
163 54 check_result(Res).
164
165 -spec clear_inbox(HostType :: mongooseim:host_type(),
166 LUser :: jid:luser(),
167 LServer :: jid:lserver()) -> mod_inbox:write_res().
168 clear_inbox(HostType, LUser, LServer) ->
169 219 Res = execute_delete(HostType, LUser, LServer),
170 219 check_result(Res).
171
172 -spec get_entry_properties(HosType :: mongooseim:host_type(),
173 InboxEntryKey :: mod_inbox:entry_key()) ->
174 entry_properties() | nil().
175 get_entry_properties(HostType, {LUser, LServer, RemBareJID}) ->
176 1 case execute_select_properties(HostType, LUser, LServer, RemBareJID) of
177 {selected, []} ->
178
:-(
[];
179 {selected, [Selected]} ->
180 1 decode_entries(Selected)
181 end.
182
183 -spec set_entry_properties(HostType :: mongooseim:host_type(),
184 InboxEntryKey :: mod_inbox:entry_key(),
185 entry_properties()) ->
186 entry_properties() | {error, binary()}.
187 set_entry_properties(HostType, {LUser, LServer, RemBareJID}, Properties) ->
188 32 case set_entry_properties_rdbms(HostType, LUser, LServer, RemBareJID, Properties) of
189 {error, Msg} when is_list(Msg) ->
190
:-(
{error, list_to_binary(Msg)};
191 {error, Msg} ->
192
:-(
{error, Msg};
193 {updated, 0} ->
194 3 {error, <<"item-not-found">>};
195 {selected, [Result]} ->
196 29 decode_entries(Result)
197 end.
198
199 decode_entries({BArchive, BCount, BMutedUntil}) ->
200 30 Archive = mongoose_rdbms:to_bool(BArchive),
201 30 Count = mongoose_rdbms:result_to_integer(BCount),
202 30 MutedUntil = mongoose_rdbms:result_to_integer(BMutedUntil),
203 30 #{archive => Archive,
204 unread_count => Count,
205 muted_until => MutedUntil}.
206
207 %% ----------------------------------------------------------------------
208 %% Internal functions
209 %% ----------------------------------------------------------------------
210
211 -spec get_inbox_rdbms(HostType :: mongooseim:host_type(),
212 LUser :: jid:luser(),
213 LServer :: jid:lserver(),
214 Params :: mod_inbox:get_inbox_params()) ->
215 mongoose_rdbms:query_result().
216 get_inbox_rdbms(HostType, LUser, LServer, Params) ->
217 282 QueryName = lookup_query_name(Params),
218 282 case mongoose_rdbms:prepared(QueryName) of
219 false ->
220 11 SQL = lookup_query(Params),
221 11 Columns = lookup_query_columns(Params),
222 11 mongoose_rdbms:prepare(QueryName, inbox, Columns, SQL);
223 true ->
224 271 ok
225 end,
226 282 Args = lookup_query_args(LServer, LUser, Params),
227 282 mongoose_rdbms:execute_successfully(HostType, QueryName, Args).
228
229 set_entry_properties_rdbms(HostType, LUser, LServer, RemBareJID, Properties) ->
230 32 QueryName = update_query_name(Properties),
231 32 case mongoose_rdbms:prepared(QueryName) of
232 false ->
233 6 SQL = update_properties_query(Properties),
234 6 Columns = update_query_columns(Properties),
235 6 mongoose_rdbms:prepare(QueryName, inbox, Columns, SQL);
236 true ->
237 26 ok
238 end,
239 32 {atomic, TransactionResult} =
240 mongoose_rdbms:sql_transaction(
241 LServer,
242 32 fun() -> set_entry_properties_t(HostType, QueryName, LUser, LServer, RemBareJID, Properties) end),
243 32 TransactionResult.
244
245 -spec set_entry_properties_t(mongooseim:host_type(), atom(), jid:luser(), jid:lserver(), jid:literal_jid(),
246 entry_properties()) ->
247 mongoose_rdbms:query_result().
248 set_entry_properties_t(HostType, QueryName, LUser, LServer, RemBareJID, Properties) ->
249 32 Args = update_query_args(LUser, LServer, RemBareJID, Properties),
250 32 case mongoose_rdbms:execute_successfully(HostType, QueryName, Args) of
251 {updated, 1} ->
252 29 execute_select_properties(HostType, LUser, LServer, RemBareJID);
253 Other ->
254 3 Other
255 end.
256
257 %% Inbox lookup
258
259 -spec lookup_query(mod_inbox:get_inbox_params()) -> iolist().
260 lookup_query(#{order := Order} = Params) ->
261 11 OrderSQL = order_to_sql(Order),
262 11 {LimitSQL, MSLimitSQL} = sql_and_where_limit(maps:get(limit, Params, undefined)),
263 11 Conditions = [lookup_sql_condition(Key, maps:get(Key, Params, undefined)) ||
264 11 Key <- [start, 'end', hidden_read, archive]],
265 11 ["SELECT ", MSLimitSQL,
266 " remote_bare_jid, content, unread_count, timestamp, archive, muted_until "
267 " FROM inbox WHERE luser = ? AND lserver = ?", Conditions,
268 " ORDER BY timestamp ", OrderSQL, " ", LimitSQL].
269
270 -spec lookup_query_args(jid:lserver(), jid:luser(), mod_inbox:get_inbox_params()) -> list().
271 lookup_query_args(LServer, LUser, Params) ->
272 282 Args = [LUser, LServer | [maps:get(Key, Params) || Key <- lookup_arg_keys(Params)]],
273 282 case maps:get(limit, Params, undefined) of
274 279 undefined -> Args;
275 3 Limit -> rdbms_queries:add_limit_arg(Limit, Args)
276 end.
277
278 -spec lookup_query_columns(mod_inbox:get_inbox_params()) -> [atom()].
279 lookup_query_columns(Params) ->
280 11 Columns = [luser, lserver | lists:map(fun param_to_column/1, lookup_arg_keys(Params))],
281 11 case maps:get(limit, Params, undefined) of
282 9 undefined -> Columns;
283 2 _ -> rdbms_queries:add_limit_arg(limit, Columns)
284 end.
285
286 -spec lookup_arg_keys(mod_inbox:get_inbox_params()) -> [atom()].
287 lookup_arg_keys(Params) ->
288 293 lists:filter(fun(Key) -> maps:is_key(Key, Params) end, [start, 'end', archive]).
289
290 -spec lookup_query_name(mod_inbox:get_inbox_params()) -> atom().
291 lookup_query_name(Params) ->
292 282 IDString = lists:flatmap(fun(Param) ->
293 1692 param_id(Param, maps:get(Param, Params, undefined))
294 end, lookup_param_keys()),
295 282 list_to_atom("inbox_lookup" ++ IDString).
296
297 -spec lookup_param_keys() -> [atom()].
298 lookup_param_keys() ->
299 282 [order, limit, start, 'end', hidden_read, archive].
300
301 -spec param_to_column(atom()) -> atom().
302 3 param_to_column(start) -> timestamp;
303 4 param_to_column('end') -> timestamp;
304 3 param_to_column(archive) -> archive.
305
306 -spec param_id(Key :: atom(), Value :: any()) -> string().
307 1039 param_id(_, undefined) -> "";
308 252 param_id(order, desc) -> "_desc";
309 30 param_id(order, asc) -> "_asc";
310 3 param_id(limit, _) -> "_lim";
311 31 param_id(start, _) -> "_start";
312 32 param_id('end', _) -> "_end";
313 6 param_id(hidden_read, true) -> "_hr";
314 276 param_id(hidden_read, false) -> "";
315 23 param_id(archive, _) -> "_arch".
316
317 -spec order_to_sql(Order :: asc | desc) -> binary().
318 2 order_to_sql(asc) -> <<"ASC">>;
319 9 order_to_sql(desc) -> <<"DESC">>.
320
321 -spec sql_and_where_limit(non_neg_integer() | undefined) -> {iolist(), iolist()}.
322 sql_and_where_limit(undefined) ->
323 9 {"", ""};
324 sql_and_where_limit(_) ->
325 2 rdbms_queries:get_db_specific_limits().
326
327 -spec lookup_sql_condition(Key :: atom(), Value :: any()) -> string().
328 lookup_sql_condition(start, Timestamp) when is_integer(Timestamp) ->
329 3 " AND timestamp >= ?";
330 lookup_sql_condition('end', Timestamp) when is_integer(Timestamp) ->
331 4 " AND timestamp <= ?";
332 lookup_sql_condition(hidden_read, true) ->
333 1 " AND unread_count > 0";
334 lookup_sql_condition(archive, Val) when is_boolean(Val) ->
335 3 " AND archive = ?";
336 lookup_sql_condition(_, _) ->
337 33 "".
338
339 %% Property update
340
341 update_properties_query(Properties) ->
342 6 KVs = [{Key, maps:get(Key, Properties, undefined)} || Key <- property_keys()],
343 6 Parts = [update_sql_part(Key, Value) || {Key, Value} <- KVs, Value =/= undefined],
344 6 ["UPDATE inbox SET ", string:join(Parts, ", "),
345 " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?"].
346
347 update_query_args(LUser, LServer, RemBareJID, Properties) ->
348 32 [maps:get(Key, Properties) || Key <- update_arg_keys(Properties)] ++
349 [LUser, LServer, RemBareJID].
350
351 update_query_columns(Properties) ->
352 6 update_arg_keys(Properties) ++ [luser, lserver, remote_bare_jid].
353
354 update_arg_keys(Properties) ->
355 38 lists:filter(fun(Key) -> maps:is_key(Key, Properties) end, [archive, muted_until]).
356
357 update_query_name(Properties) ->
358 32 IDString = lists:flatmap(fun(Prop) ->
359 96 property_id(Prop, maps:get(Prop, Properties, undefined))
360 end, property_keys()),
361 32 list_to_atom("inbox_update_properties" ++ IDString).
362
363 property_keys() ->
364 38 [unread_count, archive, muted_until].
365
366 -spec property_id(Key :: atom(), Value :: any()) -> string().
367 59 property_id(_, undefined) -> "";
368 8 property_id(unread_count, 0) -> "_read";
369 4 property_id(unread_count, 1) -> "_unread";
370 15 property_id(archive, _) -> "_arch";
371 10 property_id(muted_until, _) -> "_muted".
372
373 -spec update_sql_part(Key :: atom(), Value :: any()) -> string().
374 update_sql_part(unread_count, 0) ->
375 3 "unread_count = 0";
376 update_sql_part(unread_count, 1) ->
377 1 "unread_count = CASE unread_count WHEN 0 THEN 1 ELSE unread_count END";
378 update_sql_part(archive, Val) when is_boolean(Val) ->
379 2 "archive = ?";
380 update_sql_part(muted_until, Val) when is_integer(Val) ->
381 3 "muted_until = ?".
382
383 %% Query execution
384
385 -spec execute_select_unread_count(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) ->
386 mongoose_rdbms:query_result().
387 execute_select_unread_count(HostType, LUser, LServer, RemBareJID) ->
388
:-(
mongoose_rdbms:execute_successfully(HostType, inbox_select_unread_count,
389 [LUser, LServer, RemBareJID]).
390
391 -spec execute_select_properties(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) ->
392 mongoose_rdbms:query_result().
393 execute_select_properties(HostType, LUser, LServer, RemBareJID) ->
394 30 mongoose_rdbms:execute_successfully(HostType, inbox_select_properties,
395 [LUser, LServer, RemBareJID]).
396
397 -spec execute_reset_unread(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid(), binary() | undefined) ->
398 mongoose_rdbms:query_result().
399 execute_reset_unread(HostType, LUser, LServer, RemBareJID, undefined) ->
400 3 mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread,
401 [LUser, LServer, RemBareJID]);
402 execute_reset_unread(HostType, LUser, LServer, RemBareJID, MsgId) ->
403 51 mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread_msg,
404 [LUser, LServer, RemBareJID, MsgId]).
405
406 -spec execute_delete(mongooseim:host_type(),
407 jid:luser(), jid:lserver(), jid:literal_jid()) ->
408 mongoose_rdbms:query_result().
409 execute_delete(HostType, LUser, LServer, RemBareJID) ->
410 5 mongoose_rdbms:execute_successfully(HostType, inbox_delete_row, [LUser, LServer, RemBareJID]).
411
412 -spec execute_delete(mongooseim:host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result().
413 execute_delete(HostType, LUser, LServer) ->
414 219 mongoose_rdbms:execute_successfully(HostType, inbox_delete, [LUser, LServer]).
415
416 -spec execute_delete_domain(HostType :: mongooseim:host_type(),
417 LServer :: jid:lserver()) ->
418 mongoose_rdbms:query_result().
419 execute_delete_domain(HostType, LServer) ->
420 1 mongoose_rdbms:execute_successfully(HostType, inbox_delete_domain, [LServer]).
421
422 %% Result processing
423
424 -spec decode_row(mongooseim:host_type(), db_return()) -> inbox_res().
425 decode_row(HostType, {Username, Content, Count, Timestamp, Archive, MutedUntil}) ->
426 256 Data = mongoose_rdbms:unescape_binary(HostType, Content),
427 256 BCount = mongoose_rdbms:result_to_integer(Count),
428 256 NumericTimestamp = mongoose_rdbms:result_to_integer(Timestamp),
429 256 BoolArchive = mongoose_rdbms:to_bool(Archive),
430 256 NumericMutedUntil = mongoose_rdbms:result_to_integer(MutedUntil),
431 256 #{remote_jid => Username,
432 msg => Data,
433 unread_count => BCount,
434 timestamp => NumericTimestamp,
435 archive => BoolArchive,
436 muted_until => NumericMutedUntil}.
437
438 -spec check_result_is_expected(_, list()) -> mod_inbox:write_res().
439 check_result_is_expected({updated, Val}, ValList) when is_list(ValList) ->
440 113 case lists:member(Val, ValList) of
441 113 true -> ok;
442
:-(
_ -> {error, {expected_does_not_match, Val, ValList}}
443 end;
444 check_result_is_expected(Result, _) ->
445
:-(
{error, {bad_result, Result}}.
446
447 -spec check_result(_) -> mod_inbox:count_res().
448 check_result({selected, []}) ->
449
:-(
{ok, 0};
450 check_result({selected, [{Val}]}) ->
451
:-(
parse_result(Val);
452 check_result({updated, _, [{Val}]}) ->
453
:-(
parse_result(Val);
454 check_result({updated, _}) ->
455 458 ok;
456 check_result(Result) ->
457
:-(
{error, {bad_result, Result}}.
458
459 parse_result(Value) when is_integer(Value) ->
460
:-(
{ok, Value};
461 parse_result(Value) when is_binary(Value) ->
462
:-(
{ok, binary_to_integer(Value)};
463 parse_result(null) ->
464
:-(
{ok, 0};
465 parse_result(Value) ->
466
:-(
{error, {unknown_result_value_type, Value}}.
Line Hits Source