1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mod_last.erl |
3 |
|
%%% Author : Pawel Pikula <pawel.pikula@erlang-solutions.com> |
4 |
|
%%% Purpose : mod_last riak backend (XEP-0012) |
5 |
|
%%% |
6 |
|
%%% |
7 |
|
%%% ejabberd, Copyright (C) 2002-2014 ProcessOne |
8 |
|
%%% MongooseIM, Copyright (C) 2014 Erlang Solutions Ltd. |
9 |
|
%%% |
10 |
|
%%%---------------------------------------------------------------------- |
11 |
|
|
12 |
|
%%% @doc Riak backend for last activity XEP |
13 |
|
%%% |
14 |
|
%%% The backend uses the existing riak connection pool, which is "globally" defined in |
15 |
|
%%% the mongooseim.toml file. Therefore, we don't need to start anything in the init |
16 |
|
%%% function. |
17 |
|
%%% |
18 |
|
%%% The module follows the approach taken by the other riak backends - it creates |
19 |
|
%%% the following bucket `{<<"last">>, <<"example.com">>}' for each xmpp domain. |
20 |
|
%%% The `<<"last">>' bucket type has the following props: `{"props":{"last_write_wins":true}}' |
21 |
|
%%% |
22 |
|
%%% Basically, there are 3 operations the backend need to provide set/get and count |
23 |
|
%%% the number of users which has been active since given date. However, the count |
24 |
|
%%% query can be only called using `mongooseimctl'. |
25 |
|
%%% |
26 |
|
%%% Username is used as a key and the value is just a user status, which is just |
27 |
|
%%% a binary. Moreover, we have a secondary integer index on the timestamp, which |
28 |
|
%%% is used in the count active users query. |
29 |
|
%%% |
30 |
|
%%% ``` |
31 |
|
%%% Data Layout: |
32 |
|
%%% KV: {Username:binary, Status:binary} |
33 |
|
%%% 2i: [Timestamp:integer] |
34 |
|
%%% ''' |
35 |
|
%%% |
36 |
|
%%% Set/Get are rather simple operations - they map to riak's put/get functions. |
37 |
|
%%% The count query uses mapred task to calculate the number of users satisfying |
38 |
|
%%% the criteria. As the input timestamp index range is used. The approach is |
39 |
|
%%% described here: http://docs.basho.com/riak/latest/dev/using/2i/#Querying |
40 |
|
%%% in the *Count Bucket Objects via $bucket Index* section. The alternative approach |
41 |
|
%%% could be the "riakc_pb_socket:get_index_range" function |
42 |
|
%%% |
43 |
|
%%% The pros and cons of the both approaches have been discussed here: |
44 |
|
%%% https://github.com/esl/MongooseIM/pull/567 |
45 |
|
%%% @end |
46 |
|
-module(mod_last_riak). |
47 |
|
|
48 |
|
-behaviour(mod_last_backend). |
49 |
|
|
50 |
|
-include("mongoose.hrl"). |
51 |
|
|
52 |
|
-define(TIMESTAMP_IDX, {integer_index, "timestamp"}). |
53 |
|
|
54 |
|
%% API |
55 |
|
-export([init/2, |
56 |
|
get_last/3, |
57 |
|
count_active_users/3, |
58 |
|
set_last_info/5, |
59 |
|
remove_user/3, |
60 |
|
remove_domain/2]). |
61 |
|
|
62 |
|
-type host_type() :: mongooseim:host_type(). |
63 |
|
|
64 |
|
-spec init(host_type(), gen_mod:module_opts()) -> ok. |
65 |
|
init(_VHost, _Opts) -> |
66 |
|
%% we are using common riak pool |
67 |
:-( |
ok. |
68 |
|
|
69 |
|
-spec get_last(host_type(), jid:luser(), jid:lserver()) -> |
70 |
|
{ok, mod_last:timestamp(), mod_last:status()} | {error, term()} | not_found. |
71 |
|
get_last(HostType, LUser, LServer) -> |
72 |
:-( |
case mongoose_riak:get(bucket_type(HostType, LServer), LUser) of |
73 |
|
{ok, Obj} -> |
74 |
:-( |
Status = riakc_obj:get_value(Obj), |
75 |
:-( |
MD = riakc_obj:get_update_metadata(Obj), |
76 |
:-( |
[Timestamp] = riakc_obj:get_secondary_index(MD, ?TIMESTAMP_IDX), |
77 |
:-( |
{ok, Timestamp, Status}; |
78 |
|
{error, notfound} -> |
79 |
:-( |
not_found; |
80 |
|
{error, Reason} -> |
81 |
:-( |
{error, Reason} |
82 |
|
end. |
83 |
|
|
84 |
|
-spec count_active_users(host_type(), jid:lserver(), mod_last:timestamp()) -> non_neg_integer(). |
85 |
|
count_active_users(HostType, LServer, TimeStamp) -> |
86 |
:-( |
Idx = {index, bucket_type(HostType, LServer), ?TIMESTAMP_IDX, TimeStamp+1, infinity()}, |
87 |
:-( |
RedMF = {modfun, riak_kv_mapreduce, reduce_count_inputs}, |
88 |
:-( |
Red = [{reduce, RedMF, [{reduce_phase_batch_size, 1000}], true}], |
89 |
:-( |
{ok, [{0, [Count]}]} = mongoose_riak:mapred(Idx, Red), |
90 |
:-( |
Count. |
91 |
|
|
92 |
|
-spec set_last_info(host_type(), jid:luser(), jid:lserver(), |
93 |
|
mod_last:timestamp(), mod_last:status()) -> |
94 |
|
ok | {error, term()}. |
95 |
|
set_last_info(HostType, LUser, LServer, Timestamp, Status) -> |
96 |
:-( |
Obj = riakc_obj:new(bucket_type(HostType, LServer), LUser, Status), |
97 |
:-( |
MD = riakc_obj:get_update_metadata(Obj), |
98 |
:-( |
MDWithIndex = riakc_obj:set_secondary_index(MD, [{?TIMESTAMP_IDX, [Timestamp]}]), |
99 |
:-( |
FinalObj = riakc_obj:update_metadata(Obj, MDWithIndex), |
100 |
|
|
101 |
:-( |
mongoose_riak:put(FinalObj). |
102 |
|
|
103 |
|
-spec remove_user(host_type(), jid:luser(), jid:lserver()) -> ok | {error, term()}. |
104 |
|
remove_user(HostType, LUser, LServer) -> |
105 |
:-( |
mongoose_riak:delete(bucket_type(HostType, LServer), LUser). |
106 |
|
|
107 |
|
% Implementation only for RDBMS backends |
108 |
|
-spec remove_domain(host_type(), jid:lserver()) -> ok. |
109 |
|
remove_domain(_HostType, _Domain) -> |
110 |
:-( |
ok. |
111 |
|
|
112 |
|
-spec bucket_type(host_type(), jid:lserver()) -> riakc_obj:bucket(). |
113 |
|
bucket_type(HostType, LServer) -> |
114 |
:-( |
{gen_mod:get_module_opt(HostType, mod_last, bucket_type, <<"last">>), LServer}. |
115 |
|
|
116 |
|
-spec infinity() -> non_neg_integer(). |
117 |
|
infinity() -> |
118 |
|
% almost infinity ... Wed, 16 Nov 5138 09:46:39 GMT |
119 |
|
% I think Mongoose won't live so long ;) |
120 |
:-( |
99999999999. |