./ct_report/coverage/mod_inbox_rdbms_async.COVER.html

1 -module(mod_inbox_rdbms_async).
2
3 -include("mod_inbox.hrl").
4 -include("mongoose_logger.hrl").
5
6 -behaviour(mod_inbox_backend).
7 -behaviour(mongoose_aggregator_worker).
8
9 -type box() :: binary().
10 -type task() ::
11 {set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} |
12 {set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} |
13 {remove_inbox_row, mod_inbox:entry_key()} |
14 {reset_unread, mod_inbox:entry_key(), id(), integer()}.
15
16 %% API
17 -export([init/2,
18 set_inbox/6,
19 set_inbox_incr_unread/5,
20 reset_unread/4,
21 remove_inbox_row/2,
22 empty_user_bin/4,
23 empty_global_bin/2,
24 remove_domain/2,
25 clear_inbox/3,
26 get_inbox/4,
27 get_inbox_unread/2,
28 get_full_entry/2,
29 get_entry_properties/2,
30 set_entry_properties/3]).
31 -export([stop/1]).
32
33 %% Worker callbacks
34 -export([request/2, aggregate/3, verify/3]).
35
36 %% Initialisation
37 -spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
38 init(HostType, Opts) ->
39 20 AsyncOpts = prepare_pool_opts(Opts),
40 20 mod_inbox_rdbms:init(HostType, Opts),
41 20 prepare_deletes(HostType, Opts),
42 20 start_pool(HostType, AsyncOpts),
43 20 ok.
44
45 stop(HostType) ->
46 20 mongoose_async_pools:stop_pool(HostType, inbox).
47
48 prepare_pool_opts(#{async_writer := AsyncOpts}) ->
49 20 AsyncOpts#{pool_type => aggregate,
50 request_callback => fun ?MODULE:request/2,
51 aggregate_callback => fun ?MODULE:aggregate/3,
52 verify_callback => fun ?MODULE:verify/3}.
53
54 prepare_deletes(_HostType, _Opts) ->
55 20 mongoose_rdbms:prepare(inbox_move_conversation_to_bin, inbox,
56 [luser, lserver, remote_bare_jid],
57 <<"UPDATE inbox SET box='bin'",
58 " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>),
59 20 ok.
60
61 -spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term().
62 start_pool(HostType, Opts) ->
63 20 mongoose_async_pools:start_pool(HostType, inbox, Opts).
64
65 %% Worker callbacks
66 -spec request(task(), mongoose_async_pools:pool_extra()) -> reference().
67 request(Task, _Extra = #{host_type := HostType}) ->
68 393 request_one(HostType, Task).
69
70 request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) ->
71 126 Content = exml:to_binary(Packet),
72 126 Unique = [LUser, LServer, LToBareJid],
73 126 Update = [MsgId, Box, Content, Count, Timestamp],
74 126 Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp],
75 126 rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique);
76 request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) ->
77 194 Content = exml:to_binary(Packet),
78 194 Unique = [LUser, LServer, LToBareJid],
79 194 Update = [MsgId, Box, Content, Incrs, Timestamp],
80 194 Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp],
81 194 rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique);
82 request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) ->
83 3 mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]);
84 request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) ->
85 58 mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]);
86 request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
87 12 mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]).
88
89 -spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}.
90 aggregate(Current, NewTask, _Extra) ->
91 19 {ok, aggregate(Current, NewTask)}.
92
93 -spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok.
94 verify(Answer, InboxTask, _Extra) ->
95 393 case mod_inbox_rdbms:check_result(Answer) of
96 {error, Reason} ->
97
:-(
{LU, LS, LRem} = element(2, InboxTask),
98
:-(
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason,
99
:-(
from_jid => jid:to_binary({LU, LS}), to_jid => LRem});
100 393 _ -> ok
101 end.
102
103 %% async callbacks
104 -spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(),
105 exml:element(), Count :: integer(), id(), Timestamp :: integer()) ->
106 mod_inbox:write_res().
107 set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) ->
108 119 Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>},
109 119 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
110
111 -spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(),
112 exml:element(), MsgId :: binary(), Timestamp :: integer()) ->
113 mod_inbox:count_res().
114 set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) ->
115 209 Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>},
116 209 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
117
118 -spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) ->
119 mod_inbox:write_res().
120 reset_unread(HostType, Entry, MsgId, TS) ->
121 72 Params = {reset_unread, Entry, MsgId, TS},
122 72 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
123
124 -spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res().
125 remove_inbox_row(HostType, Entry) ->
126 12 Params = {remove_inbox_row, Entry},
127 12 mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
128
129 %% synchronous callbacks
130 -spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) ->
131 [mod_inbox:inbox_res()].
132 get_inbox(HostType, LUser, LServer, Params) ->
133 451 mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params).
134
135 -spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
136 {ok, integer()}.
137 get_inbox_unread(HostType, Entry) ->
138
:-(
mod_inbox_rdbms:get_inbox_unread(HostType, Entry).
139
140 -spec remove_domain(mongooseim:host_type(), jid:lserver()) -> ok.
141 remove_domain(HostType, LServer) ->
142
:-(
mod_inbox_rdbms:remove_domain(HostType, LServer).
143
144 -spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) ->
145 mod_inbox:write_res().
146 clear_inbox(HostType, LUser, LServer) ->
147 293 mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer).
148
149 -spec get_full_entry(mongooseim:host_type(), mod_inbox:entry_key()) ->
150 inbox_res() | nil().
151 get_full_entry(HostType, Entry) ->
152 1 mod_inbox_rdbms:get_full_entry(HostType, Entry).
153
154 -spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) ->
155 entry_properties() | nil().
156 get_entry_properties(HostType, Entry) ->
157 1 mod_inbox_rdbms:get_entry_properties(HostType, Entry).
158
159 -spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) ->
160 entry_properties() | {error, binary()}.
161 set_entry_properties(HostType, Entry, Properties) ->
162 51 mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties).
163
164 -spec empty_user_bin(HostType :: mongooseim:host_type(),
165 LServer :: jid:lserver(),
166 LUser :: jid:luser(),
167 TS :: integer()) -> non_neg_integer().
168 empty_user_bin(HostType, LServer, LUser, TS) ->
169 2 mod_inbox_rdbms:empty_user_bin(HostType, LServer, LUser, TS).
170
171 -spec empty_global_bin(HostType :: mongooseim:host_type(),
172 TS :: integer()) -> non_neg_integer().
173 empty_global_bin(HostType, TS) ->
174 111 mod_inbox_rdbms:empty_global_bin(HostType, TS).
175
176 -spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task().
177 %%% if new task is remove_row, do the previous with an updated box
178 aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _},
179 {remove_inbox_row, _}) ->
180
:-(
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
181 aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _},
182 {remove_inbox_row, _}) ->
183
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
184 aggregate(_, {remove_inbox_row, _} = NewTask) ->
185
:-(
NewTask;
186
187 %%% if the last task was remove_row, this task should now only be an insert
188 aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) ->
189
:-(
OldTask;
190 aggregate({remove_inbox_row, _},
191 {set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) ->
192
:-(
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
193 aggregate({remove_inbox_row, _},
194 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) ->
195
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
196
197 %%% If the last task was a reset_unread,
198 % we prefer explicit resets,
199 % then adhoc newer resets,
200 % then we accumulate inserts
201 %% an undefined means an explicit request to reset, it has priority
202 aggregate({reset_unread, _, _, _}, {reset_unread, _, undefined, _} = NewTask) ->
203
:-(
NewTask;
204 %% an undefined means an explicit request to reset, it has priority
205 aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) ->
206
:-(
OldTask;
207 %% both are adhoc, we prefer the newer
208 aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) ->
209
:-(
NewTask;
210 aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) ->
211
:-(
NewTask;
212 %% Here `Count` becomes an absolute value instead of an increment
213 aggregate({reset_unread, _, _, _},
214 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
215
:-(
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box};
216
217 %%% If the last task was a set_inbox
218 %% Reset is an explicit reset-to-zero, so do reset the counter
219 aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
220 {reset_unread, _, undefined, _}) ->
221
:-(
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
222 %% Reset refers to that same set_inbox
223 aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
224 {reset_unread, _, MsgId, _}) ->
225
:-(
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
226 %% Reset refers to some other set_inbox
227 aggregate({set_inbox, _, _, _, _, _, _} = OldTask,
228 {reset_unread, _, _, _}) ->
229
:-(
OldTask;
230 aggregate({set_inbox, _, _, Count, _, _, _, _},
231 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
232
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box};
233
234 %%% If the last task was a set_inbox_incr_unread
235 % we're resetting on this message:
236 aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box},
237 {reset_unread, _, MsgId, _}) ->
238 11 {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
239 aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask,
240 {reset_unread, _, _, _}) ->
241
:-(
OldTask;
242 % prefer newest row, but accumulate increment
243 aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _},
244 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) ->
245 4 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box};
246
247 aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _},
248 {set_inbox, _, _, _, MsgId, _, _} = NewTask) ->
249
:-(
NewTask;
250
251 aggregate(_OldTask, NewTask) ->
252 4 NewTask.
Line Hits Source