1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc |
17 |
|
%% Backend for `mod_event_pusher' that supports RabbitMQ integration. |
18 |
|
%% @end |
19 |
|
%%============================================================================== |
20 |
|
|
21 |
|
-module(mod_event_pusher_rabbit). |
22 |
|
-author('kacper.mentel@erlang-solutions.com'). |
23 |
|
|
24 |
|
-include_lib("mongooseim/include/mongoose.hrl"). |
25 |
|
-include_lib("mongooseim/include/mod_event_pusher_events.hrl"). |
26 |
|
-include("mongoose_config_spec.hrl"). |
27 |
|
|
28 |
|
-behaviour(gen_mod). |
29 |
|
-behaviour(mongoose_module_metrics). |
30 |
|
-behaviour(mod_event_pusher). |
31 |
|
|
32 |
|
%%%=================================================================== |
33 |
|
%%% Definitions |
34 |
|
%%%=================================================================== |
35 |
|
|
36 |
|
-define(POOL_TAG, event_pusher). |
37 |
|
|
38 |
|
-define(DEFAULT_PRESENCE_EXCHANGE, <<"presence">>). |
39 |
|
-define(DEFAULT_PRESENCE_EXCHANGE_TYPE, <<"topic">>). |
40 |
|
-define(DEFAULT_CHAT_MSG_EXCHANGE, <<"chat_msg">>). |
41 |
|
-define(DEFAULT_CHAT_MSG_EXCHANGE_TYPE, <<"topic">>). |
42 |
|
-define(DEFAULT_CHAT_MSG_SENT_TOPIC, <<"chat_msg_sent">>). |
43 |
|
-define(DEFAULT_CHAT_MSG_RECV_TOPIC, <<"chat_msg_recv">>). |
44 |
|
-define(DEFAULT_GROUP_CHAT_MSG_EXCHANGE, <<"groupchat_msg">>). |
45 |
|
-define(DEFAULT_GROUP_CHAT_MSG_EXCHANGE_TYPE, <<"topic">>). |
46 |
|
-define(DEFAULT_GROUP_CHAT_MSG_SENT_TOPIC, <<"groupchat_msg_sent">>). |
47 |
|
-define(DEFAULT_GROUP_CHAT_MSG_RECV_TOPIC, <<"groupchat_msg_recv">>). |
48 |
|
|
49 |
|
%%%=================================================================== |
50 |
|
%%% Exports |
51 |
|
%%%=================================================================== |
52 |
|
|
53 |
|
%% MIM module callbacks |
54 |
|
-export([start/2, stop/1, config_spec/0]). |
55 |
|
|
56 |
|
%% API |
57 |
|
-export([push_event/3]). |
58 |
|
|
59 |
|
%%%=================================================================== |
60 |
|
%%% Callbacks |
61 |
|
%%%=================================================================== |
62 |
|
|
63 |
|
-spec start(Host :: jid:server(), Opts :: proplists:proplist()) -> ok. |
64 |
|
start(Host, _Opts) -> |
65 |
4 |
initialize_metrics(Host), |
66 |
4 |
create_exchanges(Host), |
67 |
4 |
ok. |
68 |
|
|
69 |
|
-spec stop(Host :: jid:server()) -> ok. |
70 |
|
stop(_Host) -> |
71 |
4 |
ok. |
72 |
|
|
73 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
74 |
|
config_spec() -> |
75 |
160 |
#section{ |
76 |
|
items = #{<<"presence_exchange">> => #section{items = exch_items()}, |
77 |
|
<<"chat_msg_exchange">> => #section{items = msg_exch_items()}, |
78 |
|
<<"groupchat_msg_exchange">> => #section{items = msg_exch_items()} |
79 |
|
} |
80 |
|
}. |
81 |
|
|
82 |
|
msg_exch_items() -> |
83 |
320 |
ExchItems = exch_items(), |
84 |
320 |
ExchItems#{<<"sent_topic">> => #option{type = binary, |
85 |
|
validate = non_empty}, |
86 |
|
<<"recv_topic">> => #option{type = binary, |
87 |
|
validate = non_empty}}. |
88 |
|
|
89 |
|
exch_items() -> |
90 |
480 |
#{<<"name">> => #option{type = binary, |
91 |
|
validate = non_empty}, |
92 |
|
<<"type">> => #option{type = binary, |
93 |
|
validate = non_empty}}. |
94 |
|
|
95 |
|
push_event(Acc, _, #user_status_event{jid = UserJID, status = Status}) -> |
96 |
32 |
handle_user_presence_change(UserJID, Status), |
97 |
32 |
Acc; |
98 |
|
push_event(Acc, _, ChatEvent = #chat_event{}) -> |
99 |
64 |
handle_user_chat_event(ChatEvent), |
100 |
64 |
Acc; |
101 |
|
push_event(Acc, _, _) -> |
102 |
:-( |
Acc. |
103 |
|
|
104 |
|
%%%=================================================================== |
105 |
|
%%% Internal functions |
106 |
|
%%%=================================================================== |
107 |
|
|
108 |
|
-spec initialize_metrics(Host :: jid:server()) -> [ok | {ok | error, term()}]. |
109 |
|
initialize_metrics(Host) -> |
110 |
4 |
[mongoose_metrics:ensure_metric(Host, Name, Type) |
111 |
4 |
|| {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)]. |
112 |
|
|
113 |
|
-spec create_exchanges(Host :: jid:server()) -> ok. |
114 |
|
create_exchanges(Host) -> |
115 |
4 |
Exchanges = exchanges(Host), |
116 |
4 |
Res = |
117 |
12 |
[call_rabbit_worker(Host, {amqp_call, |
118 |
|
mongoose_amqp:exchange_declare(ExName, Type)}) |
119 |
4 |
|| {ExName, Type} <- Exchanges], |
120 |
4 |
verify_exchanges_were_created_or_crash(Res, Exchanges). |
121 |
|
|
122 |
|
-spec handle_user_presence_change(JID :: jid:jid(), Status :: atom()) -> ok. |
123 |
|
handle_user_presence_change(JID = #jid{lserver = Host}, Status) -> |
124 |
32 |
Exchange = exchange_opt(Host, presence_exchange, name, |
125 |
|
?DEFAULT_PRESENCE_EXCHANGE), |
126 |
32 |
RoutingKey = presence_routing_key(JID), |
127 |
32 |
Message = presence_msg(JID, Status), |
128 |
32 |
PublishMethod = mongoose_amqp:basic_publish(Exchange, RoutingKey), |
129 |
32 |
AMQPMessage = mongoose_amqp:message(Message), |
130 |
32 |
cast_rabbit_worker(Host, {amqp_publish, PublishMethod, AMQPMessage}). |
131 |
|
|
132 |
|
-spec handle_user_chat_event(#chat_event{}) -> ok. |
133 |
|
handle_user_chat_event(#chat_event{from = From, to = To, packet = Packet, |
134 |
|
type = Type, direction = Direction}) when |
135 |
|
Type == chat orelse Type == groupchat -> |
136 |
38 |
Host = get_host(From, To, Direction), |
137 |
38 |
UserMessage = extract_message(Packet), |
138 |
38 |
Message = chat_msg(From, To, UserMessage), |
139 |
38 |
Exchange = get_chat_exchange(Type, Host), |
140 |
38 |
RoutingKey = chat_event_routing_key(Type, Direction, From, To, Host), |
141 |
38 |
PublishMethod = mongoose_amqp:basic_publish(Exchange, RoutingKey), |
142 |
38 |
AMQPMessage = mongoose_amqp:message(Message), |
143 |
38 |
cast_rabbit_worker(Host, {amqp_publish, PublishMethod, AMQPMessage}); |
144 |
26 |
handle_user_chat_event(_) -> ok. |
145 |
|
|
146 |
|
%%%=================================================================== |
147 |
|
%%% Helpers |
148 |
|
%%%=================================================================== |
149 |
|
|
150 |
|
-spec call_rabbit_worker(Host :: binary(), Msg :: term()) -> term(). |
151 |
|
call_rabbit_worker(Host, Msg) -> |
152 |
12 |
mongoose_wpool:call(rabbit, Host, ?POOL_TAG, Msg). |
153 |
|
|
154 |
|
-spec cast_rabbit_worker(Host :: binary(), Msg :: term()) -> ok. |
155 |
|
cast_rabbit_worker(Host, Msg) -> |
156 |
70 |
mongoose_wpool:cast(rabbit, Host, ?POOL_TAG, Msg). |
157 |
|
|
158 |
|
-spec exchanges(Host :: jid:server()) -> [{binary(), binary()}]. |
159 |
|
exchanges(Host) -> |
160 |
4 |
[{ |
161 |
|
exchange_opt(Host, ExKey, name, DefName), |
162 |
|
exchange_opt(Host, ExKey, type, DefType) |
163 |
|
} || {ExKey, DefName, DefType} <- |
164 |
4 |
[ |
165 |
|
{presence_exchange, ?DEFAULT_PRESENCE_EXCHANGE, |
166 |
|
?DEFAULT_PRESENCE_EXCHANGE_TYPE}, |
167 |
|
{chat_msg_exchange, ?DEFAULT_CHAT_MSG_EXCHANGE, |
168 |
|
?DEFAULT_CHAT_MSG_EXCHANGE_TYPE}, |
169 |
|
{groupchat_msg_exchange, ?DEFAULT_GROUP_CHAT_MSG_EXCHANGE, |
170 |
|
?DEFAULT_GROUP_CHAT_MSG_EXCHANGE_TYPE} |
171 |
|
]]. |
172 |
|
|
173 |
|
-spec get_chat_exchange(chat | groupchat, Host :: binary()) -> binary(). |
174 |
|
get_chat_exchange(chat, Host) -> |
175 |
8 |
exchange_opt(Host, chat_msg_exchange, name, ?DEFAULT_CHAT_MSG_EXCHANGE); |
176 |
|
get_chat_exchange(groupchat, Host) -> |
177 |
30 |
exchange_opt(Host, groupchat_msg_exchange, name, |
178 |
|
?DEFAULT_GROUP_CHAT_MSG_EXCHANGE). |
179 |
|
|
180 |
|
-spec get_host(jid:jid(), jid:jid(), in | out) -> binary(). |
181 |
8 |
get_host(#jid{lserver = Host}, _To, in) -> Host; |
182 |
30 |
get_host(_From, #jid{lserver = Host}, out) -> Host. |
183 |
|
|
184 |
|
-spec extract_message(Packet :: exml:element()) -> binary(). |
185 |
|
extract_message(Packet) -> |
186 |
38 |
Body = exml_query:subelement(Packet, <<"body">>), |
187 |
38 |
exml_query:cdata(Body). |
188 |
|
|
189 |
|
-spec presence_routing_key(JID :: jid:jid()) -> binary(). |
190 |
|
presence_routing_key(JID) -> |
191 |
32 |
{User, Host, _} = jid:to_lower(JID), |
192 |
32 |
jid:to_binary({User, Host}). |
193 |
|
|
194 |
|
-spec chat_event_routing_key(chat | groupchat, in | out, From :: jid:jid(), |
195 |
|
To :: jid:jid(), Host :: binary()) -> binary(). |
196 |
|
chat_event_routing_key(chat, in, From, _To, Host) -> |
197 |
4 |
Topic = exchange_opt(Host, chat_msg_exchange, sent_topic, |
198 |
|
?DEFAULT_CHAT_MSG_SENT_TOPIC), |
199 |
4 |
user_topic_routing_key(From, Topic); |
200 |
|
chat_event_routing_key(chat, out, _From, To, Host) -> |
201 |
4 |
Topic = exchange_opt(Host, chat_msg_exchange, recv_topic, |
202 |
|
?DEFAULT_CHAT_MSG_RECV_TOPIC), |
203 |
4 |
user_topic_routing_key(To, Topic); |
204 |
|
chat_event_routing_key(groupchat, in, From, _To, Host) -> |
205 |
4 |
Topic = exchange_opt(Host, groupchat_msg_exchange, sent_topic, |
206 |
|
?DEFAULT_GROUP_CHAT_MSG_SENT_TOPIC), |
207 |
4 |
user_topic_routing_key(From, Topic); |
208 |
|
chat_event_routing_key(groupchat, out, _From, To, Host) -> |
209 |
26 |
Topic = exchange_opt(Host, groupchat_msg_exchange, recv_topic, |
210 |
|
?DEFAULT_GROUP_CHAT_MSG_RECV_TOPIC), |
211 |
26 |
user_topic_routing_key(To, Topic). |
212 |
|
|
213 |
|
-spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary(). |
214 |
|
user_topic_routing_key(JID, Topic) -> |
215 |
38 |
{User, Host, _Res} = jid:to_lower(JID), |
216 |
38 |
BinJID = jid:to_binary({User, Host}), |
217 |
38 |
<<BinJID/binary, ".", Topic/binary>>. |
218 |
|
|
219 |
|
-spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary(). |
220 |
|
presence_msg(JID, Status) -> |
221 |
32 |
Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)}, |
222 |
32 |
jiffy:encode(Msg). |
223 |
|
|
224 |
|
-spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) -> |
225 |
|
binary(). |
226 |
|
chat_msg(From, To, UserMsg) -> |
227 |
38 |
Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)), |
228 |
|
message => UserMsg, |
229 |
|
from_user_id => jid:to_binary(jid:to_lower(From))}, |
230 |
38 |
jiffy:encode(Msg). |
231 |
|
|
232 |
|
-spec is_user_online(online | offline) -> boolean(). |
233 |
16 |
is_user_online(online) -> true; |
234 |
16 |
is_user_online(offline) -> false. |
235 |
|
|
236 |
|
-spec exchange_opt(Host :: jid:lserver(), ExchangeKey :: atom(), |
237 |
|
Option :: atom(), Default :: term()) -> term(). |
238 |
|
exchange_opt(Host, ExchangeKey, Option, Default) -> |
239 |
132 |
ExchangeOptions = opt(Host, ExchangeKey, []), |
240 |
132 |
proplists:get_value(Option, ExchangeOptions, Default). |
241 |
|
|
242 |
|
-spec opt(Host :: jid:lserver(), Option :: atom(), Default :: term()) -> |
243 |
|
Value :: term(). |
244 |
|
opt(Host, Option, Default) -> |
245 |
132 |
gen_mod:get_module_opt(Host, ?MODULE, Option, Default). |
246 |
|
|
247 |
|
-spec verify_exchanges_were_created_or_crash(Res :: list(), |
248 |
|
Exchanges :: [{binary(), binary()}]) |
249 |
|
-> ok | no_return(). |
250 |
|
verify_exchanges_were_created_or_crash(Res, Exchanges) -> |
251 |
4 |
case lists:all(fun(E) -> |
252 |
12 |
element(2, E) == mongoose_amqp:exchange_declare_ok() |
253 |
|
end, Res) of |
254 |
|
true -> |
255 |
4 |
ok; |
256 |
|
false -> |
257 |
:-( |
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p", |
258 |
|
[Exchanges])) |
259 |
|
end. |