./ct_report/coverage/service_domain_db.COVER.html

1 -module(service_domain_db).
2
3 -behaviour(mongoose_service).
4
5 -include("mongoose_config_spec.hrl").
6 -include("mongoose_logger.hrl").
7
8 %% Use a separate pg scope, which is started by ejabberd_sup
9 %% This prevents a bug when a default pg server is not running
10 -define(SCOPE, mim_scope).
11 -define(GROUP, service_domain_db_group).
12
13 -export([start/1, stop/0, restart/0, config_spec/0]).
14 -export([start_link/0]).
15 -export([enabled/0]).
16 -export([force_check_for_updates/0]).
17 -export([sync_local/0, ping/1]).
18
19 -ignore_xref([start_link/0, sync_local/0, ping/1,
20 init/1, handle_call/3, handle_cast/2, handle_info/2,
21 code_change/3, terminate/2]).
22
23 %% gen_server callbacks
24 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
25 terminate/2, code_change/3]).
26
27 %% ---------------------------------------------------------------------------
28 %% Client code
29
30 -spec start(mongoose_service:options()) -> ok.
31 start(Opts) ->
32 90 mongoose_domain_sql:start(Opts),
33 90 ChildSpec =
34 {?MODULE,
35 {?MODULE, start_link, []},
36 permanent, infinity, worker, [?MODULE]},
37 90 supervisor:start_child(ejabberd_sup, ChildSpec),
38 90 mongoose_domain_db_cleaner:start(Opts),
39 90 ok.
40
41 -spec stop() -> ok.
42 stop() ->
43 89 mongoose_domain_db_cleaner:stop(),
44 88 supervisor:terminate_child(ejabberd_sup, ?MODULE),
45 88 supervisor:delete_child(ejabberd_sup, ?MODULE),
46 88 supervisor:terminate_child(ejabberd_sup, domain_pg),
47 88 supervisor:delete_child(ejabberd_sup, domain_pg),
48 88 ok.
49
50 restart() ->
51 %% if service goes out of sync with DB this interface
52 %% can be used to restart the service.
53 %% it's enough to just shut down gen_server, supervisor
54 %% will restart it.
55
:-(
gen_server:cast(?MODULE, reset_and_shutdown).
56
57 -spec config_spec() -> mongoose_config_spec:config_section().
58 config_spec() ->
59 88 #section{items = #{<<"event_cleaning_interval">> => #option{type = integer,
60 validate = positive},
61 <<"event_max_age">> => #option{type = integer,
62 validate = positive},
63 <<"db_pool">> => db_pool()
64 },
65 defaults = #{<<"event_cleaning_interval">> => 1800, % 30 minutes
66 <<"event_max_age">> => 7200 % 2 hours
67 }}.
68
69 -spec db_pool() -> mongoose_config_spec:config_section().
70 db_pool() ->
71 88 #section{items = #{<<"name">> => #option{type = atom,
72 validate = pool_name},
73 <<"scope">> => #option{type = binary_or_global,
74 validate = host_type_or_global}
75 },
76 include = always,
77 defaults = #{<<"name">> => default,
78 <<"scope">> => global}}.
79
80 start_link() ->
81 110 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
82
83 enabled() ->
84 6 mongoose_service:is_loaded(?MODULE).
85
86 all_members() ->
87 20 pg:get_members(?SCOPE, ?GROUP).
88
89 force_check_for_updates() ->
90 %% Send a broadcast message.
91 18 case all_members() of
92 [_|_] = Pids ->
93 18 [Pid ! check_for_updates || Pid <- Pids],
94 18 ok;
95 _ ->
96
:-(
ok
97 end.
98
99 %% Ensure that all pending check_for_updates messages are received by this node,
100 %% even if they are sent by other nodes in the cluster.
101 %% We have to do an RPC to ensure there is nothing pending in the distributed communication buffers.
102 %% Used in tests.
103 sync_local() ->
104 2 LocalPid = whereis(?MODULE),
105 2 true = is_pid(LocalPid),
106 2 Nodes = [node(Pid) || Pid <- all_members()],
107 %% Ping from all nodes in the cluster
108 2 [pong = rpc:call(Node, ?MODULE, ping, [LocalPid]) || Node <- Nodes],
109 2 pong.
110
111 -spec ping(pid()) -> pong.
112 ping(Pid) ->
113 2 gen_server:call(Pid, ping).
114
115 %% ---------------------------------------------------------------------------
116 %% Server callbacks
117
118 init([]) ->
119 110 pg:join(?SCOPE, ?GROUP, self()),
120 110 gen_server:cast(self(), initial_loading),
121 %% initial state will be set on initial_loading processing
122 110 {ok, #{}}.
123
124 handle_call(ping, _From, State) ->
125 2 {reply, pong, State};
126 handle_call(Request, From, State) ->
127
:-(
?UNEXPECTED_CALL(Request, From),
128
:-(
{reply, ok, State}.
129
130 handle_cast(initial_loading, State) ->
131 110 mongoose_domain_loader:initial_load(),
132 88 NewState = State#{check_for_updates_interval => 30000},
133 88 {noreply, handle_check_for_updates(NewState, true)};
134 handle_cast(reset_and_shutdown, State) ->
135 %% to ensure that domains table is re-read from
136 %% scratch, we must reset the last event id.
137
:-(
mongoose_loader_state:reset(),
138
:-(
{stop, shutdown, State};
139 handle_cast(Msg, State) ->
140
:-(
?UNEXPECTED_CAST(Msg),
141
:-(
{noreply, State}.
142
143 handle_info(check_for_updates, State) ->
144 72 {noreply, handle_check_for_updates(State, false)};
145 handle_info(Info, State) ->
146
:-(
?UNEXPECTED_INFO(Info),
147
:-(
{noreply, State}.
148
149 terminate(_Reason, _State) ->
150 22 ok.
151
152 code_change(_OldVsn, State, _Extra) ->
153
:-(
{ok, State}.
154
155 %% ---------------------------------------------------------------------------
156 %% Server helpers
157
158 handle_check_for_updates(State = #{check_for_updates_interval := Interval},
159 IsInitial) ->
160 160 maybe_cancel_timer(IsInitial, State),
161 160 receive_all_check_for_updates(),
162 160 mongoose_domain_loader:check_for_updates(),
163 160 TRef = erlang:send_after(Interval, self(), check_for_updates),
164 160 State#{check_for_updates_tref => TRef}.
165
166 maybe_cancel_timer(IsInitial, State) ->
167 160 TRef = maps:get(check_for_updates_tref, State, undefined),
168 160 case {IsInitial, TRef} of
169 88 {true, undefined} -> ok; %% TRef is not set the first time
170 72 {false, _} -> erlang:cancel_timer(TRef)
171 end.
172
173 receive_all_check_for_updates() ->
174 160 receive check_for_updates -> receive_all_check_for_updates() after 0 -> ok end.
Line Hits Source