1 |
%%%---------------------------------------------------------------------- |
2 |
%%% File : mod_last.erl |
3 |
%%% Author : MichaĆ Piotrowski <michal.piotrowski@erlang-solutions.com> |
4 |
%%% Purpose : mod_last rdbms backend (XEP-0012) |
5 |
%%% |
6 |
%%% |
7 |
%%% ejabberd, Copyright (C) 2002-2014 ProcessOne |
8 |
%%% MongooseIM, Copyright (C) 2014 Erlang Solutions Ltd. |
9 |
%%% |
10 |
%%%---------------------------------------------------------------------- |
11 |
12 |
13 |
-module(mod_last_rdbms). |
14 |
15 |
-behaviour(mod_last_backend). |
16 |
17 |
-include("mongoose.hrl"). |
18 |
-include("session.hrl"). |
19 |
-include("mongoose_logger.hrl"). |
20 |
21 |
%% API |
22 |
-export([init/2, |
23 |
get_last/3, |
24 |
count_active_users/3, |
25 |
set_last_info/5, |
26 |
session_cleanup/5, |
27 |
remove_user/3, |
28 |
remove_domain/2, |
29 |
sessions_cleanup/2]). |
30 |
31 |
-type host_type() :: mongooseim:host_type(). |
32 |
33 |
-spec init(host_type(), gen_mod:module_opts()) -> ok. |
34 |
init(HostType, _Opts) -> |
35 |
12 |
prepare_queries(HostType), |
36 |
12 |
ok. |
37 |
38 |
%% Prepared query functions |
39 |
prepare_queries(HostType) -> |
40 |
12 |
mongoose_rdbms:prepare(last_select, last, [server, username], |
41 |
<<"SELECT seconds, state FROM last WHERE server = ? AND username = ?">>), |
42 |
12 |
mongoose_rdbms:prepare(last_count_active, last, [server, seconds], |
43 |
<<"SELECT COUNT(*) FROM last WHERE server = ? AND seconds > ?">>), |
44 |
12 |
mongoose_rdbms:prepare(last_delete, last, [server, username], |
45 |
<<"DELETE FROM last WHERE server = ? AND username = ?">>), |
46 |
12 |
mongoose_rdbms:prepare(last_remove_domain, last, [server], |
47 |
<<"DELETE FROM last WHERE server = ?">>), |
48 |
12 |
Ins = [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>], |
49 |
12 |
Upd = [<<"seconds">>, <<"state">>], |
50 |
12 |
Key = [<<"server">>, <<"username">>], |
51 |
12 |
rdbms_queries:prepare_upsert(HostType, last_upsert, last, Ins, Upd, Key), |
52 |
12 |
rdbms_queries:prepare_upsert_many(HostType, 10, last_upsert_many10, last, Ins, Upd, Key), |
53 |
12 |
rdbms_queries:prepare_upsert_many(HostType, 100, last_upsert_many100, last, Ins, Upd, Key), |
54 |
12 |
ok. |
55 |
56 |
-spec execute_get_last(host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result(). |
57 |
execute_get_last(HostType, LServer, LUser) -> |
58 |
40 |
mongoose_rdbms:execute_successfully(HostType, last_select, [LServer, LUser]). |
59 |
60 |
-spec execute_count_active_users(host_type(), jid:lserver(), mod_last:timestamp()) -> |
61 |
mongoose_rdbms:query_result(). |
62 |
execute_count_active_users(HostType, LServer, Seconds) -> |
63 |
9 |
mongoose_rdbms:execute_successfully(HostType, last_count_active, [LServer, Seconds]). |
64 |
65 |
-spec execute_remove_user(host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result(). |
66 |
execute_remove_user(HostType, LServer, LUser) -> |
67 |
78 |
mongoose_rdbms:execute_successfully(HostType, last_delete, [LServer, LUser]). |
68 |
69 |
-spec execute_upsert_last(host_type(), jid:lserver(), jid:luser(), |
70 |
mod_last:timestamp(), mod_last:status()) -> |
71 |
mongoose_rdbms:query_result(). |
72 |
execute_upsert_last(HostType, LServer, LUser, Seconds, State) -> |
73 |
90 |
InsertParams = [LServer, LUser, Seconds, State], |
74 |
90 |
UpdateParams = [Seconds, State], |
75 |
90 |
UniqueKeyValues = [LServer, LUser], |
76 |
90 |
rdbms_queries:execute_upsert(HostType, last_upsert, |
77 |
InsertParams, UpdateParams, UniqueKeyValues). |
78 |
79 |
%% API functions |
80 |
-spec get_last(host_type(), jid:luser(), jid:lserver()) -> |
81 |
{ok, mod_last:timestamp(), mod_last:status()} | not_found. |
82 |
get_last(HostType, LUser, LServer) -> |
83 |
40 |
Result = execute_get_last(HostType, LServer, LUser), |
84 |
40 |
decode_last_result(Result). |
85 |
86 |
-spec count_active_users(host_type(), jid:lserver(), mod_last:timestamp()) -> non_neg_integer(). |
87 |
count_active_users(HostType, LServer, Seconds) -> |
88 |
9 |
Result = execute_count_active_users(HostType, LServer, Seconds), |
89 |
9 |
mongoose_rdbms:selected_to_integer(Result). |
90 |
91 |
-spec session_cleanup(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) -> |
92 |
ok | {error, term()}. |
93 |
session_cleanup(_HostType, _LUser, _LServer, _Seconds, _State) -> |
94 |
%% Cleaning is done in sessions_cleanup |
95 |
345 |
ok. |
96 |
97 |
-spec set_last_info(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) -> |
98 |
ok | {error, term()}. |
99 |
set_last_info(HostType, LUser, LServer, Seconds, State) -> |
100 |
90 |
wrap_rdbms_result(execute_upsert_last(HostType, LServer, LUser, Seconds, State)). |
101 |
102 |
-spec remove_user(host_type(), jid:luser(), jid:lserver()) -> ok | {error, term()}. |
103 |
remove_user(HostType, LUser, LServer) -> |
104 |
78 |
wrap_rdbms_result(execute_remove_user(HostType, LServer, LUser)). |
105 |
106 |
-spec remove_domain(host_type(), jid:lserver()) -> ok | {error, term()}. |
107 |
remove_domain(HostType, Domain) -> |
108 |
1 |
mongoose_rdbms:execute(HostType, last_remove_domain, [Domain]). |
109 |
110 |
%% Helper functions |
111 |
decode_last_result({selected, []}) -> |
112 |
9 |
not_found; |
113 |
decode_last_result({selected, [{DbSeconds, State}]}) -> |
114 |
31 |
Seconds = mongoose_rdbms:result_to_integer(DbSeconds), |
115 |
31 |
{ok, Seconds, State}. |
116 |
117 |
-spec wrap_rdbms_result({error, term()} | any()) -> ok | {error, term()}. |
118 |
:-( |
wrap_rdbms_result({error, _} = Error) -> Error; |
119 |
168 |
wrap_rdbms_result(_) -> ok. |
120 |
121 |
-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> ok. |
122 |
sessions_cleanup(HostType, Sessions) -> |
123 |
1 |
Seconds = erlang:system_time(second), |
124 |
%% server, username, seconds, state |
125 |
1 |
Records = [[S, U, Seconds, <<>>] || #session{usr = {U, S, _}} <- Sessions], |
126 |
1 |
UpdateParams = [Seconds, <<>>], |
127 |
1 |
AllTasks = prepare_cleanup_tasks(Records), |
128 |
1 |
RunTaskF = fun({Count, QueryName, InsertParams}) -> |
129 |
12 |
run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams) |
130 |
end, |
131 |
1 |
run_tasks_in_parallel(RunTaskF, AllTasks). |
132 |
133 |
prepare_cleanup_tasks(Records) -> |
134 |
%% PgSQL would complain if there are duplicates (i.e. when there are two sessions |
135 |
%% with the same name but different resources) |
136 |
1 |
Records2 = lists:usort(Records), |
137 |
1 |
{Singles, Many100} = bucketize(100, Records2), |
138 |
1 |
{Singles2, Many10} = bucketize(10, Singles), |
139 |
1 |
[{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++ |
140 |
4 |
[{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++ |
141 |
5 |
[{1, last_upsert, Rec} || Rec <- Singles2]. |
142 |
143 |
run_tasks_in_parallel(RunTaskF, AllTasks) -> |
144 |
%% MSSQL fails with "Transaction (Process ID 52) was deadlocked on lock resources with |
145 |
%% another process and has been chosen as the deadlock victim. Rerun the transaction" |
146 |
1 |
Workers = case mongoose_rdbms:db_type() of mssql -> 1; _ -> 8 end, |
147 |
1 |
TasksForWorkers = spread(Workers, AllTasks), |
148 |
1 |
RunTasksF = fun(Tasks) -> lists:map(RunTaskF, Tasks) end, |
149 |
1 |
Results = mongoose_lib:pmap(RunTasksF, TasksForWorkers, timer:minutes(1)), |
150 |
1 |
[check_result(Res) || Res <- Results], |
151 |
1 |
ok. |
152 |
153 |
run_upsert(HostType, 1, QueryName, InsertParams = [S, U|_], UpdateParams) -> |
154 |
5 |
{updated, _} = rdbms_queries:execute_upsert(HostType, QueryName, |
155 |
InsertParams, UpdateParams, [S, U]); |
156 |
run_upsert(HostType, _Count, QueryName, InsertParams, UpdateParams) -> |
157 |
%% MySQL replace returns wrong numbers |
158 |
7 |
{updated, _} = rdbms_queries:execute_upsert_many(HostType, QueryName, |
159 |
InsertParams, UpdateParams). |
160 |
161 |
check_result({ok, Results}) -> |
162 |
8 |
lists:foreach(fun({updated, _}) -> ok end, Results); |
163 |
check_result({error, Reason}) -> |
164 |
:-( |
?LOG_ERROR(#{what => session_cleanup_failed, reason => Reason}). |
165 |
166 |
%% Create chunks of size N |
167 |
bucketize(N, Records) -> |
168 |
2 |
bucketize(N, Records, []). |
169 |
170 |
bucketize(N, Records, Acc) -> |
171 |
9 |
try |
172 |
9 |
lists:split(N, Records) |
173 |
of |
174 |
{Batch, Records2} -> |
175 |
7 |
bucketize(N, Records2, [Batch | Acc]) |
176 |
catch error:badarg -> |
177 |
2 |
{Records, lists:reverse(Acc)} |
178 |
end. |
179 |
180 |
%% Create N chunks |
181 |
%% Spread elements into buckets one element at a time before moving to the next bucket |
182 |
spread(N, Tasks) -> |
183 |
1 |
Buckets = lists:duplicate(N, []), |
184 |
1 |
spread(lists:reverse(Tasks), Buckets, []). |
185 |
186 |
spread([Task | Tasks], [Bucket | Buckets], Acc) -> |
187 |
12 |
spread(Tasks, Buckets, [[Task | Bucket] | Acc]); |
188 |
spread([], Buckets, Acc) -> |
189 |
1 |
Acc ++ lists:reverse(Buckets); |
190 |
spread(Tasks, [], Acc) -> |
191 |
1 |
spread(Tasks, lists:reverse(Acc), []). |
192 |
193 |
-ifdef(TEST). |
194 |
-include_lib("eunit/include/eunit.hrl"). |
195 |
bucketize_test_() -> |
196 |
[?_assertEqual({[10], [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}, bucketize(3, lists:seq(1, 10)))]. |
197 |
198 |
spread_test_() -> |
199 |
[?_assertEqual([[1, 4, 7, 10], [2, 5, 8], [3, 6, 9]], spread(3, lists:seq(1, 10))), |
200 |
?_assertEqual([[1, 6], [2, 7], [3, 8], [4, 9], [5, 10]], spread(5, lists:seq(1, 10))), |
201 |
?_assertEqual([[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]], spread(2, lists:seq(1, 10)))]. |
202 |
-endif. |