1 |
|
-module(mod_inbox_rdbms_async). |
2 |
|
|
3 |
|
-include("mod_inbox.hrl"). |
4 |
|
-include("mongoose_logger.hrl"). |
5 |
|
|
6 |
|
-behaviour(mod_inbox_backend). |
7 |
|
-behaviour(mongoose_aggregator_worker). |
8 |
|
|
9 |
|
-type box() :: binary(). |
10 |
|
-type task() :: |
11 |
|
{set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} | |
12 |
|
{set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} | |
13 |
|
{remove_inbox_row, mod_inbox:entry_key()} | |
14 |
|
{reset_unread, mod_inbox:entry_key(), id(), integer()}. |
15 |
|
|
16 |
|
%% API |
17 |
|
-export([init/2, |
18 |
|
set_inbox/6, |
19 |
|
set_inbox_incr_unread/5, |
20 |
|
reset_unread/4, |
21 |
|
remove_inbox_row/2, |
22 |
|
empty_user_bin/4, |
23 |
|
empty_global_bin/2, |
24 |
|
remove_domain/2, |
25 |
|
clear_inbox/3, |
26 |
|
get_inbox/4, |
27 |
|
get_inbox_unread/2, |
28 |
|
get_full_entry/2, |
29 |
|
get_entry_properties/2, |
30 |
|
set_entry_properties/3]). |
31 |
|
-export([stop/1]). |
32 |
|
|
33 |
|
%% Worker callbacks |
34 |
|
-export([request/2, aggregate/3, verify/3]). |
35 |
|
|
36 |
|
%% Initialisation |
37 |
|
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
38 |
|
init(HostType, Opts) -> |
39 |
20 |
AsyncOpts = prepare_pool_opts(Opts), |
40 |
20 |
mod_inbox_rdbms:init(HostType, Opts), |
41 |
20 |
prepare_deletes(HostType, Opts), |
42 |
20 |
start_pool(HostType, AsyncOpts), |
43 |
20 |
ok. |
44 |
|
|
45 |
|
stop(HostType) -> |
46 |
20 |
mongoose_async_pools:stop_pool(HostType, inbox). |
47 |
|
|
48 |
|
prepare_pool_opts(#{async_writer := AsyncOpts}) -> |
49 |
20 |
AsyncOpts#{pool_type => aggregate, |
50 |
|
request_callback => fun ?MODULE:request/2, |
51 |
|
aggregate_callback => fun ?MODULE:aggregate/3, |
52 |
|
verify_callback => fun ?MODULE:verify/3}. |
53 |
|
|
54 |
|
prepare_deletes(_HostType, _Opts) -> |
55 |
20 |
mongoose_rdbms:prepare(inbox_move_conversation_to_bin, inbox, |
56 |
|
[luser, lserver, remote_bare_jid], |
57 |
|
<<"UPDATE inbox SET box='bin'", |
58 |
|
" WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>), |
59 |
20 |
ok. |
60 |
|
|
61 |
|
-spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term(). |
62 |
|
start_pool(HostType, Opts) -> |
63 |
20 |
mongoose_async_pools:start_pool(HostType, inbox, Opts). |
64 |
|
|
65 |
|
%% Worker callbacks |
66 |
|
-spec request(task(), mongoose_async_pools:pool_extra()) -> reference(). |
67 |
|
request(Task, _Extra = #{host_type := HostType}) -> |
68 |
393 |
request_one(HostType, Task). |
69 |
|
|
70 |
|
request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) -> |
71 |
126 |
Content = exml:to_binary(Packet), |
72 |
126 |
Unique = [LUser, LServer, LToBareJid], |
73 |
126 |
Update = [MsgId, Box, Content, Count, Timestamp], |
74 |
126 |
Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp], |
75 |
126 |
rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique); |
76 |
|
request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) -> |
77 |
194 |
Content = exml:to_binary(Packet), |
78 |
194 |
Unique = [LUser, LServer, LToBareJid], |
79 |
194 |
Update = [MsgId, Box, Content, Incrs, Timestamp], |
80 |
194 |
Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp], |
81 |
194 |
rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique); |
82 |
|
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) -> |
83 |
3 |
mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]); |
84 |
|
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) -> |
85 |
58 |
mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]); |
86 |
|
request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) -> |
87 |
12 |
mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]). |
88 |
|
|
89 |
|
-spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}. |
90 |
|
aggregate(Current, NewTask, _Extra) -> |
91 |
19 |
{ok, aggregate(Current, NewTask)}. |
92 |
|
|
93 |
|
-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok. |
94 |
|
verify(Answer, InboxTask, _Extra) -> |
95 |
393 |
case mod_inbox_rdbms:check_result(Answer) of |
96 |
|
{error, Reason} -> |
97 |
:-( |
{LU, LS, LRem} = element(2, InboxTask), |
98 |
:-( |
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason, |
99 |
:-( |
from_jid => jid:to_binary({LU, LS}), to_jid => LRem}); |
100 |
393 |
_ -> ok |
101 |
|
end. |
102 |
|
|
103 |
|
%% async callbacks |
104 |
|
-spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(), |
105 |
|
exml:element(), Count :: integer(), id(), Timestamp :: integer()) -> |
106 |
|
mod_inbox:write_res(). |
107 |
|
set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) -> |
108 |
119 |
Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>}, |
109 |
119 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
110 |
|
|
111 |
|
-spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(), |
112 |
|
exml:element(), MsgId :: binary(), Timestamp :: integer()) -> |
113 |
|
mod_inbox:count_res(). |
114 |
|
set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) -> |
115 |
209 |
Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>}, |
116 |
209 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
117 |
|
|
118 |
|
-spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) -> |
119 |
|
mod_inbox:write_res(). |
120 |
|
reset_unread(HostType, Entry, MsgId, TS) -> |
121 |
72 |
Params = {reset_unread, Entry, MsgId, TS}, |
122 |
72 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
123 |
|
|
124 |
|
-spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res(). |
125 |
|
remove_inbox_row(HostType, Entry) -> |
126 |
12 |
Params = {remove_inbox_row, Entry}, |
127 |
12 |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
128 |
|
|
129 |
|
%% synchronous callbacks |
130 |
|
-spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) -> |
131 |
|
[mod_inbox:inbox_res()]. |
132 |
|
get_inbox(HostType, LUser, LServer, Params) -> |
133 |
451 |
mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params). |
134 |
|
|
135 |
|
-spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) -> |
136 |
|
{ok, integer()}. |
137 |
|
get_inbox_unread(HostType, Entry) -> |
138 |
:-( |
mod_inbox_rdbms:get_inbox_unread(HostType, Entry). |
139 |
|
|
140 |
|
-spec remove_domain(mongooseim:host_type(), jid:lserver()) -> ok. |
141 |
|
remove_domain(HostType, LServer) -> |
142 |
:-( |
mod_inbox_rdbms:remove_domain(HostType, LServer). |
143 |
|
|
144 |
|
-spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) -> |
145 |
|
mod_inbox:write_res(). |
146 |
|
clear_inbox(HostType, LUser, LServer) -> |
147 |
293 |
mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer). |
148 |
|
|
149 |
|
-spec get_full_entry(mongooseim:host_type(), mod_inbox:entry_key()) -> |
150 |
|
inbox_res() | nil(). |
151 |
|
get_full_entry(HostType, Entry) -> |
152 |
1 |
mod_inbox_rdbms:get_full_entry(HostType, Entry). |
153 |
|
|
154 |
|
-spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) -> |
155 |
|
entry_properties() | nil(). |
156 |
|
get_entry_properties(HostType, Entry) -> |
157 |
1 |
mod_inbox_rdbms:get_entry_properties(HostType, Entry). |
158 |
|
|
159 |
|
-spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) -> |
160 |
|
entry_properties() | {error, binary()}. |
161 |
|
set_entry_properties(HostType, Entry, Properties) -> |
162 |
51 |
mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties). |
163 |
|
|
164 |
|
-spec empty_user_bin(HostType :: mongooseim:host_type(), |
165 |
|
LServer :: jid:lserver(), |
166 |
|
LUser :: jid:luser(), |
167 |
|
TS :: integer()) -> non_neg_integer(). |
168 |
|
empty_user_bin(HostType, LServer, LUser, TS) -> |
169 |
2 |
mod_inbox_rdbms:empty_user_bin(HostType, LServer, LUser, TS). |
170 |
|
|
171 |
|
-spec empty_global_bin(HostType :: mongooseim:host_type(), |
172 |
|
TS :: integer()) -> non_neg_integer(). |
173 |
|
empty_global_bin(HostType, TS) -> |
174 |
111 |
mod_inbox_rdbms:empty_global_bin(HostType, TS). |
175 |
|
|
176 |
|
-spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task(). |
177 |
|
%%% if new task is remove_row, do the previous with an updated box |
178 |
|
aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _}, |
179 |
|
{remove_inbox_row, _}) -> |
180 |
:-( |
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>}; |
181 |
|
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}, |
182 |
|
{remove_inbox_row, _}) -> |
183 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>}; |
184 |
|
aggregate(_, {remove_inbox_row, _} = NewTask) -> |
185 |
:-( |
NewTask; |
186 |
|
|
187 |
|
%%% if the last task was remove_row, this task should now only be an insert |
188 |
|
aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) -> |
189 |
:-( |
OldTask; |
190 |
|
aggregate({remove_inbox_row, _}, |
191 |
|
{set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) -> |
192 |
:-( |
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>}; |
193 |
|
aggregate({remove_inbox_row, _}, |
194 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) -> |
195 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>}; |
196 |
|
|
197 |
|
%%% If the last task was a reset_unread, |
198 |
|
% we prefer explicit resets, |
199 |
|
% then adhoc newer resets, |
200 |
|
% then we accumulate inserts |
201 |
|
%% an undefined means an explicit request to reset, it has priority |
202 |
|
aggregate({reset_unread, _, _, _}, {reset_unread, _, undefined, _} = NewTask) -> |
203 |
:-( |
NewTask; |
204 |
|
%% an undefined means an explicit request to reset, it has priority |
205 |
|
aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) -> |
206 |
:-( |
OldTask; |
207 |
|
%% both are adhoc, we prefer the newer |
208 |
|
aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) -> |
209 |
:-( |
NewTask; |
210 |
|
aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) -> |
211 |
:-( |
NewTask; |
212 |
|
%% Here `Count` becomes an absolute value instead of an increment |
213 |
|
aggregate({reset_unread, _, _, _}, |
214 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) -> |
215 |
:-( |
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box}; |
216 |
|
|
217 |
|
%%% If the last task was a set_inbox |
218 |
|
%% Reset is an explicit reset-to-zero, so do reset the counter |
219 |
|
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box}, |
220 |
|
{reset_unread, _, undefined, _}) -> |
221 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; |
222 |
|
%% Reset refers to that same set_inbox |
223 |
|
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box}, |
224 |
|
{reset_unread, _, MsgId, _}) -> |
225 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; |
226 |
|
%% Reset refers to some other set_inbox |
227 |
|
aggregate({set_inbox, _, _, _, _, _, _} = OldTask, |
228 |
|
{reset_unread, _, _, _}) -> |
229 |
:-( |
OldTask; |
230 |
|
aggregate({set_inbox, _, _, Count, _, _, _, _}, |
231 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) -> |
232 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box}; |
233 |
|
|
234 |
|
%%% If the last task was a set_inbox_incr_unread |
235 |
|
% we're resetting on this message: |
236 |
|
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box}, |
237 |
|
{reset_unread, _, MsgId, _}) -> |
238 |
11 |
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; |
239 |
|
aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask, |
240 |
|
{reset_unread, _, _, _}) -> |
241 |
:-( |
OldTask; |
242 |
|
% prefer newest row, but accumulate increment |
243 |
|
aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _}, |
244 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) -> |
245 |
4 |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box}; |
246 |
|
|
247 |
|
aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _}, |
248 |
|
{set_inbox, _, _, _, MsgId, _, _} = NewTask) -> |
249 |
:-( |
NewTask; |
250 |
|
|
251 |
|
aggregate(_OldTask, NewTask) -> |
252 |
4 |
NewTask. |