./ct_report/coverage/mod_inbox_rdbms_async.COVER.html

1 -module(mod_inbox_rdbms_async).
2
3 -include("mod_inbox.hrl").
4 -include("mongoose_logger.hrl").
5
6 -behaviour(mod_inbox_backend).
7 -behaviour(mongoose_aggregator_worker).
8
9 -ifdef(gen_server_request_id).
10 -type request_id() :: gen_server:request_id().
11 -else.
12 -type request_id() :: term().
13 -endif.
14
15 -type box() :: binary().
16 -type task() ::
17 {set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} |
18 {set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} |
19 {remove_inbox_row, mod_inbox:entry_key()} |
20 {reset_unread, mod_inbox:entry_key(), id(), integer()}.
21
22 %% API
23 -export([init/2,
24 set_inbox/6,
25 set_inbox_incr_unread/5,
26 reset_unread/4,
27 remove_inbox_row/2,
28 empty_user_bin/4,
29 empty_domain_bin/3,
30 empty_global_bin/2,
31 remove_domain/2,
32 clear_inbox/3,
33 get_inbox/4,
34 get_inbox_unread/2,
35 get_full_entry/2,
36 get_entry_properties/2,
37 set_entry_properties/3]).
38 -export([stop/1]).
39
40 %% Worker callbacks
41 -export([request/2, aggregate/3, verify/3]).
42
43 %% Initialisation
44 -spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
45 init(HostType, Opts) ->
46 22 AsyncOpts = prepare_pool_opts(Opts),
47 22 mod_inbox_rdbms:init(HostType, Opts),
48 22 prepare_deletes(HostType, Opts),
49 22 start_pool(HostType, AsyncOpts),
50 22 ok.
51
52 stop(HostType) ->
53 22 mongoose_async_pools:stop_pool(HostType, inbox).
54
55 prepare_pool_opts(#{async_writer := AsyncOpts}) ->
56 22 AsyncOpts#{pool_type => aggregate,
57 request_callback => fun ?MODULE:request/2,
58 aggregate_callback => fun ?MODULE:aggregate/3,
59 verify_callback => fun ?MODULE:verify/3}.
60
61 prepare_deletes(_HostType, _Opts) ->
62 22 mongoose_rdbms:prepare(inbox_move_conversation_to_bin, inbox,
63 [luser, lserver, remote_bare_jid],
64 <<"UPDATE inbox SET box='bin'",
65 " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>),
66 22 ok.
67
68 -spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term().
69 start_pool(HostType, Opts) ->
70 22 mongoose_async_pools:start_pool(HostType, inbox, Opts).
71
72 %% Worker callbacks
73 -spec request(task(), mongoose_async_pools:pool_extra()) -> request_id().
74 request(Task, _Extra = #{host_type := HostType}) ->
75 627 request_one(HostType, Task).
76
77 request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) ->
78 151 Content = exml:to_binary(Packet),
79 151 Unique = [LUser, LServer, LToBareJid],
80 151 Update = [MsgId, Box, Content, Count, Timestamp],
81 151 Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp],
82 151 rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique);
83 request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) ->
84 310 Content = exml:to_binary(Packet),
85 310 Unique = [LUser, LServer, LToBareJid],
86 310 Update = [MsgId, Box, Content, Incrs, Timestamp],
87 310 Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp],
88 310 rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique);
89 request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) ->
90 3 mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]);
91 request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) ->
92 113 mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]);
93 request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
94 50 mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]).
95
96 -spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}.
97 aggregate(Current, NewTask, _Extra) ->
98 21 {ok, aggregate(Current, NewTask)}.
99
100 -spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}.
101 verify(Answer, InboxTask, _Extra) ->
102 627 case mod_inbox_rdbms:check_result(Answer) of
103 {error, Reason} ->
104
:-(
{LU, LS, LRem} = element(2, InboxTask),
105
:-(
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason,
106
:-(
from_jid => jid:to_binary({LU, LS}), to_jid => LRem}),
107
:-(
{error, Reason};
108 627 _ -> ok
109 end.
110
111 %% async callbacks
112 -spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(),
113 exml:element(), Count :: integer(), id(), Timestamp :: integer()) ->
114 mod_inbox:write_res().
115 set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) ->
116 137 Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>},
117 137 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
118
119 -spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(),
120 exml:element(), MsgId :: binary(), Timestamp :: integer()) ->
121 mod_inbox:count_res().
122 set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) ->
123 327 Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>},
124 327 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
125
126 -spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) ->
127 mod_inbox:write_res().
128 reset_unread(HostType, Entry, MsgId, TS) ->
129 132 Params = {reset_unread, Entry, MsgId, TS},
130 132 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
131
132 -spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res().
133 remove_inbox_row(HostType, Entry) ->
134 52 Params = {remove_inbox_row, Entry},
135 52 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
136
137 %% synchronous callbacks
138 -spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) ->
139 [mod_inbox:inbox_res()].
140 get_inbox(HostType, LUser, LServer, Params) ->
141 737 mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params).
142
143 -spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
144 {ok, integer()}.
145 get_inbox_unread(HostType, Entry) ->
146
:-(
mod_inbox_rdbms:get_inbox_unread(HostType, Entry).
147
148 -spec remove_domain(mongooseim:host_type(), jid:lserver()) -> term().
149 remove_domain(HostType, LServer) ->
150
:-(
mod_inbox_rdbms:remove_domain(HostType, LServer).
151
152 -spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) ->
153 mod_inbox:write_res().
154 clear_inbox(HostType, LUser, LServer) ->
155 382 mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer).
156
157 -spec get_full_entry(mongooseim:host_type(), mod_inbox:entry_key()) ->
158 inbox_res() | nil().
159 get_full_entry(HostType, Entry) ->
160 1 mod_inbox_rdbms:get_full_entry(HostType, Entry).
161
162 -spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) ->
163 entry_properties() | nil().
164 get_entry_properties(HostType, Entry) ->
165 1 mod_inbox_rdbms:get_entry_properties(HostType, Entry).
166
167 -spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) ->
168 entry_properties() | {error, binary()}.
169 set_entry_properties(HostType, Entry, Properties) ->
170 51 mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties).
171
172 -spec empty_user_bin(HostType :: mongooseim:host_type(),
173 LServer :: jid:lserver(),
174 LUser :: jid:luser(),
175 TS :: integer()) -> non_neg_integer().
176 empty_user_bin(HostType, LServer, LUser, TS) ->
177 8 mod_inbox_rdbms:empty_user_bin(HostType, LServer, LUser, TS).
178
179 -spec empty_domain_bin(HostType :: mongooseim:host_type(),
180 LServer :: jid:lserver(),
181 TS :: integer()) -> non_neg_integer().
182 empty_domain_bin(HostType, LServer, TS) ->
183 7 mod_inbox_rdbms:empty_domain_bin(HostType, LServer, TS).
184
185 -spec empty_global_bin(HostType :: mongooseim:host_type(),
186 TS :: integer()) -> non_neg_integer().
187 empty_global_bin(HostType, TS) ->
188 21 mod_inbox_rdbms:empty_global_bin(HostType, TS).
189
190 -spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task().
191 %%% if new task is remove_row, do the previous with an updated box
192 aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _},
193 {remove_inbox_row, _}) ->
194
:-(
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
195 aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _},
196 {remove_inbox_row, _}) ->
197 2 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
198 aggregate(_, {remove_inbox_row, _} = NewTask) ->
199
:-(
NewTask;
200
201 %%% if the last task was remove_row, this task should now only be an insert
202 aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) ->
203
:-(
OldTask;
204 aggregate({remove_inbox_row, _},
205 {set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) ->
206
:-(
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
207 aggregate({remove_inbox_row, _},
208 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) ->
209
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
210
211 %%% If the last task was a reset_unread,
212 % we prefer explicit resets,
213 % then adhoc newer resets,
214 % then we accumulate inserts
215 %% an undefined means an explicit request to reset, it has priority
216 aggregate({reset_unread, _, _, _}, {reset_unread, _, undefined, _} = NewTask) ->
217
:-(
NewTask;
218 %% an undefined means an explicit request to reset, it has priority
219 aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) ->
220
:-(
OldTask;
221 %% both are adhoc, we prefer the newer
222 aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) ->
223
:-(
NewTask;
224 aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) ->
225
:-(
NewTask;
226 %% Here `Count` becomes an absolute value instead of an increment
227 aggregate({reset_unread, _, _, _},
228 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
229 4 {set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box};
230
231 %%% If the last task was a set_inbox
232 %% Reset is an explicit reset-to-zero, so do reset the counter
233 aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
234 {reset_unread, _, undefined, _}) ->
235
:-(
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
236 %% Reset refers to that same set_inbox
237 aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
238 {reset_unread, _, MsgId, _}) ->
239 1 {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
240 %% Reset refers to some other set_inbox
241 aggregate({set_inbox, _, _, _, _, _, _} = OldTask,
242 {reset_unread, _, _, _}) ->
243
:-(
OldTask;
244 aggregate({set_inbox, _, _, Count, _, _, _, _},
245 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
246
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box};
247
248 %%% If the last task was a set_inbox_incr_unread
249 % we're resetting on this message:
250 aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box},
251 {reset_unread, _, MsgId, _}) ->
252 11 {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
253 aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask,
254 {reset_unread, _, _, _}) ->
255
:-(
OldTask;
256 % prefer newest row, but accumulate increment
257 aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _},
258 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) ->
259 2 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box};
260
261 aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _},
262 {set_inbox, _, _, _, MsgId, _, _} = NewTask) ->
263
:-(
NewTask;
264
265 aggregate(_OldTask, NewTask) ->
266 1 NewTask.
Line Hits Source