./ct_report/coverage/mod_event_pusher_rabbit.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc
17 %% Backend for `mod_event_pusher' that supports RabbitMQ integration.
18 %% @end
19 %%==============================================================================
20
21 -module(mod_event_pusher_rabbit).
22 -author('kacper.mentel@erlang-solutions.com').
23
24 -include_lib("mongooseim/include/mongoose.hrl").
25 -include_lib("mongooseim/include/mod_event_pusher_events.hrl").
26 -include("mongoose_config_spec.hrl").
27
28 -behaviour(gen_mod).
29 -behaviour(mongoose_module_metrics).
30 -behaviour(mod_event_pusher).
31
32 %%%===================================================================
33 %%% Definitions
34 %%%===================================================================
35
36 -define(POOL_TAG, event_pusher).
37
38 -type exchange_opts() :: #{name := binary(), type := binary(),
39 sent_topic => binary(), recv_topic => binary()}.
40
41 %%%===================================================================
42 %%% Exports
43 %%%===================================================================
44
45 %% MIM module callbacks
46 -export([start/2, stop/1, config_spec/0]).
47
48 %% API
49 -export([push_event/2]).
50
51 %%%===================================================================
52 %%% Callbacks
53 %%%===================================================================
54
55 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
56 start(HostType, _Opts) ->
57 4 initialize_metrics(HostType),
58 4 create_exchanges(HostType),
59 4 ok.
60
61 -spec stop(mongooseim:host_type()) -> ok.
62 stop(_HostType) ->
63 4 ok.
64
65 -spec config_spec() -> mongoose_config_spec:config_section().
66 config_spec() ->
67 200 #section{items = #{<<"presence_exchange">> => exchange_spec(<<"presence">>),
68 <<"chat_msg_exchange">> => msg_exchange_spec(<<"chat_msg">>),
69 <<"groupchat_msg_exchange">> => msg_exchange_spec(<<"groupchat_msg">>)}}.
70
71 msg_exchange_spec(Name) ->
72 400 Spec = #section{items = Items, defaults = Defaults} = exchange_spec(Name),
73 400 Spec#section{items = Items#{<<"sent_topic">> => #option{type = binary,
74 validate = non_empty},
75 <<"recv_topic">> => #option{type = binary,
76 validate = non_empty}},
77 defaults = Defaults#{<<"sent_topic">> => <<Name/binary, "_sent">>,
78 <<"recv_topic">> => <<Name/binary, "_recv">>},
79 include = always
80 }.
81
82 exchange_spec(Name) ->
83 600 #section{items = #{<<"name">> => #option{type = binary,
84 validate = non_empty},
85 <<"type">> => #option{type = binary,
86 validate = non_empty}},
87 defaults = #{<<"name">> => Name,
88 <<"type">> => <<"topic">>},
89 include = always}.
90
91 push_event(Acc, #user_status_event{jid = UserJID, status = Status}) ->
92 32 handle_user_presence_change(mongoose_acc:host_type(Acc), UserJID, Status),
93 32 Acc;
94 push_event(Acc, ChatEvent = #chat_event{}) ->
95 60 handle_user_chat_event(mongoose_acc:host_type(Acc), ChatEvent),
96 60 Acc;
97 push_event(Acc, _) ->
98
:-(
Acc.
99
100 %%%===================================================================
101 %%% Internal functions
102 %%%===================================================================
103
104 -spec initialize_metrics(mongooseim:host_type()) -> [ok | {ok | error, term()}].
105 initialize_metrics(HostType) ->
106 4 [mongoose_metrics:ensure_metric(HostType, Name, Type)
107 4 || {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)].
108
109 -spec create_exchanges(mongooseim:host_type()) -> ok.
110 create_exchanges(HostType) ->
111 4 Exchanges = exchanges(HostType),
112 4 Res = [call_rabbit_worker(HostType, {amqp_call,
113 mongoose_amqp:exchange_declare(ExName, Type)})
114 4 || #{name := ExName, type := Type} <- Exchanges],
115 4 verify_exchanges_were_created_or_crash(Res, Exchanges).
116
117 -spec handle_user_presence_change(mongooseim:host_type(), JID :: jid:jid(), Status :: atom()) -> ok.
118 handle_user_presence_change(HostType, JID, Status) ->
119 32 #{name := ExchangeName} = exchange_opts(HostType, presence_exchange),
120 32 RoutingKey = presence_routing_key(JID),
121 32 Message = presence_msg(JID, Status),
122 32 PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
123 32 AMQPMessage = mongoose_amqp:message(Message),
124 32 cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}).
125
126 -spec handle_user_chat_event(mongooseim:host_type(), #chat_event{}) -> ok.
127 handle_user_chat_event(HostType, #chat_event{from = From, to = To, packet = Packet,
128 type = Type, direction = Direction}) when
129 Type == chat orelse Type == groupchat ->
130 28 UserMessage = extract_message(Packet),
131 28 Message = chat_msg(From, To, UserMessage),
132 28 #{name := ExchangeName} = ExchangeOpts = exchange_opts(HostType, msg_type_to_key(Type)),
133 28 RoutingKey = chat_event_routing_key(ExchangeOpts, Direction, From, To),
134 28 PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey),
135 28 AMQPMessage = mongoose_amqp:message(Message),
136 28 cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage});
137 32 handle_user_chat_event(_HostType, _) -> ok.
138
139 %%%===================================================================
140 %%% Helpers
141 %%%===================================================================
142
143 -spec call_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> term().
144 call_rabbit_worker(HostType, Msg) ->
145 12 mongoose_wpool:call(rabbit, HostType, ?POOL_TAG, Msg).
146
147 -spec cast_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> ok.
148 cast_rabbit_worker(HostType, Msg) ->
149 60 mongoose_wpool:cast(rabbit, HostType, ?POOL_TAG, Msg).
150
151 -spec exchanges(mongooseim:host_type()) -> [exchange_opts()].
152 exchanges(HostType) ->
153 4 [exchange_opts(HostType, ExKey) || ExKey <- exchange_keys()].
154
155 exchange_keys() ->
156 4 [presence_exchange, chat_msg_exchange, groupchat_msg_exchange].
157
158 -spec extract_message(Packet :: exml:element()) -> binary().
159 extract_message(Packet) ->
160 28 Body = exml_query:subelement(Packet, <<"body">>),
161 28 exml_query:cdata(Body).
162
163 -spec presence_routing_key(JID :: jid:jid()) -> jid:literal_jid().
164 presence_routing_key(JID) ->
165 32 {LUser, LServer, _} = jid:to_lower(JID),
166 32 jid:to_binary({LUser, LServer}).
167
168 -spec chat_event_routing_key(exchange_opts(), in | out, From :: jid:jid(),
169 To :: jid:jid()) -> binary().
170 chat_event_routing_key(#{sent_topic := Topic}, in, From, _To) ->
171 8 user_topic_routing_key(From, Topic);
172 chat_event_routing_key(#{recv_topic := Topic}, out, _From, To) ->
173 20 user_topic_routing_key(To, Topic).
174
175 8 msg_type_to_key(chat) -> chat_msg_exchange;
176 20 msg_type_to_key(groupchat) -> groupchat_msg_exchange.
177
178 -spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary().
179 user_topic_routing_key(JID, Topic) ->
180 28 {LUser, LServer, _Res} = jid:to_lower(JID),
181 28 BinJID = jid:to_binary({LUser, LServer}),
182 28 <<BinJID/binary, ".", Topic/binary>>.
183
184 -spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary().
185 presence_msg(JID, Status) ->
186 32 Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)},
187 32 jiffy:encode(Msg).
188
189 -spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) ->
190 binary().
191 chat_msg(From, To, UserMsg) ->
192 28 Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)),
193 message => UserMsg,
194 from_user_id => jid:to_binary(jid:to_lower(From))},
195 28 jiffy:encode(Msg).
196
197 -spec is_user_online(online | offline) -> boolean().
198 16 is_user_online(online) -> true;
199 16 is_user_online(offline) -> false.
200
201 -spec exchange_opts(mongooseim:host_type(), gen_mod:opt_key()) -> exchange_opts().
202 exchange_opts(HostType, ExchangeKey) ->
203 72 gen_mod:get_module_opt(HostType, ?MODULE, ExchangeKey).
204
205 -spec verify_exchanges_were_created_or_crash(Res :: list(), [exchange_opts()])
206 -> ok | no_return().
207 verify_exchanges_were_created_or_crash(Res, Exchanges) ->
208 4 case lists:all(fun(E) ->
209 12 element(2, E) == mongoose_amqp:exchange_declare_ok()
210 end, Res) of
211 true ->
212 4 ok;
213 false ->
214
:-(
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p",
215 [Exchanges]))
216 end.
Line Hits Source