./ct_report/coverage/mod_event_pusher_sns.COVER.html

1 -module(mod_event_pusher_sns).
2
3 -behaviour(gen_mod).
4 -behaviour(mod_event_pusher).
5
6 -include("mod_event_pusher_events.hrl").
7 -include("mongoose.hrl").
8 -include("jlib.hrl").
9 -include_lib("erlcloud/include/erlcloud_aws.hrl").
10 -include("mongoose_config_spec.hrl").
11
12 -callback user_guid(UserJID :: jid:jid()) -> user_guid().
13 -callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(),
14 IsOnline :: boolean()) -> attributes().
15 -callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(),
16 To :: jid:jid(), MessageType :: pm | muc,
17 Packet :: exml:element()) -> attributes().
18
19 %%%===================================================================
20 %%% Types and definitions
21 %%%===================================================================
22
23 -define(TOPIC_BASE, ["arn", "aws", "sns"]).
24 -define(PUBLISH_RETRY_COUNT_DEFAULT, 2).
25
26 -type user_guid() :: binary().
27 -type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC}
28 -type topic() :: string(). %% {TOPIC} part of topic_arn() type
29 -type attributes() :: #{string() => string() | binary() | number()}.
30
31
32 %%%===================================================================
33 %%% Exports
34 %%%===================================================================
35
36 %% MIM module callbacks
37 -export([start/2, stop/1, config_spec/0]).
38
39 %% API
40 -export([try_publish/5, push_event/3]).
41
42 -ignore_xref([behaviour_info/1, try_publish/5]).
43
44 %% Types
45 -export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]).
46
47 -spec start(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> ok.
48 start(Host, Opts) ->
49 2 application:ensure_all_started(erlcloud),
50 2 application:ensure_all_started(worker_pool),
51 2 start_pool(Host, Opts),
52 2 ok.
53
54 -spec start_pool(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) ->
55 term().
56 start_pool(Host, Opts) ->
57 2 {ok, _} = mongoose_wpool:start(generic, Host, pusher_sns, pool_opts(Opts)).
58
59 -spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts().
60 pool_opts(Opts) ->
61 2 WorkerNum = get_worker_num(Opts),
62 2 [{workers, WorkerNum}, {strategy, available_worker}].
63
64 -spec get_worker_num(gen_mod:module_opts()) -> pos_integer().
65 get_worker_num(Opts) ->
66 2 gen_mod:get_opt(pool_size, Opts, 100).
67
68 -spec stop(Host :: jid:server()) -> ok.
69 stop(Host) ->
70 2 mongoose_wpool:stop(generic, Host, pusher_sns),
71 2 ok.
72
73 -spec config_spec() -> mongoose_config_spec:config_section().
74 config_spec() ->
75 164 #section{
76 items = #{<<"presence_updates_topic">> => #option{type = string},
77 <<"pm_messages_topic">> => #option{type = string},
78 <<"muc_messages_topic">> => #option{type = string},
79 <<"plugin_module">> => #option{type = atom,
80 validate = module},
81 <<"sns_host">> => #option{type = string},
82 <<"region">> => #option{type = string},
83 <<"access_key_id">> => #option{type = string},
84 <<"secret_access_key">> => #option{type = string},
85 <<"account_id">> => #option{type = string},
86 <<"pool_size">> => #option{type = integer,
87 validate = positive},
88 <<"publish_retry_count">> => #option{type = integer,
89 validate = non_negative},
90 <<"publish_retry_time_ms">> => #option{type = integer,
91 validate = non_negative}
92 },
93 required = [<<"access_key_id">>, <<"secret_access_key">>,
94 <<"region">>, <<"account_id">>, <<"sns_host">>]
95 }.
96
97 push_event(Acc, _, #user_status_event{jid = UserJID, status = Status}) ->
98 18 user_presence_changed(UserJID, Status == online),
99 18 Acc;
100 push_event(Acc, _, #chat_event{direction = in, from = From, to = To, packet = Packet}) ->
101 4 handle_packet(From, To, Packet),
102 4 Acc;
103 push_event(Acc, _, _) ->
104 21 Acc.
105
106 %%%===================================================================
107 %%% Internal functions
108 %%%===================================================================
109
110 -spec user_presence_changed(UserJID :: jid:jid(), IsOnline :: boolean()) -> ok.
111 user_presence_changed(#jid{lserver = Host} = UserJID, IsOnline) ->
112 18 Topic = opt(Host, presence_updates_topic),
113 18 case Topic of
114 undefined ->
115
:-(
skip;
116 Topic ->
117 18 UserGUID = user_guid(Host, UserJID),
118 18 Content = #{user_id => UserGUID, present => IsOnline},
119 18 TopicARN = make_topic_arn(Host, Topic),
120 18 Attributes = message_attributes(Host, TopicARN, UserJID, IsOnline),
121 18 async_publish(Host, TopicARN, Content, Attributes)
122 end,
123 18 ok.
124
125 %% @doc Handles packet and if needed publishes SNS notification
126 -spec handle_packet(From :: jid:jid(), To :: jid:jid(),
127 Packet :: exml:element()) -> ok | skip.
128 handle_packet(From = #jid{lserver = Host}, To, Packet) ->
129 4 ?LOG_DEBUG(#{what => sns_handle_packet,
130 4 from => From, to => To, packet => exml_Packet}),
131
132 4 case {get_topic(Host, Packet), exml_query:subelement(Packet, <<"body">>)} of
133 {undefined, _} -> %% Skip if there is no topic set in configuration for the packet type
134
:-(
skip;
135 {_, undefined} -> %% Skip if there is no message body in the packet
136
:-(
skip;
137 {Topic, BodyTag} ->
138 4 FromGUID = user_guid(Host, From),
139 4 ToGUID = user_guid(Host, To),
140 4 MessageBody = exml_query:cdata(BodyTag),
141 4 Content = #{from_user_id => FromGUID,
142 to_user_id => ToGUID,
143 message => MessageBody},
144
145 4 TopicARN = make_topic_arn(Host, Topic),
146 4 Attributes = message_attributes(Host, TopicARN, From, To, message_type(Packet), Packet),
147
148 4 async_publish(Host, TopicARN, Content, Attributes)
149 end.
150
151 %% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term
152 -spec async_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(),
153 attributes()) -> ok.
154 async_publish(Host, TopicARN, Content, Attributes) ->
155 22 Retry = opt(Host, publish_retry_count, ?PUBLISH_RETRY_COUNT_DEFAULT),
156 22 mongoose_wpool:cast(generic, Host, pusher_sns,
157 {?MODULE, try_publish, [Host, TopicARN, Content, Attributes, Retry]}).
158
159 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
160 -spec try_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(),
161 attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled.
162 try_publish(Host, TopicARN, Content, Attributes, Retry) when Retry < 0 ->
163
:-(
?LOG_WARNING(#{what => sns_notification_dropped,
164 server => Host, topic_arn => TopicARN,
165
:-(
attributes => Attributes, content => Content}),
166
:-(
dropped;
167 try_publish(Host, TopicARN, Content, Attributes, Retry) ->
168 22 try publish(Host, TopicARN, Content, Attributes)
169 catch
170 Type:Error:StackTrace ->
171
:-(
BackoffTime = calc_backoff_time(Host, Retry),
172
:-(
timer:apply_after(BackoffTime, mongoose_wpool, cast,
173 [generic, Host, pusher_sns,
174 {?MODULE, try_publish,
175 [Host, TopicARN, Content, Attributes, Retry - 1]}]),
176
:-(
?LOG_WARNING(#{what => sns_notification_retry,
177 text => <<"Retrying SNS notification after {backoff_time} ms">>,
178 server => Host, topic_arn => TopicARN,
179 attributes => Attributes, content => Content,
180 backoff_time => BackoffTime,
181
:-(
class => Type, reason => Error, stacktrace => StackTrace}),
182
:-(
scheduled
183 end.
184
185 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
186 -spec publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(),
187 attributes()) -> MessageId :: string().
188 publish(Host, TopicARN, Content, Attributes) ->
189 22 erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content),
190 undefined, maps:to_list(Attributes), aws_handle(Host)).
191
192 %% @doc Returns AWS SNS handle base on configured AWS credentials
193 -spec aws_handle(Host :: jid:lserver()) -> aws_config().
194 aws_handle(Host) ->
195 22 AccessKeyId = opt(Host, access_key_id),
196 22 SecretKey = opt(Host, secret_access_key),
197 22 SNSHost = opt(Host, sns_host),
198
199 22 erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost).
200
201 %% @doc Returns notification topic based on packet type and module configuration
202 -spec get_topic(Host :: jid:lserver(), Packet :: exml:element()) -> topic() | undefined.
203 get_topic(Host, Packet) ->
204 4 case message_type(Packet) of
205 pm ->
206 3 opt(Host, pm_messages_topic);
207 muc ->
208 1 opt(Host, muc_messages_topic);
209 _ ->
210
:-(
undefined
211 end.
212
213
214 %% @doc Constructs SNS TopicArn from given topic suffix
215 -spec make_topic_arn(Host :: jid:lserver(), Topic :: topic()) -> topic_arn().
216 make_topic_arn(Host, Topic) ->
217 22 AWSRegion = opt(Host, region),
218 22 AWSAccountId = opt(Host, account_id),
219
220 22 string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":").
221
222 %% @doc Returns message type
223 -spec message_type(Packet :: exml:element()) -> pm | muc | undefined.
224 message_type(Packet) ->
225 8 case exml_query:attr(Packet, <<"type">>) of
226 6 <<"chat">> -> pm;
227 2 <<"groupchat">> -> muc;
228
:-(
_ -> undefined
229 end.
230
231 %% Getter for module options
232 -spec opt(Host :: jid:lserver(), Option :: atom()) -> Value :: term() | undefined.
233 opt(Host, Option) ->
234 132 opt(Host, Option, undefined).
235
236 %% Getter for module options with default value
237 -spec opt(Host :: jid:lserver(), Option :: atom(), Default :: term()) ->
238 Value :: term().
239 opt(Host, Option, Default) ->
240 202 gen_mod:get_module_opt(Host, ?MODULE, Option, Default).
241
242 %% ----------------------------------------------------------------------
243 %% Callbacks
244
245 -spec user_guid(Host :: jid:lserver(), UserJID :: jid:jid()) -> user_guid().
246 user_guid(Host, UserJID) ->
247 26 PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults),
248 26 PluginModule:user_guid(UserJID).
249
250 -spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(),
251 UserJID :: jid:jid(), IsOnline :: boolean()) ->
252 attributes().
253 message_attributes(Host, TopicARN, UserJID, IsOnline) ->
254 18 PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults),
255 18 PluginModule:message_attributes(TopicARN, UserJID, IsOnline).
256
257 -spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(),
258 From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc,
259 Packet :: exml:element()) -> attributes().
260 message_attributes(Host, TopicARN, From, To, MessageType, Packet) ->
261 4 PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults),
262 4 PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet).
263
264 -spec calc_backoff_time(Host :: jid:lserver(), integer()) -> integer().
265 calc_backoff_time(Host, Retry) ->
266
:-(
MaxRetry = opt(Host, publish_retry_count, ?PUBLISH_RETRY_COUNT_DEFAULT),
267
:-(
BaseTime = opt(Host, publish_retry_time_ms, 50),
268
:-(
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime,
269
:-(
Random = rand:uniform(BaseTime),
270
:-(
BackoffMaxTime - Random.
Line Hits Source