./ct_report/coverage/mod_event_pusher_sns.COVER.html

1 -module(mod_event_pusher_sns).
2
3 -behaviour(gen_mod).
4 -behaviour(mod_event_pusher).
5
6 -include("mod_event_pusher_events.hrl").
7 -include("mongoose.hrl").
8 -include_lib("erlcloud/include/erlcloud_aws.hrl").
9 -include("mongoose_config_spec.hrl").
10
11 -callback user_guid(UserJID :: jid:jid()) -> user_guid().
12 -callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(),
13 IsOnline :: boolean()) -> attributes().
14 -callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(),
15 To :: jid:jid(), MessageType :: pm | muc,
16 Packet :: exml:element()) -> attributes().
17
18 %%%===================================================================
19 %%% Types and definitions
20 %%%===================================================================
21
22 -define(TOPIC_BASE, ["arn", "aws", "sns"]).
23
24 -type user_guid() :: binary().
25 -type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC}
26 -type topic() :: string(). %% {TOPIC} part of topic_arn() type
27 -type attributes() :: #{string() => string() | binary() | number()}.
28
29
30 %%%===================================================================
31 %%% Exports
32 %%%===================================================================
33
34 %% MIM module callbacks
35 -export([start/2, stop/1, config_spec/0]).
36
37 %% API
38 -export([try_publish/5, push_event/2]).
39
40 -ignore_xref([behaviour_info/1, try_publish/5]).
41
42 %% Types
43 -export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]).
44
45 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
46 start(HostType, Opts) ->
47 2 application:ensure_all_started(erlcloud),
48 2 application:ensure_all_started(worker_pool),
49 2 start_pool(HostType, Opts),
50 2 ok.
51
52 -spec start_pool(mongooseim:host_type(), gen_mod:module_opts()) ->
53 term().
54 start_pool(HostType, Opts) ->
55 2 {ok, _} = mongoose_wpool:start(generic, HostType, pusher_sns, pool_opts(Opts)).
56
57 -spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts().
58 pool_opts(Opts) ->
59 2 WorkerNum = get_worker_num(Opts),
60 2 [{workers, WorkerNum}, {strategy, available_worker}].
61
62 -spec get_worker_num(gen_mod:module_opts()) -> pos_integer().
63 get_worker_num(Opts) ->
64 2 gen_mod:get_opt(pool_size, Opts).
65
66 -spec stop(HostType :: jid:server()) -> ok.
67 stop(HostType) ->
68 2 mongoose_wpool:stop(generic, HostType, pusher_sns),
69 2 ok.
70
71 -spec config_spec() -> mongoose_config_spec:config_section().
72 config_spec() ->
73 106 #section{
74 items = #{<<"presence_updates_topic">> => #option{type = string},
75 <<"pm_messages_topic">> => #option{type = string},
76 <<"muc_messages_topic">> => #option{type = string},
77 <<"plugin_module">> => #option{type = atom,
78 validate = module},
79 <<"sns_host">> => #option{type = string},
80 <<"region">> => #option{type = string},
81 <<"access_key_id">> => #option{type = string},
82 <<"secret_access_key">> => #option{type = string},
83 <<"account_id">> => #option{type = string},
84 <<"pool_size">> => #option{type = integer,
85 validate = positive},
86 <<"publish_retry_count">> => #option{type = integer,
87 validate = non_negative},
88 <<"publish_retry_time_ms">> => #option{type = integer,
89 validate = non_negative}
90 },
91 required = [<<"sns_host">>, <<"region">>, <<"access_key_id">>, <<"secret_access_key">>,
92 <<"account_id">>],
93 defaults = #{<<"plugin_module">> => mod_event_pusher_sns_defaults,
94 <<"pool_size">> => 100,
95 <<"publish_retry_count">> => 2,
96 <<"publish_retry_time_ms">> => 50
97 }
98 }.
99
100 push_event(Acc, #user_status_event{jid = UserJID, status = Status}) ->
101 18 user_presence_changed(mongoose_acc:host_type(Acc), UserJID, Status == online),
102 18 Acc;
103 push_event(Acc, #chat_event{direction = in, from = From, to = To, packet = Packet}) ->
104 4 handle_packet(mongoose_acc:host_type(Acc), From, To, Packet),
105 4 Acc;
106 push_event(Acc, _) ->
107 23 Acc.
108
109 %%%===================================================================
110 %%% Internal functions
111 %%%===================================================================
112
113 -spec user_presence_changed(mongooseim:host_type(), UserJID :: jid:jid(),
114 IsOnline :: boolean()) -> ok.
115 user_presence_changed(HostType, UserJID, IsOnline) ->
116 18 case gen_mod:lookup_module_opt(HostType, ?MODULE, presence_updates_topic) of
117 {error, not_found} ->
118
:-(
skip;
119 {ok, Topic} ->
120 18 UserGUID = user_guid(HostType, UserJID),
121 18 Content = #{user_id => UserGUID, present => IsOnline},
122 18 TopicARN = make_topic_arn(HostType, Topic),
123 18 Attributes = message_attributes(HostType, TopicARN, UserJID, IsOnline),
124 18 async_publish(HostType, TopicARN, Content, Attributes)
125 end,
126 18 ok.
127
128 %% @doc Handles packet and if needed publishes SNS notification
129 -spec handle_packet(mongooseim:host_type(), From :: jid:jid(), To :: jid:jid(),
130 Packet :: exml:element()) -> ok | skip.
131 handle_packet(HostType, From, To, Packet) ->
132 4 ?LOG_DEBUG(#{what => sns_handle_packet,
133 4 from => From, to => To, packet => exml_Packet}),
134
135 4 case {get_topic(HostType, Packet), exml_query:subelement(Packet, <<"body">>)} of
136 {undefined, _} -> %% Skip if there is no topic set in configuration for the packet type
137
:-(
skip;
138 {_, undefined} -> %% Skip if there is no message body in the packet
139
:-(
skip;
140 {Topic, BodyTag} ->
141 4 FromGUID = user_guid(HostType, From),
142 4 ToGUID = user_guid(HostType, To),
143 4 MessageBody = exml_query:cdata(BodyTag),
144 4 Content = #{from_user_id => FromGUID,
145 to_user_id => ToGUID,
146 message => MessageBody},
147
148 4 TopicARN = make_topic_arn(HostType, Topic),
149 4 Attributes = message_attributes(HostType, TopicARN, From, To, message_type(Packet), Packet),
150
151 4 async_publish(HostType, TopicARN, Content, Attributes)
152 end.
153
154 %% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term
155 -spec async_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
156 attributes()) -> ok.
157 async_publish(HostType, TopicARN, Content, Attributes) ->
158 22 Retry = gen_mod:get_module_opt(HostType, ?MODULE, publish_retry_count),
159 22 mongoose_wpool:cast(generic, HostType, pusher_sns,
160 {?MODULE, try_publish, [HostType, TopicARN, Content, Attributes, Retry]}).
161
162 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
163 -spec try_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
164 attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled.
165 try_publish(HostType, TopicARN, Content, Attributes, Retry) when Retry < 0 ->
166
:-(
?LOG_WARNING(#{what => sns_notification_dropped,
167 server => HostType, topic_arn => TopicARN,
168
:-(
attributes => Attributes, content => Content}),
169
:-(
dropped;
170 try_publish(HostType, TopicARN, Content, Attributes, Retry) ->
171 22 try publish(HostType, TopicARN, Content, Attributes)
172 catch
173 Type:Error:StackTrace ->
174
:-(
BackoffTime = calc_backoff_time(HostType, Retry),
175
:-(
timer:apply_after(BackoffTime, mongoose_wpool, cast,
176 [generic, HostType, pusher_sns,
177 {?MODULE, try_publish,
178 [HostType, TopicARN, Content, Attributes, Retry - 1]}]),
179
:-(
?LOG_WARNING(#{what => sns_notification_retry,
180 text => <<"Retrying SNS notification after {backoff_time} ms">>,
181 host_type => HostType, topic_arn => TopicARN,
182 attributes => Attributes, content => Content,
183 backoff_time => BackoffTime,
184
:-(
class => Type, reason => Error, stacktrace => StackTrace}),
185
:-(
scheduled
186 end.
187
188 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
189 -spec publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
190 attributes()) -> MessageId :: string().
191 publish(HostType, TopicARN, Content, Attributes) ->
192 22 erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content),
193 undefined, maps:to_list(Attributes), aws_handle(HostType)).
194
195 %% @doc Returns AWS SNS handle base on configured AWS credentials
196 -spec aws_handle(mongooseim:host_type()) -> aws_config().
197 aws_handle(HostType) ->
198 22 Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
199 22 #{access_key_id := AccessKeyId, secret_access_key := SecretKey, sns_host := SNSHost} = Opts,
200 22 erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost).
201
202 %% @doc Returns notification topic based on packet type and module configuration
203 -spec get_topic(mongooseim:host_type(), Packet :: exml:element()) -> topic() | undefined.
204 get_topic(HostType, Packet) ->
205 4 case message_type(Packet) of
206 pm ->
207 3 gen_mod:get_module_opt(HostType, ?MODULE, pm_messages_topic, undefined);
208 muc ->
209 1 gen_mod:get_module_opt(HostType, ?MODULE, muc_messages_topic, undefined);
210 _ ->
211
:-(
undefined
212 end.
213
214 %% @doc Constructs SNS TopicArn from given topic suffix
215 -spec make_topic_arn(mongooseim:host_type(), topic()) -> topic_arn().
216 make_topic_arn(HostType, Topic) ->
217 22 Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
218 22 #{region := AWSRegion, account_id := AWSAccountId} = Opts,
219 22 string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":").
220
221 %% @doc Returns message type
222 -spec message_type(Packet :: exml:element()) -> pm | muc | undefined.
223 message_type(Packet) ->
224 8 case exml_query:attr(Packet, <<"type">>) of
225 6 <<"chat">> -> pm;
226 2 <<"groupchat">> -> muc;
227
:-(
_ -> undefined
228 end.
229
230 %% ----------------------------------------------------------------------
231 %% Callbacks
232
233 -spec user_guid(mongooseim:host_type(), UserJID :: jid:jid()) -> user_guid().
234 user_guid(HostType, UserJID) ->
235 26 PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module),
236 26 PluginModule:user_guid(UserJID).
237
238 -spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(),
239 UserJID :: jid:jid(), IsOnline :: boolean()) ->
240 attributes().
241 message_attributes(HostType, TopicARN, UserJID, IsOnline) ->
242 18 PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module),
243 18 PluginModule:message_attributes(TopicARN, UserJID, IsOnline).
244
245 -spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(),
246 From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc,
247 Packet :: exml:element()) -> attributes().
248 message_attributes(HostType, TopicARN, From, To, MessageType, Packet) ->
249 4 PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module),
250 4 PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet).
251
252 -spec calc_backoff_time(mongooseim:host_type(), integer()) -> integer().
253 calc_backoff_time(HostType, Retry) ->
254
:-(
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
255
:-(
#{publish_retry_count := MaxRetry, publish_retry_time_ms := BaseTime} = Opts,
256
:-(
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime,
257
:-(
Random = rand:uniform(BaseTime),
258
:-(
BackoffMaxTime - Random.
Line Hits Source