1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc |
17 |
|
%% Backend for `mod_event_pusher' that supports RabbitMQ integration. |
18 |
|
%% @end |
19 |
|
%%============================================================================== |
20 |
|
|
21 |
|
-module(mod_event_pusher_rabbit). |
22 |
|
-author('kacper.mentel@erlang-solutions.com'). |
23 |
|
|
24 |
|
-include_lib("mongooseim/include/mongoose.hrl"). |
25 |
|
-include_lib("mongooseim/include/mod_event_pusher_events.hrl"). |
26 |
|
-include("mongoose_config_spec.hrl"). |
27 |
|
|
28 |
|
-behaviour(gen_mod). |
29 |
|
-behaviour(mongoose_module_metrics). |
30 |
|
-behaviour(mod_event_pusher). |
31 |
|
|
32 |
|
%%%=================================================================== |
33 |
|
%%% Definitions |
34 |
|
%%%=================================================================== |
35 |
|
|
36 |
|
-define(POOL_TAG, event_pusher). |
37 |
|
|
38 |
|
-type exchange_opts() :: #{name := binary(), type := binary(), |
39 |
|
sent_topic => binary(), recv_topic => binary()}. |
40 |
|
|
41 |
|
%%%=================================================================== |
42 |
|
%%% Exports |
43 |
|
%%%=================================================================== |
44 |
|
|
45 |
|
%% MIM module callbacks |
46 |
|
-export([start/2, stop/1, config_spec/0]). |
47 |
|
|
48 |
|
%% API |
49 |
|
-export([push_event/2]). |
50 |
|
|
51 |
|
%%%=================================================================== |
52 |
|
%%% Callbacks |
53 |
|
%%%=================================================================== |
54 |
|
|
55 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
56 |
|
start(HostType, _Opts) -> |
57 |
:-( |
initialize_metrics(HostType), |
58 |
:-( |
create_exchanges(HostType), |
59 |
:-( |
ok. |
60 |
|
|
61 |
|
-spec stop(mongooseim:host_type()) -> ok. |
62 |
|
stop(_HostType) -> |
63 |
:-( |
ok. |
64 |
|
|
65 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
66 |
|
config_spec() -> |
67 |
152 |
#section{items = #{<<"presence_exchange">> => exchange_spec(<<"presence">>), |
68 |
|
<<"chat_msg_exchange">> => msg_exchange_spec(<<"chat_msg">>), |
69 |
|
<<"groupchat_msg_exchange">> => msg_exchange_spec(<<"groupchat_msg">>)}, |
70 |
|
format_items = map}. |
71 |
|
|
72 |
|
msg_exchange_spec(Name) -> |
73 |
304 |
Spec = #section{items = Items, defaults = Defaults} = exchange_spec(Name), |
74 |
304 |
Spec#section{items = Items#{<<"sent_topic">> => #option{type = binary, |
75 |
|
validate = non_empty}, |
76 |
|
<<"recv_topic">> => #option{type = binary, |
77 |
|
validate = non_empty}}, |
78 |
|
defaults = Defaults#{<<"sent_topic">> => <<Name/binary, "_sent">>, |
79 |
|
<<"recv_topic">> => <<Name/binary, "_recv">>}, |
80 |
|
format_items = map, |
81 |
|
include = always |
82 |
|
}. |
83 |
|
|
84 |
|
exchange_spec(Name) -> |
85 |
456 |
#section{items = #{<<"name">> => #option{type = binary, |
86 |
|
validate = non_empty}, |
87 |
|
<<"type">> => #option{type = binary, |
88 |
|
validate = non_empty}}, |
89 |
|
defaults = #{<<"name">> => Name, |
90 |
|
<<"type">> => <<"topic">>}, |
91 |
|
format_items = map, |
92 |
|
include = always}. |
93 |
|
|
94 |
|
push_event(Acc, #user_status_event{jid = UserJID, status = Status}) -> |
95 |
:-( |
handle_user_presence_change(mongoose_acc:host_type(Acc), UserJID, Status), |
96 |
:-( |
Acc; |
97 |
|
push_event(Acc, ChatEvent = #chat_event{}) -> |
98 |
:-( |
handle_user_chat_event(mongoose_acc:host_type(Acc), ChatEvent), |
99 |
:-( |
Acc; |
100 |
|
push_event(Acc, _) -> |
101 |
:-( |
Acc. |
102 |
|
|
103 |
|
%%%=================================================================== |
104 |
|
%%% Internal functions |
105 |
|
%%%=================================================================== |
106 |
|
|
107 |
|
-spec initialize_metrics(mongooseim:host_type()) -> [ok | {ok | error, term()}]. |
108 |
|
initialize_metrics(HostType) -> |
109 |
:-( |
[mongoose_metrics:ensure_metric(HostType, Name, Type) |
110 |
:-( |
|| {Name, Type} <- mongoose_rabbit_worker:list_metrics(?POOL_TAG)]. |
111 |
|
|
112 |
|
-spec create_exchanges(mongooseim:host_type()) -> ok. |
113 |
|
create_exchanges(HostType) -> |
114 |
:-( |
Exchanges = exchanges(HostType), |
115 |
:-( |
Res = [call_rabbit_worker(HostType, {amqp_call, |
116 |
|
mongoose_amqp:exchange_declare(ExName, Type)}) |
117 |
:-( |
|| #{name := ExName, type := Type} <- Exchanges], |
118 |
:-( |
verify_exchanges_were_created_or_crash(Res, Exchanges). |
119 |
|
|
120 |
|
-spec handle_user_presence_change(mongooseim:host_type(), JID :: jid:jid(), Status :: atom()) -> ok. |
121 |
|
handle_user_presence_change(HostType, JID, Status) -> |
122 |
:-( |
#{name := ExchangeName} = exchange_opts(HostType, presence_exchange), |
123 |
:-( |
RoutingKey = presence_routing_key(JID), |
124 |
:-( |
Message = presence_msg(JID, Status), |
125 |
:-( |
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey), |
126 |
:-( |
AMQPMessage = mongoose_amqp:message(Message), |
127 |
:-( |
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}). |
128 |
|
|
129 |
|
-spec handle_user_chat_event(mongooseim:host_type(), #chat_event{}) -> ok. |
130 |
|
handle_user_chat_event(HostType, #chat_event{from = From, to = To, packet = Packet, |
131 |
|
type = Type, direction = Direction}) when |
132 |
|
Type == chat orelse Type == groupchat -> |
133 |
:-( |
UserMessage = extract_message(Packet), |
134 |
:-( |
Message = chat_msg(From, To, UserMessage), |
135 |
:-( |
#{name := ExchangeName} = ExchangeOpts = exchange_opts(HostType, msg_type_to_key(Type)), |
136 |
:-( |
RoutingKey = chat_event_routing_key(ExchangeOpts, Direction, From, To), |
137 |
:-( |
PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey), |
138 |
:-( |
AMQPMessage = mongoose_amqp:message(Message), |
139 |
:-( |
cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}); |
140 |
:-( |
handle_user_chat_event(_HostType, _) -> ok. |
141 |
|
|
142 |
|
%%%=================================================================== |
143 |
|
%%% Helpers |
144 |
|
%%%=================================================================== |
145 |
|
|
146 |
|
-spec call_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> term(). |
147 |
|
call_rabbit_worker(HostType, Msg) -> |
148 |
:-( |
mongoose_wpool:call(rabbit, HostType, ?POOL_TAG, Msg). |
149 |
|
|
150 |
|
-spec cast_rabbit_worker(mongooseim:host_type(), Msg :: term()) -> ok. |
151 |
|
cast_rabbit_worker(HostType, Msg) -> |
152 |
:-( |
mongoose_wpool:cast(rabbit, HostType, ?POOL_TAG, Msg). |
153 |
|
|
154 |
|
-spec exchanges(mongooseim:host_type()) -> [exchange_opts()]. |
155 |
|
exchanges(HostType) -> |
156 |
:-( |
[exchange_opts(HostType, ExKey) || ExKey <- exchange_keys()]. |
157 |
|
|
158 |
|
exchange_keys() -> |
159 |
:-( |
[presence_exchange, chat_msg_exchange, groupchat_msg_exchange]. |
160 |
|
|
161 |
|
-spec extract_message(Packet :: exml:element()) -> binary(). |
162 |
|
extract_message(Packet) -> |
163 |
:-( |
Body = exml_query:subelement(Packet, <<"body">>), |
164 |
:-( |
exml_query:cdata(Body). |
165 |
|
|
166 |
|
-spec presence_routing_key(JID :: jid:jid()) -> jid:literal_jid(). |
167 |
|
presence_routing_key(JID) -> |
168 |
:-( |
{LUser, LServer, _} = jid:to_lower(JID), |
169 |
:-( |
jid:to_binary({LUser, LServer}). |
170 |
|
|
171 |
|
-spec chat_event_routing_key(exchange_opts(), in | out, From :: jid:jid(), |
172 |
|
To :: jid:jid()) -> binary(). |
173 |
|
chat_event_routing_key(#{sent_topic := Topic}, in, From, _To) -> |
174 |
:-( |
user_topic_routing_key(From, Topic); |
175 |
|
chat_event_routing_key(#{recv_topic := Topic}, out, _From, To) -> |
176 |
:-( |
user_topic_routing_key(To, Topic). |
177 |
|
|
178 |
:-( |
msg_type_to_key(chat) -> chat_msg_exchange; |
179 |
:-( |
msg_type_to_key(groupchat) -> groupchat_msg_exchange. |
180 |
|
|
181 |
|
-spec user_topic_routing_key(JID :: jid:jid(), Topic :: binary()) -> binary(). |
182 |
|
user_topic_routing_key(JID, Topic) -> |
183 |
:-( |
{LUser, LServer, _Res} = jid:to_lower(JID), |
184 |
:-( |
BinJID = jid:to_binary({LUser, LServer}), |
185 |
:-( |
<<BinJID/binary, ".", Topic/binary>>. |
186 |
|
|
187 |
|
-spec presence_msg(JID :: jid:jid(), Status :: atom()) -> binary(). |
188 |
|
presence_msg(JID, Status) -> |
189 |
:-( |
Msg = #{user_id => jid:to_binary(jid:to_lower(JID)), present => is_user_online(Status)}, |
190 |
:-( |
jiffy:encode(Msg). |
191 |
|
|
192 |
|
-spec chat_msg(From :: jid:jid(), To :: jid:jid(), UserMsg :: binary()) -> |
193 |
|
binary(). |
194 |
|
chat_msg(From, To, UserMsg) -> |
195 |
:-( |
Msg = #{to_user_id => jid:to_binary(jid:to_lower(To)), |
196 |
|
message => UserMsg, |
197 |
|
from_user_id => jid:to_binary(jid:to_lower(From))}, |
198 |
:-( |
jiffy:encode(Msg). |
199 |
|
|
200 |
|
-spec is_user_online(online | offline) -> boolean(). |
201 |
:-( |
is_user_online(online) -> true; |
202 |
:-( |
is_user_online(offline) -> false. |
203 |
|
|
204 |
|
-spec exchange_opts(mongooseim:host_type(), gen_mod:opt_key()) -> exchange_opts(). |
205 |
|
exchange_opts(HostType, ExchangeKey) -> |
206 |
:-( |
gen_mod:get_module_opt(HostType, ?MODULE, ExchangeKey). |
207 |
|
|
208 |
|
-spec verify_exchanges_were_created_or_crash(Res :: list(), [exchange_opts()]) |
209 |
|
-> ok | no_return(). |
210 |
|
verify_exchanges_were_created_or_crash(Res, Exchanges) -> |
211 |
:-( |
case lists:all(fun(E) -> |
212 |
:-( |
element(2, E) == mongoose_amqp:exchange_declare_ok() |
213 |
|
end, Res) of |
214 |
|
true -> |
215 |
:-( |
ok; |
216 |
|
false -> |
217 |
:-( |
erlang:error(io_lib:format("Creating exchanges failed, exchanges=~p", |
218 |
|
[Exchanges])) |
219 |
|
end. |