./ct_report/coverage/mod_event_pusher_sns.COVER.html

1 -module(mod_event_pusher_sns).
2
3 -behaviour(gen_mod).
4 -behaviour(mod_event_pusher).
5
6 -include("mod_event_pusher_events.hrl").
7 -include("mongoose.hrl").
8 -include_lib("erlcloud/include/erlcloud_aws.hrl").
9 -include("mongoose_config_spec.hrl").
10
11 -callback user_guid(UserJID :: jid:jid()) -> user_guid().
12 -callback message_attributes(TopicARN :: topic_arn(), UserJID :: jid:jid(),
13 IsOnline :: boolean()) -> attributes().
14 -callback message_attributes(TopicARN :: topic_arn(), From :: jid:jid(),
15 To :: jid:jid(), MessageType :: pm | muc,
16 Packet :: exml:element()) -> attributes().
17
18 %%%===================================================================
19 %%% Types and definitions
20 %%%===================================================================
21
22 -define(TOPIC_BASE, ["arn", "aws", "sns"]).
23
24 -type user_guid() :: binary().
25 -type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC}
26 -type topic() :: string(). %% {TOPIC} part of topic_arn() type
27 -type attributes() :: #{string() => string() | binary() | number()}.
28
29
30 %%%===================================================================
31 %%% Exports
32 %%%===================================================================
33
34 %% MIM module callbacks
35 -export([start/2, stop/1, config_spec/0]).
36
37 %% API
38 -export([try_publish/5, push_event/2]).
39
40 -ignore_xref([behaviour_info/1, try_publish/5]).
41
42 %% Types
43 -export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]).
44
45 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
46 start(HostType, Opts) ->
47
:-(
application:ensure_all_started(erlcloud),
48
:-(
application:ensure_all_started(worker_pool),
49
:-(
start_pool(HostType, Opts),
50
:-(
ok.
51
52 -spec start_pool(mongooseim:host_type(), gen_mod:module_opts()) ->
53 term().
54 start_pool(HostType, Opts) ->
55
:-(
{ok, _} = mongoose_wpool:start(generic, HostType, pusher_sns, pool_opts(Opts)).
56
57 -spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts().
58 pool_opts(Opts) ->
59
:-(
WorkerNum = get_worker_num(Opts),
60
:-(
[{workers, WorkerNum}, {strategy, available_worker}].
61
62 -spec get_worker_num(gen_mod:module_opts()) -> pos_integer().
63 get_worker_num(Opts) ->
64
:-(
gen_mod:get_opt(pool_size, Opts).
65
66 -spec stop(HostType :: jid:server()) -> ok.
67 stop(HostType) ->
68
:-(
mongoose_wpool:stop(generic, HostType, pusher_sns),
69
:-(
ok.
70
71 -spec config_spec() -> mongoose_config_spec:config_section().
72 config_spec() ->
73 152 #section{
74 items = #{<<"presence_updates_topic">> => #option{type = string},
75 <<"pm_messages_topic">> => #option{type = string},
76 <<"muc_messages_topic">> => #option{type = string},
77 <<"plugin_module">> => #option{type = atom,
78 validate = module},
79 <<"sns_host">> => #option{type = string},
80 <<"region">> => #option{type = string},
81 <<"access_key_id">> => #option{type = string},
82 <<"secret_access_key">> => #option{type = string},
83 <<"account_id">> => #option{type = string},
84 <<"pool_size">> => #option{type = integer,
85 validate = positive},
86 <<"publish_retry_count">> => #option{type = integer,
87 validate = non_negative},
88 <<"publish_retry_time_ms">> => #option{type = integer,
89 validate = non_negative}
90 },
91 required = [<<"sns_host">>, <<"region">>, <<"access_key_id">>, <<"secret_access_key">>,
92 <<"account_id">>],
93 defaults = #{<<"plugin_module">> => mod_event_pusher_sns_defaults,
94 <<"pool_size">> => 100,
95 <<"publish_retry_count">> => 2,
96 <<"publish_retry_time_ms">> => 50
97 },
98 format_items = map
99 }.
100
101 push_event(Acc, #user_status_event{jid = UserJID, status = Status}) ->
102
:-(
user_presence_changed(mongoose_acc:host_type(Acc), UserJID, Status == online),
103
:-(
Acc;
104 push_event(Acc, #chat_event{direction = in, from = From, to = To, packet = Packet}) ->
105
:-(
handle_packet(mongoose_acc:host_type(Acc), From, To, Packet),
106
:-(
Acc;
107 push_event(Acc, _) ->
108
:-(
Acc.
109
110 %%%===================================================================
111 %%% Internal functions
112 %%%===================================================================
113
114 -spec user_presence_changed(mongooseim:host_type(), UserJID :: jid:jid(),
115 IsOnline :: boolean()) -> ok.
116 user_presence_changed(HostType, UserJID, IsOnline) ->
117
:-(
case gen_mod:lookup_module_opt(HostType, ?MODULE, presence_updates_topic) of
118 {error, not_found} ->
119
:-(
skip;
120 {ok, Topic} ->
121
:-(
UserGUID = user_guid(HostType, UserJID),
122
:-(
Content = #{user_id => UserGUID, present => IsOnline},
123
:-(
TopicARN = make_topic_arn(HostType, Topic),
124
:-(
Attributes = message_attributes(HostType, TopicARN, UserJID, IsOnline),
125
:-(
async_publish(HostType, TopicARN, Content, Attributes)
126 end,
127
:-(
ok.
128
129 %% @doc Handles packet and if needed publishes SNS notification
130 -spec handle_packet(mongooseim:host_type(), From :: jid:jid(), To :: jid:jid(),
131 Packet :: exml:element()) -> ok | skip.
132 handle_packet(HostType, From, To, Packet) ->
133
:-(
?LOG_DEBUG(#{what => sns_handle_packet,
134
:-(
from => From, to => To, packet => exml_Packet}),
135
136
:-(
case {get_topic(HostType, Packet), exml_query:subelement(Packet, <<"body">>)} of
137 {undefined, _} -> %% Skip if there is no topic set in configuration for the packet type
138
:-(
skip;
139 {_, undefined} -> %% Skip if there is no message body in the packet
140
:-(
skip;
141 {Topic, BodyTag} ->
142
:-(
FromGUID = user_guid(HostType, From),
143
:-(
ToGUID = user_guid(HostType, To),
144
:-(
MessageBody = exml_query:cdata(BodyTag),
145
:-(
Content = #{from_user_id => FromGUID,
146 to_user_id => ToGUID,
147 message => MessageBody},
148
149
:-(
TopicARN = make_topic_arn(HostType, Topic),
150
:-(
Attributes = message_attributes(HostType, TopicARN, From, To, message_type(Packet), Packet),
151
152
:-(
async_publish(HostType, TopicARN, Content, Attributes)
153 end.
154
155 %% @doc Start publish process notification to AWS SNS service. Content should be valid JSON term
156 -spec async_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
157 attributes()) -> ok.
158 async_publish(HostType, TopicARN, Content, Attributes) ->
159
:-(
Retry = gen_mod:get_module_opt(HostType, ?MODULE, publish_retry_count),
160
:-(
mongoose_wpool:cast(generic, HostType, pusher_sns,
161 {?MODULE, try_publish, [HostType, TopicARN, Content, Attributes, Retry]}).
162
163 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
164 -spec try_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
165 attributes(), TryCount :: integer()) -> MessageId :: string() | dropped | scheduled.
166 try_publish(HostType, TopicARN, Content, Attributes, Retry) when Retry < 0 ->
167
:-(
?LOG_WARNING(#{what => sns_notification_dropped,
168 server => HostType, topic_arn => TopicARN,
169
:-(
attributes => Attributes, content => Content}),
170
:-(
dropped;
171 try_publish(HostType, TopicARN, Content, Attributes, Retry) ->
172
:-(
try publish(HostType, TopicARN, Content, Attributes)
173 catch
174 Type:Error:StackTrace ->
175
:-(
BackoffTime = calc_backoff_time(HostType, Retry),
176
:-(
timer:apply_after(BackoffTime, mongoose_wpool, cast,
177 [generic, HostType, pusher_sns,
178 {?MODULE, try_publish,
179 [HostType, TopicARN, Content, Attributes, Retry - 1]}]),
180
:-(
?LOG_WARNING(#{what => sns_notification_retry,
181 text => <<"Retrying SNS notification after {backoff_time} ms">>,
182 host_type => HostType, topic_arn => TopicARN,
183 attributes => Attributes, content => Content,
184 backoff_time => BackoffTime,
185
:-(
class => Type, reason => Error, stacktrace => StackTrace}),
186
:-(
scheduled
187 end.
188
189 %% @doc Publish notification to AWS SNS service. Content should be a valid JSON term
190 -spec publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
191 attributes()) -> MessageId :: string().
192 publish(HostType, TopicARN, Content, Attributes) ->
193
:-(
erlcloud_sns:publish(topic, TopicARN, maps:to_list(Content),
194 undefined, maps:to_list(Attributes), aws_handle(HostType)).
195
196 %% @doc Returns AWS SNS handle base on configured AWS credentials
197 -spec aws_handle(mongooseim:host_type()) -> aws_config().
198 aws_handle(HostType) ->
199
:-(
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
200
:-(
#{access_key_id := AccessKeyId, secret_access_key := SecretKey, sns_host := SNSHost} = Opts,
201
:-(
erlcloud_sns:new(AccessKeyId, SecretKey, SNSHost).
202
203 %% @doc Returns notification topic based on packet type and module configuration
204 -spec get_topic(mongooseim:host_type(), Packet :: exml:element()) -> topic() | undefined.
205 get_topic(HostType, Packet) ->
206
:-(
case message_type(Packet) of
207 pm ->
208
:-(
gen_mod:get_module_opt(HostType, ?MODULE, pm_messages_topic, undefined);
209 muc ->
210
:-(
gen_mod:get_module_opt(HostType, ?MODULE, muc_messages_topic, undefined);
211 _ ->
212
:-(
undefined
213 end.
214
215 %% @doc Constructs SNS TopicArn from given topic suffix
216 -spec make_topic_arn(mongooseim:host_type(), topic()) -> topic_arn().
217 make_topic_arn(HostType, Topic) ->
218
:-(
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
219
:-(
#{region := AWSRegion, account_id := AWSAccountId} = Opts,
220
:-(
string:join(?TOPIC_BASE ++ [AWSRegion, AWSAccountId, Topic], ":").
221
222 %% @doc Returns message type
223 -spec message_type(Packet :: exml:element()) -> pm | muc | undefined.
224 message_type(Packet) ->
225
:-(
case exml_query:attr(Packet, <<"type">>) of
226
:-(
<<"chat">> -> pm;
227
:-(
<<"groupchat">> -> muc;
228
:-(
_ -> undefined
229 end.
230
231 %% ----------------------------------------------------------------------
232 %% Callbacks
233
234 -spec user_guid(mongooseim:host_type(), UserJID :: jid:jid()) -> user_guid().
235 user_guid(HostType, UserJID) ->
236
:-(
PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module),
237
:-(
PluginModule:user_guid(UserJID).
238
239 -spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(),
240 UserJID :: jid:jid(), IsOnline :: boolean()) ->
241 attributes().
242 message_attributes(HostType, TopicARN, UserJID, IsOnline) ->
243
:-(
PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module),
244
:-(
PluginModule:message_attributes(TopicARN, UserJID, IsOnline).
245
246 -spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(),
247 From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc,
248 Packet :: exml:element()) -> attributes().
249 message_attributes(HostType, TopicARN, From, To, MessageType, Packet) ->
250
:-(
PluginModule = gen_mod:get_module_opt(HostType, ?MODULE, plugin_module),
251
:-(
PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet).
252
253 -spec calc_backoff_time(mongooseim:host_type(), integer()) -> integer().
254 calc_backoff_time(HostType, Retry) ->
255
:-(
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
256
:-(
#{publish_retry_count := MaxRetry, publish_retry_time_ms := BaseTime} = Opts,
257
:-(
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime,
258
:-(
Random = rand:uniform(BaseTime),
259
:-(
BackoffMaxTime - Random.
Line Hits Source