./ct_report/coverage/mam_lookup.COVER.html

1 %% RSM logic lives here
2 -module(mam_lookup).
3 -export([lookup/3]).
4
5 -include("mongoose.hrl").
6 -include("jlib.hrl").
7 -include("mongoose_rsm.hrl").
8
9 -type filter() :: mam_filter:filter().
10 -type env_vars() :: mod_mam_rdbms_arch:env_vars().
11 -type params() :: map().
12 -type message_id() :: mod_mam:message_id().
13 -type maybe_rsm() :: #rsm_in{} | undefined.
14 -type opt_count_type() :: last_page | by_offset | none.
15
16 %% Public logic
17 %% We use two fields from Env:
18 %% - lookup_fn
19 %% - decode_row_fn
20 -spec lookup(env_vars(), filter(), params()) ->
21 {ok, mod_mam:lookup_result()} | {error, item_not_found}.
22 lookup(Env = #{}, Filter, Params = #{rsm := RSM}) when is_list(Filter) ->
23 1548 OptParams = Params#{opt_count_type => opt_count_type(RSM)},
24 1548 choose_lookup_messages_strategy(Env, Filter, OptParams).
25
26 lookup_query(QueryType, #{lookup_fn := LookupF} = Env, Filters, Order, OffsetLimit) ->
27 2087 LookupF(QueryType, Env, Filters, Order, OffsetLimit).
28
29 decode_row(Row, #{decode_row_fn := DecodeF} = Env) ->
30 4964 DecodeF(Row, Env).
31
32 %% Private logic below
33
34 %% There are no optimizations for these queries yet:
35 %% - #rsm_in{direction = aft, id = ID}
36 %% - #rsm_in{direction = before, id = ID}
37 -spec opt_count_type(RSM :: maybe_rsm()) -> opt_count_type().
38 opt_count_type(#rsm_in{direction = before, id = undefined}) ->
39 231 last_page; %% last page is supported
40 opt_count_type(#rsm_in{direction = undefined}) ->
41 189 by_offset; %% offset
42 opt_count_type(undefined) ->
43 771 by_offset; %% no RSM
44 opt_count_type(_) ->
45 357 none. %% id field is defined in RSM
46
47 %% There are several strategies how to extract messages:
48 %% - we can use regular query that requires counting;
49 %% - we can reduce number of queries if we skip counting for small data sets;
50 %% - sometimes we want not to count at all
51 %% (for example, our client side counts ones and keep the information)
52 choose_lookup_messages_strategy(Env, Filter,
53 Params = #{rsm := RSM, page_size := PageSize}) ->
54 1548 case Params of
55 #{is_simple := true} ->
56 %% Simple query without calculating offset and total count
57 343 simple_lookup_messages(Env, RSM, PageSize, Filter);
58 #{opt_count_type := last_page} when PageSize > 0 ->
59 %% Extract messages before calculating offset and total count
60 %% Useful for small result sets
61 112 lookup_last_page(Env, PageSize, Filter);
62 #{opt_count_type := by_offset} when PageSize > 0 ->
63 %% Extract messages before calculating offset and total count
64 %% Useful for small result sets
65 848 lookup_by_offset(Env, RSM, PageSize, Filter);
66 _ ->
67 %% Calculate offset and total count first before extracting messages
68 245 lookup_messages_regular(Env, RSM, PageSize, Filter)
69 end.
70
71 %% Just extract messages without count and offset information
72 simple_lookup_messages(Env, RSM, PageSize, Filter) ->
73 343 {Filter2, Offset, Order} = rsm_to_filter(RSM, Filter),
74 343 Messages = extract_messages(Env, Filter2, Offset, PageSize, Order),
75 343 {ok, {undefined, undefined, Messages}}.
76
77 rsm_to_filter(RSM, Filter) ->
78 343 case RSM of
79 %% Get last rows from result set
80 #rsm_in{direction = aft, id = ID} ->
81 91 {after_id(ID, Filter), 0, asc};
82 #rsm_in{direction = before, id = undefined} ->
83 98 {Filter, 0, desc};
84 #rsm_in{direction = before, id = ID} ->
85 105 {before_id(ID, Filter), 0, desc};
86 #rsm_in{direction = undefined, index = Index} ->
87
:-(
{Filter, Index, asc};
88 undefined ->
89 49 {Filter, 0, asc}
90 end.
91
92 %% This function handles case: #rsm_in{direction = before, id = undefined}
93 %% Assumes assert_rsm_without_id(RSM)
94 lookup_last_page(Env, PageSize, Filter) ->
95 112 Messages = extract_messages(Env, Filter, 0, PageSize, desc),
96 112 Selected = length(Messages),
97 112 Offset =
98 case Selected < PageSize of
99 true ->
100
:-(
0; %% Result fits on a single page
101 false ->
102 112 FirstID = decoded_row_to_message_id(hd(Messages)),
103 112 calc_count(Env, before_id(FirstID, Filter))
104 end,
105 112 {ok, {Offset + Selected, Offset, Messages}}.
106
107 lookup_by_offset(Env, RSM, PageSize, Filter) ->
108 848 assert_rsm_without_id(RSM),
109 848 Offset = rsm_to_index(RSM),
110 848 Messages = extract_messages(Env, Filter, Offset, PageSize, asc),
111 848 Selected = length(Messages),
112 848 TotalCount =
113 case Selected < PageSize of
114 true ->
115 743 Offset + Selected; %% Result fits on a single page
116 false ->
117 105 LastID = decoded_row_to_message_id(lists:last(Messages)),
118 105 CountAfterLastID = calc_count(Env, after_id(LastID, Filter)),
119 105 Offset + Selected + CountAfterLastID
120 end,
121 848 {ok, {TotalCount, Offset, Messages}}.
122
123 722 assert_rsm_without_id(undefined) -> ok;
124 126 assert_rsm_without_id(#rsm_in{id = undefined}) -> ok.
125
126 rsm_to_index(#rsm_in{direction = undefined, index = Offset})
127 21 when is_integer(Offset) -> Offset;
128 827 rsm_to_index(_) -> 0.
129
130 lookup_messages_regular(Env, RSM, PageSize, Filter) ->
131 245 TotalCount = calc_count(Env, Filter),
132 245 Offset = calc_offset(Env, Filter, PageSize, TotalCount, RSM),
133 245 {LookupById, Filter2, Offset2, PageSize2, Order} =
134 rsm_to_regular_lookup_vars(RSM, Filter, Offset, PageSize),
135 245 Messages = extract_messages(Env, Filter2, Offset2, PageSize2, Order),
136 245 Result = {TotalCount, Offset, Messages},
137 245 case LookupById of
138 true -> %% check if we've selected a message with #rsm_in.id
139 161 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result);
140 false ->
141 84 {ok, Result}
142 end.
143
144 rsm_to_regular_lookup_vars(RSM, Filter, Offset, PageSize) ->
145 245 case RSM of
146 #rsm_in{direction = aft, id = ID} when ID =/= undefined ->
147 %% Set extra flag when selecting PageSize + 1 messages
148 91 {true, from_id(ID, Filter), 0, PageSize + 1, asc};
149 #rsm_in{direction = before, id = ID} when ID =/= undefined ->
150 70 {true, to_id(ID, Filter), 0, PageSize + 1, desc};
151 _ ->
152 84 {false, Filter, Offset, PageSize, asc}
153 end.
154
155 decode_rows(MessageRows, Env) ->
156 1464 [decode_row(Row, Env) || Row <- MessageRows].
157
158 -spec decoded_row_to_message_id(mod_mam:message_row()) -> mod_mam:message_id().
159 217 decoded_row_to_message_id(#{id := MessId}) -> MessId.
160
161 -spec extract_messages(Env :: env_vars(),
162 Filter :: filter(), Offset :: non_neg_integer(), Max :: pos_integer(),
163 Order :: asc | desc) -> [mod_mam:message_row()].
164 extract_messages(_Env, _Filter, _Offset, 0 = _Max, _Order) ->
165 84 [];
166 extract_messages(Env, Filter, Offset, Max, Order) ->
167 1464 {selected, MessageRows} = extract_rows(Env, Filter, Offset, Max, Order),
168 1464 Rows = maybe_reverse(Order, MessageRows),
169 1464 decode_rows(Rows, Env).
170
171 1079 maybe_reverse(asc, List) -> List;
172 385 maybe_reverse(desc, List) -> lists:reverse(List).
173
174 extract_rows(Env, Filters, Offset, Max, Order) ->
175 1464 lookup_query(lookup, Env, Filters, Order, {Offset, Max}).
176
177 %% @doc Get the total result set size.
178 %% SELECT COUNT(*) as count FROM mam_message
179 -spec calc_count(env_vars(), filter()) -> non_neg_integer().
180 calc_count(Env, Filter) ->
181 623 Result = lookup_query(count, Env, Filter, unordered, all),
182 623 mongoose_rdbms:selected_to_integer(Result).
183
184 %% @doc Calculate a zero-based index of the row with UID in the result test.
185 %%
186 %% If the element does not exists, the ID of the next element will
187 %% be returned instead.
188 %% @end
189 %% SELECT COUNT(*) as index FROM mam_message WHERE id <= ?
190 -spec calc_index(env_vars(), filter(), message_id()) -> non_neg_integer().
191 calc_index(Env, Filter, ID) ->
192 91 calc_count(Env, to_id(ID, Filter)).
193
194 %% @doc Count of elements in RSet before the passed element.
195 %%
196 %% The element with the passed UID can be already deleted.
197 %% @end
198 %% SELECT COUNT(*) as count FROM mam_message WHERE id < ?
199 -spec calc_before(env_vars(), filter(), message_id()) -> non_neg_integer().
200 calc_before(Env, Filter, ID) ->
201 70 calc_count(Env, before_id(ID, Filter)).
202
203 -spec calc_offset(Env :: env_vars(),
204 Filter :: filter(), PageSize :: non_neg_integer(),
205 TotalCount :: non_neg_integer(), RSM :: jlib:rsm_in()) -> non_neg_integer().
206 calc_offset(Env, Filter, PageSize, TotalCount, RSM) ->
207 245 case RSM of
208 #rsm_in{direction = undefined, index = Index} when is_integer(Index) ->
209 21 Index;
210 #rsm_in{direction = before, id = undefined} ->
211 %% Requesting the Last Page in a Result Set
212 21 max(0, TotalCount - PageSize);
213 #rsm_in{direction = before, id = ID} when is_integer(ID) ->
214 70 max(0, calc_before(Env, Filter, ID) - PageSize);
215 #rsm_in{direction = aft, id = ID} when is_integer(ID) ->
216 91 calc_index(Env, Filter, ID);
217 _ ->
218 42 0
219 end.
220
221 -spec after_id(message_id(), filter()) -> filter().
222 after_id(ID, Filter) ->
223 196 [{greater, id, ID}|Filter].
224
225 -spec before_id(message_id(), filter()) -> filter().
226 before_id(ID, Filter) ->
227 287 [{less, id, ID}|Filter].
228
229 -spec from_id(message_id(), filter()) -> filter().
230 from_id(ID, Filter) ->
231 91 [{ge, id, ID}|Filter].
232
233 -spec to_id(message_id(), filter()) -> filter().
234 to_id(ID, Filter) ->
235 161 [{le, id, ID}|Filter].
Line Hits Source