./ct_report/coverage/mod_event_pusher_sns.COVER.html

1 -module(mod_event_pusher_sns).
2
3 -behaviour(gen_mod).
4 -behaviour(mod_event_pusher).
5
6 -include("mod_event_pusher_events.hrl").
7 -include("mongoose.hrl").
8 -include("jlib.hrl").
9 -include_lib("erlcloud/include/erlcloud_aws.hrl").
10 -include("mongoose_config_spec.hrl").
11
12 -callback user_guid(UserJID :: jid:jid()) -> user_guid().
13 -callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(),
14 IsOnline :: boolean()) -> attributes().
15 -callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(),
16 To :: jid:jid(), MessageType :: pm | muc,
17 Packet :: exml:element()) -> attributes().
18
19 %%%===================================================================
20 %%% Types and definitions
21 %%%===================================================================
22
23 -define(TOPIC_BASE, ["arn", "aws", "sns"]).
24
25 -type user_guid() :: binary().
26 -type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC}
27 -type topic() :: string(). %% {TOPIC} part of topic_arn() type
28 -type attributes() :: #{string() => string() | binary() | number()}.
29
30
31 %%%===================================================================
32 %%% Exports
33 %%%===================================================================
34
35 %% MIM module callbacks
36 -export([start/2, stop/1, config_spec/0]).
37
38 %% API
39 -export([try_publish/5, push_event/3]).
40
41 -ignore_xref([behaviour_info/1, try_publish/5]).
42
43 %% Types
44 -export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]).
45
46 -spec start(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> ok.
47 start(Host, Opts) ->
48 2 application:ensure_all_started(erlcloud),
49 2 application:ensure_all_started(worker_pool),
50 2 start_pool(Host, Opts),
51 2 ok.
52
53 -spec start_pool(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) ->
54 term().
55 start_pool(Host, Opts) ->
56 2 {ok, _} = mongoose_wpool:start(generic, Host, pusher_sns, pool_opts(Opts)).
57
58 -spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts().
59 pool_opts(Opts) ->
60 2 WorkerNum = get_worker_num(Opts),
61 2 [{workers, WorkerNum}, {strategy, available_worker}].
62
63 -spec get_worker_num(gen_mod:module_opts()) -> pos_integer().
64 get_worker_num(Opts) ->
65 2 gen_mod:get_opt(pool_size, Opts, 100).
66
67 -spec stop(Host :: jid:server()) -> ok.
68 stop(Host) ->
69 2 mongoose_wpool:stop(generic, Host, pusher_sns),
70 2 ok.
71
72 -spec config_spec() -> mongoose_config_spec:config_section().
73 config_spec() ->
74 160 #section{
75 items = #{<<"presence_updates_topic">> => #option{type = string},
76 <<"pm_messages_topic">> => #option{type = string},
77 <<"muc_messages_topic">> => #option{type = string},
78 <<"plugin_module">> => #option{type = atom,
79 validate = module},
80 <<"sns_host">> => #option{type = string},
81 <<"region">> => #option{type = string},
82 <<"access_key_id">> => #option{type = string},
83 <<"secret_access_key">> => #option{type = string},
84 <<"account_id">> => #option{type = string},
85 <<"pool_size">> => #option{type = integer,
86 validate = positive},
87 <<"publish_retry_count">> => #option{type = integer,
88 validate = non_negative},
89 <<"publish_retry_time_ms">> => #option{type = integer,
90 validate = non_negative}
91 },
92 required = [<<"access_key_id">>, <<"secret_access_key">>,
93 <<"region">>, <<"account_id">>, <<"sns_host">>]
94 }.
95
96 push_event(Acc, _, #user_status_event{jid = UserJID, status = Status}) ->
97 18 user_presence_changed(UserJID, Status == online),
98 18 Acc;
99 push_event(Acc, _, #chat_event{direction = in, from = From, to = To, packet = Packet}) ->
100 4 handle_packet(From, To, Packet),
101 4 Acc;
102 push_event(Acc, _, _) ->
103 20 Acc.
104
105 %%%===================================================================
106 %%% Internal functions
107 %%%===================================================================
108
109 -spec user_presence_changed(UserJID :: jid:jid(), IsOnline :: boolean()) -> ok.
110 user_presence_changed(#jid{lserver = Host} = UserJID, IsOnline) ->
111 18 Topic = opt(Host, presence_updates_topic),
112 18 case Topic of
113 undefined ->
114
:-(
skip;
115 Topic ->
116 18 UserGUID = user_guid(Host, UserJID),
117 18 Content = #{user_id => UserGUID, present => IsOnline},
118 18 TopicARN = make_topic_arn(Host, Topic),
119 18 Attributes = message_attributes(Host, TopicARN, UserJID, IsOnline),
120 18 async_publish(Host, TopicARN, Content, Attributes)
121 end,
122 18 ok.
123
124 %% @doc Handles packet and if needed publishes SNS notification
125 -spec handle_packet(From :: jid:jid(), To :: jid:jid(),
126 Packet :: exml:element()) -> ok | skip.
127 handle_packet(From = #jid{lserver = Host}, To, Packet) ->
128 4 ?LOG_DEBUG(#{what => sns_handle_packet,
129 4 from => From, to => To, packet => exml_Packet}),
130
131 4 case {get_topic(Host, Packet), exml_query:subelement(Packet, <<"body">>)} of
132 {undefined, _} -> %% Skip if there is no topic set in configuration for the packet type
133
:-(
skip;
134 {_, undefined} -> %% Skip if there is no message body in the packet
135
:-(
skip;
136 {Topic, BodyTag} ->
137 4 FromGUID = user_guid(Host, From),
138 4 ToGUID = user_guid(Host, To),
139 4 MessageBody = exml_query:cdata(BodyTag),
140 4 Content = #{from_user_id => FromGUID,
141 to_user_id => ToGUID,
142 message => MessageBody},
143
144 4 TopicARN = make_topic_arn(Host, Topic),
145 4 Attributes = message_attributes(Host, TopicARN, From, To, message_type(Packet), Packet),
146
147 4 async_publish(Host, TopicARN, Content, Attributes)
148 end.
149
150 %% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term
151 -spec async_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(),
152 attributes()) -> ok.
153 async_publish(Host, TopicARN, Content, Attributes) ->
154 22 Retry = opt(Host, publish_retry_count, 2),
155 22 mongoose_wpool:cast(generic, Host, pusher_sns,
156 {?MODULE, try_publish, [Host, TopicARN, Content, Attributes, Retry]}).
157
158 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
159 -spec try_publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(),
160 attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled.
161 try_publish(Host, TopicARN, Content, Attributes, Retry) when Retry < 0 ->
162
:-(
?LOG_WARNING(#{what => sns_notification_dropped,
163 server => Host, topic_arn => TopicARN,
164
:-(
attributes => Attributes, content => Content}),
165
:-(
dropped;
166 try_publish(Host, TopicARN, Content, Attributes, Retry) ->
167 22 try publish(Host, TopicARN, Content, Attributes)
168 catch
169 Type:Error:StackTrace ->
170
:-(
BackoffTime = calc_backoff_time(Host, Retry),
171
:-(
timer:apply_after(BackoffTime, mongoose_wpool, cast,
172 [generic, Host, pusher_sns,
173 {?MODULE, try_publish,
174 [Host, TopicARN, Content, Attributes, Retry - 1]}]),
175
:-(
?LOG_WARNING(#{what => sns_notification_retry,
176 text => <<"Retrying SNS notification after {backoff_time} ms">>,
177 server => Host, topic_arn => TopicARN,
178 attributes => Attributes, content => Content,
179 backoff_time => BackoffTime,
180
:-(
class => Type, reason => Error, stacktrace => StackTrace}),
181
:-(
scheduled
182 end.
183
184 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
185 -spec publish(Host :: jid:lserver(), topic_arn(), Content :: jiffy:json_value(),
186 attributes()) -> MessageId :: string().
187 publish(Host, TopicARN, Content, Attributes) ->
188 22 erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content),
189 undefined, maps:to_list(Attributes), aws_handle(Host)).
190
191 %% @doc Returns AWS SNS handle base on configured AWS credentials
192 -spec aws_handle(Host :: jid:lserver()) -> aws_config().
193 aws_handle(Host) ->
194 22 AccessKeyId = opt(Host, access_key_id),
195 22 SecretKey = opt(Host, secret_access_key),
196 22 SNSHost = opt(Host, sns_host),
197
198 22 erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost).
199
200 %% @doc Returns notification topic based on packet type and module configuration
201 -spec get_topic(Host :: jid:lserver(), Packet :: exml:element()) -> topic() | undefined.
202 get_topic(Host, Packet) ->
203 4 case message_type(Packet) of
204 pm ->
205 3 opt(Host, pm_messages_topic);
206 muc ->
207 1 opt(Host, muc_messages_topic);
208 _ ->
209
:-(
undefined
210 end.
211
212
213 %% @doc Constructs SNS TopicArn from given topic suffix
214 -spec make_topic_arn(Host :: jid:lserver(), Topic :: topic()) -> topic_arn().
215 make_topic_arn(Host, Topic) ->
216 22 AWSRegion = opt(Host, region),
217 22 AWSAccountId = opt(Host, account_id),
218
219 22 string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":").
220
221 %% @doc Returns message type
222 -spec message_type(Packet :: exml:element()) -> pm | muc | undefined.
223 message_type(Packet) ->
224 8 case exml_query:attr(Packet, <<"type">>) of
225 6 <<"chat">> -> pm;
226 2 <<"groupchat">> -> muc;
227
:-(
_ -> undefined
228 end.
229
230 %% Getter for module options
231 -spec opt(Host :: jid:lserver(), Option :: atom()) -> Value :: term() | undefined.
232 opt(Host, Option) ->
233 132 opt(Host, Option, undefined).
234
235 %% Getter for module options with default value
236 -spec opt(Host :: jid:lserver(), Option :: atom(), Default :: term()) ->
237 Value :: term().
238 opt(Host, Option, Default) ->
239 202 gen_mod:get_module_opt(Host, ?MODULE, Option, Default).
240
241 %% ----------------------------------------------------------------------
242 %% Callbacks
243
244 -spec user_guid(Host :: jid:lserver(), UserJID :: jid:jid()) -> user_guid().
245 user_guid(Host, UserJID) ->
246 26 PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults),
247 26 PluginModule:user_guid(UserJID).
248
249 -spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(),
250 UserJID :: jid:jid(), IsOnline :: boolean()) ->
251 attributes().
252 message_attributes(Host, TopicARN, UserJID, IsOnline) ->
253 18 PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults),
254 18 PluginModule:message_attributes(TopicARN, UserJID, IsOnline).
255
256 -spec message_attributes(Host :: jid:lserver(), TopicARN :: topic_arn(),
257 From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc,
258 Packet :: exml:element()) -> attributes().
259 message_attributes(Host, TopicARN, From, To, MessageType, Packet) ->
260 4 PluginModule = opt(Host, plugin_module, mod_event_pusher_sns_defaults),
261 4 PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet).
262
263 -spec calc_backoff_time(Host :: jid:lserver(), integer()) -> integer().
264 calc_backoff_time(Host, Retry) ->
265
:-(
MaxRetry = opt(Host, publish_retry_count),
266
:-(
BaseTime = opt(Host, publish_retry_time_ms, 50),
267
:-(
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime,
268
:-(
crypto:rand_uniform(BackoffMaxTime - BaseTime, BackoffMaxTime).
Line Hits Source