1 |
|
-module(mod_event_pusher_sns). |
2 |
|
|
3 |
|
-behaviour(gen_mod). |
4 |
|
-behaviour(mod_event_pusher). |
5 |
|
|
6 |
|
-include("mod_event_pusher_events.hrl"). |
7 |
|
-include("mongoose.hrl"). |
8 |
|
-include_lib("erlcloud/include/erlcloud_aws.hrl"). |
9 |
|
-include("mongoose_config_spec.hrl"). |
10 |
|
|
11 |
|
-callback user_guid(UserJID :: jid:jid()) -> user_guid(). |
12 |
|
-callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(), |
13 |
|
IsOnline :: boolean()) -> attributes(). |
14 |
|
-callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(), |
15 |
|
To :: jid:jid(), MessageType :: pm | muc, |
16 |
|
Packet :: exml:element()) -> attributes(). |
17 |
|
|
18 |
|
%%%=================================================================== |
19 |
|
%%% Types and definitions |
20 |
|
%%%=================================================================== |
21 |
|
|
22 |
|
-define(TOPIC_BASE, ["arn", "aws", "sns"]). |
23 |
|
|
24 |
|
-type user_guid() :: binary(). |
25 |
|
-type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC} |
26 |
|
-type topic() :: string(). %% {TOPIC} part of topic_arn() type |
27 |
|
-type attributes() :: #{string() => string() | binary() | number()}. |
28 |
|
|
29 |
|
|
30 |
|
%%%=================================================================== |
31 |
|
%%% Exports |
32 |
|
%%%=================================================================== |
33 |
|
|
34 |
|
%% MIM module callbacks |
35 |
|
-export([start/2, stop/1, config_spec/0]). |
36 |
|
|
37 |
|
%% API |
38 |
|
-export([try_publish/5, push_event/2]). |
39 |
|
|
40 |
|
-ignore_xref([behaviour_info/1, try_publish/5]). |
41 |
|
|
42 |
|
%% Types |
43 |
|
-export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]). |
44 |
|
|
45 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
46 |
|
start(HostType, Opts) -> |
47 |
2 |
application:ensure_all_started(erlcloud), |
48 |
2 |
application:ensure_all_started(worker_pool), |
49 |
2 |
start_pool(HostType, Opts), |
50 |
2 |
ok. |
51 |
|
|
52 |
|
-spec start_pool(mongooseim:host_type(), gen_mod:module_opts()) -> |
53 |
|
term(). |
54 |
|
start_pool(HostType, Opts) -> |
55 |
2 |
{ok, _} = mongoose_wpool:start(generic, HostType, pusher_sns, pool_opts(Opts)). |
56 |
|
|
57 |
|
-spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts(). |
58 |
|
pool_opts(Opts) -> |
59 |
2 |
WorkerNum = get_worker_num(Opts), |
60 |
2 |
[{workers, WorkerNum}, {strategy, available_worker}]. |
61 |
|
|
62 |
|
-spec get_worker_num(gen_mod:module_opts()) -> pos_integer(). |
63 |
|
get_worker_num(Opts) -> |
64 |
2 |
gen_mod:get_opt(pool_size, Opts). |
65 |
|
|
66 |
|
-spec stop(HostType :: jid:server()) -> ok. |
67 |
|
stop(HostType) -> |
68 |
2 |
mongoose_wpool:stop(generic, HostType, pusher_sns), |
69 |
2 |
ok. |
70 |
|
|
71 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
72 |
|
config_spec() -> |
73 |
106 |
#section{ |
74 |
|
items = #{<<"presence_updates_topic">> => #option{type = string}, |
75 |
|
<<"pm_messages_topic">> => #option{type = string}, |
76 |
|
<<"muc_messages_topic">> => #option{type = string}, |
77 |
|
<<"plugin_module">> => #option{type = atom, |
78 |
|
validate = module}, |
79 |
|
<<"sns_host">> => #option{type = string}, |
80 |
|
<<"region">> => #option{type = string}, |
81 |
|
<<"access_key_id">> => #option{type = string}, |
82 |
|
<<"secret_access_key">> => #option{type = string}, |
83 |
|
<<"account_id">> => #option{type = string}, |
84 |
|
<<"pool_size">> => #option{type = integer, |
85 |
|
validate = positive}, |
86 |
|
<<"publish_retry_count">> => #option{type = integer, |
87 |
|
validate = non_negative}, |
88 |
|
<<"publish_retry_time_ms">> => #option{type = integer, |
89 |
|
validate = non_negative} |
90 |
|
}, |
91 |
|
required = [<<"sns_host">>, <<"region">>, <<"access_key_id">>, <<"secret_access_key">>, |
92 |
|
<<"account_id">>], |
93 |
|
defaults = #{<<"plugin_module">> => mod_event_pusher_sns_defaults, |
94 |
|
<<"pool_size">> => 100, |
95 |
|
<<"publish_retry_count">> => 2, |
96 |
|
<<"publish_retry_time_ms">> => 50 |
97 |
|
} |
98 |
|
}. |
99 |
|
|
100 |
|
push_event(Acc, #user_status_event{jid = UserJID, status = Status}) -> |
101 |
18 |
user_presence_changed(mongoose_acc:host_type(Acc), UserJID, Status == online), |
102 |
18 |
Acc; |
103 |
|
push_event(Acc, #chat_event{direction = in, from = From, to = To, packet = Packet}) -> |
104 |
4 |
handle_packet(mongoose_acc:host_type(Acc), From, To, Packet), |
105 |
4 |
Acc; |
106 |
|
push_event(Acc, _) -> |
107 |
23 |
Acc. |
108 |
|
|
109 |
|
%%%=================================================================== |
110 |
|
%%% Internal functions |
111 |
|
%%%=================================================================== |
112 |
|
|
113 |
|
-spec user_presence_changed(mongooseim:host_type(), UserJID :: jid:jid(), |
114 |
|
IsOnline :: boolean()) -> ok. |
115 |
|
user_presence_changed(HostType, UserJID, IsOnline) -> |
116 |
18 |
case gen_mod:lookup_module_opt(HostType, ?MODULE, presence_updates_topic) of |
117 |
|
{error, not_found} -> |
118 |
:-( |
skip; |
119 |
|
{ok, Topic} -> |
120 |
18 |
UserGUID = user_guid(HostType, UserJID), |
121 |
18 |
Content = #{user_id => UserGUID, present => IsOnline}, |
122 |
18 |
TopicARN = make_topic_arn(HostType, Topic), |
123 |
18 |
Attributes = message_attributes(HostType, TopicARN, UserJID, IsOnline), |
124 |
18 |
async_publish(HostType, TopicARN, Content, Attributes) |
125 |
|
end, |
126 |
18 |
ok. |
127 |
|
|
128 |
|
%% @doc Handles packet and if needed publishes SNS notification |
129 |
|
-spec handle_packet(mongooseim:host_type(), From :: jid:jid(), To :: jid:jid(), |
130 |
|
Packet :: exml:element()) -> ok | skip. |
131 |
|
handle_packet(HostType, From, To, Packet) -> |
132 |
4 |
?LOG_DEBUG(#{what => sns_handle_packet, |
133 |
4 |
from => From, to => To, packet => exml_Packet}), |
134 |
|
|
135 |
4 |
case {get_topic(HostType, Packet), exml_query:subelement(Packet, <<"body">>)} of |
136 |
|
{undefined, _} -> %% Skip if there is no topic set in configuration for the packet type |
137 |
:-( |
skip; |
138 |
|
{_, undefined} -> %% Skip if there is no message body in the packet |
139 |
:-( |
skip; |
140 |
|
{Topic, BodyTag} -> |
141 |
4 |
FromGUID = user_guid(HostType, From), |
142 |
4 |
ToGUID = user_guid(HostType, To), |
143 |
4 |
MessageBody = exml_query:cdata(BodyTag), |
144 |
4 |
Content = #{from_user_id => FromGUID, |
145 |
|
to_user_id => ToGUID, |
146 |
|
message => MessageBody}, |
147 |
|
|
148 |
4 |
TopicARN = make_topic_arn(HostType, Topic), |
149 |
4 |
Attributes = message_attributes(HostType, TopicARN, From, To, message_type(Packet), Packet), |
150 |
|
|
151 |
4 |
async_publish(HostType, TopicARN, Content, Attributes) |
152 |
|
end. |
153 |
|
|
154 |
|
%% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term |
155 |
|
-spec async_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(), |
156 |
|
attributes()) -> ok. |
157 |
|
async_publish(HostType, TopicARN, Content, Attributes) -> |
158 |
22 |
Retry = gen_mod:get_module_opt(HostType, ?MODULE, publish_retry_count), |
159 |
22 |
mongoose_wpool:cast(generic, HostType, pusher_sns, |
160 |
|
{?MODULE, try_publish, [HostType, TopicARN, Content, Attributes, Retry]}). |
161 |
|
|
162 |
|
%% @doc Publish notification to AWS SNS service. Content should be a valid JSON term |
163 |
|
-spec try_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(), |
164 |
|
attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled. |
165 |
|
try_publish(HostType, TopicARN, Content, Attributes, Retry) when Retry < 0 -> |
166 |
:-( |
?LOG_WARNING(#{what => sns_notification_dropped, |
167 |
|
server => HostType, topic_arn => TopicARN, |
168 |
:-( |
attributes => Attributes, content => Content}), |
169 |
:-( |
dropped; |
170 |
|
try_publish(HostType, TopicARN, Content, Attributes, Retry) -> |
171 |
22 |
try publish(HostType, TopicARN, Content, Attributes) |
172 |
|
catch |
173 |
|
Type:Error:StackTrace -> |
174 |
:-( |
BackoffTime = calc_backoff_time(HostType, Retry), |
175 |
:-( |
timer:apply_after(BackoffTime, mongoose_wpool, cast, |
176 |
|
[generic, HostType, pusher_sns, |
177 |
|
{?MODULE, try_publish, |
178 |
|
[HostType, TopicARN, Content, Attributes, Retry - 1]}]), |
179 |
:-( |
?LOG_WARNING(#{what => sns_notification_retry, |
180 |
|
text => <<"Retrying SNS notification after {backoff_time} ms">>, |
181 |
|
host_type => HostType, topic_arn => TopicARN, |
182 |
|
attributes => Attributes, content => Content, |
183 |
|
backoff_time => BackoffTime, |
184 |
:-( |
class => Type, reason => Error, stacktrace => StackTrace}), |
185 |
:-( |
scheduled |
186 |
|
end. |
187 |
|
|
188 |
|
%% @doc Publish notification to AWS SNS service. Content should be a valid JSON term |
189 |
|
-spec publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(), |
190 |
|
attributes()) -> MessageId :: string(). |
191 |
|
publish(HostType, TopicARN, Content, Attributes) -> |
192 |
22 |
erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content), |
193 |
|
undefined, maps:to_list(Attributes), aws_handle(HostType)). |
194 |
|
|
195 |
|
%% @doc Returns AWS SNS handle base on configured AWS credentials |
196 |
|
-spec aws_handle(mongooseim:host_type()) -> aws_config(). |
197 |
|
aws_handle(HostType) -> |
198 |
22 |
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE), |
199 |
22 |
#{access_key_id := AccessKeyId, secret_access_key := SecretKey, sns_host := SNSHost} = Opts, |
200 |
22 |
erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost). |
201 |
|
|
202 |
|
%% @doc Returns notification topic based on packet type and module configuration |
203 |
|
-spec get_topic(mongooseim:host_type(), Packet :: exml:element()) -> topic() | undefined. |
204 |
|
get_topic(HostType, Packet) -> |
205 |
4 |
case message_type(Packet) of |
206 |
|
pm -> |
207 |
3 |
gen_mod:get_module_opt(HostType, ?MODULE, pm_messages_topic, undefined); |
208 |
|
muc -> |
209 |
1 |
gen_mod:get_module_opt(HostType, ?MODULE, muc_messages_topic, undefined); |
210 |
|
_ -> |
211 |
:-( |
undefined |
212 |
|
end. |
213 |
|
|
214 |
|
%% @doc Constructs SNS TopicArn from given topic suffix |
215 |
|
-spec make_topic_arn(mongooseim:host_type(), topic()) -> topic_arn(). |
216 |
|
make_topic_arn(HostType, Topic) -> |
217 |
22 |
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE), |
218 |
22 |
#{region := AWSRegion, account_id := AWSAccountId} = Opts, |
219 |
22 |
string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":"). |
220 |
|
|
221 |
|
%% @doc Returns message type |
222 |
|
-spec message_type(Packet :: exml:element()) -> pm | muc | undefined. |
223 |
|
message_type(Packet) -> |
224 |
8 |
case exml_query:attr(Packet, <<"type">>) of |
225 |
6 |
<<"chat">> -> pm; |
226 |
2 |
<<"groupchat">> -> muc; |
227 |
:-( |
_ -> undefined |
228 |
|
end. |
229 |
|
|
230 |
|
%% ---------------------------------------------------------------------- |
231 |
|
%% Callbacks |
232 |
|
|
233 |
|
-spec user_guid(mongooseim:host_type(), UserJID :: jid:jid()) -> user_guid(). |
234 |
|
user_guid(HostType, UserJID) -> |
235 |
26 |
PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module), |
236 |
26 |
PluginModule:user_guid(UserJID). |
237 |
|
|
238 |
|
-spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(), |
239 |
|
UserJID :: jid:jid(), IsOnline :: boolean()) -> |
240 |
|
attributes(). |
241 |
|
message_attributes(HostType, TopicARN, UserJID, IsOnline) -> |
242 |
18 |
PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module), |
243 |
18 |
PluginModule:message_attributes(TopicARN, UserJID, IsOnline). |
244 |
|
|
245 |
|
-spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(), |
246 |
|
From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc, |
247 |
|
Packet :: exml:element()) -> attributes(). |
248 |
|
message_attributes(HostType, TopicARN, From, To, MessageType, Packet) -> |
249 |
4 |
PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module), |
250 |
4 |
PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet). |
251 |
|
|
252 |
|
-spec calc_backoff_time(mongooseim:host_type(), integer()) -> integer(). |
253 |
|
calc_backoff_time(HostType, Retry) -> |
254 |
:-( |
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE), |
255 |
:-( |
#{publish_retry_count := MaxRetry, publish_retry_time_ms := BaseTime} = Opts, |
256 |
:-( |
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime, |
257 |
:-( |
Random = rand:uniform(BaseTime), |
258 |
:-( |
BackoffMaxTime - Random. |