./ct_report/coverage/mod_event_pusher_rabbit.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc
17 %% Backend for `mod_event_pusher' that supports RabbitMQ integration.
18 %% @end
19 %%==============================================================================
20
21 -module(mod_event_pusher_rabbit).
22 -author('kacper.mentel@erlang-solutions.com').
23
24 -include_lib("mongooseim/include/mongoose.hrl").
25 -include_lib("mongooseim/include/mod_event_pusher_events.hrl").
26 -include("mongoose_config_spec.hrl").
27
28 -behaviour(gen_mod).
29 -behaviour(mongoose_module_metrics).
30 -behaviour(mod_event_pusher).
31
32 %%%===================================================================
33 %%% Definitions
34 %%%===================================================================
35
36 -define(POOL_TAG, event_pusher).
37
38 -type exchange_opts() :: #{name := binary(), type := binary(),
39 sent_topic => binary(), recv_topic => binary()}.
40
41 %%%===================================================================
42 %%% Exports
43 %%%===================================================================
44
45 %% MIM module callbacks
46 -export([start/2, stop/1, config_spec/0]).
47
48 %% API
49 -export([push_event/2]).
50
51 %%%===================================================================
52 %%% Callbacks
53 %%%===================================================================
54
55 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
56 start(HostType, _Opts) ->
57
:-(
initialize_metrics(HostType),
58
:-(
create_exchanges(HostType),
59
:-(
ok.
60
61 -spec stop(mongooseim:host_type()) -> ok.
62 stop(_HostType) ->
63
:-(
ok.
64
65 -spec config_spec() -> mongoose_config_spec:config_section().
66 config_spec() ->
67 196 #section{items = #{<<"presence_exchange">> => exchange_spec(<<"presence">>),
68 <<"chat_msg_exchange">> => msg_exchange_spec(<<"chat_msg">>),
69 <<"groupchat_msg_exchange">> => msg_exchange_spec(<<"groupchat_msg">>)}}.
70
71 msg_exchange_spec(Name) ->
72 392 Spec = #section{items = Items, defaults = Defaults} = exchange_spec(Name),
73 392 Spec#section{items = Items#{<<"sent_topic">> => #option{type = binary,
74 validate = non_empty},
75 <<"recv_topic">> => #option{type = binary,
76 validate = non_empty}},
77 defaults = Defaults#{<<"sent_topic">> => <<Name/binary, "_sent">>,
78 <<"recv_topic">> => <<Name/binary, "_recv">>},
79 include = always
80 }.
81
82 exchange_spec(Name) ->
83 588 #section{items = #{<<"name">> => #option{type = binary,
84 validate = non_empty},
85 <<"type">> => #option{type = binary,
86 validate = non_empty}},
87 defaults = #{<<"name">> => Name,
88 <<"type">> => <<"topic">>},
89 include = always}.
90
91 push_event(Acc, #user_status_event{jid = UserJID, status = Status}) ->
92
:-(
handle_user_presence_change(mongoose_acc:host_type(Acc), UserJID, Status),
93
:-(
Acc;
94 push_event(Acc, ChatEvent = #chat_event{}) ->
95
:-(
handle_user_chat_event(mongoose_acc:host_type(Acc), ChatEvent),
96
:-(
Acc;
97 push_event(Acc, _) ->
98
:-(
Acc.
99
100 %%%===================================================================
101 %%% Internal functions
102 %%%===================================================================
103
104 -spec initialize_metrics(mongooseim:host_type()) -> [ok | {ok | error, term()}].
105 initialize_metrics(HostType) ->
106
:-(
[mongoose_metrics:ensure_metric(HostType, Name, Type)
107
:-(
|| {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)].
108
109 -spec create_exchanges(mongooseim:host_type()) -> ok.
110 create_exchanges(HostType) ->
111
:-(
Exchanges = exchanges(HostType),
112
:-(
Res = [call_rabbit_worker(HostType, {amqp_call,
113 mongoose_amqp:exchange_declare(ExName, Type)})
114
:-(
|| #{name := ExName, type := Type} <- Exchanges],
115
:-(
verify_exchanges_were_created_or_crash(Res, Exchanges).
116
117 -spec handle_user_presence_change(mongooseim:host_type(), JID :: jid:jid(), Status :: atom()) -> ok.
118 handle_user_presence_change(HostType, JID, Status) ->
119
:-(
#{name := ExchangeName} = exchange_opts(HostType, presence_exchange),
120
:-(
RoutingKey = presence_routing_key(JID),
121
:-(
Message = presence_msg(JID, Status),
122
:-(
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
123
:-(
AMQPMessage = mongoose_amqp:message(Message),
124
:-(
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}).
125
126 -spec handle_user_chat_event(mongooseim:host_type(), #chat_event{}) -> ok.
127 handle_user_chat_event(HostType, #chat_event{from = From, to = To, packet = Packet,
128 type = Type, direction = Direction}) when
129 Type == chat orelse Type == groupchat ->
130
:-(
UserMessage = extract_message(Packet),
131
:-(
Message = chat_msg(From, To, UserMessage),
132
:-(
#{name := ExchangeName} = ExchangeOpts = exchange_opts(HostType, msg_type_to_key(Type)),
133
:-(
RoutingKey = chat_event_routing_key(ExchangeOpts, Direction, From, To),
134
:-(
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
135
:-(
AMQPMessage = mongoose_amqp:message(Message),
136
:-(
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage});
137
:-(
handle_user_chat_event(_HostType, _) -> ok.
138
139 %%%===================================================================
140 %%% Helpers
141 %%%===================================================================
142
143 -spec call_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> term().
144 call_rabbit_worker(HostType, Msg) ->
145
:-(
mongoose_wpool:call(rabbit, HostType, ?POOL_TAG, Msg).
146
147 -spec cast_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> ok.
148 cast_rabbit_worker(HostType, Msg) ->
149
:-(
mongoose_wpool:cast(rabbit, HostType, ?POOL_TAG, Msg).
150
151 -spec exchanges(mongooseim:host_type()) -> [exchange_opts()].
152 exchanges(HostType) ->
153
:-(
[exchange_opts(HostType, ExKey) || ExKey <- exchange_keys()].
154
155 exchange_keys() ->
156
:-(
[presence_exchange, chat_msg_exchange, groupchat_msg_exchange].
157
158 -spec extract_message(Packet :: exml:element()) -> binary().
159 extract_message(Packet) ->
160
:-(
Body = exml_query:subelement(Packet, <<"body">>),
161
:-(
exml_query:cdata(Body).
162
163 -spec presence_routing_key(JID :: jid:jid()) -> jid:literal_jid().
164 presence_routing_key(JID) ->
165
:-(
{LUser, LServer, _} = jid:to_lower(JID),
166
:-(
jid:to_binary({LUser, LServer}).
167
168 -spec chat_event_routing_key(exchange_opts(), in | out, From :: jid:jid(),
169 To :: jid:jid()) -> binary().
170 chat_event_routing_key(#{sent_topic := Topic}, in, From, _To) ->
171
:-(
user_topic_routing_key(From, Topic);
172 chat_event_routing_key(#{recv_topic := Topic}, out, _From, To) ->
173
:-(
user_topic_routing_key(To, Topic).
174
175
:-(
msg_type_to_key(chat) -> chat_msg_exchange;
176
:-(
msg_type_to_key(groupchat) -> groupchat_msg_exchange.
177
178 -spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary().
179 user_topic_routing_key(JID, Topic) ->
180
:-(
{LUser, LServer, _Res} = jid:to_lower(JID),
181
:-(
BinJID = jid:to_binary({LUser, LServer}),
182
:-(
<<BinJID/binary, ".", Topic/binary>>.
183
184 -spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary().
185 presence_msg(JID, Status) ->
186
:-(
Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)},
187
:-(
jiffy:encode(Msg).
188
189 -spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) ->
190 binary().
191 chat_msg(From, To, UserMsg) ->
192
:-(
Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)),
193 message => UserMsg,
194 from_user_id => jid:to_binary(jid:to_lower(From))},
195
:-(
jiffy:encode(Msg).
196
197 -spec is_user_online(online | offline) -> boolean().
198
:-(
is_user_online(online) -> true;
199
:-(
is_user_online(offline) -> false.
200
201 -spec exchange_opts(mongooseim:host_type(), gen_mod:opt_key()) -> exchange_opts().
202 exchange_opts(HostType, ExchangeKey) ->
203
:-(
gen_mod:get_module_opt(HostType, ?MODULE, ExchangeKey).
204
205 -spec verify_exchanges_were_created_or_crash(Res :: list(), [exchange_opts()])
206 -> ok | no_return().
207 verify_exchanges_were_created_or_crash(Res, Exchanges) ->
208
:-(
case lists:all(fun(E) ->
209
:-(
element(2, E) == mongoose_amqp:exchange_declare_ok()
210 end, Res) of
211 true ->
212
:-(
ok;
213 false ->
214
:-(
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p",
215 [Exchanges]))
216 end.
Line Hits Source