1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc |
17 |
|
%% Backend for `mod_event_pusher' that supports RabbitMQ integration. |
18 |
|
%% @end |
19 |
|
%%============================================================================== |
20 |
|
|
21 |
|
-module(mod_event_pusher_rabbit). |
22 |
|
-author('kacper.mentel@erlang-solutions.com'). |
23 |
|
|
24 |
|
-include_lib("mongooseim/include/mongoose.hrl"). |
25 |
|
-include_lib("mongooseim/include/mod_event_pusher_events.hrl"). |
26 |
|
-include("mongoose_config_spec.hrl"). |
27 |
|
|
28 |
|
-behaviour(gen_mod). |
29 |
|
-behaviour(mongoose_module_metrics). |
30 |
|
-behaviour(mod_event_pusher). |
31 |
|
|
32 |
|
%%%=================================================================== |
33 |
|
%%% Definitions |
34 |
|
%%%=================================================================== |
35 |
|
|
36 |
|
-define(POOL_TAG, event_pusher). |
37 |
|
|
38 |
|
-type exchange_opts() :: #{name := binary(), type := binary(), |
39 |
|
sent_topic => binary(), recv_topic => binary()}. |
40 |
|
|
41 |
|
%%%=================================================================== |
42 |
|
%%% Exports |
43 |
|
%%%=================================================================== |
44 |
|
|
45 |
|
%% MIM module callbacks |
46 |
|
-export([start/2, stop/1, config_spec/0]). |
47 |
|
|
48 |
|
%% API |
49 |
|
-export([push_event/2]). |
50 |
|
|
51 |
|
%%%=================================================================== |
52 |
|
%%% Callbacks |
53 |
|
%%%=================================================================== |
54 |
|
|
55 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
56 |
|
start(HostType, _Opts) -> |
57 |
4 |
initialize_metrics(HostType), |
58 |
4 |
create_exchanges(HostType), |
59 |
4 |
ok. |
60 |
|
|
61 |
|
-spec stop(mongooseim:host_type()) -> ok. |
62 |
|
stop(_HostType) -> |
63 |
4 |
ok. |
64 |
|
|
65 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
66 |
|
config_spec() -> |
67 |
200 |
#section{items = #{<<"presence_exchange">> => exchange_spec(<<"presence">>), |
68 |
|
<<"chat_msg_exchange">> => msg_exchange_spec(<<"chat_msg">>), |
69 |
|
<<"groupchat_msg_exchange">> => msg_exchange_spec(<<"groupchat_msg">>)}}. |
70 |
|
|
71 |
|
msg_exchange_spec(Name) -> |
72 |
400 |
Spec = #section{items = Items, defaults = Defaults} = exchange_spec(Name), |
73 |
400 |
Spec#section{items = Items#{<<"sent_topic">> => #option{type = binary, |
74 |
|
validate = non_empty}, |
75 |
|
<<"recv_topic">> => #option{type = binary, |
76 |
|
validate = non_empty}}, |
77 |
|
defaults = Defaults#{<<"sent_topic">> => <<Name/binary, "_sent">>, |
78 |
|
<<"recv_topic">> => <<Name/binary, "_recv">>}, |
79 |
|
include = always |
80 |
|
}. |
81 |
|
|
82 |
|
exchange_spec(Name) -> |
83 |
600 |
#section{items = #{<<"name">> => #option{type = binary, |
84 |
|
validate = non_empty}, |
85 |
|
<<"type">> => #option{type = binary, |
86 |
|
validate = non_empty}}, |
87 |
|
defaults = #{<<"name">> => Name, |
88 |
|
<<"type">> => <<"topic">>}, |
89 |
|
include = always}. |
90 |
|
|
91 |
|
push_event(Acc, #user_status_event{jid = UserJID, status = Status}) -> |
92 |
32 |
handle_user_presence_change(mongoose_acc:host_type(Acc), UserJID, Status), |
93 |
32 |
Acc; |
94 |
|
push_event(Acc, ChatEvent = #chat_event{}) -> |
95 |
60 |
handle_user_chat_event(mongoose_acc:host_type(Acc), ChatEvent), |
96 |
60 |
Acc; |
97 |
|
push_event(Acc, _) -> |
98 |
:-( |
Acc. |
99 |
|
|
100 |
|
%%%=================================================================== |
101 |
|
%%% Internal functions |
102 |
|
%%%=================================================================== |
103 |
|
|
104 |
|
-spec initialize_metrics(mongooseim:host_type()) -> [ok | {ok | error, term()}]. |
105 |
|
initialize_metrics(HostType) -> |
106 |
4 |
[mongoose_metrics:ensure_metric(HostType, Name, Type) |
107 |
4 |
|| {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)]. |
108 |
|
|
109 |
|
-spec create_exchanges(mongooseim:host_type()) -> ok. |
110 |
|
create_exchanges(HostType) -> |
111 |
4 |
Exchanges = exchanges(HostType), |
112 |
4 |
Res = [call_rabbit_worker(HostType, {amqp_call, |
113 |
|
mongoose_amqp:exchange_declare(ExName, Type)}) |
114 |
4 |
|| #{name := ExName, type := Type} <- Exchanges], |
115 |
4 |
verify_exchanges_were_created_or_crash(Res, Exchanges). |
116 |
|
|
117 |
|
-spec handle_user_presence_change(mongooseim:host_type(), JID :: jid:jid(), Status :: atom()) -> ok. |
118 |
|
handle_user_presence_change(HostType, JID, Status) -> |
119 |
32 |
#{name := ExchangeName} = exchange_opts(HostType, presence_exchange), |
120 |
32 |
RoutingKey = presence_routing_key(JID), |
121 |
32 |
Message = presence_msg(JID, Status), |
122 |
32 |
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey), |
123 |
32 |
AMQPMessage = mongoose_amqp:message(Message), |
124 |
32 |
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}). |
125 |
|
|
126 |
|
-spec handle_user_chat_event(mongooseim:host_type(), #chat_event{}) -> ok. |
127 |
|
handle_user_chat_event(HostType, #chat_event{from = From, to = To, packet = Packet, |
128 |
|
type = Type, direction = Direction}) when |
129 |
|
Type == chat orelse Type == groupchat -> |
130 |
28 |
UserMessage = extract_message(Packet), |
131 |
28 |
Message = chat_msg(From, To, UserMessage), |
132 |
28 |
#{name := ExchangeName} = ExchangeOpts = exchange_opts(HostType, msg_type_to_key(Type)), |
133 |
28 |
RoutingKey = chat_event_routing_key(ExchangeOpts, Direction, From, To), |
134 |
28 |
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey), |
135 |
28 |
AMQPMessage = mongoose_amqp:message(Message), |
136 |
28 |
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}); |
137 |
32 |
handle_user_chat_event(_HostType, _) -> ok. |
138 |
|
|
139 |
|
%%%=================================================================== |
140 |
|
%%% Helpers |
141 |
|
%%%=================================================================== |
142 |
|
|
143 |
|
-spec call_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> term(). |
144 |
|
call_rabbit_worker(HostType, Msg) -> |
145 |
12 |
mongoose_wpool:call(rabbit, HostType, ?POOL_TAG, Msg). |
146 |
|
|
147 |
|
-spec cast_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> ok. |
148 |
|
cast_rabbit_worker(HostType, Msg) -> |
149 |
60 |
mongoose_wpool:cast(rabbit, HostType, ?POOL_TAG, Msg). |
150 |
|
|
151 |
|
-spec exchanges(mongooseim:host_type()) -> [exchange_opts()]. |
152 |
|
exchanges(HostType) -> |
153 |
4 |
[exchange_opts(HostType, ExKey) || ExKey <- exchange_keys()]. |
154 |
|
|
155 |
|
exchange_keys() -> |
156 |
4 |
[presence_exchange, chat_msg_exchange, groupchat_msg_exchange]. |
157 |
|
|
158 |
|
-spec extract_message(Packet :: exml:element()) -> binary(). |
159 |
|
extract_message(Packet) -> |
160 |
28 |
Body = exml_query:subelement(Packet, <<"body">>), |
161 |
28 |
exml_query:cdata(Body). |
162 |
|
|
163 |
|
-spec presence_routing_key(JID :: jid:jid()) -> jid:literal_jid(). |
164 |
|
presence_routing_key(JID) -> |
165 |
32 |
{LUser, LServer, _} = jid:to_lower(JID), |
166 |
32 |
jid:to_binary({LUser, LServer}). |
167 |
|
|
168 |
|
-spec chat_event_routing_key(exchange_opts(), in | out, From :: jid:jid(), |
169 |
|
To :: jid:jid()) -> binary(). |
170 |
|
chat_event_routing_key(#{sent_topic := Topic}, in, From, _To) -> |
171 |
8 |
user_topic_routing_key(From, Topic); |
172 |
|
chat_event_routing_key(#{recv_topic := Topic}, out, _From, To) -> |
173 |
20 |
user_topic_routing_key(To, Topic). |
174 |
|
|
175 |
8 |
msg_type_to_key(chat) -> chat_msg_exchange; |
176 |
20 |
msg_type_to_key(groupchat) -> groupchat_msg_exchange. |
177 |
|
|
178 |
|
-spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary(). |
179 |
|
user_topic_routing_key(JID, Topic) -> |
180 |
28 |
{LUser, LServer, _Res} = jid:to_lower(JID), |
181 |
28 |
BinJID = jid:to_binary({LUser, LServer}), |
182 |
28 |
<<BinJID/binary, ".", Topic/binary>>. |
183 |
|
|
184 |
|
-spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary(). |
185 |
|
presence_msg(JID, Status) -> |
186 |
32 |
Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)}, |
187 |
32 |
jiffy:encode(Msg). |
188 |
|
|
189 |
|
-spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) -> |
190 |
|
binary(). |
191 |
|
chat_msg(From, To, UserMsg) -> |
192 |
28 |
Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)), |
193 |
|
message => UserMsg, |
194 |
|
from_user_id => jid:to_binary(jid:to_lower(From))}, |
195 |
28 |
jiffy:encode(Msg). |
196 |
|
|
197 |
|
-spec is_user_online(online | offline) -> boolean(). |
198 |
16 |
is_user_online(online) -> true; |
199 |
16 |
is_user_online(offline) -> false. |
200 |
|
|
201 |
|
-spec exchange_opts(mongooseim:host_type(), gen_mod:opt_key()) -> exchange_opts(). |
202 |
|
exchange_opts(HostType, ExchangeKey) -> |
203 |
72 |
gen_mod:get_module_opt(HostType, ?MODULE, ExchangeKey). |
204 |
|
|
205 |
|
-spec verify_exchanges_were_created_or_crash(Res :: list(), [exchange_opts()]) |
206 |
|
-> ok | no_return(). |
207 |
|
verify_exchanges_were_created_or_crash(Res, Exchanges) -> |
208 |
4 |
case lists:all(fun(E) -> |
209 |
12 |
element(2, E) == mongoose_amqp:exchange_declare_ok() |
210 |
|
end, Res) of |
211 |
|
true -> |
212 |
4 |
ok; |
213 |
|
false -> |
214 |
:-( |
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p", |
215 |
|
[Exchanges])) |
216 |
|
end. |