1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mod_offline_riak.erl |
3 |
|
%%% Author : Michal Piotrowski <michal.piotrowski@erlang-solutions.com> |
4 |
|
%%% Purpose : mod_offline backend in Riak |
5 |
|
%%% |
6 |
|
%%% |
7 |
|
%%% MongooseIM, Copyright (C) 2015 Erlang Solutions Ltd. |
8 |
|
%%% |
9 |
|
%%%---------------------------------------------------------------------- |
10 |
|
|
11 |
|
%%% @doc Riak backend for last activity XEP |
12 |
|
%%% |
13 |
|
%%% The backend uses the existing riak connection pool, which is "globally" defined in |
14 |
|
%%% the mongooseim.toml file. Therefore, we don't need to start anything in the init |
15 |
|
%%% function. |
16 |
|
%%% |
17 |
|
%%% The module follows the approach taken by the other riak backends - it creates |
18 |
|
%%% the following bucket `{<<"offline">>, <<"example.com">>}' for each xmpp domain. |
19 |
|
%%% |
20 |
|
%%% ``` |
21 |
|
%%% Data Layout: |
22 |
|
%%% KV: {Username_and_timestamp:binary, Status:binary} |
23 |
|
%%% 2i: [Username:binary, Timestamp:integer] |
24 |
|
%%% ''' |
25 |
|
%%% @end |
26 |
|
-module(mod_offline_riak). |
27 |
|
|
28 |
|
-behaviour(mod_offline_backend). |
29 |
|
|
30 |
|
-export([init/2]). |
31 |
|
-export([pop_messages/2]). |
32 |
|
-export([fetch_messages/2]). |
33 |
|
-export([write_messages/4]). |
34 |
|
-export([count_offline_messages/4]). |
35 |
|
-export([remove_expired_messages/2]). |
36 |
|
-export([remove_old_messages/3]). |
37 |
|
-export([remove_user/3]). |
38 |
|
|
39 |
|
-include("mongoose.hrl"). |
40 |
|
-include("jlib.hrl"). |
41 |
|
-include("mod_offline.hrl"). |
42 |
|
-include_lib("riakc/include/riakc.hrl"). |
43 |
|
|
44 |
|
-define(TIMESTAMP_IDX, {integer_index, "timestamp"}). |
45 |
|
-define(USER_IDX, {binary_index, "user"}). |
46 |
|
-define(EXPIRE_IDX, {integer_index, "expire"}). |
47 |
|
|
48 |
|
-define(INFINITY, 99999999999). %% Wed, 16 Nov 5138 09:46:39 GMT |
49 |
|
|
50 |
|
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
51 |
|
init(_Host, _Opts) -> |
52 |
:-( |
ok. |
53 |
|
|
54 |
|
-spec pop_messages(mongooseim:host_type(), jid:jid()) -> {ok, [mod_offline:msg()]}. |
55 |
|
pop_messages(HostType, To = #jid{luser = LUser, lserver = LServer}) -> |
56 |
:-( |
Keys = read_user_idx(HostType, LUser, LServer), |
57 |
:-( |
Msgs = [pop_msg(HostType, Key, LUser, LServer, To) || Key <- Keys], |
58 |
:-( |
{ok, lists:flatten(Msgs)}. |
59 |
|
|
60 |
|
-spec fetch_messages(mongooseim:host_type(), jid:jid()) -> {ok, [mod_offline:msg()]}. |
61 |
|
fetch_messages(HostType, To) -> |
62 |
:-( |
{LUser, LServer} = jid:to_lus(To), |
63 |
:-( |
Keys = read_user_idx(HostType, LUser, LServer), |
64 |
:-( |
{ok, [fetch_msg(HostType, Key, LUser, LServer, To) || Key <- Keys]}. |
65 |
|
|
66 |
|
-spec write_messages(mongooseim:host_type(), jid:luser(), jid:lserver(), [mod_offline:msg()]) -> ok. |
67 |
|
write_messages(HostType, LUser, LServer, Msgs) -> |
68 |
:-( |
[write_msg(HostType, LUser, LServer, Msg) || Msg <- Msgs], |
69 |
:-( |
ok. |
70 |
|
|
71 |
|
-spec count_offline_messages(mongooseim:host_type(), jid:luser(), jid:lserver(), |
72 |
|
mod_offline:msg_count()) -> |
73 |
|
mod_offline:msg_count(). |
74 |
|
count_offline_messages(HostType, LUser, LServer, MaxArchivedMsgs) -> |
75 |
:-( |
{ok, IdxResult} = mongoose_riak:get_index(bucket_type(HostType, LServer), ?USER_IDX, |
76 |
|
LUser, [{max_results, MaxArchivedMsgs}]), |
77 |
:-( |
length(IdxResult?INDEX_RESULTS.keys). |
78 |
|
|
79 |
|
-spec remove_expired_messages(mongooseim:host_type(), jid:lserver()) -> {ok, mod_offline:msg_count()}. |
80 |
|
remove_expired_messages(HostType, LServer) -> |
81 |
:-( |
TimestampInt = os:system_time(microsecond), |
82 |
:-( |
{ok, Result} = mongoose_riak:get_index_range(bucket_type(HostType, LServer), ?EXPIRE_IDX, |
83 |
|
0, TimestampInt, []), |
84 |
:-( |
Keys = Result?INDEX_RESULTS.keys, |
85 |
:-( |
[mongoose_riak:delete(bucket_type(HostType, LServer), Key) || Key <- Keys], |
86 |
:-( |
{ok, length(Keys)}. |
87 |
|
|
88 |
|
-spec remove_old_messages(mongooseim:host_type(), jid:lserver(), mod_offline:timestamp()) -> |
89 |
|
{ok, mod_offline:msg_count()}. |
90 |
|
remove_old_messages(HostType, LServer, TimestampInt) -> |
91 |
:-( |
{ok, Result} = mongoose_riak:get_index_range(bucket_type(HostType, LServer), ?TIMESTAMP_IDX, |
92 |
|
0, TimestampInt, []), |
93 |
:-( |
Keys = Result?INDEX_RESULTS.keys, |
94 |
:-( |
[mongoose_riak:delete(bucket_type(HostType, LServer), Key) || Key <- Keys], |
95 |
:-( |
{ok, length(Keys)}. |
96 |
|
|
97 |
|
-spec remove_user(mongooseim:host_type(), jid:luser(), jid:lserver()) -> ok. |
98 |
|
remove_user(HostType, LUser, LServer) -> |
99 |
:-( |
Keys = read_user_idx(HostType, LUser, LServer), |
100 |
:-( |
[mongoose_riak:delete(bucket_type(HostType, LServer), Key) || Key <- Keys], |
101 |
:-( |
ok. |
102 |
|
|
103 |
|
read_user_idx(HostType, LUser, LServer) -> |
104 |
:-( |
{ok, IdxResult} = mongoose_riak:get_index(bucket_type(HostType, LServer), ?USER_IDX, |
105 |
|
LUser, []), |
106 |
:-( |
IdxResult?INDEX_RESULTS.keys. |
107 |
|
|
108 |
|
write_msg(HostType, LUser, LServer, #offline_msg{from = FromJID, packet = Packet, |
109 |
|
timestamp = TimestampIn, expire = Expire, |
110 |
|
permanent_fields = PermanentFields}) -> |
111 |
:-( |
Timestamp = TimestampIn, |
112 |
:-( |
Obj = riakc_obj:new(bucket_type(HostType, LServer), key(LUser, Timestamp), exml:to_binary(Packet)), |
113 |
:-( |
MD = riakc_obj:get_update_metadata(Obj), |
114 |
:-( |
SecondaryIndexes = [{?TIMESTAMP_IDX, [Timestamp]}, |
115 |
|
{?USER_IDX, [LUser]}, |
116 |
|
{?EXPIRE_IDX, [maybe_encode_timestamp(Expire)]}], |
117 |
:-( |
MDWithIndexes = riakc_obj:set_secondary_index(MD, SecondaryIndexes), |
118 |
:-( |
From = jid:to_binary(FromJID), |
119 |
:-( |
UserMetaData = [{<<"from">>, From}, |
120 |
|
{<<"permanent_fields">>, term_to_binary(PermanentFields)}], |
121 |
:-( |
MDWithUserData = set_obj_user_metadata(MDWithIndexes, UserMetaData), |
122 |
:-( |
FinalObj = riakc_obj:update_metadata(Obj, MDWithUserData), |
123 |
:-( |
mongoose_riak:put(FinalObj). |
124 |
|
|
125 |
|
set_obj_user_metadata(MD, []) -> |
126 |
:-( |
MD; |
127 |
|
set_obj_user_metadata(MD, [UserMD | Rest]) -> |
128 |
:-( |
UpdatedMD = riakc_obj:set_user_metadata_entry(MD, UserMD), |
129 |
:-( |
set_obj_user_metadata(UpdatedMD, Rest). |
130 |
|
|
131 |
|
pop_msg(HostType, Key, LUser, LServer, To) -> |
132 |
:-( |
try |
133 |
:-( |
{ok, Obj} = mongoose_riak:get(bucket_type(HostType, LServer), Key), |
134 |
|
|
135 |
:-( |
PacketRaw = riakc_obj:get_value(Obj), |
136 |
:-( |
{ok, Packet} = exml:parse(PacketRaw), |
137 |
:-( |
MD = riakc_obj:get_update_metadata(Obj), |
138 |
:-( |
[Timestamp] = riakc_obj:get_secondary_index(MD, ?TIMESTAMP_IDX), |
139 |
:-( |
[Expire] = riakc_obj:get_secondary_index(MD, ?EXPIRE_IDX), |
140 |
:-( |
From = riakc_obj:get_user_metadata_entry(MD, <<"from">>), |
141 |
:-( |
PermanentFields = extract_permanent_fields(MD), |
142 |
|
|
143 |
:-( |
mongoose_riak:delete(bucket_type(HostType, LServer), Key), |
144 |
|
|
145 |
:-( |
#offline_msg{us = {LUser, LServer}, |
146 |
|
timestamp = Timestamp, |
147 |
|
expire = maybe_decode_timestamp(Expire), |
148 |
|
from = jid:from_binary(From), |
149 |
|
to = To, |
150 |
|
packet = Packet, |
151 |
|
permanent_fields = PermanentFields} |
152 |
|
|
153 |
|
catch |
154 |
|
Class:Reason:StackTrace -> |
155 |
:-( |
?LOG_WARNING(#{what => offline_riak_reading_key_failed, |
156 |
|
text => <<"mod_offline_riak failed to read key">>, |
157 |
|
server => LServer, user => LUser, riak_key => Key, |
158 |
:-( |
class => Class, reason => Reason, stacktrace => StackTrace}), |
159 |
:-( |
[] |
160 |
|
end. |
161 |
|
|
162 |
|
extract_permanent_fields(MD) -> |
163 |
:-( |
case riakc_obj:get_user_metadata_entry(MD, <<"permanent_fields">>) of |
164 |
:-( |
notfound -> []; |
165 |
:-( |
Fields -> binary_to_term(Fields) |
166 |
|
end. |
167 |
|
|
168 |
|
-spec bucket_type(mongooseim:host_type(), jid:lserver()) -> {binary(), jid:lserver()}. |
169 |
|
bucket_type(HostType, LServer) -> |
170 |
:-( |
{gen_mod:get_module_opt(LServer, mod_offline, [riak, bucket_type]), HostType}. |
171 |
|
|
172 |
|
-spec key(binary(), integer()) -> binary(). |
173 |
|
key(LUser, TimestampInt) -> |
174 |
:-( |
Timestamp = integer_to_binary(TimestampInt), |
175 |
:-( |
Random = integer_to_binary(rand:uniform(1024)), |
176 |
:-( |
<<LUser/binary, "@", Timestamp/binary, "@", Random/binary>>. |
177 |
|
|
178 |
|
maybe_encode_timestamp(never) -> |
179 |
:-( |
?INFINITY; |
180 |
|
maybe_encode_timestamp(TS) -> |
181 |
:-( |
TS. |
182 |
|
|
183 |
|
maybe_decode_timestamp(?INFINITY) -> |
184 |
:-( |
never; |
185 |
|
maybe_decode_timestamp(TS) -> |
186 |
:-( |
TS. |
187 |
|
|
188 |
|
fetch_msg(HostType, Key, LUser, LServer, To) -> |
189 |
:-( |
try |
190 |
:-( |
{ok, Obj} = mongoose_riak:get(bucket_type(HostType, LServer), Key), |
191 |
|
|
192 |
:-( |
PacketRaw = riakc_obj:get_value(Obj), |
193 |
:-( |
{ok, Packet} = exml:parse(PacketRaw), |
194 |
:-( |
MD = riakc_obj:get_update_metadata(Obj), |
195 |
:-( |
[Timestamp] = riakc_obj:get_secondary_index(MD, ?TIMESTAMP_IDX), |
196 |
:-( |
From = riakc_obj:get_user_metadata_entry(MD, <<"from">>), |
197 |
:-( |
[Expire] = riakc_obj:get_secondary_index(MD, ?EXPIRE_IDX), |
198 |
|
|
199 |
:-( |
#offline_msg{us = {LUser, LServer}, |
200 |
|
timestamp = Timestamp, |
201 |
|
expire = maybe_decode_timestamp(Expire), |
202 |
|
from = jid:from_binary(From), |
203 |
|
to = To, |
204 |
|
packet = Packet} |
205 |
|
|
206 |
|
catch |
207 |
|
Class:Reason:StackTrace -> |
208 |
:-( |
?LOG_WARNING(#{what => offline_riak_reading_key_failed, |
209 |
|
text => <<"mod_offline_riak failed to read key">>, |
210 |
|
server => LServer, user => LUser, riak_key => Key, |
211 |
:-( |
class => Class, reason => Reason, stacktrace => StackTrace}), |
212 |
:-( |
[] |
213 |
|
end. |
214 |
|
|