./ct_report/coverage/mam_lookup.COVER.html

1 %% RSM logic lives here
2 -module(mam_lookup).
3 -export([lookup/3]).
4
5 -include("mongoose.hrl").
6 -include("jlib.hrl").
7 -include("mongoose_rsm.hrl").
8
9 -type filter() :: mam_filter:filter().
10 -type env_vars() :: mod_mam_rdbms_arch:env_vars().
11 -type params() :: map().
12 -type message_id() :: mod_mam:message_id().
13 -type maybe_rsm() :: #rsm_in{} | undefined.
14 -type opt_count_type() :: last_page | by_offset | none.
15
16 %% Public logic
17 %% We use two fields from Env:
18 %% - lookup_fn
19 %% - decode_row_fn
20 -spec lookup(env_vars(), filter(), params()) ->
21 {ok, mod_mam:lookup_result()} | {error, item_not_found}.
22 lookup(Env = #{}, Filter, Params = #{rsm := RSM}) when is_list(Filter) ->
23 1158 OptParams = Params#{opt_count_type => opt_count_type(RSM)},
24 1158 choose_lookup_messages_strategy(Env, Filter, OptParams).
25
26 lookup_query(QueryType, #{lookup_fn := LookupF} = Env, Filters, Order, OffsetLimit) ->
27 1563 LookupF(QueryType, Env, Filters, Order, OffsetLimit).
28
29 decode_row(Row, #{decode_row_fn := DecodeF} = Env) ->
30 3651 DecodeF(Row, Env).
31
32 %% Private logic below
33
34 %% There are no optimizations for these queries yet:
35 %% - #rsm_in{direction = aft, id = ID}
36 %% - #rsm_in{direction = before, id = ID}
37 -spec opt_count_type(RSM :: maybe_rsm()) -> opt_count_type().
38 opt_count_type(#rsm_in{direction = before, id = undefined}) ->
39 185 last_page; %% last page is supported
40 opt_count_type(#rsm_in{direction = undefined}) ->
41 135 by_offset; %% offset
42 opt_count_type(undefined) ->
43 573 by_offset; %% no RSM
44 opt_count_type(_) ->
45 265 none. %% id field is defined in RSM
46
47 %% There are several strategies how to extract messages:
48 %% - we can use regular query that requires counting;
49 %% - we can reduce number of queries if we skip counting for small data sets;
50 %% - sometimes we want not to count at all
51 %% (for example, our client side counts ones and keep the information)
52 choose_lookup_messages_strategy(Env, Filter,
53 Params = #{rsm := RSM, page_size := PageSize}) ->
54 1158 case Params of
55 #{is_simple := true} ->
56 %% Simple query without calculating offset and total count
57 265 simple_lookup_messages(Env, RSM, PageSize, Filter);
58 #{opt_count_type := last_page} when PageSize > 0 ->
59 %% Extract messages before calculating offset and total count
60 %% Useful for small result sets
61 80 lookup_last_page(Env, PageSize, Filter);
62 #{opt_count_type := by_offset} when PageSize > 0 ->
63 %% Extract messages before calculating offset and total count
64 %% Useful for small result sets
65 628 lookup_by_offset(Env, RSM, PageSize, Filter);
66 _ ->
67 %% Calculate offset and total count first before extracting messages
68 185 lookup_messages_regular(Env, RSM, PageSize, Filter)
69 end.
70
71 %% Just extract messages without count and offset information
72 simple_lookup_messages(Env, RSM, PageSize, Filter) ->
73 265 {Filter2, Offset, Order} = rsm_to_filter(RSM, Filter),
74 265 Messages = extract_messages(Env, Filter2, Offset, PageSize, Order),
75 265 {ok, {undefined, undefined, Messages}}.
76
77 rsm_to_filter(RSM, Filter) ->
78 265 case RSM of
79 %% Get last rows from result set
80 #rsm_in{direction = aft, id = ID} ->
81 65 {after_id(ID, Filter), 0, asc};
82 #rsm_in{direction = before, id = undefined} ->
83 90 {Filter, 0, desc};
84 #rsm_in{direction = before, id = ID} ->
85 75 {before_id(ID, Filter), 0, desc};
86 #rsm_in{direction = undefined, index = Index} ->
87
:-(
{Filter, Index, asc};
88 undefined ->
89 35 {Filter, 0, asc}
90 end.
91
92 %% This function handles case: #rsm_in{direction = before, id = undefined}
93 %% Assumes assert_rsm_without_id(RSM)
94 lookup_last_page(Env, PageSize, Filter) ->
95 80 Messages = extract_messages(Env, Filter, 0, PageSize, desc),
96 80 Selected = length(Messages),
97 80 Offset =
98 case Selected < PageSize of
99 true ->
100
:-(
0; %% Result fits on a single page
101 false ->
102 80 FirstID = decoded_row_to_message_id(hd(Messages)),
103 80 calc_count(Env, before_id(FirstID, Filter))
104 end,
105 80 {ok, {Offset + Selected, Offset, Messages}}.
106
107 lookup_by_offset(Env, RSM, PageSize, Filter) ->
108 628 assert_rsm_without_id(RSM),
109 628 Offset = rsm_to_index(RSM),
110 628 Messages = extract_messages(Env, Filter, Offset, PageSize, asc),
111 628 Selected = length(Messages),
112 628 TotalCount =
113 case Selected < PageSize of
114 true ->
115 553 Offset + Selected; %% Result fits on a single page
116 false ->
117 75 LastID = decoded_row_to_message_id(lists:last(Messages)),
118 75 CountAfterLastID = calc_count(Env, after_id(LastID, Filter)),
119 75 Offset + Selected + CountAfterLastID
120 end,
121 628 {ok, {TotalCount, Offset, Messages}}.
122
123 538 assert_rsm_without_id(undefined) -> ok;
124 90 assert_rsm_without_id(#rsm_in{id = undefined}) -> ok.
125
126 rsm_to_index(#rsm_in{direction = undefined, index = Offset})
127 15 when is_integer(Offset) -> Offset;
128 613 rsm_to_index(_) -> 0.
129
130 lookup_messages_regular(Env, RSM, PageSize, Filter) ->
131 185 TotalCount = calc_count(Env, Filter),
132 185 Offset = calc_offset(Env, Filter, PageSize, TotalCount, RSM),
133 185 {LookupById, Filter2, Offset2, PageSize2, Order} =
134 rsm_to_regular_lookup_vars(RSM, Filter, Offset, PageSize),
135 185 Messages = extract_messages(Env, Filter2, Offset2, PageSize2, Order),
136 185 Result = {TotalCount, Offset, Messages},
137 185 case LookupById of
138 true -> %% check if we've selected a message with #rsm_in.id
139 125 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result);
140 false ->
141 60 {ok, Result}
142 end.
143
144 rsm_to_regular_lookup_vars(RSM, Filter, Offset, PageSize) ->
145 185 case RSM of
146 #rsm_in{direction = aft, id = ID} when ID =/= undefined ->
147 %% Set extra flag when selecting PageSize + 1 messages
148 75 {true, from_id(ID, Filter), 0, PageSize + 1, asc};
149 #rsm_in{direction = before, id = ID} when ID =/= undefined ->
150 50 {true, to_id(ID, Filter), 0, PageSize + 1, desc};
151 _ ->
152 60 {false, Filter, Offset, PageSize, asc}
153 end.
154
155 decode_rows(MessageRows, Env) ->
156 1098 [decode_row(Row, Env) || Row <- MessageRows].
157
158 -spec decoded_row_to_message_id(mod_mam:message_row()) -> mod_mam:message_id().
159 155 decoded_row_to_message_id(#{id := MessId}) -> MessId.
160
161 -spec extract_messages(Env :: env_vars(),
162 Filter :: filter(), Offset :: non_neg_integer(), Max :: pos_integer(),
163 Order :: asc | desc) -> [mod_mam:message_row()].
164 extract_messages(_Env, _Filter, _Offset, 0 = _Max, _Order) ->
165 60 [];
166 extract_messages(Env, Filter, Offset, Max, Order) ->
167 1098 {selected, MessageRows} = extract_rows(Env, Filter, Offset, Max, Order),
168 1098 Rows = maybe_reverse(Order, MessageRows),
169 1098 decode_rows(Rows, Env).
170
171 803 maybe_reverse(asc, List) -> List;
172 295 maybe_reverse(desc, List) -> lists:reverse(List).
173
174 extract_rows(Env, Filters, Offset, Max, Order) ->
175 1098 lookup_query(lookup, Env, Filters, Order, {Offset, Max}).
176
177 %% @doc Get the total result set size.
178 %% SELECT COUNT(*) as count FROM mam_message
179 -spec calc_count(env_vars(), filter()) -> non_neg_integer().
180 calc_count(Env, Filter) ->
181 465 Result = lookup_query(count, Env, Filter, unordered, all),
182 465 mongoose_rdbms:selected_to_integer(Result).
183
184 %% @doc Calculate a zero-based index of the row with UID in the result test.
185 %%
186 %% If the element does not exists, the ID of the next element will
187 %% be returned instead.
188 %% @end
189 %% SELECT COUNT(*) as index FROM mam_message WHERE id <= ?
190 -spec calc_index(env_vars(), filter(), message_id()) -> non_neg_integer().
191 calc_index(Env, Filter, ID) ->
192 75 calc_count(Env, to_id(ID, Filter)).
193
194 %% @doc Count of elements in RSet before the passed element.
195 %%
196 %% The element with the passed UID can be already deleted.
197 %% @end
198 %% SELECT COUNT(*) as count FROM mam_message WHERE id < ?
199 -spec calc_before(env_vars(), filter(), message_id()) -> non_neg_integer().
200 calc_before(Env, Filter, ID) ->
201 50 calc_count(Env, before_id(ID, Filter)).
202
203 -spec calc_offset(Env :: env_vars(),
204 Filter :: filter(), PageSize :: non_neg_integer(),
205 TotalCount :: non_neg_integer(), RSM :: jlib:rsm_in()) -> non_neg_integer().
206 calc_offset(Env, Filter, PageSize, TotalCount, RSM) ->
207 185 case RSM of
208 #rsm_in{direction = undefined, index = Index} when is_integer(Index) ->
209 15 Index;
210 #rsm_in{direction = before, id = undefined} ->
211 %% Requesting the Last Page in a Result Set
212 15 max(0, TotalCount - PageSize);
213 #rsm_in{direction = before, id = ID} when is_integer(ID) ->
214 50 max(0, calc_before(Env, Filter, ID) - PageSize);
215 #rsm_in{direction = aft, id = ID} when is_integer(ID) ->
216 75 calc_index(Env, Filter, ID);
217 _ ->
218 30 0
219 end.
220
221 -spec after_id(message_id(), filter()) -> filter().
222 after_id(ID, Filter) ->
223 140 [{greater, id, ID}|Filter].
224
225 -spec before_id(message_id(), filter()) -> filter().
226 before_id(ID, Filter) ->
227 205 [{less, id, ID}|Filter].
228
229 -spec from_id(message_id(), filter()) -> filter().
230 from_id(ID, Filter) ->
231 75 [{ge, id, ID}|Filter].
232
233 -spec to_id(message_id(), filter()) -> filter().
234 to_id(ID, Filter) ->
235 125 [{le, id, ID}|Filter].
Line Hits Source