./ct_report/coverage/mam_lookup.COVER.html

1 %% RSM logic lives here
2 -module(mam_lookup).
3 -export([lookup/3]).
4
5 -include("mongoose.hrl").
6 -include("jlib.hrl").
7 -include("mongoose_rsm.hrl").
8
9 -type filter() :: mam_filter:filter().
10 -type env_vars() :: mod_mam_rdbms_arch:env_vars().
11 -type params() :: map().
12 -type message_id() :: mod_mam:message_id().
13 -type maybe_rsm() :: #rsm_in{} | undefined.
14 -type opt_count_type() :: last_page | by_offset | none.
15
16 %% Public logic
17 %% We use two fields from Env:
18 %% - lookup_fn
19 %% - decode_row_fn
20 -spec lookup(env_vars(), filter(), params()) ->
21 {ok, mod_mam:lookup_result()} | {error, item_not_found}.
22 lookup(Env = #{}, Filter, Params = #{rsm := RSM}) when is_list(Filter) ->
23 1161 OptParams = Params#{opt_count_type => opt_count_type(RSM)},
24 1161 choose_lookup_messages_strategy(Env, Filter, OptParams).
25
26 lookup_query(QueryType, #{lookup_fn := LookupF} = Env, Filters, Order, OffsetLimit) ->
27 1679 LookupF(QueryType, Env, Filters, Order, OffsetLimit).
28
29 decode_row(Row, #{decode_row_fn := DecodeF} = Env) ->
30 4913 DecodeF(Row, Env).
31
32 %% Private logic below
33
34 %% There are no optimizations for these queries yet:
35 %% - #rsm_in{direction = aft, id = ID}
36 %% - #rsm_in{direction = before, id = ID}
37 -spec opt_count_type(RSM :: maybe_rsm()) -> opt_count_type().
38 opt_count_type(#rsm_in{direction = before, id = undefined}) ->
39 173 last_page; %% last page is supported
40 opt_count_type(#rsm_in{direction = undefined}) ->
41 231 by_offset; %% offset
42 opt_count_type(undefined) ->
43 575 by_offset; %% no RSM
44 opt_count_type(_) ->
45 182 none. %% id field is defined in RSM
46
47 %% There are several strategies how to extract messages:
48 %% - we can use regular query that requires counting;
49 %% - we can reduce number of queries if we skip counting for small data sets;
50 %% - sometimes we want not to count at all
51 %% (for example, our client side counts ones and keep the information)
52 choose_lookup_messages_strategy(Env, Filter,
53 Params = #{rsm := RSM, page_size := PageSize}) ->
54 1161 case Params of
55 #{is_simple := true} ->
56 %% Simple query without calculating offset and total count
57 61 simple_lookup_messages(Env, RSM, PageSize, Filter);
58 %% NOTICE: We always prefer opt_count optimization, if possible.
59 %% Clients don't event know what opt_count is.
60 #{opt_count_type := last_page} when PageSize > 0 ->
61 %% Extract messages before calculating offset and total count
62 %% Useful for small result sets
63 112 lookup_last_page(Env, PageSize, Filter);
64 #{opt_count_type := by_offset} when PageSize > 0 ->
65 %% Extract messages before calculating offset and total count
66 %% Useful for small result sets
67 743 lookup_by_offset(Env, RSM, PageSize, Filter);
68 _ ->
69 %% Calculate offset and total count first before extracting messages
70 245 lookup_messages_regular(Env, RSM, PageSize, Filter)
71 end.
72
73 %% Just extract messages without count and offset information
74 simple_lookup_messages(Env, RSM, PageSize, Filter) ->
75 61 {Filter2, Offset, Order} = rsm_to_filter(RSM, Filter),
76 61 Messages = extract_messages(Env, Filter2, Offset, PageSize, Order),
77 61 {ok, {undefined, undefined, Messages}}.
78
79 rsm_to_filter(RSM, Filter) ->
80 61 case RSM of
81 %% Get last rows from result set
82 #rsm_in{direction = aft, id = ID} ->
83
:-(
{after_id(ID, Filter), 0, asc};
84 #rsm_in{direction = before, id = undefined} ->
85 40 {Filter, 0, desc};
86 #rsm_in{direction = before, id = ID} ->
87 21 {before_id(ID, Filter), 0, desc};
88 #rsm_in{direction = undefined, index = Index} ->
89
:-(
{Filter, Index, asc};
90 undefined ->
91
:-(
{Filter, 0, asc}
92 end.
93
94 %% This function handles case: #rsm_in{direction = before, id = undefined}
95 %% Assumes assert_rsm_without_id(RSM)
96 lookup_last_page(Env, PageSize, Filter) ->
97 112 Messages = extract_messages(Env, Filter, 0, PageSize, desc),
98 112 Selected = length(Messages),
99 112 Offset =
100 case Selected < PageSize of
101 true ->
102 21 0; %% Result fits on a single page
103 false ->
104 91 FirstID = decoded_row_to_message_id(hd(Messages)),
105 91 calc_count(Env, before_id(FirstID, Filter))
106 end,
107 112 {ok, {Offset + Selected, Offset, Messages}}.
108
109 lookup_by_offset(Env, RSM, PageSize, Filter) ->
110 743 assert_rsm_without_id(RSM),
111 743 Offset = rsm_to_index(RSM),
112 743 Messages = extract_messages(Env, Filter, Offset, PageSize, asc),
113 743 Selected = length(Messages),
114 743 TotalCount =
115 case Selected < PageSize of
116 true ->
117 638 Offset + Selected; %% Result fits on a single page
118 false ->
119 105 LastID = decoded_row_to_message_id(lists:last(Messages)),
120 105 CountAfterLastID = calc_count(Env, after_id(LastID, Filter)),
121 105 Offset + Selected + CountAfterLastID
122 end,
123 743 {ok, {TotalCount, Offset, Messages}}.
124
125 575 assert_rsm_without_id(undefined) -> ok;
126 168 assert_rsm_without_id(#rsm_in{id = undefined}) -> ok.
127
128 rsm_to_index(#rsm_in{direction = undefined, index = Offset})
129 63 when is_integer(Offset) -> Offset;
130 680 rsm_to_index(_) -> 0.
131
132 lookup_messages_regular(Env, RSM, PageSize, Filter) ->
133 245 TotalCount = calc_count(Env, Filter),
134 245 Offset = calc_offset(Env, Filter, PageSize, TotalCount, RSM),
135 245 {LookupById, Filter2, Offset2, PageSize2, Order} =
136 rsm_to_regular_lookup_vars(RSM, Filter, Offset, PageSize),
137 245 Messages = extract_messages(Env, Filter2, Offset2, PageSize2, Order),
138 245 Result = {TotalCount, Offset, Messages},
139 245 case LookupById of
140 true -> %% check if we've selected a message with #rsm_in.id
141 161 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result);
142 false ->
143 84 {ok, Result}
144 end.
145
146 rsm_to_regular_lookup_vars(RSM, Filter, Offset, PageSize) ->
147 245 case RSM of
148 #rsm_in{direction = aft, id = ID} when ID =/= undefined ->
149 %% Set extra flag when selecting PageSize + 1 messages
150 91 {true, from_id(ID, Filter), 0, PageSize + 1, asc};
151 #rsm_in{direction = before, id = ID} when ID =/= undefined ->
152 70 {true, to_id(ID, Filter), 0, PageSize + 1, desc};
153 _ ->
154 84 {false, Filter, Offset, PageSize, asc}
155 end.
156
157 decode_rows(MessageRows, Env) ->
158 1077 [decode_row(Row, Env) || Row <- MessageRows].
159
160 -spec decoded_row_to_message_id(mod_mam:message_row()) -> mod_mam:message_id().
161 196 decoded_row_to_message_id(#{id := MessId}) -> MessId.
162
163 -spec extract_messages(Env :: env_vars(),
164 Filter :: filter(), Offset :: non_neg_integer(), Max :: pos_integer(),
165 Order :: asc | desc) -> [mod_mam:message_row()].
166 extract_messages(_Env, _Filter, _Offset, 0 = _Max, _Order) ->
167 84 [];
168 extract_messages(Env, Filter, Offset, Max, Order) ->
169 1077 {selected, MessageRows} = extract_rows(Env, Filter, Offset, Max, Order),
170 1077 Rows = maybe_reverse(Order, MessageRows),
171 1077 decode_rows(Rows, Env).
172
173 834 maybe_reverse(asc, List) -> List;
174 243 maybe_reverse(desc, List) -> lists:reverse(List).
175
176 extract_rows(Env, Filters, Offset, Max, Order) ->
177 1077 lookup_query(lookup, Env, Filters, Order, {Offset, Max}).
178
179 %% @doc Get the total result set size.
180 %% SELECT COUNT(*) as count FROM mam_message
181 -spec calc_count(env_vars(), filter()) -> non_neg_integer().
182 calc_count(Env, Filter) ->
183 602 Result = lookup_query(count, Env, Filter, unordered, all),
184 602 mongoose_rdbms:selected_to_integer(Result).
185
186 %% @doc Calculate a zero-based index of the row with UID in the result test.
187 %%
188 %% If the element does not exists, the ID of the next element will
189 %% be returned instead.
190 %% @end
191 %% SELECT COUNT(*) as index FROM mam_message WHERE id <= ?
192 -spec calc_index(env_vars(), filter(), message_id()) -> non_neg_integer().
193 calc_index(Env, Filter, ID) ->
194 91 calc_count(Env, to_id(ID, Filter)).
195
196 %% @doc Count of elements in RSet before the passed element.
197 %%
198 %% The element with the passed UID can be already deleted.
199 %% @end
200 %% SELECT COUNT(*) as count FROM mam_message WHERE id < ?
201 -spec calc_before(env_vars(), filter(), message_id()) -> non_neg_integer().
202 calc_before(Env, Filter, ID) ->
203 70 calc_count(Env, before_id(ID, Filter)).
204
205 -spec calc_offset(Env :: env_vars(),
206 Filter :: filter(), PageSize :: non_neg_integer(),
207 TotalCount :: non_neg_integer(), RSM :: jlib:rsm_in()) -> non_neg_integer().
208 calc_offset(Env, Filter, PageSize, TotalCount, RSM) ->
209 245 case RSM of
210 #rsm_in{direction = undefined, index = Index} when is_integer(Index) ->
211 21 Index;
212 #rsm_in{direction = before, id = undefined} ->
213 %% Requesting the Last Page in a Result Set
214 21 max(0, TotalCount - PageSize);
215 #rsm_in{direction = before, id = ID} when is_integer(ID) ->
216 70 max(0, calc_before(Env, Filter, ID) - PageSize);
217 #rsm_in{direction = aft, id = ID} when is_integer(ID) ->
218 91 calc_index(Env, Filter, ID);
219 _ ->
220 42 0
221 end.
222
223 -spec after_id(message_id(), filter()) -> filter().
224 after_id(ID, Filter) ->
225 105 [{greater, id, ID}|Filter].
226
227 -spec before_id(message_id(), filter()) -> filter().
228 before_id(ID, Filter) ->
229 182 [{less, id, ID}|Filter].
230
231 -spec from_id(message_id(), filter()) -> filter().
232 from_id(ID, Filter) ->
233 91 [{ge, id, ID}|Filter].
234
235 -spec to_id(message_id(), filter()) -> filter().
236 to_id(ID, Filter) ->
237 161 [{le, id, ID}|Filter].
Line Hits Source