./ct_report/coverage/mod_last_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_last.erl
3 %%% Author : MichaƂ Piotrowski <michal.piotrowski@erlang-solutions.com>
4 %%% Purpose : mod_last rdbms backend (XEP-0012)
5 %%%
6 %%%
7 %%% ejabberd, Copyright (C) 2002-2014 ProcessOne
8 %%% MongooseIM, Copyright (C) 2014 Erlang Solutions Ltd.
9 %%%
10 %%%----------------------------------------------------------------------
11
12
13 -module(mod_last_rdbms).
14
15 -behaviour(mod_last_backend).
16
17 -include("mongoose.hrl").
18 -include("session.hrl").
19 -include("mongoose_logger.hrl").
20
21 %% API
22 -export([init/2,
23 get_last/3,
24 count_active_users/3,
25 set_last_info/5,
26 session_cleanup/5,
27 remove_user/3,
28 remove_domain/2,
29 sessions_cleanup/2]).
30
31 -type host_type() :: mongooseim:host_type().
32
33 -spec init(host_type(), gen_mod:module_opts()) -> ok.
34 init(HostType, _Opts) ->
35 12 prepare_queries(HostType),
36 12 ok.
37
38 %% Prepared query functions
39 prepare_queries(HostType) ->
40 12 mongoose_rdbms:prepare(last_select, last, [server, username],
41 <<"SELECT seconds, state FROM last WHERE server = ? AND username = ?">>),
42 12 mongoose_rdbms:prepare(last_count_active, last, [server, seconds],
43 <<"SELECT COUNT(*) FROM last WHERE server = ? AND seconds > ?">>),
44 12 mongoose_rdbms:prepare(last_delete, last, [server, username],
45 <<"DELETE FROM last WHERE server = ? AND username = ?">>),
46 12 mongoose_rdbms:prepare(last_remove_domain, last, [server],
47 <<"DELETE FROM last WHERE server = ?">>),
48 12 Ins = [<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
49 12 Upd = [<<"seconds">>, <<"state">>],
50 12 Key = [<<"server">>, <<"username">>],
51 12 rdbms_queries:prepare_upsert(HostType, last_upsert, last, Ins, Upd, Key),
52 12 rdbms_queries:prepare_upsert_many(HostType, 10, last_upsert_many10, last, Ins, Upd, Key),
53 12 rdbms_queries:prepare_upsert_many(HostType, 100, last_upsert_many100, last, Ins, Upd, Key),
54 12 ok.
55
56 -spec execute_get_last(host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result().
57 execute_get_last(HostType, LServer, LUser) ->
58 40 mongoose_rdbms:execute_successfully(HostType, last_select, [LServer, LUser]).
59
60 -spec execute_count_active_users(host_type(), jid:lserver(), mod_last:timestamp()) ->
61 mongoose_rdbms:query_result().
62 execute_count_active_users(HostType, LServer, Seconds) ->
63 9 mongoose_rdbms:execute_successfully(HostType, last_count_active, [LServer, Seconds]).
64
65 -spec execute_remove_user(host_type(), jid:lserver(), jid:luser()) -> mongoose_rdbms:query_result().
66 execute_remove_user(HostType, LServer, LUser) ->
67 78 mongoose_rdbms:execute_successfully(HostType, last_delete, [LServer, LUser]).
68
69 -spec execute_upsert_last(host_type(), jid:lserver(), jid:luser(),
70 mod_last:timestamp(), mod_last:status()) ->
71 mongoose_rdbms:query_result().
72 execute_upsert_last(HostType, LServer, LUser, Seconds, State) ->
73 90 InsertParams = [LServer, LUser, Seconds, State],
74 90 UpdateParams = [Seconds, State],
75 90 UniqueKeyValues = [LServer, LUser],
76 90 rdbms_queries:execute_upsert(HostType, last_upsert,
77 InsertParams, UpdateParams, UniqueKeyValues).
78
79 %% API functions
80 -spec get_last(host_type(), jid:luser(), jid:lserver()) ->
81 {ok, mod_last:timestamp(), mod_last:status()} | not_found.
82 get_last(HostType, LUser, LServer) ->
83 40 Result = execute_get_last(HostType, LServer, LUser),
84 40 decode_last_result(Result).
85
86 -spec count_active_users(host_type(), jid:lserver(), mod_last:timestamp()) -> non_neg_integer().
87 count_active_users(HostType, LServer, Seconds) ->
88 9 Result = execute_count_active_users(HostType, LServer, Seconds),
89 9 mongoose_rdbms:selected_to_integer(Result).
90
91 -spec session_cleanup(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) ->
92 ok | {error, term()}.
93 session_cleanup(_HostType, _LUser, _LServer, _Seconds, _State) ->
94 %% Cleaning is done in sessions_cleanup
95 345 ok.
96
97 -spec set_last_info(host_type(), jid:luser(), jid:lserver(), mod_last:timestamp(), mod_last:status()) ->
98 ok | {error, term()}.
99 set_last_info(HostType, LUser, LServer, Seconds, State) ->
100 90 wrap_rdbms_result(execute_upsert_last(HostType, LServer, LUser, Seconds, State)).
101
102 -spec remove_user(host_type(), jid:luser(), jid:lserver()) -> ok | {error, term()}.
103 remove_user(HostType, LUser, LServer) ->
104 78 wrap_rdbms_result(execute_remove_user(HostType, LServer, LUser)).
105
106 -spec remove_domain(host_type(), jid:lserver()) -> ok | {error, term()}.
107 remove_domain(HostType, Domain) ->
108 1 mongoose_rdbms:execute(HostType, last_remove_domain, [Domain]).
109
110 %% Helper functions
111 decode_last_result({selected, []}) ->
112 9 not_found;
113 decode_last_result({selected, [{DbSeconds, State}]}) ->
114 31 Seconds = mongoose_rdbms:result_to_integer(DbSeconds),
115 31 {ok, Seconds, State}.
116
117 -spec wrap_rdbms_result({error, term()} | any()) -> ok | {error, term()}.
118
:-(
wrap_rdbms_result({error, _} = Error) -> Error;
119 168 wrap_rdbms_result(_) -> ok.
120
121 -spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> ok.
122 sessions_cleanup(HostType, Sessions) ->
123 1 Seconds = erlang:system_time(second),
124 %% server, username, seconds, state
125 1 Records = [[S, U, Seconds, <<>>] || #session{usr = {U, S, _}} <- Sessions],
126 1 UpdateParams = [Seconds, <<>>],
127 1 AllTasks = prepare_cleanup_tasks(Records),
128 1 RunTaskF = fun({Count, QueryName, InsertParams}) ->
129 12 run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams)
130 end,
131 1 run_tasks_in_parallel(RunTaskF, AllTasks).
132
133 prepare_cleanup_tasks(Records) ->
134 %% PgSQL would complain if there are duplicates (i.e. when there are two sessions
135 %% with the same name but different resources)
136 1 Records2 = lists:usort(Records),
137 1 {Singles, Many100} = bucketize(100, Records2),
138 1 {Singles2, Many10} = bucketize(10, Singles),
139 1 [{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++
140 4 [{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++
141 5 [{1, last_upsert, Rec} || Rec <- Singles2].
142
143 run_tasks_in_parallel(RunTaskF, AllTasks) ->
144 %% MSSQL fails with "Transaction (Process ID 52) was deadlocked on lock resources with
145 %% another process and has been chosen as the deadlock victim. Rerun the transaction"
146 1 Workers = case mongoose_rdbms:db_type() of mssql -> 1; _ -> 8 end,
147 1 TasksForWorkers = spread(Workers, AllTasks),
148 1 RunTasksF = fun(Tasks) -> lists:map(RunTaskF, Tasks) end,
149 1 Results = mongoose_lib:pmap(RunTasksF, TasksForWorkers, timer:minutes(1)),
150 1 [check_result(Res) || Res <- Results],
151 1 ok.
152
153 run_upsert(HostType, 1, QueryName, InsertParams = [S, U|_], UpdateParams) ->
154 5 {updated, _} = rdbms_queries:execute_upsert(HostType, QueryName,
155 InsertParams, UpdateParams, [S, U]);
156 run_upsert(HostType, _Count, QueryName, InsertParams, UpdateParams) ->
157 %% MySQL replace returns wrong numbers
158 7 {updated, _} = rdbms_queries:execute_upsert_many(HostType, QueryName,
159 InsertParams, UpdateParams).
160
161 check_result({ok, Results}) ->
162 8 lists:foreach(fun({updated, _}) -> ok end, Results);
163 check_result({error, Reason}) ->
164
:-(
?LOG_ERROR(#{what => session_cleanup_failed, reason => Reason}).
165
166 %% Create chunks of size N
167 bucketize(N, Records) ->
168 2 bucketize(N, Records, []).
169
170 bucketize(N, Records, Acc) ->
171 9 try
172 9 lists:split(N, Records)
173 of
174 {Batch, Records2} ->
175 7 bucketize(N, Records2, [Batch | Acc])
176 catch error:badarg ->
177 2 {Records, lists:reverse(Acc)}
178 end.
179
180 %% Create N chunks
181 %% Spread elements into buckets one element at a time before moving to the next bucket
182 spread(N, Tasks) ->
183 1 Buckets = lists:duplicate(N, []),
184 1 spread(lists:reverse(Tasks), Buckets, []).
185
186 spread([Task | Tasks], [Bucket | Buckets], Acc) ->
187 12 spread(Tasks, Buckets, [[Task | Bucket] | Acc]);
188 spread([], Buckets, Acc) ->
189 1 Acc ++ lists:reverse(Buckets);
190 spread(Tasks, [], Acc) ->
191 1 spread(Tasks, lists:reverse(Acc), []).
192
193 -ifdef(TEST).
194 -include_lib("eunit/include/eunit.hrl").
195 bucketize_test_() ->
196 [?_assertEqual({[10], [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}, bucketize(3, lists:seq(1, 10)))].
197
198 spread_test_() ->
199 [?_assertEqual([[1, 4, 7, 10], [2, 5, 8], [3, 6, 9]], spread(3, lists:seq(1, 10))),
200 ?_assertEqual([[1, 6], [2, 7], [3, 8], [4, 9], [5, 10]], spread(5, lists:seq(1, 10))),
201 ?_assertEqual([[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]], spread(2, lists:seq(1, 10)))].
202 -endif.
Line Hits Source