./ct_report/coverage/mod_offline_riak.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mod_offline_riak.erl
3 %%% Author : Michal Piotrowski <michal.piotrowski@erlang-solutions.com>
4 %%% Purpose : mod_offline backend in Riak
5 %%%
6 %%%
7 %%% MongooseIM, Copyright (C) 2015 Erlang Solutions Ltd.
8 %%%
9 %%%----------------------------------------------------------------------
10
11 %%% @doc Riak backend for last activity XEP
12 %%%
13 %%% The backend uses the existing riak connection pool, which is "globally" defined in
14 %%% the mongooseim.toml file. Therefore, we don't need to start anything in the init
15 %%% function.
16 %%%
17 %%% The module follows the approach taken by the other riak backends - it creates
18 %%% the following bucket `{<<"offline">>, <<"example.com">>}' for each xmpp domain.
19 %%%
20 %%% ```
21 %%% Data Layout:
22 %%% KV: {Username_and_timestamp:binary, Status:binary}
23 %%% 2i: [Username:binary, Timestamp:integer]
24 %%% '''
25 %%% @end
26 -module(mod_offline_riak).
27
28 -behaviour(mod_offline_backend).
29
30 -export([init/2]).
31 -export([pop_messages/2]).
32 -export([fetch_messages/2]).
33 -export([write_messages/4]).
34 -export([count_offline_messages/4]).
35 -export([remove_expired_messages/2]).
36 -export([remove_old_messages/3]).
37 -export([remove_user/3]).
38
39 -include("mongoose.hrl").
40 -include("jlib.hrl").
41 -include("mod_offline.hrl").
42 -include_lib("riakc/include/riakc.hrl").
43
44 -define(TIMESTAMP_IDX, {integer_index, "timestamp"}).
45 -define(USER_IDX, {binary_index, "user"}).
46 -define(EXPIRE_IDX, {integer_index, "expire"}).
47
48 -define(INFINITY, 99999999999). %% Wed, 16 Nov 5138 09:46:39 GMT
49
50 -spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
51 init(_Host, _Opts) ->
52
:-(
ok.
53
54 -spec pop_messages(mongooseim:host_type(), jid:jid()) -> {ok, [mod_offline:msg()]}.
55 pop_messages(HostType, To = #jid{luser = LUser, lserver = LServer}) ->
56
:-(
Keys = read_user_idx(HostType, LUser, LServer),
57
:-(
Msgs = [pop_msg(HostType, Key, LUser, LServer, To) || Key <- Keys],
58
:-(
{ok, lists:flatten(Msgs)}.
59
60 -spec fetch_messages(mongooseim:host_type(), jid:jid()) -> {ok, [mod_offline:msg()]}.
61 fetch_messages(HostType, To) ->
62
:-(
{LUser, LServer} = jid:to_lus(To),
63
:-(
Keys = read_user_idx(HostType, LUser, LServer),
64
:-(
{ok, [fetch_msg(HostType, Key, LUser, LServer, To) || Key <- Keys]}.
65
66 -spec write_messages(mongooseim:host_type(), jid:luser(), jid:lserver(), [mod_offline:msg()]) -> ok.
67 write_messages(HostType, LUser, LServer, Msgs) ->
68
:-(
[write_msg(HostType, LUser, LServer, Msg) || Msg <- Msgs],
69
:-(
ok.
70
71 -spec count_offline_messages(mongooseim:host_type(), jid:luser(), jid:lserver(),
72 mod_offline:msg_count()) ->
73 mod_offline:msg_count().
74 count_offline_messages(HostType, LUser, LServer, MaxArchivedMsgs) ->
75
:-(
{ok, IdxResult} = mongoose_riak:get_index(bucket_type(HostType, LServer), ?USER_IDX,
76 LUser, [{max_results, MaxArchivedMsgs}]),
77
:-(
length(IdxResult?INDEX_RESULTS.keys).
78
79 -spec remove_expired_messages(mongooseim:host_type(), jid:lserver()) -> {ok, mod_offline:msg_count()}.
80 remove_expired_messages(HostType, LServer) ->
81
:-(
TimestampInt = os:system_time(microsecond),
82
:-(
{ok, Result} = mongoose_riak:get_index_range(bucket_type(HostType, LServer), ?EXPIRE_IDX,
83 0, TimestampInt, []),
84
:-(
Keys = Result?INDEX_RESULTS.keys,
85
:-(
[mongoose_riak:delete(bucket_type(HostType, LServer), Key) || Key <- Keys],
86
:-(
{ok, length(Keys)}.
87
88 -spec remove_old_messages(mongooseim:host_type(), jid:lserver(), mod_offline:timestamp()) ->
89 {ok, mod_offline:msg_count()}.
90 remove_old_messages(HostType, LServer, TimestampInt) ->
91
:-(
{ok, Result} = mongoose_riak:get_index_range(bucket_type(HostType, LServer), ?TIMESTAMP_IDX,
92 0, TimestampInt, []),
93
:-(
Keys = Result?INDEX_RESULTS.keys,
94
:-(
[mongoose_riak:delete(bucket_type(HostType, LServer), Key) || Key <- Keys],
95
:-(
{ok, length(Keys)}.
96
97 -spec remove_user(mongooseim:host_type(), jid:luser(), jid:lserver()) -> ok.
98 remove_user(HostType, LUser, LServer) ->
99
:-(
Keys = read_user_idx(HostType, LUser, LServer),
100
:-(
[mongoose_riak:delete(bucket_type(HostType, LServer), Key) || Key <- Keys],
101
:-(
ok.
102
103 read_user_idx(HostType, LUser, LServer) ->
104
:-(
{ok, IdxResult} = mongoose_riak:get_index(bucket_type(HostType, LServer), ?USER_IDX,
105 LUser, []),
106
:-(
IdxResult?INDEX_RESULTS.keys.
107
108 write_msg(HostType, LUser, LServer, #offline_msg{from = FromJID, packet = Packet,
109 timestamp = TimestampIn, expire = Expire,
110 permanent_fields = PermanentFields}) ->
111
:-(
Timestamp = TimestampIn,
112
:-(
Obj = riakc_obj:new(bucket_type(HostType, LServer), key(LUser, Timestamp), exml:to_binary(Packet)),
113
:-(
MD = riakc_obj:get_update_metadata(Obj),
114
:-(
SecondaryIndexes = [{?TIMESTAMP_IDX, [Timestamp]},
115 {?USER_IDX, [LUser]},
116 {?EXPIRE_IDX, [maybe_encode_timestamp(Expire)]}],
117
:-(
MDWithIndexes = riakc_obj:set_secondary_index(MD, SecondaryIndexes),
118
:-(
From = jid:to_binary(FromJID),
119
:-(
UserMetaData = [{<<"from">>, From},
120 {<<"permanent_fields">>, term_to_binary(PermanentFields)}],
121
:-(
MDWithUserData = set_obj_user_metadata(MDWithIndexes, UserMetaData),
122
:-(
FinalObj = riakc_obj:update_metadata(Obj, MDWithUserData),
123
:-(
mongoose_riak:put(FinalObj).
124
125 set_obj_user_metadata(MD, []) ->
126
:-(
MD;
127 set_obj_user_metadata(MD, [UserMD | Rest]) ->
128
:-(
UpdatedMD = riakc_obj:set_user_metadata_entry(MD, UserMD),
129
:-(
set_obj_user_metadata(UpdatedMD, Rest).
130
131 pop_msg(HostType, Key, LUser, LServer, To) ->
132
:-(
try
133
:-(
{ok, Obj} = mongoose_riak:get(bucket_type(HostType, LServer), Key),
134
135
:-(
PacketRaw = riakc_obj:get_value(Obj),
136
:-(
{ok, Packet} = exml:parse(PacketRaw),
137
:-(
MD = riakc_obj:get_update_metadata(Obj),
138
:-(
[Timestamp] = riakc_obj:get_secondary_index(MD, ?TIMESTAMP_IDX),
139
:-(
[Expire] = riakc_obj:get_secondary_index(MD, ?EXPIRE_IDX),
140
:-(
From = riakc_obj:get_user_metadata_entry(MD, <<"from">>),
141
:-(
PermanentFields = extract_permanent_fields(MD),
142
143
:-(
mongoose_riak:delete(bucket_type(HostType, LServer), Key),
144
145
:-(
#offline_msg{us = {LUser, LServer},
146 timestamp = Timestamp,
147 expire = maybe_decode_timestamp(Expire),
148 from = jid:from_binary(From),
149 to = To,
150 packet = Packet,
151 permanent_fields = PermanentFields}
152
153 catch
154 Class:Reason:StackTrace ->
155
:-(
?LOG_WARNING(#{what => offline_riak_reading_key_failed,
156 text => <<"mod_offline_riak failed to read key">>,
157 server => LServer, user => LUser, riak_key => Key,
158
:-(
class => Class, reason => Reason, stacktrace => StackTrace}),
159
:-(
[]
160 end.
161
162 extract_permanent_fields(MD) ->
163
:-(
case riakc_obj:get_user_metadata_entry(MD, <<"permanent_fields">>) of
164
:-(
notfound -> [];
165
:-(
Fields -> binary_to_term(Fields)
166 end.
167
168 -spec bucket_type(mongooseim:host_type(), jid:lserver()) -> {binary(), jid:lserver()}.
169 bucket_type(HostType, LServer) ->
170
:-(
{gen_mod:get_module_opt(LServer, mod_offline, bucket_type, <<"offline">>), HostType}.
171
172 -spec key(binary(), integer()) -> binary().
173 key(LUser, TimestampInt) ->
174
:-(
Timestamp = integer_to_binary(TimestampInt),
175
:-(
Random = integer_to_binary(rand:uniform(1024)),
176
:-(
<<LUser/binary, "@", Timestamp/binary, "@", Random/binary>>.
177
178 maybe_encode_timestamp(never) ->
179
:-(
?INFINITY;
180 maybe_encode_timestamp(TS) ->
181
:-(
TS.
182
183 maybe_decode_timestamp(?INFINITY) ->
184
:-(
never;
185 maybe_decode_timestamp(TS) ->
186
:-(
TS.
187
188 fetch_msg(HostType, Key, LUser, LServer, To) ->
189
:-(
try
190
:-(
{ok, Obj} = mongoose_riak:get(bucket_type(HostType, LServer), Key),
191
192
:-(
PacketRaw = riakc_obj:get_value(Obj),
193
:-(
{ok, Packet} = exml:parse(PacketRaw),
194
:-(
MD = riakc_obj:get_update_metadata(Obj),
195
:-(
[Timestamp] = riakc_obj:get_secondary_index(MD, ?TIMESTAMP_IDX),
196
:-(
From = riakc_obj:get_user_metadata_entry(MD, <<"from">>),
197
:-(
[Expire] = riakc_obj:get_secondary_index(MD, ?EXPIRE_IDX),
198
199
:-(
#offline_msg{us = {LUser, LServer},
200 timestamp = Timestamp,
201 expire = maybe_decode_timestamp(Expire),
202 from = jid:from_binary(From),
203 to = To,
204 packet = Packet}
205
206 catch
207 Class:Reason:StackTrace ->
208
:-(
?LOG_WARNING(#{what => offline_riak_reading_key_failed,
209 text => <<"mod_offline_riak failed to read key">>,
210 server => LServer, user => LUser, riak_key => Key,
211
:-(
class => Class, reason => Reason, stacktrace => StackTrace}),
212
:-(
[]
213 end.
214
Line Hits Source