1 |
|
-module(mod_inbox_rdbms_async). |
2 |
|
|
3 |
|
-include("mod_inbox.hrl"). |
4 |
|
-include("mongoose_logger.hrl"). |
5 |
|
|
6 |
|
-behaviour(mod_inbox_backend). |
7 |
|
-behaviour(mongoose_aggregator_worker). |
8 |
|
|
9 |
|
-type task() :: |
10 |
|
{set_inbox, mod_inbox:entry_key(), content(), pos_integer(), id(), integer()} | |
11 |
|
{set_inbox_incr_unread, mod_inbox:entry_key(), content(), id(), integer(), Incrs :: pos_integer()} | |
12 |
|
{remove_inbox_row, mod_inbox:entry_key()} | |
13 |
|
{reset_unread, mod_inbox:entry_key(), id()}. |
14 |
|
|
15 |
|
%% API |
16 |
|
-export([init/2, |
17 |
|
set_inbox/6, |
18 |
|
set_inbox_incr_unread/5, |
19 |
|
reset_unread/3, |
20 |
|
remove_inbox_row/2, |
21 |
|
remove_domain/2, |
22 |
|
clear_inbox/3, |
23 |
|
get_inbox/4, |
24 |
|
get_inbox_unread/2, |
25 |
|
get_entry_properties/2, |
26 |
|
set_entry_properties/3]). |
27 |
|
-export([stop/1]). |
28 |
|
|
29 |
|
%% Worker callbacks |
30 |
|
-export([request/2, aggregate/3, verify/3]). |
31 |
|
|
32 |
|
%% Initialisation |
33 |
|
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
34 |
|
init(HostType, Opts) -> |
35 |
:-( |
AsyncOpts = prepare_pool_opts(Opts), |
36 |
:-( |
mod_inbox_rdbms:init(HostType, Opts), |
37 |
:-( |
start_pool(HostType, AsyncOpts), |
38 |
:-( |
ok. |
39 |
|
|
40 |
|
stop(HostType) -> |
41 |
:-( |
mongoose_async_pools:stop_pool(HostType, inbox). |
42 |
|
|
43 |
|
prepare_pool_opts(#{async_writer := AsyncOpts}) -> |
44 |
:-( |
AsyncOpts#{pool_type => aggregate, |
45 |
|
request_callback => fun ?MODULE:request/2, |
46 |
|
aggregate_callback => fun ?MODULE:aggregate/3, |
47 |
|
verify_callback => fun ?MODULE:verify/3}. |
48 |
|
|
49 |
|
-spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term(). |
50 |
|
start_pool(HostType, Opts) -> |
51 |
:-( |
mongoose_async_pools:start_pool(HostType, inbox, Opts). |
52 |
|
|
53 |
|
%% Worker callbacks |
54 |
|
-spec request(task(), mongoose_async_pools:pool_extra()) -> reference(). |
55 |
|
request(Task, _Extra = #{host_type := HostType}) -> |
56 |
:-( |
request_one(HostType, Task). |
57 |
|
|
58 |
|
request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp}) -> |
59 |
:-( |
InsertParams = [LUser, LServer, LToBareJid, Content, Count, MsgId, Timestamp], |
60 |
:-( |
UpdateParams = [Content, Count, MsgId, Timestamp, false], |
61 |
:-( |
UniqueKeyValues = [LUser, LServer, LToBareJid], |
62 |
:-( |
rdbms_queries:request_upsert(HostType, inbox_upsert, InsertParams, UpdateParams, UniqueKeyValues); |
63 |
|
|
64 |
|
request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs}) -> |
65 |
:-( |
InsertParams = [LUser, LServer, LToBareJid, Content, Incrs, MsgId, Timestamp], |
66 |
:-( |
UpdateParams = [Content, MsgId, Timestamp, false, Incrs], |
67 |
:-( |
UniqueKeyValues = [LUser, LServer, LToBareJid], |
68 |
:-( |
rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, InsertParams, UpdateParams, UniqueKeyValues); |
69 |
|
|
70 |
|
request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) -> |
71 |
:-( |
mongoose_rdbms:execute_request(HostType, inbox_delete_row, [LUser, LServer, LToBareJid]); |
72 |
|
|
73 |
|
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined}) -> |
74 |
:-( |
mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid]); |
75 |
|
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId}) -> |
76 |
:-( |
mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId]). |
77 |
|
|
78 |
|
-spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}. |
79 |
|
aggregate(Current, NewTask, _Extra) -> |
80 |
:-( |
{ok, aggregate(Current, NewTask)}. |
81 |
|
|
82 |
|
-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok. |
83 |
|
verify(Answer, InboxTask, _Extra) -> |
84 |
:-( |
case mod_inbox_rdbms:check_result(Answer) of |
85 |
|
{error, Reason} -> |
86 |
:-( |
{LU, LS, LRem} = element(2, InboxTask), |
87 |
:-( |
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason, |
88 |
:-( |
from_jid => jid:to_binary({LU, LS}), to_jid => LRem}); |
89 |
:-( |
_ -> ok |
90 |
|
end. |
91 |
|
|
92 |
|
%% async callbacks |
93 |
|
-spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(), |
94 |
|
content(), Count :: integer(), id(), Timestamp :: integer()) -> |
95 |
|
mod_inbox:write_res(). |
96 |
|
set_inbox(HostType, Entry, Content, Count, MsgId, Timestamp) -> |
97 |
:-( |
Params = {set_inbox, Entry, Content, Count, MsgId, Timestamp}, |
98 |
:-( |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
99 |
|
|
100 |
|
-spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(), |
101 |
|
Content :: binary(), MsgId :: binary(), Timestamp :: integer()) -> |
102 |
|
mod_inbox:count_res(). |
103 |
|
set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) -> |
104 |
:-( |
Params = {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, 1}, |
105 |
:-( |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
106 |
|
|
107 |
|
-spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined) -> |
108 |
|
mod_inbox:write_res(). |
109 |
|
reset_unread(HostType, Entry, MsgId) -> |
110 |
:-( |
Params = {reset_unread, Entry, MsgId}, |
111 |
:-( |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
112 |
|
|
113 |
|
-spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res(). |
114 |
|
remove_inbox_row(HostType, Entry) -> |
115 |
:-( |
Params = {remove_inbox_row, Entry}, |
116 |
:-( |
mongoose_async_pools:put_task(HostType, inbox, Entry, Params). |
117 |
|
|
118 |
|
%% synchronous callbacks |
119 |
|
-spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) -> |
120 |
|
get_inbox_res(). |
121 |
|
get_inbox(HostType, LUser, LServer, Params) -> |
122 |
:-( |
mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params). |
123 |
|
|
124 |
|
-spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) -> |
125 |
|
{ok, integer()}. |
126 |
|
get_inbox_unread(HostType, Entry) -> |
127 |
:-( |
mod_inbox_rdbms:get_inbox_unread(HostType, Entry). |
128 |
|
|
129 |
|
-spec remove_domain(mongooseim:host_type(), jid:lserver()) -> ok. |
130 |
|
remove_domain(HostType, LServer) -> |
131 |
:-( |
mod_inbox_rdbms:remove_domain(HostType, LServer). |
132 |
|
|
133 |
|
-spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) -> |
134 |
|
mod_inbox:write_res(). |
135 |
|
clear_inbox(HostType, LUser, LServer) -> |
136 |
:-( |
mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer). |
137 |
|
|
138 |
|
-spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) -> |
139 |
|
entry_properties() | nil(). |
140 |
|
get_entry_properties(HostType, Entry) -> |
141 |
:-( |
mod_inbox_rdbms:get_entry_properties(HostType, Entry). |
142 |
|
|
143 |
|
-spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) -> |
144 |
|
entry_properties() | {error, binary()}. |
145 |
|
set_entry_properties(HostType, Entry, Properties) -> |
146 |
:-( |
mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties). |
147 |
|
|
148 |
|
-spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task(). |
149 |
|
%%% First task being processed, just take that one |
150 |
|
aggregate(undefined, Task) -> |
151 |
:-( |
Task; |
152 |
|
|
153 |
|
%%% if new task is remove_inbox, ignore all previous requests and just remove |
154 |
|
aggregate(_, {remove_inbox_row, Entry}) -> |
155 |
:-( |
{remove_inbox_row, Entry}; |
156 |
|
|
157 |
|
%%% if the last task was remove_row, this task should now only be an insert |
158 |
|
aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _}) -> |
159 |
:-( |
OldTask; |
160 |
|
aggregate({remove_inbox_row, _}, {set_inbox, _, _, _, _, _} = NewTask) -> |
161 |
:-( |
NewTask; |
162 |
|
aggregate({remove_inbox_row, _}, {set_inbox_incr_unread, _, _, _, _, _} = NewTask) -> |
163 |
:-( |
NewTask; |
164 |
|
|
165 |
|
%%% If the last task was a reset_unread, |
166 |
|
% we prefer explicit resets, |
167 |
|
% then adhoc newer resets, |
168 |
|
% then we accumulate inserts |
169 |
|
%% an undefined means an explicit request to reset, it has priority |
170 |
|
aggregate({reset_unread, _, _}, {reset_unread, _, undefined} = NewTask) -> |
171 |
:-( |
NewTask; |
172 |
|
%% an undefined means an explicit request to reset, it has priority |
173 |
|
aggregate({reset_unread, _, undefined} = OldTask, {reset_unread, _, _}) -> |
174 |
:-( |
OldTask; |
175 |
|
%% both are adhoc, we prefer the newer |
176 |
|
aggregate({reset_unread, _, _}, {reset_unread, _, _} = NewTask) -> |
177 |
:-( |
NewTask; |
178 |
|
aggregate({reset_unread, _, _}, {set_inbox, _, _, _, _, _} = NewTask) -> |
179 |
:-( |
NewTask; |
180 |
|
%% Here `Count` becomes an absolute value instead of an increment |
181 |
|
aggregate({reset_unread, _, _}, {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) -> |
182 |
:-( |
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp}; |
183 |
|
|
184 |
|
%%% If the last task was a set_inbox |
185 |
|
%% Reset is an explicit reset-to-zero, so do reset the counter |
186 |
|
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, undefined}) -> |
187 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp}; |
188 |
|
%% Reset refers to that same set_inbox |
189 |
|
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, MsgId}) -> |
190 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp}; |
191 |
|
%% Reset refers to some other set_inbox |
192 |
|
aggregate({set_inbox, _, _, _, _, _} = OldTask, {reset_unread, _, _}) -> |
193 |
:-( |
OldTask; |
194 |
|
aggregate({set_inbox, _, _, Count, _, _, _}, |
195 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) -> |
196 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs}; |
197 |
|
|
198 |
|
%%% If the last task was a set_inbox_incr_unread |
199 |
|
% we're resetting on this message: |
200 |
|
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _}, {reset_unread, _, MsgId}) -> |
201 |
:-( |
{set_inbox, Entry, Content, 0, MsgId, Timestamp}; |
202 |
|
aggregate({set_inbox_incr_unread, _, _, _, _, _} = OldTask, {reset_unread, _, _}) -> |
203 |
:-( |
OldTask; |
204 |
|
% prefer newest row, but accumulate increment |
205 |
|
aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2}, |
206 |
|
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1}) -> |
207 |
:-( |
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2}; |
208 |
|
|
209 |
|
aggregate({set_inbox_incr_unread, _, _, MsgId, _, _}, {set_inbox, _, _, _, MsgId, _} = NewTask) -> |
210 |
:-( |
NewTask; |
211 |
|
|
212 |
|
aggregate(_OldTask, NewTask) -> |
213 |
:-( |
NewTask. |