./ct_report/coverage/service_domain_db.COVER.html

1 -module(service_domain_db).
2
3 -behaviour(mongoose_service).
4
5 -include("mongoose_config_spec.hrl").
6 -include("mongoose_logger.hrl").
7
8 %% Use a separate pg scope, which is started by ejabberd_sup
9 %% This prevents a bug when a default pg server is not running
10 -define(SCOPE, mim_scope).
11 -define(GROUP, service_domain_db_group).
12
13 -export([start/1, stop/0, restart/0, config_spec/0]).
14 -export([start_link/0]).
15 -export([enabled/0]).
16 -export([force_check_for_updates/0]).
17 -export([sync_local/0, ping/1]).
18
19 -ignore_xref([start_link/0, sync_local/0, ping/1,
20 init/1, handle_call/3, handle_cast/2, handle_info/2,
21 code_change/3, terminate/2]).
22
23 %% gen_server callbacks
24 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
25 terminate/2, code_change/3]).
26
27 %% ---------------------------------------------------------------------------
28 %% Client code
29
30 -spec start(mongoose_service:options()) -> ok.
31 start(Opts) ->
32 259 mongoose_domain_sql:start(Opts),
33 259 ChildSpec =
34 {?MODULE,
35 {?MODULE, start_link, []},
36 permanent, infinity, worker, [?MODULE]},
37 259 supervisor:start_child(ejabberd_sup, ChildSpec),
38 259 mongoose_domain_db_cleaner:start(Opts),
39 259 ok.
40
41 -spec stop() -> ok.
42 stop() ->
43 257 mongoose_domain_db_cleaner:stop(),
44 257 supervisor:terminate_child(ejabberd_sup, ?MODULE),
45 257 supervisor:delete_child(ejabberd_sup, ?MODULE),
46 257 supervisor:terminate_child(ejabberd_sup, domain_pg),
47 257 supervisor:delete_child(ejabberd_sup, domain_pg),
48 257 ok.
49
50 restart() ->
51 %% if service goes out of sync with DB this interface
52 %% can be used to restart the service.
53 %% it's enough to just shut down gen_server, supervisor
54 %% will restart it.
55 11 gen_server:cast(?MODULE, reset_and_shutdown).
56
57 -spec config_spec() -> mongoose_config_spec:config_section().
58 config_spec() ->
59 103 #section{items = #{<<"event_cleaning_interval">> => #option{type = integer,
60 validate = positive},
61 <<"event_max_age">> => #option{type = integer,
62 validate = positive},
63 <<"db_pool">> => #option{type = atom,
64 validate = pool_name}
65 },
66 defaults = #{<<"event_cleaning_interval">> => 1800, % 30 minutes
67 <<"event_max_age">> => 7200, % 2 hours
68 <<"db_pool">> => global}}.
69
70 start_link() ->
71 270 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
72
73 enabled() ->
74 330 mongoose_service:is_loaded(?MODULE).
75
76 all_members() ->
77 125 pg:get_members(?SCOPE, ?GROUP).
78
79 force_check_for_updates() ->
80 %% Send a broadcast message.
81 69 case all_members() of
82 [_|_] = Pids ->
83 69 [Pid ! check_for_updates || Pid <- Pids],
84 69 ok;
85 _ ->
86
:-(
ok
87 end.
88
89 %% Ensure that all pending check_for_updates messages are received by this node,
90 %% even if they are sent by other nodes in the cluster.
91 %% We have to do an RPC to ensure there is nothing pending in the distributed communication buffers.
92 %% Used in tests.
93 sync_local() ->
94 56 LocalPid = whereis(?MODULE),
95 56 true = is_pid(LocalPid),
96 56 Nodes = [node(Pid) || Pid <- all_members()],
97 %% Ping from all nodes in the cluster
98 56 [pong = rpc:call(Node, ?MODULE, ping, [LocalPid]) || Node <- Nodes],
99 56 pong.
100
101 -spec ping(pid()) -> pong.
102 ping(Pid) ->
103 123 gen_server:call(Pid, ping).
104
105 %% ---------------------------------------------------------------------------
106 %% Server callbacks
107
108 init([]) ->
109 270 pg:join(?SCOPE, ?GROUP, self()),
110 270 gen_server:cast(self(), initial_loading),
111 %% initial state will be set on initial_loading processing
112 270 {ok, #{}}.
113
114 handle_call(ping, _From, State) ->
115 120 {reply, pong, State};
116 handle_call(Request, From, State) ->
117
:-(
?UNEXPECTED_CALL(Request, From),
118
:-(
{reply, ok, State}.
119
120 handle_cast(initial_loading, State) ->
121 270 mongoose_domain_loader:initial_load(),
122 269 NewState = State#{check_for_updates_interval => 30000},
123 269 {noreply, handle_check_for_updates(NewState, true)};
124 handle_cast(reset_and_shutdown, State) ->
125 %% to ensure that domains table is re-read from
126 %% scratch, we must reset the last event id.
127 11 mongoose_loader_state:reset(),
128 11 {stop, shutdown, State};
129 handle_cast(Msg, State) ->
130
:-(
?UNEXPECTED_CAST(Msg),
131
:-(
{noreply, State}.
132
133 handle_info(check_for_updates, State) ->
134 283 {noreply, handle_check_for_updates(State, false)};
135 handle_info(Info, State) ->
136
:-(
?UNEXPECTED_INFO(Info),
137
:-(
{noreply, State}.
138
139 terminate(_Reason, _State) ->
140 12 ok.
141
142 code_change(_OldVsn, State, _Extra) ->
143
:-(
{ok, State}.
144
145 %% ---------------------------------------------------------------------------
146 %% Server helpers
147
148 handle_check_for_updates(State = #{check_for_updates_interval := Interval},
149 IsInitial) ->
150 552 maybe_cancel_timer(IsInitial, State),
151 552 receive_all_check_for_updates(),
152 552 mongoose_domain_loader:check_for_updates(),
153 547 TRef = erlang:send_after(Interval, self(), check_for_updates),
154 547 State#{check_for_updates_tref => TRef}.
155
156 maybe_cancel_timer(IsInitial, State) ->
157 552 TRef = maps:get(check_for_updates_tref, State, undefined),
158 552 case {IsInitial, TRef} of
159 269 {true, undefined} -> ok; %% TRef is not set the first time
160 283 {false, _} -> erlang:cancel_timer(TRef)
161 end.
162
163 receive_all_check_for_updates() ->
164 554 receive check_for_updates -> receive_all_check_for_updates() after 0 -> ok end.
Line Hits Source