1 |
|
-module(mod_inbox_rdbms_async). |
2 |
|
|
3 |
|
-include("mod_inbox.hrl"). |
4 |
|
-include("mongoose_logger.hrl"). |
5 |
|
|
6 |
|
-behaviour(mod_inbox_backend). |
7 |
|
-behaviour(mongoose_aggregator_worker). |
8 |
|
|
9 |
|
-ifdef(gen_server_request_id). |
10 |
|
-type request_id() :: gen_server:request_id(). |
11 |
|
-else. |
12 |
|
-type request_id() :: term(). |
13 |
|
-endif. |
14 |
|
|
15 |
|
-type box() :: binary(). |
16 |
|
-type task() :: |
17 |
|
{set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} | |
18 |
|
{set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} | |
19 |
|
{remove_inbox_row, mod_inbox:entry_key()} | |
20 |
|
{reset_unread, mod_inbox:entry_key(), id(), integer()}. |
21 |
|
|
22 |
|
%% API |
23 |
|
-export([init/2, |
24 |
|
set_inbox/6, |
25 |
|
set_inbox_incr_unread/5, |
26 |
|
reset_unread/4, |
27 |
|
remove_inbox_row/2, |
28 |
|
empty_user_bin/4, |
29 |
|
empty_domain_bin/3, |
30 |
|
empty_global_bin/2, |
31 |
|
remove_domain/2, |
32 |
|
clear_inbox/3, |
33 |
|
get_inbox/4, |
34 |
|
get_inbox_unread/2, |
35 |
|
get_full_entry/2, |
36 |
|
get_entry_properties/2, |
37 |
|
set_entry_properties/3]). |
38 |
|
-export([stop/1]). |
39 |
|
|
40 |
|
%% Worker callbacks |
41 |
|
-export([request/2, aggregate/3, verify/3]). |
42 |
|
|
43 |
|
%% Initialisation |
44 |
|
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
45 |
|
init(HostType, Opts) -> |
46 |
24 |
AsyncOpts = prepare_pool_opts(Opts), |
47 |
24 |
mod_inbox_rdbms:init(HostType, Opts), |
48 |
24 |
prepare_deletes(HostType, Opts), |
49 |
24 |
start_pool(HostType, AsyncOpts), |
50 |
24 |
ok. |
51 |
|
|
52 |
|
stop(HostType) -> |
53 |
24 |
mongoose_async_pools:stop_pool(HostType, inbox). |
54 |
|
|
55 |
|
prepare_pool_opts(#{async_writer := AsyncOpts}) -> |
56 |
24 |
AsyncOpts#{pool_type => aggregate, |
57 |
|
request_callback => fun ?MODULE:request/2, |
58 |
|
aggregate_callback => fun ?MODULE:aggregate/3, |
59 |
|
verify_callback => fun ?MODULE:verify/3}. |
60 |
|
|
61 |
|
prepare_deletes(_HostType, _Opts) -> |
62 |
24 |
mongoose_rdbms:prepare(inbox_move_conversation_to_bin, inbox, |
63 |
|
[luser, lserver, remote_bare_jid], |
64 |
|
<<"UPDATE inbox SET box='bin'", |
65 |
|
" WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>), |
66 |
24 |
ok. |
67 |
|
|
68 |
|
-spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term(). |
69 |
|
start_pool(HostType, Opts) -> |
70 |
24 |
mongoose_async_pools:start_pool(HostType, inbox, Opts). |
71 |
|
|
72 |
|
%% Worker callbacks |
73 |
|
-spec request(task(), mongoose_async_pools:pool_extra()) -> request_id(). |
74 |
|
request(Task, _Extra = #{host_type := HostType}) -> |
75 |
627 |
request_one(HostType, Task). |
76 |
|
|
77 |
|
request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) -> |
78 |
149 |
Content = exml:to_binary(Packet), |
79 |
149 |
Unique = [LUser, LServer, LToBareJid], |
80 |
149 |
Update = [MsgId, Box, Content, Count, Timestamp], |
81 |
149 |
Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp], |
82 |
149 |
rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique); |
83 |
|
request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) -> |
84 |
308 |
Content = exml:to_binary(Packet), |
85 |
308 |
Unique = [LUser, LServer, LToBareJid], |
86 |
308 |
Update = [MsgId, Box, Content, Incrs, Timestamp], |
87 |
308 |
Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp], |
88 |
308 |
rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique); |
89 |
|
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) -> |
90 |
3 |
mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]); |
91 |
|
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) -> |
92 |
116 |
mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]); |
93 |
|
request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) -> |
94 |
51 |
mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]). |
95 |
|
|
96 |
|
-spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}. |
97 |
|
aggregate(Current, NewTask, _Extra) -> |
98 |
19 |
{ok, aggregate(Current, NewTask)}. |
99 |
|
|
100 |
|
-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}. |
101 |
|
verify(Answer, InboxTask, _Extra) -> |
102 |
627 |
case mod_inbox_rdbms:check_result(Answer) of |
103 |
|
{error, Reason} -> |
104 |
:-( |
{LU, LS, LRem} = element(2, InboxTask), |
105 |
:-( |
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason, |
106 |
:-( |
from_jid => jid:to_binary({LU, LS}), to_jid => LRem}), |
107 |
:-( |
{error, Reason}; |
108 |
627 |
_ -> ok |
109 |
|
end. |
110 |
|
|
111 |
|
%% async callbacks |
112 |
|
-spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(), |
113 |
|
exml:element(), Count :: integer(), id(), Timestamp :: integer()) -> |
114 |
|
mod_inbox:write_res(). |
115 |
|
set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) -> |
116 |
137 |
Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>}, |
117 |
137 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
118 |
|
|
119 |
|
-spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(), |
120 |
|
exml:element(), MsgId :: binary(), Timestamp :: integer()) -> |
121 |
|
mod_inbox:count_res(). |
122 |
|
set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) -> |
123 |
325 |
Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>}, |
124 |
325 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
125 |
|
|
126 |
|
-spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) -> |
127 |
|
mod_inbox:write_res(). |
128 |
|
reset_unread(HostType, Entry, MsgId, TS) -> |
129 |
132 |
Params = {reset_unread, Entry, MsgId, TS}, |
130 |
132 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
131 |
|
|
132 |
|
-spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res(). |
133 |
|
remove_inbox_row(HostType, Entry) -> |
134 |
52 |
Params = {remove_inbox_row, Entry}, |
135 |
52 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
136 |
|
|
137 |
|
%% synchronous callbacks |
138 |
|
-spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) -> |
139 |
|
[mod_inbox:inbox_res()]. |
140 |
|
get_inbox(HostType, LUser, LServer, Params) -> |
141 |
728 |
mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params). |
142 |
|
|
143 |
|
-spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) -> |
144 |
|
{ok, integer()}. |
145 |
|
get_inbox_unread(HostType, Entry) -> |
146 |
:-( |
mod_inbox_rdbms:get_inbox_unread(HostType, Entry). |
147 |
|
|
148 |
|
-spec remove_domain(mongooseim:host_type(), jid:lserver()) -> term(). |
149 |
|
remove_domain(HostType, LServer) -> |
150 |
:-( |
mod_inbox_rdbms:remove_domain(HostType, LServer). |
151 |
|
|
152 |
|
-spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) -> |
153 |
|
mod_inbox:write_res(). |
154 |
|
clear_inbox(HostType, LUser, LServer) -> |
155 |
382 |
mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer). |
156 |
|
|
157 |
|
-spec get_full_entry(mongooseim:host_type(), mod_inbox:entry_key()) -> |
158 |
|
inbox_res() | nil(). |
159 |
|
get_full_entry(HostType, Entry) -> |
160 |
1 |
mod_inbox_rdbms:get_full_entry(HostType, Entry). |
161 |
|
|
162 |
|
-spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) -> |
163 |
|
entry_properties() | nil(). |
164 |
|
get_entry_properties(HostType, Entry) -> |
165 |
1 |
mod_inbox_rdbms:get_entry_properties(HostType, Entry). |
166 |
|
|
167 |
|
-spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) -> |
168 |
|
entry_properties() | {error, binary()}. |
169 |
|
set_entry_properties(HostType, Entry, Properties) -> |
170 |
51 |
mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties). |
171 |
|
|
172 |
|
-spec empty_user_bin(HostType :: mongooseim:host_type(), |
173 |
|
LServer :: jid:lserver(), |
174 |
|
LUser :: jid:luser(), |
175 |
|
TS :: integer()) -> non_neg_integer(). |
176 |
|
empty_user_bin(HostType, LServer, LUser, TS) -> |
177 |
8 |
mod_inbox_rdbms:empty_user_bin(HostType, LServer, LUser, TS). |
178 |
|
|
179 |
|
-spec empty_domain_bin(HostType :: mongooseim:host_type(), |
180 |
|
LServer :: jid:lserver(), |
181 |
|
TS :: integer()) -> non_neg_integer(). |
182 |
|
empty_domain_bin(HostType, LServer, TS) -> |
183 |
7 |
mod_inbox_rdbms:empty_domain_bin(HostType, LServer, TS). |
184 |
|
|
185 |
|
-spec empty_global_bin(HostType :: mongooseim:host_type(), |
186 |
|
TS :: integer()) -> non_neg_integer(). |
187 |
|
empty_global_bin(HostType, TS) -> |
188 |
20 |
mod_inbox_rdbms:empty_global_bin(HostType, TS). |
189 |
|
|
190 |
|
-spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task(). |
191 |
|
%%% if new task is remove_row, do the previous with an updated box |
192 |
|
aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _}, |
193 |
|
{remove_inbox_row, _}) -> |
194 |
:-( |
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>}; |
195 |
|
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}, |
196 |
|
{remove_inbox_row, _}) -> |
197 |
1 |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>}; |
198 |
|
aggregate(_, {remove_inbox_row, _} = NewTask) -> |
199 |
:-( |
NewTask; |
200 |
|
|
201 |
|
%%% if the last task was remove_row, this task should now only be an insert |
202 |
|
aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) -> |
203 |
:-( |
OldTask; |
204 |
|
aggregate({remove_inbox_row, _}, |
205 |
|
{set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) -> |
206 |
:-( |
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>}; |
207 |
|
aggregate({remove_inbox_row, _}, |
208 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) -> |
209 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>}; |
210 |
|
|
211 |
|
%%% If the last task was a reset_unread, |
212 |
|
% we prefer explicit resets, |
213 |
|
% then adhoc newer resets, |
214 |
|
% then we accumulate inserts |
215 |
|
%% an undefined means an explicit request to reset, it has priority |
216 |
|
aggregate({reset_unread, _, _, _}, {reset_unread, _, undefined, _} = NewTask) -> |
217 |
:-( |
NewTask; |
218 |
|
%% an undefined means an explicit request to reset, it has priority |
219 |
|
aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) -> |
220 |
:-( |
OldTask; |
221 |
|
%% both are adhoc, we prefer the newer |
222 |
|
aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) -> |
223 |
:-( |
NewTask; |
224 |
|
aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) -> |
225 |
:-( |
NewTask; |
226 |
|
%% Here `Count` becomes an absolute value instead of an increment |
227 |
|
aggregate({reset_unread, _, _, _}, |
228 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) -> |
229 |
:-( |
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box}; |
230 |
|
|
231 |
|
%%% If the last task was a set_inbox |
232 |
|
%% Reset is an explicit reset-to-zero, so do reset the counter |
233 |
|
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box}, |
234 |
|
{reset_unread, _, undefined, _}) -> |
235 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; |
236 |
|
%% Reset refers to that same set_inbox |
237 |
|
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box}, |
238 |
|
{reset_unread, _, MsgId, _}) -> |
239 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; |
240 |
|
%% Reset refers to some other set_inbox |
241 |
|
aggregate({set_inbox, _, _, _, _, _, _} = OldTask, |
242 |
|
{reset_unread, _, _, _}) -> |
243 |
:-( |
OldTask; |
244 |
|
aggregate({set_inbox, _, _, Count, _, _, _, _}, |
245 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) -> |
246 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box}; |
247 |
|
|
248 |
|
%%% If the last task was a set_inbox_incr_unread |
249 |
|
% we're resetting on this message: |
250 |
|
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box}, |
251 |
|
{reset_unread, _, MsgId, _}) -> |
252 |
13 |
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; |
253 |
|
aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask, |
254 |
|
{reset_unread, _, _, _}) -> |
255 |
:-( |
OldTask; |
256 |
|
% prefer newest row, but accumulate increment |
257 |
|
aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _}, |
258 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) -> |
259 |
4 |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box}; |
260 |
|
|
261 |
|
aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _}, |
262 |
|
{set_inbox, _, _, _, MsgId, _, _} = NewTask) -> |
263 |
:-( |
NewTask; |
264 |
|
|
265 |
|
aggregate(_OldTask, NewTask) -> |
266 |
1 |
NewTask. |