./ct_report/coverage/mod_event_pusher_rabbit.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc
17 %% Backend for `mod_event_pusher' that supports RabbitMQ integration.
18 %% @end
19 %%==============================================================================
20
21 -module(mod_event_pusher_rabbit).
22 -author('kacper.mentel@erlang-solutions.com').
23
24 -include_lib("mongooseim/include/mongoose.hrl").
25 -include_lib("mongooseim/include/mod_event_pusher_events.hrl").
26 -include("mongoose_config_spec.hrl").
27
28 -behaviour(gen_mod).
29 -behaviour(mongoose_module_metrics).
30 -behaviour(mod_event_pusher).
31
32 %%%===================================================================
33 %%% Definitions
34 %%%===================================================================
35
36 -define(POOL_TAG, event_pusher).
37
38 -define(DEFAULT_PRESENCE_EXCHANGE, <<"presence">>).
39 -define(DEFAULT_PRESENCE_EXCHANGE_TYPE, <<"topic">>).
40 -define(DEFAULT_CHAT_MSG_EXCHANGE, <<"chat_msg">>).
41 -define(DEFAULT_CHAT_MSG_EXCHANGE_TYPE, <<"topic">>).
42 -define(DEFAULT_CHAT_MSG_SENT_TOPIC, <<"chat_msg_sent">>).
43 -define(DEFAULT_CHAT_MSG_RECV_TOPIC, <<"chat_msg_recv">>).
44 -define(DEFAULT_GROUP_CHAT_MSG_EXCHANGE, <<"groupchat_msg">>).
45 -define(DEFAULT_GROUP_CHAT_MSG_EXCHANGE_TYPE, <<"topic">>).
46 -define(DEFAULT_GROUP_CHAT_MSG_SENT_TOPIC, <<"groupchat_msg_sent">>).
47 -define(DEFAULT_GROUP_CHAT_MSG_RECV_TOPIC, <<"groupchat_msg_recv">>).
48
49 %%%===================================================================
50 %%% Exports
51 %%%===================================================================
52
53 %% MIM module callbacks
54 -export([start/2, stop/1, config_spec/0]).
55
56 %% API
57 -export([push_event/3]).
58
59 %%%===================================================================
60 %%% Callbacks
61 %%%===================================================================
62
63 -spec start(Host :: jid:server(), Opts :: proplists:proplist()) -> ok.
64 start(Host, _Opts) ->
65
:-(
initialize_metrics(Host),
66
:-(
create_exchanges(Host),
67
:-(
ok.
68
69 -spec stop(Host :: jid:server()) -> ok.
70 stop(_Host) ->
71
:-(
ok.
72
73 -spec config_spec() -> mongoose_config_spec:config_section().
74 config_spec() ->
75 164 #section{
76 items = #{<<"presence_exchange">> => #section{items = exch_items()},
77 <<"chat_msg_exchange">> => #section{items = msg_exch_items()},
78 <<"groupchat_msg_exchange">> => #section{items = msg_exch_items()}
79 }
80 }.
81
82 msg_exch_items() ->
83 328 ExchItems = exch_items(),
84 328 ExchItems#{<<"sent_topic">> => #option{type = binary,
85 validate = non_empty},
86 <<"recv_topic">> => #option{type = binary,
87 validate = non_empty}}.
88
89 exch_items() ->
90 492 #{<<"name">> => #option{type = binary,
91 validate = non_empty},
92 <<"type">> => #option{type = binary,
93 validate = non_empty}}.
94
95 push_event(Acc, _, #user_status_event{jid = UserJID, status = Status}) ->
96
:-(
handle_user_presence_change(UserJID, Status),
97
:-(
Acc;
98 push_event(Acc, _, ChatEvent = #chat_event{}) ->
99
:-(
handle_user_chat_event(ChatEvent),
100
:-(
Acc;
101 push_event(Acc, _, _) ->
102
:-(
Acc.
103
104 %%%===================================================================
105 %%% Internal functions
106 %%%===================================================================
107
108 -spec initialize_metrics(Host :: jid:server()) -> [ok | {ok | error, term()}].
109 initialize_metrics(Host) ->
110
:-(
[mongoose_metrics:ensure_metric(Host, Name, Type)
111
:-(
|| {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)].
112
113 -spec create_exchanges(Host :: jid:server()) -> ok.
114 create_exchanges(Host) ->
115
:-(
Exchanges = exchanges(Host),
116
:-(
Res =
117
:-(
[call_rabbit_worker(Host, {amqp_call,
118 mongoose_amqp:exchange_declare(ExName, Type)})
119
:-(
|| {ExName, Type} <- Exchanges],
120
:-(
verify_exchanges_were_created_or_crash(Res, Exchanges).
121
122 -spec handle_user_presence_change(JID :: jid:jid(), Status :: atom()) -> ok.
123 handle_user_presence_change(JID = #jid{lserver = Host}, Status) ->
124
:-(
Exchange = exchange_opt(Host, presence_exchange, name,
125 ?DEFAULT_PRESENCE_EXCHANGE),
126
:-(
RoutingKey = presence_routing_key(JID),
127
:-(
Message = presence_msg(JID, Status),
128
:-(
PublishMethod = mongoose_amqp:basic_publish(Exchange, RoutingKey),
129
:-(
AMQPMessage = mongoose_amqp:message(Message),
130
:-(
cast_rabbit_worker(Host, {amqp_publish, PublishMethod, AMQPMessage}).
131
132 -spec handle_user_chat_event(#chat_event{}) -> ok.
133 handle_user_chat_event(#chat_event{from = From, to = To, packet = Packet,
134 type = Type, direction = Direction}) when
135 Type == chat orelse Type == groupchat ->
136
:-(
Host = get_host(From, To, Direction),
137
:-(
UserMessage = extract_message(Packet),
138
:-(
Message = chat_msg(From, To, UserMessage),
139
:-(
Exchange = get_chat_exchange(Type, Host),
140
:-(
RoutingKey = chat_event_routing_key(Type, Direction, From, To, Host),
141
:-(
PublishMethod = mongoose_amqp:basic_publish(Exchange, RoutingKey),
142
:-(
AMQPMessage = mongoose_amqp:message(Message),
143
:-(
cast_rabbit_worker(Host, {amqp_publish, PublishMethod, AMQPMessage});
144
:-(
handle_user_chat_event(_) -> ok.
145
146 %%%===================================================================
147 %%% Helpers
148 %%%===================================================================
149
150 -spec call_rabbit_worker(Host :: binary(), Msg :: term()) -> term().
151 call_rabbit_worker(Host, Msg) ->
152
:-(
mongoose_wpool:call(rabbit, Host, ?POOL_TAG, Msg).
153
154 -spec cast_rabbit_worker(Host :: binary(), Msg :: term()) -> ok.
155 cast_rabbit_worker(Host, Msg) ->
156
:-(
mongoose_wpool:cast(rabbit, Host, ?POOL_TAG, Msg).
157
158 -spec exchanges(Host :: jid:server()) -> [{binary(), binary()}].
159 exchanges(Host) ->
160
:-(
[{
161 exchange_opt(Host, ExKey, name, DefName),
162 exchange_opt(Host, ExKey, type, DefType)
163 } || {ExKey, DefName, DefType} <-
164
:-(
[
165 {presence_exchange, ?DEFAULT_PRESENCE_EXCHANGE,
166 ?DEFAULT_PRESENCE_EXCHANGE_TYPE},
167 {chat_msg_exchange, ?DEFAULT_CHAT_MSG_EXCHANGE,
168 ?DEFAULT_CHAT_MSG_EXCHANGE_TYPE},
169 {groupchat_msg_exchange, ?DEFAULT_GROUP_CHAT_MSG_EXCHANGE,
170 ?DEFAULT_GROUP_CHAT_MSG_EXCHANGE_TYPE}
171 ]].
172
173 -spec get_chat_exchange(chat | groupchat, Host :: binary()) -> binary().
174 get_chat_exchange(chat, Host) ->
175
:-(
exchange_opt(Host, chat_msg_exchange, name, ?DEFAULT_CHAT_MSG_EXCHANGE);
176 get_chat_exchange(groupchat, Host) ->
177
:-(
exchange_opt(Host, groupchat_msg_exchange, name,
178 ?DEFAULT_GROUP_CHAT_MSG_EXCHANGE).
179
180 -spec get_host(jid:jid(), jid:jid(), in | out) -> binary().
181
:-(
get_host(#jid{lserver = Host}, _To, in) -> Host;
182
:-(
get_host(_From, #jid{lserver = Host}, out) -> Host.
183
184 -spec extract_message(Packet :: exml:element()) -> binary().
185 extract_message(Packet) ->
186
:-(
Body = exml_query:subelement(Packet, <<"body">>),
187
:-(
exml_query:cdata(Body).
188
189 -spec presence_routing_key(JID :: jid:jid()) -> binary().
190 presence_routing_key(JID) ->
191
:-(
{User, Host, _} = jid:to_lower(JID),
192
:-(
jid:to_binary({User, Host}).
193
194 -spec chat_event_routing_key(chat | groupchat, in | out, From :: jid:jid(),
195 To :: jid:jid(), Host :: binary()) -> binary().
196 chat_event_routing_key(chat, in, From, _To, Host) ->
197
:-(
Topic = exchange_opt(Host, chat_msg_exchange, sent_topic,
198 ?DEFAULT_CHAT_MSG_SENT_TOPIC),
199
:-(
user_topic_routing_key(From, Topic);
200 chat_event_routing_key(chat, out, _From, To, Host) ->
201
:-(
Topic = exchange_opt(Host, chat_msg_exchange, recv_topic,
202 ?DEFAULT_CHAT_MSG_RECV_TOPIC),
203
:-(
user_topic_routing_key(To, Topic);
204 chat_event_routing_key(groupchat, in, From, _To, Host) ->
205
:-(
Topic = exchange_opt(Host, groupchat_msg_exchange, sent_topic,
206 ?DEFAULT_GROUP_CHAT_MSG_SENT_TOPIC),
207
:-(
user_topic_routing_key(From, Topic);
208 chat_event_routing_key(groupchat, out, _From, To, Host) ->
209
:-(
Topic = exchange_opt(Host, groupchat_msg_exchange, recv_topic,
210 ?DEFAULT_GROUP_CHAT_MSG_RECV_TOPIC),
211
:-(
user_topic_routing_key(To, Topic).
212
213 -spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary().
214 user_topic_routing_key(JID, Topic) ->
215
:-(
{User, Host, _Res} = jid:to_lower(JID),
216
:-(
BinJID = jid:to_binary({User, Host}),
217
:-(
<<BinJID/binary, ".", Topic/binary>>.
218
219 -spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary().
220 presence_msg(JID, Status) ->
221
:-(
Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)},
222
:-(
jiffy:encode(Msg).
223
224 -spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) ->
225 binary().
226 chat_msg(From, To, UserMsg) ->
227
:-(
Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)),
228 message => UserMsg,
229 from_user_id => jid:to_binary(jid:to_lower(From))},
230
:-(
jiffy:encode(Msg).
231
232 -spec is_user_online(online | offline) -> boolean().
233
:-(
is_user_online(online) -> true;
234
:-(
is_user_online(offline) -> false.
235
236 -spec exchange_opt(Host :: jid:lserver(), ExchangeKey :: atom(),
237 Option :: atom(), Default :: term()) -> term().
238 exchange_opt(Host, ExchangeKey, Option, Default) ->
239
:-(
ExchangeOptions = opt(Host, ExchangeKey, []),
240
:-(
proplists:get_value(Option, ExchangeOptions, Default).
241
242 -spec opt(Host :: jid:lserver(), Option :: atom(), Default :: term()) ->
243 Value :: term().
244 opt(Host, Option, Default) ->
245
:-(
gen_mod:get_module_opt(Host, ?MODULE, Option, Default).
246
247 -spec verify_exchanges_were_created_or_crash(Res :: list(),
248 Exchanges :: [{binary(), binary()}])
249 -> ok | no_return().
250 verify_exchanges_were_created_or_crash(Res, Exchanges) ->
251
:-(
case lists:all(fun(E) ->
252
:-(
element(2, E) == mongoose_amqp:exchange_declare_ok()
253 end, Res) of
254 true ->
255
:-(
ok;
256 false ->
257
:-(
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p",
258 [Exchanges]))
259 end.
Line Hits Source