./ct_report/coverage/mod_event_pusher_rabbit.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc
17 %% Backend for `mod_event_pusher' that supports RabbitMQ integration.
18 %% @end
19 %%==============================================================================
20
21 -module(mod_event_pusher_rabbit).
22 -author('kacper.mentel@erlang-solutions.com').
23
24 -include_lib("mongooseim/include/mongoose.hrl").
25 -include_lib("mongooseim/include/mod_event_pusher_events.hrl").
26 -include("mongoose_config_spec.hrl").
27
28 -behaviour(gen_mod).
29 -behaviour(mongoose_module_metrics).
30 -behaviour(mod_event_pusher).
31
32 %%%===================================================================
33 %%% Definitions
34 %%%===================================================================
35
36 -define(POOL_TAG, event_pusher).
37
38 -type exchange_opts() :: #{name := binary(), type := binary(),
39 sent_topic => binary(), recv_topic => binary()}.
40
41 %%%===================================================================
42 %%% Exports
43 %%%===================================================================
44
45 %% MIM module callbacks
46 -export([start/2, stop/1, config_spec/0]).
47
48 %% API
49 -export([push_event/2]).
50
51 %%%===================================================================
52 %%% Callbacks
53 %%%===================================================================
54
55 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
56 start(HostType, _Opts) ->
57
:-(
initialize_metrics(HostType),
58
:-(
create_exchanges(HostType),
59
:-(
ok.
60
61 -spec stop(mongooseim:host_type()) -> ok.
62 stop(_HostType) ->
63
:-(
ok.
64
65 -spec config_spec() -> mongoose_config_spec:config_section().
66 config_spec() ->
67 152 #section{items = #{<<"presence_exchange">> => exchange_spec(<<"presence">>),
68 <<"chat_msg_exchange">> => msg_exchange_spec(<<"chat_msg">>),
69 <<"groupchat_msg_exchange">> => msg_exchange_spec(<<"groupchat_msg">>)},
70 format_items = map}.
71
72 msg_exchange_spec(Name) ->
73 304 Spec = #section{items = Items, defaults = Defaults} = exchange_spec(Name),
74 304 Spec#section{items = Items#{<<"sent_topic">> => #option{type = binary,
75 validate = non_empty},
76 <<"recv_topic">> => #option{type = binary,
77 validate = non_empty}},
78 defaults = Defaults#{<<"sent_topic">> => <<Name/binary, "_sent">>,
79 <<"recv_topic">> => <<Name/binary, "_recv">>},
80 format_items = map,
81 include = always
82 }.
83
84 exchange_spec(Name) ->
85 456 #section{items = #{<<"name">> => #option{type = binary,
86 validate = non_empty},
87 <<"type">> => #option{type = binary,
88 validate = non_empty}},
89 defaults = #{<<"name">> => Name,
90 <<"type">> => <<"topic">>},
91 format_items = map,
92 include = always}.
93
94 push_event(Acc, #user_status_event{jid = UserJID, status = Status}) ->
95
:-(
handle_user_presence_change(mongoose_acc:host_type(Acc), UserJID, Status),
96
:-(
Acc;
97 push_event(Acc, ChatEvent = #chat_event{}) ->
98
:-(
handle_user_chat_event(mongoose_acc:host_type(Acc), ChatEvent),
99
:-(
Acc;
100 push_event(Acc, _) ->
101
:-(
Acc.
102
103 %%%===================================================================
104 %%% Internal functions
105 %%%===================================================================
106
107 -spec initialize_metrics(mongooseim:host_type()) -> [ok | {ok | error, term()}].
108 initialize_metrics(HostType) ->
109
:-(
[mongoose_metrics:ensure_metric(HostType, Name, Type)
110
:-(
|| {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)].
111
112 -spec create_exchanges(mongooseim:host_type()) -> ok.
113 create_exchanges(HostType) ->
114
:-(
Exchanges = exchanges(HostType),
115
:-(
Res = [call_rabbit_worker(HostType, {amqp_call,
116 mongoose_amqp:exchange_declare(ExName, Type)})
117
:-(
|| #{name := ExName, type := Type} <- Exchanges],
118
:-(
verify_exchanges_were_created_or_crash(Res, Exchanges).
119
120 -spec handle_user_presence_change(mongooseim:host_type(), JID :: jid:jid(), Status :: atom()) -> ok.
121 handle_user_presence_change(HostType, JID, Status) ->
122
:-(
#{name := ExchangeName} = exchange_opts(HostType, presence_exchange),
123
:-(
RoutingKey = presence_routing_key(JID),
124
:-(
Message = presence_msg(JID, Status),
125
:-(
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
126
:-(
AMQPMessage = mongoose_amqp:message(Message),
127
:-(
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}).
128
129 -spec handle_user_chat_event(mongooseim:host_type(), #chat_event{}) -> ok.
130 handle_user_chat_event(HostType, #chat_event{from = From, to = To, packet = Packet,
131 type = Type, direction = Direction}) when
132 Type == chat orelse Type == groupchat ->
133
:-(
UserMessage = extract_message(Packet),
134
:-(
Message = chat_msg(From, To, UserMessage),
135
:-(
#{name := ExchangeName} = ExchangeOpts = exchange_opts(HostType, msg_type_to_key(Type)),
136
:-(
RoutingKey = chat_event_routing_key(ExchangeOpts, Direction, From, To),
137
:-(
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
138
:-(
AMQPMessage = mongoose_amqp:message(Message),
139
:-(
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage});
140
:-(
handle_user_chat_event(_HostType, _) -> ok.
141
142 %%%===================================================================
143 %%% Helpers
144 %%%===================================================================
145
146 -spec call_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> term().
147 call_rabbit_worker(HostType, Msg) ->
148
:-(
mongoose_wpool:call(rabbit, HostType, ?POOL_TAG, Msg).
149
150 -spec cast_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> ok.
151 cast_rabbit_worker(HostType, Msg) ->
152
:-(
mongoose_wpool:cast(rabbit, HostType, ?POOL_TAG, Msg).
153
154 -spec exchanges(mongooseim:host_type()) -> [exchange_opts()].
155 exchanges(HostType) ->
156
:-(
[exchange_opts(HostType, ExKey) || ExKey <- exchange_keys()].
157
158 exchange_keys() ->
159
:-(
[presence_exchange, chat_msg_exchange, groupchat_msg_exchange].
160
161 -spec extract_message(Packet :: exml:element()) -> binary().
162 extract_message(Packet) ->
163
:-(
Body = exml_query:subelement(Packet, <<"body">>),
164
:-(
exml_query:cdata(Body).
165
166 -spec presence_routing_key(JID :: jid:jid()) -> jid:literal_jid().
167 presence_routing_key(JID) ->
168
:-(
{LUser, LServer, _} = jid:to_lower(JID),
169
:-(
jid:to_binary({LUser, LServer}).
170
171 -spec chat_event_routing_key(exchange_opts(), in | out, From :: jid:jid(),
172 To :: jid:jid()) -> binary().
173 chat_event_routing_key(#{sent_topic := Topic}, in, From, _To) ->
174
:-(
user_topic_routing_key(From, Topic);
175 chat_event_routing_key(#{recv_topic := Topic}, out, _From, To) ->
176
:-(
user_topic_routing_key(To, Topic).
177
178
:-(
msg_type_to_key(chat) -> chat_msg_exchange;
179
:-(
msg_type_to_key(groupchat) -> groupchat_msg_exchange.
180
181 -spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary().
182 user_topic_routing_key(JID, Topic) ->
183
:-(
{LUser, LServer, _Res} = jid:to_lower(JID),
184
:-(
BinJID = jid:to_binary({LUser, LServer}),
185
:-(
<<BinJID/binary, ".", Topic/binary>>.
186
187 -spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary().
188 presence_msg(JID, Status) ->
189
:-(
Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)},
190
:-(
jiffy:encode(Msg).
191
192 -spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) ->
193 binary().
194 chat_msg(From, To, UserMsg) ->
195
:-(
Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)),
196 message => UserMsg,
197 from_user_id => jid:to_binary(jid:to_lower(From))},
198
:-(
jiffy:encode(Msg).
199
200 -spec is_user_online(online | offline) -> boolean().
201
:-(
is_user_online(online) -> true;
202
:-(
is_user_online(offline) -> false.
203
204 -spec exchange_opts(mongooseim:host_type(), gen_mod:opt_key()) -> exchange_opts().
205 exchange_opts(HostType, ExchangeKey) ->
206
:-(
gen_mod:get_module_opt(HostType, ?MODULE, ExchangeKey).
207
208 -spec verify_exchanges_were_created_or_crash(Res :: list(), [exchange_opts()])
209 -> ok | no_return().
210 verify_exchanges_were_created_or_crash(Res, Exchanges) ->
211
:-(
case lists:all(fun(E) ->
212
:-(
element(2, E) == mongoose_amqp:exchange_declare_ok()
213 end, Res) of
214 true ->
215
:-(
ok;
216 false ->
217
:-(
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p",
218 [Exchanges]))
219 end.
Line Hits Source