1 |
|
-module(mod_pubsub_cache_rdbms). |
2 |
|
|
3 |
|
-behaviour(mod_pubsub_cache_backend). |
4 |
|
|
5 |
|
-include("pubsub.hrl"). |
6 |
|
-include("jlib.hrl"). |
7 |
|
-include("mongoose_logger.hrl"). |
8 |
|
|
9 |
|
-export([start/1, stop/0]). |
10 |
|
|
11 |
|
-export([ |
12 |
|
upsert_last_item/5, |
13 |
|
delete_last_item/2, |
14 |
|
get_last_item/2]). |
15 |
|
%% ------------------------ Backend start/stop ------------------------ |
16 |
|
|
17 |
|
-spec start(jid:lserver()) -> ok. |
18 |
|
start(Host) -> |
19 |
3 |
InsertFields = [<<"nidx">>, <<"itemid">>, <<"created_luser">>, |
20 |
|
<<"created_lserver">>, <<"created_at">>, <<"payload">>], |
21 |
3 |
UpdateFields = [<<"itemid">>, <<"created_luser">>, <<"created_lserver">>, |
22 |
|
<<"created_at">>, <<"payload">>], |
23 |
3 |
mongoose_rdbms:prepare(pubsub_get_last_item, pubsub_last_item, [nidx], |
24 |
|
<<"SELECT nidx, itemid, created_luser, created_at, created_lserver, payload " |
25 |
|
"FROM pubsub_last_item WHERE nidx = ?">>), |
26 |
3 |
mongoose_rdbms:prepare(pubsub_delete_last_item, pubsub_last_item, [nidx], |
27 |
|
<<"DELETE FROM pubsub_last_item WHERE nidx = ?">>), |
28 |
3 |
rdbms_queries:prepare_upsert(Host, pubsub_last_item_upsert, pubsub_last_item, |
29 |
|
InsertFields, |
30 |
|
UpdateFields, |
31 |
|
[<<"nidx">>]), |
32 |
3 |
ok. |
33 |
|
|
34 |
|
-spec stop() -> ok. |
35 |
:-( |
stop() -> ok. |
36 |
|
|
37 |
|
%% ------------------- Pubusub last item ------------------------------ |
38 |
|
|
39 |
|
-spec upsert_last_item(ServerHost :: jid:lserver(), |
40 |
|
Nidx :: mod_pubsub:nodeIdx(), |
41 |
|
ItemID :: mod_pubsub:itemId(), |
42 |
|
Publisher :: jid:jid(), |
43 |
|
Payload :: mod_pubsub:payload()) -> ok | {error, Reason :: term()}. |
44 |
|
upsert_last_item(ServerHost, Nidx, ItemID, Publisher, Payload) -> |
45 |
14 |
{ModifiedLUser, ModifiedLServer, |
46 |
|
CreatedAt, PayloadBin} = prepare_upsert_params(Publisher, Payload), |
47 |
14 |
UpdateParams = [ItemID, ModifiedLUser, ModifiedLServer, CreatedAt, PayloadBin], |
48 |
14 |
InsertParams = [Nidx | UpdateParams], |
49 |
14 |
UniqueKeyValues = [Nidx], |
50 |
14 |
Res = rdbms_queries:execute_upsert(ServerHost, pubsub_last_item_upsert, |
51 |
|
InsertParams, UpdateParams, UniqueKeyValues), |
52 |
14 |
convert_rdbms_response(Res). |
53 |
|
|
54 |
|
-spec delete_last_item(ServerHost :: binary(), |
55 |
|
Nidx :: mod_pubsub:nodeIdx()) -> ok | {error, Reason :: term()}. |
56 |
|
delete_last_item(ServerHost, Nidx) -> |
57 |
2 |
Res = mongoose_rdbms:execute_successfully(ServerHost, pubsub_delete_last_item, [Nidx]), |
58 |
2 |
convert_rdbms_response(Res). |
59 |
|
|
60 |
|
-spec get_last_item(ServerHost :: binary(), |
61 |
|
Nidx :: mod_pubsub:nodeIdx()) -> |
62 |
|
{ok, LastItem :: mod_pubsub:pubsubLastItem()} | {error, Reason :: term()}. |
63 |
|
get_last_item(ServerHost, Nidx) -> |
64 |
8 |
Res = mongoose_rdbms:execute_successfully(ServerHost, pubsub_get_last_item, [Nidx]), |
65 |
8 |
convert_rdbms_response(Res). |
66 |
|
|
67 |
|
%%==================================================================== |
68 |
|
%% Helpers |
69 |
|
%%==================================================================== |
70 |
|
|
71 |
|
convert_rdbms_response({selected, []}) -> |
72 |
3 |
{error, no_items}; |
73 |
|
convert_rdbms_response({selected, [SelectedItem]}) -> |
74 |
5 |
LastItem = item_to_record(SelectedItem), |
75 |
5 |
{ok, LastItem}; |
76 |
|
convert_rdbms_response({updated, _}) -> |
77 |
16 |
ok; |
78 |
|
convert_rdbms_response(Response) -> |
79 |
:-( |
?LOG_ERROR(#{what => pubsub_rdbms_cache_failed, reason => Response}), |
80 |
:-( |
{error, pubsub_rdbms_cache_failed}. |
81 |
|
|
82 |
|
prepare_upsert_params(Publisher, Payload) -> |
83 |
14 |
PayloadXML = #xmlel{name = <<"item">>, children = Payload}, |
84 |
14 |
CreatedAt = os:system_time(microsecond), |
85 |
14 |
BinaryPayload = exml:to_binary(PayloadXML), |
86 |
14 |
ModifiedLUser = Publisher#jid.luser, |
87 |
14 |
ModifiedLServer = Publisher#jid.lserver, |
88 |
14 |
{ |
89 |
|
ModifiedLUser, ModifiedLServer, |
90 |
|
CreatedAt, BinaryPayload |
91 |
|
}. |
92 |
|
|
93 |
|
item_to_record({NodeIdx, ItemId, CreatedLUser, CreatedAt, CreatedLServer, PayloadDB}) -> |
94 |
5 |
PayloadXML = mongoose_rdbms:unescape_binary(global, PayloadDB), |
95 |
5 |
{ok, #xmlel{children = Payload}} = exml:parse(PayloadXML), |
96 |
5 |
Creation = {mongoose_rdbms:result_to_integer(CreatedAt), |
97 |
|
{CreatedLUser, CreatedLServer, <<>>}}, |
98 |
5 |
#pubsub_last_item{itemid = ItemId, |
99 |
|
nodeid = NodeIdx, |
100 |
|
creation = Creation, |
101 |
|
payload = Payload}. |