1 |
|
-module(mod_event_pusher_sns). |
2 |
|
|
3 |
|
-behaviour(gen_mod). |
4 |
|
-behaviour(mod_event_pusher). |
5 |
|
|
6 |
|
-include("mod_event_pusher_events.hrl"). |
7 |
|
-include("mongoose.hrl"). |
8 |
|
-include("jlib.hrl"). |
9 |
|
-include_lib("erlcloud/include/erlcloud_aws.hrl"). |
10 |
|
-include("mongoose_config_spec.hrl"). |
11 |
|
|
12 |
|
-callback user_guid(UserJID :: jid:jid()) -> user_guid(). |
13 |
|
-callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(), |
14 |
|
IsOnline :: boolean()) -> attributes(). |
15 |
|
-callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(), |
16 |
|
To :: jid:jid(), MessageType :: pm | muc, |
17 |
|
Packet :: exml:element()) -> attributes(). |
18 |
|
|
19 |
|
%%%=================================================================== |
20 |
|
%%% Types and definitions |
21 |
|
%%%=================================================================== |
22 |
|
|
23 |
|
-define(TOPIC_BASE, ["arn", "aws", "sns"]). |
24 |
|
-define(PUBLISH_RETRY_COUNT_DEFAULT, 2). |
25 |
|
|
26 |
|
-type user_guid() :: binary(). |
27 |
|
-type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC} |
28 |
|
-type topic() :: string(). %% {TOPIC} part of topic_arn() type |
29 |
|
-type attributes() :: #{string() => string() | binary() | number()}. |
30 |
|
|
31 |
|
|
32 |
|
%%%=================================================================== |
33 |
|
%%% Exports |
34 |
|
%%%=================================================================== |
35 |
|
|
36 |
|
%% MIM module callbacks |
37 |
|
-export([start/2, stop/1, config_spec/0]). |
38 |
|
|
39 |
|
%% API |
40 |
|
-export([try_publish/5, push_event/3]). |
41 |
|
|
42 |
|
-ignore_xref([behaviour_info/1, try_publish/5]). |
43 |
|
|
44 |
|
%% Types |
45 |
|
-export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]). |
46 |
|
|
47 |
|
-spec start(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> ok. |
48 |
|
start(Host, Opts) -> |
49 |
:-( |
application:ensure_all_started(erlcloud), |
50 |
:-( |
application:ensure_all_started(worker_pool), |
51 |
:-( |
start_pool(Host, Opts), |
52 |
:-( |
ok. |
53 |
|
|
54 |
|
-spec start_pool(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> |
55 |
|
term(). |
56 |
|
start_pool(Host, Opts) -> |
57 |
:-( |
{ok, _} = mongoose_wpool:start(generic, Host, pusher_sns, pool_opts(Opts)). |
58 |
|
|
59 |
|
-spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts(). |
60 |
|
pool_opts(Opts) -> |
61 |
:-( |
WorkerNum = get_worker_num(Opts), |
62 |
:-( |
[{workers, WorkerNum}, {strategy, available_worker}]. |
63 |
|
|
64 |
|
-spec get_worker_num(gen_mod:module_opts()) -> pos_integer(). |
65 |
|
get_worker_num(Opts) -> |
66 |
:-( |
gen_mod:get_opt(pool_size, Opts, 100). |
67 |
|
|
68 |
|
-spec stop(Host :: jid:server()) -> ok. |
69 |
|
stop(Host) -> |
70 |
:-( |
mongoose_wpool:stop(generic, Host, pusher_sns), |
71 |
:-( |
ok. |
72 |
|
|
73 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
74 |
|
config_spec() -> |
75 |
146 |
#section{ |
76 |
|
items = #{<<"presence_updates_topic">> => #option{type = string}, |
77 |
|
<<"pm_messages_topic">> => #option{type = string}, |
78 |
|
<<"muc_messages_topic">> => #option{type = string}, |
79 |
|
<<"plugin_module">> => #option{type = atom, |
80 |
|
validate = module}, |
81 |
|
<<"sns_host">> => #option{type = string}, |
82 |
|
<<"region">> => #option{type = string}, |
83 |
|
<<"access_key_id">> => #option{type = string}, |
84 |
|
<<"secret_access_key">> => #option{type = string}, |
85 |
|
<<"account_id">> => #option{type = string}, |
86 |
|
<<"pool_size">> => #option{type = integer, |
87 |
|
validate = positive}, |
88 |
|
<<"publish_retry_count">> => #option{type = integer, |
89 |
|
validate = non_negative}, |
90 |
|
<<"publish_retry_time_ms">> => #option{type = integer, |
91 |
|
validate = non_negative} |
92 |
|
}, |
93 |
|
required = [<<"access_key_id">>, <<"secret_access_key">>, |
94 |
|
<<"region">>, <<"account_id">>, <<"sns_host">>] |
95 |
|
}. |
96 |
|
|
97 |
|
push_event(Acc, _, #user_status_event{jid = UserJID, status = Status}) -> |
98 |
:-( |
user_presence_changed(UserJID, Status == online), |
99 |
:-( |
Acc; |
100 |
|
push_event(Acc, _, #chat_event{direction = in, from = From, to = To, packet = Packet}) -> |
101 |
:-( |
handle_packet(From, To, Packet), |
102 |
:-( |
Acc; |
103 |
|
push_event(Acc, _, _) -> |
104 |
:-( |
Acc. |
105 |
|
|
106 |
|
%%%=================================================================== |
107 |
|
%%% Internal functions |
108 |
|
%%%=================================================================== |
109 |
|
|
110 |
|
-spec user_presence_changed(UserJID :: jid:jid(), IsOnline :: boolean()) -> ok. |
111 |
|
user_presence_changed(#jid{lserver = Host} = UserJID, IsOnline) -> |
112 |
:-( |
Topic = opt(Host, presence_updates_topic), |
113 |
:-( |
case Topic of |
114 |
|
undefined -> |
115 |
:-( |
skip; |
116 |
|
Topic -> |
117 |
:-( |
UserGUID = user_guid(Host, UserJID), |
118 |
:-( |
Content = #{user_id => UserGUID, present => IsOnline}, |
119 |
:-( |
TopicARN = make_topic_arn(Host, Topic), |
120 |
:-( |
Attributes = message_attributes(Host, TopicARN, UserJID, IsOnline), |
121 |
:-( |
async_publish(Host, TopicARN, Content, Attributes) |
122 |
|
end, |
123 |
:-( |
ok. |
124 |
|
|
125 |
|
%% @doc Handles packet and if needed publishes SNS notification |
126 |
|
-spec handle_packet(From :: jid:jid(), To :: jid:jid(), |
127 |
|
Packet :: exml:element()) -> ok | skip. |
128 |
|
handle_packet(From = #jid{lserver = Host}, To, Packet) -> |
129 |
:-( |
?LOG_DEBUG(#{what => sns_handle_packet, |
130 |
:-( |
from => From, to => To, packet => exml_Packet}), |
131 |
|
|
132 |
:-( |
case {get_topic(Host, Packet), exml_query:subelement(Packet, <<"body">>)} of |
133 |
|
{undefined, _} -> %% Skip if there is no topic set in configuration for the packet type |
134 |
:-( |
skip; |
135 |
|
{_, undefined} -> %% Skip if there is no message body in the packet |
136 |
:-( |
skip; |
137 |
|
{Topic, BodyTag} -> |
138 |
:-( |
FromGUID = user_guid(Host, From), |
139 |
:-( |
ToGUID = user_guid(Host, To), |
140 |
:-( |
MessageBody = exml_query:cdata(BodyTag), |
141 |
:-( |
Content = #{from_user_id => FromGUID, |
142 |
|
to_user_id => ToGUID, |
143 |
|
message => MessageBody}, |
144 |
|
|
145 |
:-( |
TopicARN = make_topic_arn(Host, Topic), |
146 |
:-( |
Attributes = message_attributes(Host, TopicARN, From, To, message_type(Packet), Packet), |
147 |
|
|
148 |
:-( |
async_publish(Host, TopicARN, Content, Attributes) |
149 |
|
end. |
150 |
|
|
151 |
|
%% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term |
152 |
|
-spec async_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(), |
153 |
|
attributes()) -> ok. |
154 |
|
async_publish(Host, TopicARN, Content, Attributes) -> |
155 |
:-( |
Retry = opt(Host, publish_retry_count, ?PUBLISH_RETRY_COUNT_DEFAULT), |
156 |
:-( |
mongoose_wpool:cast(generic, Host, pusher_sns, |
157 |
|
{?MODULE, try_publish, [Host, TopicARN, Content, Attributes, Retry]}). |
158 |
|
|
159 |
|
%% @doc Publish notification to AWS SNS service. Content should be a valid JSON term |
160 |
|
-spec try_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(), |
161 |
|
attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled. |
162 |
|
try_publish(Host, TopicARN, Content, Attributes, Retry) when Retry < 0 -> |
163 |
:-( |
?LOG_WARNING(#{what => sns_notification_dropped, |
164 |
|
server => Host, topic_arn => TopicARN, |
165 |
:-( |
attributes => Attributes, content => Content}), |
166 |
:-( |
dropped; |
167 |
|
try_publish(Host, TopicARN, Content, Attributes, Retry) -> |
168 |
:-( |
try publish(Host, TopicARN, Content, Attributes) |
169 |
|
catch |
170 |
|
Type:Error:StackTrace -> |
171 |
:-( |
BackoffTime = calc_backoff_time(Host, Retry), |
172 |
:-( |
timer:apply_after(BackoffTime, mongoose_wpool, cast, |
173 |
|
[generic, Host, pusher_sns, |
174 |
|
{?MODULE, try_publish, |
175 |
|
[Host, TopicARN, Content, Attributes, Retry - 1]}]), |
176 |
:-( |
?LOG_WARNING(#{what => sns_notification_retry, |
177 |
|
text => <<"Retrying SNS notification after {backoff_time} ms">>, |
178 |
|
server => Host, topic_arn => TopicARN, |
179 |
|
attributes => Attributes, content => Content, |
180 |
|
backoff_time => BackoffTime, |
181 |
:-( |
class => Type, reason => Error, stacktrace => StackTrace}), |
182 |
:-( |
scheduled |
183 |
|
end. |
184 |
|
|
185 |
|
%% @doc Publish notification to AWS SNS service. Content should be a valid JSON term |
186 |
|
-spec publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(), |
187 |
|
attributes()) -> MessageId :: string(). |
188 |
|
publish(Host, TopicARN, Content, Attributes) -> |
189 |
:-( |
erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content), |
190 |
|
undefined, maps:to_list(Attributes), aws_handle(Host)). |
191 |
|
|
192 |
|
%% @doc Returns AWS SNS handle base on configured AWS credentials |
193 |
|
-spec aws_handle(Host :: jid:lserver()) -> aws_config(). |
194 |
|
aws_handle(Host) -> |
195 |
:-( |
AccessKeyId = opt(Host, access_key_id), |
196 |
:-( |
SecretKey = opt(Host, secret_access_key), |
197 |
:-( |
SNSHost = opt(Host, sns_host), |
198 |
|
|
199 |
:-( |
erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost). |
200 |
|
|
201 |
|
%% @doc Returns notification topic based on packet type and module configuration |
202 |
|
-spec get_topic(Host :: jid:lserver(), Packet :: exml:element()) -> topic() | undefined. |
203 |
|
get_topic(Host, Packet) -> |
204 |
:-( |
case message_type(Packet) of |
205 |
|
pm -> |
206 |
:-( |
opt(Host, pm_messages_topic); |
207 |
|
muc -> |
208 |
:-( |
opt(Host, muc_messages_topic); |
209 |
|
_ -> |
210 |
:-( |
undefined |
211 |
|
end. |
212 |
|
|
213 |
|
|
214 |
|
%% @doc Constructs SNS TopicArn from given topic suffix |
215 |
|
-spec make_topic_arn(Host :: jid:lserver(), Topic :: topic()) -> topic_arn(). |
216 |
|
make_topic_arn(Host, Topic) -> |
217 |
:-( |
AWSRegion = opt(Host, region), |
218 |
:-( |
AWSAccountId = opt(Host, account_id), |
219 |
|
|
220 |
:-( |
string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":"). |
221 |
|
|
222 |
|
%% @doc Returns message type |
223 |
|
-spec message_type(Packet :: exml:element()) -> pm | muc | undefined. |
224 |
|
message_type(Packet) -> |
225 |
:-( |
case exml_query:attr(Packet, <<"type">>) of |
226 |
:-( |
<<"chat">> -> pm; |
227 |
:-( |
<<"groupchat">> -> muc; |
228 |
:-( |
_ -> undefined |
229 |
|
end. |
230 |
|
|
231 |
|
%% Getter for module options |
232 |
|
-spec opt(Host :: jid:lserver(), Option :: atom()) -> Value :: term() | undefined. |
233 |
|
opt(Host, Option) -> |
234 |
:-( |
opt(Host, Option, undefined). |
235 |
|
|
236 |
|
%% Getter for module options with default value |
237 |
|
-spec opt(Host :: jid:lserver(), Option :: atom(), Default :: term()) -> |
238 |
|
Value :: term(). |
239 |
|
opt(Host, Option, Default) -> |
240 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, Option, Default). |
241 |
|
|
242 |
|
%% ---------------------------------------------------------------------- |
243 |
|
%% Callbacks |
244 |
|
|
245 |
|
-spec user_guid(Host :: jid:lserver(), UserJID :: jid:jid()) -> user_guid(). |
246 |
|
user_guid(Host, UserJID) -> |
247 |
:-( |
PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults), |
248 |
:-( |
PluginModule:user_guid(UserJID). |
249 |
|
|
250 |
|
-spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(), |
251 |
|
UserJID :: jid:jid(), IsOnline :: boolean()) -> |
252 |
|
attributes(). |
253 |
|
message_attributes(Host, TopicARN, UserJID, IsOnline) -> |
254 |
:-( |
PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults), |
255 |
:-( |
PluginModule:message_attributes(TopicARN, UserJID, IsOnline). |
256 |
|
|
257 |
|
-spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(), |
258 |
|
From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc, |
259 |
|
Packet :: exml:element()) -> attributes(). |
260 |
|
message_attributes(Host, TopicARN, From, To, MessageType, Packet) -> |
261 |
:-( |
PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults), |
262 |
:-( |
PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet). |
263 |
|
|
264 |
|
-spec calc_backoff_time(Host :: jid:lserver(), integer()) -> integer(). |
265 |
|
calc_backoff_time(Host, Retry) -> |
266 |
:-( |
MaxRetry = opt(Host, publish_retry_count, ?PUBLISH_RETRY_COUNT_DEFAULT), |
267 |
:-( |
BaseTime = opt(Host, publish_retry_time_ms, 50), |
268 |
:-( |
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime, |
269 |
:-( |
Random = rand:uniform(BaseTime), |
270 |
:-( |
BackoffMaxTime - Random. |