1 |
|
-module(mod_event_pusher_sns). |
2 |
|
|
3 |
|
-behaviour(gen_mod). |
4 |
|
-behaviour(mod_event_pusher). |
5 |
|
|
6 |
|
-include("mod_event_pusher_events.hrl"). |
7 |
|
-include("mongoose.hrl"). |
8 |
|
-include("jlib.hrl"). |
9 |
|
-include_lib("erlcloud/include/erlcloud_aws.hrl"). |
10 |
|
-include("mongoose_config_spec.hrl"). |
11 |
|
|
12 |
|
-callback user_guid(UserJID :: jid:jid()) -> user_guid(). |
13 |
|
-callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(), |
14 |
|
IsOnline :: boolean()) -> attributes(). |
15 |
|
-callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(), |
16 |
|
To :: jid:jid(), MessageType :: pm | muc, |
17 |
|
Packet :: exml:element()) -> attributes(). |
18 |
|
|
19 |
|
%%%=================================================================== |
20 |
|
%%% Types and definitions |
21 |
|
%%%=================================================================== |
22 |
|
|
23 |
|
-define(TOPIC_BASE, ["arn", "aws", "sns"]). |
24 |
|
|
25 |
|
-type user_guid() :: binary(). |
26 |
|
-type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC} |
27 |
|
-type topic() :: string(). %% {TOPIC} part of topic_arn() type |
28 |
|
-type attributes() :: #{string() => string() | binary() | number()}. |
29 |
|
|
30 |
|
|
31 |
|
%%%=================================================================== |
32 |
|
%%% Exports |
33 |
|
%%%=================================================================== |
34 |
|
|
35 |
|
%% MIM module callbacks |
36 |
|
-export([start/2, stop/1, config_spec/0]). |
37 |
|
|
38 |
|
%% API |
39 |
|
-export([try_publish/5, push_event/3]). |
40 |
|
|
41 |
|
-ignore_xref([behaviour_info/1, try_publish/5]). |
42 |
|
|
43 |
|
%% Types |
44 |
|
-export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]). |
45 |
|
|
46 |
|
-spec start(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> ok. |
47 |
|
start(Host, Opts) -> |
48 |
2 |
application:ensure_all_started(erlcloud), |
49 |
2 |
application:ensure_all_started(worker_pool), |
50 |
2 |
start_pool(Host, Opts), |
51 |
2 |
ok. |
52 |
|
|
53 |
|
-spec start_pool(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> |
54 |
|
term(). |
55 |
|
start_pool(Host, Opts) -> |
56 |
2 |
{ok, _} = mongoose_wpool:start(generic, Host, pusher_sns, pool_opts(Opts)). |
57 |
|
|
58 |
|
-spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts(). |
59 |
|
pool_opts(Opts) -> |
60 |
2 |
WorkerNum = get_worker_num(Opts), |
61 |
2 |
[{workers, WorkerNum}, {strategy, available_worker}]. |
62 |
|
|
63 |
|
-spec get_worker_num(gen_mod:module_opts()) -> pos_integer(). |
64 |
|
get_worker_num(Opts) -> |
65 |
2 |
gen_mod:get_opt(pool_size, Opts, 100). |
66 |
|
|
67 |
|
-spec stop(Host :: jid:server()) -> ok. |
68 |
|
stop(Host) -> |
69 |
2 |
mongoose_wpool:stop(generic, Host, pusher_sns), |
70 |
2 |
ok. |
71 |
|
|
72 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
73 |
|
config_spec() -> |
74 |
160 |
#section{ |
75 |
|
items = #{<<"presence_updates_topic">> => #option{type = string}, |
76 |
|
<<"pm_messages_topic">> => #option{type = string}, |
77 |
|
<<"muc_messages_topic">> => #option{type = string}, |
78 |
|
<<"plugin_module">> => #option{type = atom, |
79 |
|
validate = module}, |
80 |
|
<<"sns_host">> => #option{type = string}, |
81 |
|
<<"region">> => #option{type = string}, |
82 |
|
<<"access_key_id">> => #option{type = string}, |
83 |
|
<<"secret_access_key">> => #option{type = string}, |
84 |
|
<<"account_id">> => #option{type = string}, |
85 |
|
<<"pool_size">> => #option{type = integer, |
86 |
|
validate = positive}, |
87 |
|
<<"publish_retry_count">> => #option{type = integer, |
88 |
|
validate = non_negative}, |
89 |
|
<<"publish_retry_time_ms">> => #option{type = integer, |
90 |
|
validate = non_negative} |
91 |
|
}, |
92 |
|
required = [<<"access_key_id">>, <<"secret_access_key">>, |
93 |
|
<<"region">>, <<"account_id">>, <<"sns_host">>] |
94 |
|
}. |
95 |
|
|
96 |
|
push_event(Acc, _, #user_status_event{jid = UserJID, status = Status}) -> |
97 |
18 |
user_presence_changed(UserJID, Status == online), |
98 |
18 |
Acc; |
99 |
|
push_event(Acc, _, #chat_event{direction = in, from = From, to = To, packet = Packet}) -> |
100 |
4 |
handle_packet(From, To, Packet), |
101 |
4 |
Acc; |
102 |
|
push_event(Acc, _, _) -> |
103 |
20 |
Acc. |
104 |
|
|
105 |
|
%%%=================================================================== |
106 |
|
%%% Internal functions |
107 |
|
%%%=================================================================== |
108 |
|
|
109 |
|
-spec user_presence_changed(UserJID :: jid:jid(), IsOnline :: boolean()) -> ok. |
110 |
|
user_presence_changed(#jid{lserver = Host} = UserJID, IsOnline) -> |
111 |
18 |
Topic = opt(Host, presence_updates_topic), |
112 |
18 |
case Topic of |
113 |
|
undefined -> |
114 |
:-( |
skip; |
115 |
|
Topic -> |
116 |
18 |
UserGUID = user_guid(Host, UserJID), |
117 |
18 |
Content = #{user_id => UserGUID, present => IsOnline}, |
118 |
18 |
TopicARN = make_topic_arn(Host, Topic), |
119 |
18 |
Attributes = message_attributes(Host, TopicARN, UserJID, IsOnline), |
120 |
18 |
async_publish(Host, TopicARN, Content, Attributes) |
121 |
|
end, |
122 |
18 |
ok. |
123 |
|
|
124 |
|
%% @doc Handles packet and if needed publishes SNS notification |
125 |
|
-spec handle_packet(From :: jid:jid(), To :: jid:jid(), |
126 |
|
Packet :: exml:element()) -> ok | skip. |
127 |
|
handle_packet(From = #jid{lserver = Host}, To, Packet) -> |
128 |
4 |
?LOG_DEBUG(#{what => sns_handle_packet, |
129 |
4 |
from => From, to => To, packet => exml_Packet}), |
130 |
|
|
131 |
4 |
case {get_topic(Host, Packet), exml_query:subelement(Packet, <<"body">>)} of |
132 |
|
{undefined, _} -> %% Skip if there is no topic set in configuration for the packet type |
133 |
:-( |
skip; |
134 |
|
{_, undefined} -> %% Skip if there is no message body in the packet |
135 |
:-( |
skip; |
136 |
|
{Topic, BodyTag} -> |
137 |
4 |
FromGUID = user_guid(Host, From), |
138 |
4 |
ToGUID = user_guid(Host, To), |
139 |
4 |
MessageBody = exml_query:cdata(BodyTag), |
140 |
4 |
Content = #{from_user_id => FromGUID, |
141 |
|
to_user_id => ToGUID, |
142 |
|
message => MessageBody}, |
143 |
|
|
144 |
4 |
TopicARN = make_topic_arn(Host, Topic), |
145 |
4 |
Attributes = message_attributes(Host, TopicARN, From, To, message_type(Packet), Packet), |
146 |
|
|
147 |
4 |
async_publish(Host, TopicARN, Content, Attributes) |
148 |
|
end. |
149 |
|
|
150 |
|
%% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term |
151 |
|
-spec async_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(), |
152 |
|
attributes()) -> ok. |
153 |
|
async_publish(Host, TopicARN, Content, Attributes) -> |
154 |
22 |
Retry = opt(Host, publish_retry_count, 2), |
155 |
22 |
mongoose_wpool:cast(generic, Host, pusher_sns, |
156 |
|
{?MODULE, try_publish, [Host, TopicARN, Content, Attributes, Retry]}). |
157 |
|
|
158 |
|
%% @doc Publish notification to AWS SNS service. Content should be a valid JSON term |
159 |
|
-spec try_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(), |
160 |
|
attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled. |
161 |
|
try_publish(Host, TopicARN, Content, Attributes, Retry) when Retry < 0 -> |
162 |
:-( |
?LOG_WARNING(#{what => sns_notification_dropped, |
163 |
|
server => Host, topic_arn => TopicARN, |
164 |
:-( |
attributes => Attributes, content => Content}), |
165 |
:-( |
dropped; |
166 |
|
try_publish(Host, TopicARN, Content, Attributes, Retry) -> |
167 |
22 |
try publish(Host, TopicARN, Content, Attributes) |
168 |
|
catch |
169 |
|
Type:Error:StackTrace -> |
170 |
:-( |
BackoffTime = calc_backoff_time(Host, Retry), |
171 |
:-( |
timer:apply_after(BackoffTime, mongoose_wpool, cast, |
172 |
|
[generic, Host, pusher_sns, |
173 |
|
{?MODULE, try_publish, |
174 |
|
[Host, TopicARN, Content, Attributes, Retry - 1]}]), |
175 |
:-( |
?LOG_WARNING(#{what => sns_notification_retry, |
176 |
|
text => <<"Retrying SNS notification after {backoff_time} ms">>, |
177 |
|
server => Host, topic_arn => TopicARN, |
178 |
|
attributes => Attributes, content => Content, |
179 |
|
backoff_time => BackoffTime, |
180 |
:-( |
class => Type, reason => Error, stacktrace => StackTrace}), |
181 |
:-( |
scheduled |
182 |
|
end. |
183 |
|
|
184 |
|
%% @doc Publish notification to AWS SNS service. Content should be a valid JSON term |
185 |
|
-spec publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(), |
186 |
|
attributes()) -> MessageId :: string(). |
187 |
|
publish(Host, TopicARN, Content, Attributes) -> |
188 |
22 |
erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content), |
189 |
|
undefined, maps:to_list(Attributes), aws_handle(Host)). |
190 |
|
|
191 |
|
%% @doc Returns AWS SNS handle base on configured AWS credentials |
192 |
|
-spec aws_handle(Host :: jid:lserver()) -> aws_config(). |
193 |
|
aws_handle(Host) -> |
194 |
22 |
AccessKeyId = opt(Host, access_key_id), |
195 |
22 |
SecretKey = opt(Host, secret_access_key), |
196 |
22 |
SNSHost = opt(Host, sns_host), |
197 |
|
|
198 |
22 |
erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost). |
199 |
|
|
200 |
|
%% @doc Returns notification topic based on packet type and module configuration |
201 |
|
-spec get_topic(Host :: jid:lserver(), Packet :: exml:element()) -> topic() | undefined. |
202 |
|
get_topic(Host, Packet) -> |
203 |
4 |
case message_type(Packet) of |
204 |
|
pm -> |
205 |
3 |
opt(Host, pm_messages_topic); |
206 |
|
muc -> |
207 |
1 |
opt(Host, muc_messages_topic); |
208 |
|
_ -> |
209 |
:-( |
undefined |
210 |
|
end. |
211 |
|
|
212 |
|
|
213 |
|
%% @doc Constructs SNS TopicArn from given topic suffix |
214 |
|
-spec make_topic_arn(Host :: jid:lserver(), Topic :: topic()) -> topic_arn(). |
215 |
|
make_topic_arn(Host, Topic) -> |
216 |
22 |
AWSRegion = opt(Host, region), |
217 |
22 |
AWSAccountId = opt(Host, account_id), |
218 |
|
|
219 |
22 |
string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":"). |
220 |
|
|
221 |
|
%% @doc Returns message type |
222 |
|
-spec message_type(Packet :: exml:element()) -> pm | muc | undefined. |
223 |
|
message_type(Packet) -> |
224 |
8 |
case exml_query:attr(Packet, <<"type">>) of |
225 |
6 |
<<"chat">> -> pm; |
226 |
2 |
<<"groupchat">> -> muc; |
227 |
:-( |
_ -> undefined |
228 |
|
end. |
229 |
|
|
230 |
|
%% Getter for module options |
231 |
|
-spec opt(Host :: jid:lserver(), Option :: atom()) -> Value :: term() | undefined. |
232 |
|
opt(Host, Option) -> |
233 |
132 |
opt(Host, Option, undefined). |
234 |
|
|
235 |
|
%% Getter for module options with default value |
236 |
|
-spec opt(Host :: jid:lserver(), Option :: atom(), Default :: term()) -> |
237 |
|
Value :: term(). |
238 |
|
opt(Host, Option, Default) -> |
239 |
202 |
gen_mod:get_module_opt(Host, ?MODULE, Option, Default). |
240 |
|
|
241 |
|
%% ---------------------------------------------------------------------- |
242 |
|
%% Callbacks |
243 |
|
|
244 |
|
-spec user_guid(Host :: jid:lserver(), UserJID :: jid:jid()) -> user_guid(). |
245 |
|
user_guid(Host, UserJID) -> |
246 |
26 |
PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults), |
247 |
26 |
PluginModule:user_guid(UserJID). |
248 |
|
|
249 |
|
-spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(), |
250 |
|
UserJID :: jid:jid(), IsOnline :: boolean()) -> |
251 |
|
attributes(). |
252 |
|
message_attributes(Host, TopicARN, UserJID, IsOnline) -> |
253 |
18 |
PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults), |
254 |
18 |
PluginModule:message_attributes(TopicARN, UserJID, IsOnline). |
255 |
|
|
256 |
|
-spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(), |
257 |
|
From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc, |
258 |
|
Packet :: exml:element()) -> attributes(). |
259 |
|
message_attributes(Host, TopicARN, From, To, MessageType, Packet) -> |
260 |
4 |
PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults), |
261 |
4 |
PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet). |
262 |
|
|
263 |
|
-spec calc_backoff_time(Host :: jid:lserver(), integer()) -> integer(). |
264 |
|
calc_backoff_time(Host, Retry) -> |
265 |
:-( |
MaxRetry = opt(Host, publish_retry_count), |
266 |
:-( |
BaseTime = opt(Host, publish_retry_time_ms, 50), |
267 |
:-( |
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime, |
268 |
:-( |
crypto:rand_uniform(BackoffMaxTime - BaseTime, BackoffMaxTime). |