./ct_report/coverage/mod_inbox_rdbms_async.COVER.html

1 -module(mod_inbox_rdbms_async).
2
3 -include("mod_inbox.hrl").
4 -include("mongoose_logger.hrl").
5
6 -behaviour(mod_inbox_backend).
7 -behaviour(mongoose_aggregator_worker).
8
9 -type task() ::
10 {set_inbox, mod_inbox:entry_key(), content(), pos_integer(), id(), integer()} |
11 {set_inbox_incr_unread, mod_inbox:entry_key(), content(), id(), integer(), Incrs :: pos_integer()} |
12 {remove_inbox_row, mod_inbox:entry_key()} |
13 {reset_unread, mod_inbox:entry_key(), id()}.
14
15 %% API
16 -export([init/2,
17 set_inbox/6,
18 set_inbox_incr_unread/5,
19 reset_unread/3,
20 remove_inbox_row/2,
21 remove_domain/2,
22 clear_inbox/3,
23 get_inbox/4,
24 get_inbox_unread/2,
25 get_entry_properties/2,
26 set_entry_properties/3]).
27 -export([stop/1]).
28
29 %% Worker callbacks
30 -export([request/2, aggregate/3, verify/3]).
31
32 %% Initialisation
33 -spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
34 init(HostType, Opts) ->
35
:-(
AsyncOpts = prepare_pool_opts(Opts),
36
:-(
mod_inbox_rdbms:init(HostType, Opts),
37
:-(
start_pool(HostType, AsyncOpts),
38
:-(
ok.
39
40 stop(HostType) ->
41
:-(
mongoose_async_pools:stop_pool(HostType, inbox).
42
43 prepare_pool_opts(#{async_writer := AsyncOpts}) ->
44
:-(
AsyncOpts#{pool_type => aggregate,
45 request_callback => fun ?MODULE:request/2,
46 aggregate_callback => fun ?MODULE:aggregate/3,
47 verify_callback => fun ?MODULE:verify/3}.
48
49 -spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term().
50 start_pool(HostType, Opts) ->
51
:-(
mongoose_async_pools:start_pool(HostType, inbox, Opts).
52
53 %% Worker callbacks
54 -spec request(task(), mongoose_async_pools:pool_extra()) -> reference().
55 request(Task, _Extra = #{host_type := HostType}) ->
56
:-(
request_one(HostType, Task).
57
58 request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp}) ->
59
:-(
InsertParams = [LUser, LServer, LToBareJid, Content, Count, MsgId, Timestamp],
60
:-(
UpdateParams = [Content, Count, MsgId, Timestamp, false],
61
:-(
UniqueKeyValues = [LUser, LServer, LToBareJid],
62
:-(
rdbms_queries:request_upsert(HostType, inbox_upsert, InsertParams, UpdateParams, UniqueKeyValues);
63
64 request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs}) ->
65
:-(
InsertParams = [LUser, LServer, LToBareJid, Content, Incrs, MsgId, Timestamp],
66
:-(
UpdateParams = [Content, MsgId, Timestamp, false, Incrs],
67
:-(
UniqueKeyValues = [LUser, LServer, LToBareJid],
68
:-(
rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, InsertParams, UpdateParams, UniqueKeyValues);
69
70 request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
71
:-(
mongoose_rdbms:execute_request(HostType, inbox_delete_row, [LUser, LServer, LToBareJid]);
72
73 request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined}) ->
74
:-(
mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid]);
75 request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId}) ->
76
:-(
mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId]).
77
78 -spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}.
79 aggregate(Current, NewTask, _Extra) ->
80
:-(
{ok, aggregate(Current, NewTask)}.
81
82 -spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok.
83 verify(Answer, InboxTask, _Extra) ->
84
:-(
case mod_inbox_rdbms:check_result(Answer) of
85 {error, Reason} ->
86
:-(
{LU, LS, LRem} = element(2, InboxTask),
87
:-(
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason,
88
:-(
from_jid => jid:to_binary({LU, LS}), to_jid => LRem});
89
:-(
_ -> ok
90 end.
91
92 %% async callbacks
93 -spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(),
94 content(), Count :: integer(), id(), Timestamp :: integer()) ->
95 mod_inbox:write_res().
96 set_inbox(HostType, Entry, Content, Count, MsgId, Timestamp) ->
97
:-(
Params = {set_inbox, Entry, Content, Count, MsgId, Timestamp},
98
:-(
mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
99
100 -spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(),
101 Content :: binary(), MsgId :: binary(), Timestamp :: integer()) ->
102 mod_inbox:count_res().
103 set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) ->
104
:-(
Params = {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, 1},
105
:-(
mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
106
107 -spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined) ->
108 mod_inbox:write_res().
109 reset_unread(HostType, Entry, MsgId) ->
110
:-(
Params = {reset_unread, Entry, MsgId},
111
:-(
mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
112
113 -spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res().
114 remove_inbox_row(HostType, Entry) ->
115
:-(
Params = {remove_inbox_row, Entry},
116
:-(
mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
117
118 %% synchronous callbacks
119 -spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) ->
120 get_inbox_res().
121 get_inbox(HostType, LUser, LServer, Params) ->
122
:-(
mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params).
123
124 -spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
125 {ok, integer()}.
126 get_inbox_unread(HostType, Entry) ->
127
:-(
mod_inbox_rdbms:get_inbox_unread(HostType, Entry).
128
129 -spec remove_domain(mongooseim:host_type(), jid:lserver()) -> ok.
130 remove_domain(HostType, LServer) ->
131
:-(
mod_inbox_rdbms:remove_domain(HostType, LServer).
132
133 -spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) ->
134 mod_inbox:write_res().
135 clear_inbox(HostType, LUser, LServer) ->
136
:-(
mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer).
137
138 -spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) ->
139 entry_properties() | nil().
140 get_entry_properties(HostType, Entry) ->
141
:-(
mod_inbox_rdbms:get_entry_properties(HostType, Entry).
142
143 -spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) ->
144 entry_properties() | {error, binary()}.
145 set_entry_properties(HostType, Entry, Properties) ->
146
:-(
mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties).
147
148 -spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task().
149 %%% First task being processed, just take that one
150 aggregate(undefined, Task) ->
151
:-(
Task;
152
153 %%% if new task is remove_inbox, ignore all previous requests and just remove
154 aggregate(_, {remove_inbox_row, Entry}) ->
155
:-(
{remove_inbox_row, Entry};
156
157 %%% if the last task was remove_row, this task should now only be an insert
158 aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _}) ->
159
:-(
OldTask;
160 aggregate({remove_inbox_row, _}, {set_inbox, _, _, _, _, _} = NewTask) ->
161
:-(
NewTask;
162 aggregate({remove_inbox_row, _}, {set_inbox_incr_unread, _, _, _, _, _} = NewTask) ->
163
:-(
NewTask;
164
165 %%% If the last task was a reset_unread,
166 % we prefer explicit resets,
167 % then adhoc newer resets,
168 % then we accumulate inserts
169 %% an undefined means an explicit request to reset, it has priority
170 aggregate({reset_unread, _, _}, {reset_unread, _, undefined} = NewTask) ->
171
:-(
NewTask;
172 %% an undefined means an explicit request to reset, it has priority
173 aggregate({reset_unread, _, undefined} = OldTask, {reset_unread, _, _}) ->
174
:-(
OldTask;
175 %% both are adhoc, we prefer the newer
176 aggregate({reset_unread, _, _}, {reset_unread, _, _} = NewTask) ->
177
:-(
NewTask;
178 aggregate({reset_unread, _, _}, {set_inbox, _, _, _, _, _} = NewTask) ->
179
:-(
NewTask;
180 %% Here `Count` becomes an absolute value instead of an increment
181 aggregate({reset_unread, _, _}, {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) ->
182
:-(
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp};
183
184 %%% If the last task was a set_inbox
185 %% Reset is an explicit reset-to-zero, so do reset the counter
186 aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, undefined}) ->
187
:-(
{set_inbox, Entry, Content, 0, MsgId, Timestamp};
188 %% Reset refers to that same set_inbox
189 aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, MsgId}) ->
190
:-(
{set_inbox, Entry, Content, 0, MsgId, Timestamp};
191 %% Reset refers to some other set_inbox
192 aggregate({set_inbox, _, _, _, _, _} = OldTask, {reset_unread, _, _}) ->
193
:-(
OldTask;
194 aggregate({set_inbox, _, _, Count, _, _, _},
195 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) ->
196
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs};
197
198 %%% If the last task was a set_inbox_incr_unread
199 % we're resetting on this message:
200 aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _}, {reset_unread, _, MsgId}) ->
201
:-(
{set_inbox, Entry, Content, 0, MsgId, Timestamp};
202 aggregate({set_inbox_incr_unread, _, _, _, _, _} = OldTask, {reset_unread, _, _}) ->
203
:-(
OldTask;
204 % prefer newest row, but accumulate increment
205 aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2},
206 {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1}) ->
207
:-(
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2};
208
209 aggregate({set_inbox_incr_unread, _, _, MsgId, _, _}, {set_inbox, _, _, _, MsgId, _} = NewTask) ->
210
:-(
NewTask;
211
212 aggregate(_OldTask, NewTask) ->
213
:-(
NewTask.
Line Hits Source