./ct_report/coverage/mod_bosh_socket.COVER.html

1 -module(mod_bosh_socket).
2
3 -behaviour(gen_fsm_compat).
4 -behaviour(mongoose_transport).
5
6 %% API
7 -export([start/4,
8 start_link/4,
9 start_supervisor/0,
10 is_supervisor_started/0,
11 handle_request/2,
12 pause/2]).
13
14 %% Private API
15 -export([get_handlers/1,
16 get_pending/1,
17 get_client_acks/1,
18 set_client_acks/2,
19 get_cached_responses/1]).
20
21 %% ejabberd_socket compatibility
22 -export([starttls/2, starttls/3,
23 compress/1, compress/3,
24 send/2,
25 send_xml/2,
26 change_shaper/2,
27 monitor/1,
28 get_sockmod/1,
29 close/1,
30 peername/1,
31 get_peer_certificate/1]).
32
33 %% gen_fsm callbacks
34 -export([init/1,
35 accumulate/2, accumulate/3,
36 normal/2, normal/3,
37 closing/2, closing/3,
38 handle_event/3,
39 handle_sync_event/4,
40 handle_info/3,
41 terminate/3,
42 code_change/4]).
43
44 -ignore_xref([{mod_bosh_backend, delete_session, 1},
45 accumulate/2, accumulate/3, change_shaper/2, close/1, closing/2,
46 closing/3, compress/1, compress/3, get_cached_responses/1,
47 get_client_acks/1, get_handlers/1, get_peer_certificate/1,
48 get_pending/1, get_pending/1, get_sockmod/1, normal/2, normal/3,
49 pause/2, send/2, set_client_acks/2, start_link/4]).
50
51 -include("mongoose.hrl").
52 -include("jlib.hrl").
53 -include_lib("exml/include/exml_stream.hrl").
54 -include("mod_bosh.hrl").
55 -define(ACCUMULATE_PERIOD, 10).
56 -define(DEFAULT_HOLD, 1).
57 -define(CONCURRENT_REQUESTS, 2).
58 -define(DEFAULT_WAIT, 60).
59 -define(DEFAULT_CLIENT_ACKS, false).
60
61 -type cached_response() :: {rid(), TStamp :: integer(), exml:element()}.
62 -type rid() :: pos_integer().
63
64 -record(state, {from :: binary() | undefined,
65 to :: binary() | undefined,
66 c2s_pid :: pid(),
67 handlers = [] :: [{rid(), reference(), pid()}],
68 %% Elements buffered for sending to the client.
69 pending = [] :: [jlib:xmlstreamel()],
70 sid :: mod_bosh:sid(),
71 wait = ?DEFAULT_WAIT :: integer(),
72 hold = ?DEFAULT_HOLD :: integer(),
73 rid :: rid() | undefined,
74 %% Requests deferred for later processing because
75 %% of having Rid greater than expected.
76 deferred = [] :: [{rid(), {mod_bosh:event_type(), exml:element()}}],
77 client_acks = ?DEFAULT_CLIENT_ACKS :: boolean(),
78 sent = [] :: [cached_response()],
79 %% Allowed inactivity period in seconds.
80 inactivity :: pos_integer() | infinity,
81 inactivity_tref :: reference() | 'undefined',
82 %% Max pause period in seconds.
83 max_pause :: pos_integer(),
84 max_wait :: pos_integer() | infinity,
85 %% Are acknowledgements used?
86 server_acks :: boolean(),
87 last_processed :: rid() | 'undefined',
88 %% Report scheduled for sending at the earliest
89 %% possible occasion.
90 report = false :: {rid(), Time :: non_neg_integer()} | 'false'}).
91 -type state() :: #state{}.
92
93 -type statename() :: 'accumulate' | 'normal' | 'closing'.
94 -type fsm_return() :: {'next_state', statename(), state()}.
95
96 %%--------------------------------------------------------------------
97 %% API
98 %%--------------------------------------------------------------------
99
100 -spec start(mongooseim:host_type(), mod_bosh:sid(), mongoose_transport:peer(),
101 binary() | undefined) ->
102 {'error', _} | {'ok', 'undefined' | pid()} | {'ok', 'undefined' | pid(), _}.
103 start(HostType, Sid, Peer, PeerCert) ->
104 130 supervisor:start_child(?BOSH_SOCKET_SUP, [HostType, Sid, Peer, PeerCert]).
105
106
107 -spec start_link(mongooseim:host_type(), mod_bosh:sid(), mongoose_transport:peer(),
108 binary() | undefined) ->
109 'ignore' | {'error', _} | {'ok', pid()}.
110 start_link(HostType, Sid, Peer, PeerCert) ->
111 130 gen_fsm_compat:start_link(?MODULE, [HostType, Sid, Peer, PeerCert], []).
112
113 -spec start_supervisor() -> {ok, pid()} | {error, any()}.
114 start_supervisor() ->
115 82 ChildId = ?BOSH_SOCKET_SUP,
116 82 ChildSpec =
117 {ChildId,
118 {ejabberd_tmp_sup, start_link,
119 [ChildId, ?MODULE]},
120 permanent,
121 infinity,
122 supervisor,
123 [ejabberd_tmp_sup]},
124 82 case supervisor:start_child(ejabberd_sup, ChildSpec) of
125 {ok, undefined} ->
126
:-(
{error, undefined};
127 {ok, Child} ->
128 82 {ok, Child};
129 {ok, Child, _Info} ->
130
:-(
{ok, Child};
131 {error, {already_started, Child}} ->
132
:-(
{ok, Child};
133 {error, Reason} ->
134
:-(
{error, Reason}
135 end.
136
137 -spec is_supervisor_started() -> boolean().
138 is_supervisor_started() ->
139 321 is_pid(whereis(?BOSH_SOCKET_SUP)).
140
141 -spec handle_request(Pid :: pid(),
142 {EventTag :: mod_bosh:event_type(),
143 Handler :: pid(),
144 Body :: exml:element()}) -> ok.
145 handle_request(Pid, Request) ->
146 2120 gen_fsm_compat:send_all_state_event(Pid, Request).
147
148
149 %% @doc TODO: no handler for this call is present!
150 %% No check for violating maxpause is made when calling this!
151 -spec pause(Pid :: pid(), Seconds :: pos_integer()) -> ok.
152 pause(Pid, Seconds) ->
153
:-(
gen_fsm_compat:send_all_state_event(Pid, {pause, Seconds}).
154
155 %%--------------------------------------------------------------------
156 %% Private API
157 %%--------------------------------------------------------------------
158
159 get_handlers(Pid) ->
160 19 gen_fsm_compat:sync_send_all_state_event(Pid, get_handlers).
161
162
163 get_pending(Pid) ->
164
:-(
gen_fsm_compat:sync_send_all_state_event(Pid, get_pending).
165
166
167 -spec get_client_acks(pid()) -> boolean().
168 get_client_acks(Pid) ->
169
:-(
gen_fsm_compat:sync_send_all_state_event(Pid, get_client_acks).
170
171
172 -spec set_client_acks(pid(), boolean()) -> any().
173 set_client_acks(Pid, Enabled) ->
174 4 gen_fsm_compat:sync_send_all_state_event(Pid, {set_client_acks, Enabled}).
175
176
177 -spec get_cached_responses(pid()) -> [cached_response()].
178 get_cached_responses(Pid) ->
179 1 gen_fsm_compat:sync_send_all_state_event(Pid, get_cached_responses).
180
181 %%--------------------------------------------------------------------
182 %% gen_fsm callbacks
183 %%--------------------------------------------------------------------
184
185 %%--------------------------------------------------------------------
186 %% @private
187 %% init(Args) -> {ok, StateName, State} |
188 %% {ok, StateName, State, Timeout} |
189 %% ignore |
190 %% {stop, StopReason}
191 %% @end
192 %%--------------------------------------------------------------------
193 init([HostType, Sid, Peer, PeerCert]) ->
194 130 BoshSocket = #bosh_socket{sid = Sid, pid = self(), peer = Peer, peercert = PeerCert},
195 130 C2SOpts = [{xml_socket, true}],
196 130 {ok, C2SPid} = ejabberd_c2s:start({mod_bosh_socket, BoshSocket}, C2SOpts),
197 130 Opts = gen_mod:get_loaded_module_opts(HostType, mod_bosh),
198 130 State = new_state(Sid, C2SPid, Opts),
199 130 ?LOG_DEBUG(ls(#{what => bosh_socket_init}, State)),
200 130 {ok, accumulate, State}.
201
202 new_state(Sid, C2SPid, #{inactivity := Inactivity, max_wait := MaxWait,
203 server_acks := ServerAcks, max_pause := MaxPause}) ->
204 130 #state{sid = Sid,
205 c2s_pid = C2SPid,
206 inactivity = Inactivity,
207 max_pause = MaxPause,
208 max_wait = MaxWait,
209 server_acks = ServerAcks}.
210
211 %%--------------------------------------------------------------------
212 %% @private
213 %% @doc
214 %% There should be one instance of this function for each possible
215 %% state name. Whenever a gen_fsm receives an event sent using
216 %% gen_fsm_compat:send_event/2, the instance of this function with the same
217 %% name as the current state name StateName is called to handle
218 %% the event. It is also called if a timeout occurs.
219 %%
220 %% state_name(Event, State) ->
221 %% {next_state, NextStateName, NextState} |
222 %% {next_state, NextStateName, NextState, Timeout} |
223 %% {stop, Reason, NewState}
224 %% @end
225 %%--------------------------------------------------------------------
226
227 -spec accumulate(_, state()) -> fsm_return().
228 accumulate(acc_off, #state{pending = Pending} = S) ->
229 256 NS = S#state{pending = []},
230 256 {next_state, normal, send_or_store(Pending, NS)};
231 accumulate(Event, State) ->
232
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_event, state_name => accumulate,
233
:-(
event => Event}, State)),
234
:-(
{next_state, accumulate, State}.
235
236
237 -spec normal(_, state()) -> fsm_return().
238 normal(acc_off, #state{} = S) ->
239
:-(
{next_state, normal, S};
240 normal(Event, State) ->
241
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_event, state_name => normal,
242
:-(
event => Event}, State)),
243
:-(
{next_state, normal, State}.
244
245 closing(Event, State) ->
246
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_event, state_name => closing,
247
:-(
event => Event}, State)),
248
:-(
{next_state, closing, State}.
249
250 %%--------------------------------------------------------------------
251 %% @private
252 %% @doc
253 %% There should be one instance of this function for each possible
254 %% state name. Whenever a gen_fsm receives an event sent using
255 %% gen_fsm_compat:sync_send_event/[2, 3], the instance of this function with
256 %% the same name as the current state name StateName is called to
257 %% handle the event.
258 %%
259 %% state_name(Event, From, State) ->
260 %% {next_state, NextStateName, NextState} |
261 %% {next_state, NextStateName, NextState, Timeout} |
262 %% {reply, Reply, NextStateName, NextState} |
263 %% {reply, Reply, NextStateName, NextState, Timeout} |
264 %% {stop, Reason, NewState} |
265 %% {stop, Reason, Reply, NewState}
266 %% @end
267 %%--------------------------------------------------------------------
268 accumulate(Event, _From, State) ->
269
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_sync_event, state_name => accumulate,
270
:-(
event => Event}, State)),
271
:-(
{reply, ok, accumulate, State}.
272
273 normal(Event, _From, State) ->
274
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_sync_event, state_name => normal,
275
:-(
event => Event}, State)),
276
:-(
{reply, ok, normal, State}.
277
278 closing(Event, _From, State) ->
279
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_sync_event, state_name => closing,
280
:-(
event => Event}, State)),
281
:-(
{reply, ok, closing, State}.
282
283 %%--------------------------------------------------------------------
284 %% @private
285 %% @doc
286 %% Whenever a gen_fsm receives an event sent using
287 %% gen_fsm_compat:send_all_state_event/2, this function is called to handle
288 %% the event.
289 %%
290 %% handle_event(Event, StateName, State) ->
291 %% {next_state, NextStateName, NextState} |
292 %% {next_state, NextStateName, NextState, Timeout} |
293 %% {stop, Reason, NewState}
294 %% @end
295 %%--------------------------------------------------------------------
296
297 handle_event({EventTag, Handler, #xmlel{} = Body}, SName, S) ->
298 2120 NS = cancel_inactivity_timer(S),
299 2120 Rid = binary_to_integer(exml_query:attr(Body, <<"rid">>)),
300 2120 try
301 2120 NNS = handle_stream_event({EventTag, Body, Rid}, Handler, SName, NS),
302 %% TODO: it's the event which determines the next state,
303 %% this ought to be returned from handle_stream_event
304 2117 determine_next_state(EventTag, SName, NNS)
305 catch
306 throw:{invalid_rid, TState} ->
307 2 {stop, {shutdown, invalid_rid}, TState};
308 throw:{invalid_pause, TState} ->
309 1 {stop, {shutdown, policy_violation}, TState}
310 end;
311
312 handle_event(Event, StateName, State) ->
313
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_all_state,
314
:-(
state_name => StateName, event => Event}, State)),
315
:-(
{next_state, StateName, State}.
316
317
318 determine_next_state(_EventTag, closing, NNS) ->
319 10 {stop, normal, NNS};
320 determine_next_state(EventTag, SName, NNS) ->
321 2107 case EventTag of
322 _ when EventTag == streamstart; EventTag == restart ->
323 256 timer:apply_after(?ACCUMULATE_PERIOD,
324 gen_fsm_compat, send_event, [self(), acc_off]),
325 256 {next_state, accumulate, NNS};
326 _ ->
327 1851 {next_state, SName, NNS}
328 end.
329
330 %%--------------------------------------------------------------------
331 %% @private
332 %% @doc
333 %% Whenever a gen_fsm receives an event sent using
334 %% gen_fsm_compat:sync_send_all_state_event/[2, 3], this function is called
335 %% to handle the event.
336 %%
337 %% handle_sync_event(Event, From, StateName, State) ->
338 %% {next_state, NextStateName, NextState} |
339 %% {next_state, NextStateName, NextState, Timeout} |
340 %% {reply, Reply, NextStateName, NextState} |
341 %% {reply, Reply, NextStateName, NextState, Timeout} |
342 %% {stop, Reason, NewState} |
343 %% {stop, Reason, Reply, NewState}
344 %% @end
345 %%--------------------------------------------------------------------
346 handle_sync_event(get_handlers, _From, StateName,
347 #state{handlers = Handlers} = S) ->
348 19 {reply, Handlers, StateName, S};
349 handle_sync_event(get_pending, _From, StateName,
350 #state{pending = Pending} = S) ->
351
:-(
{reply, Pending, StateName, S};
352 handle_sync_event(get_client_acks, _From, StateName,
353 #state{client_acks = ClientAcks} = S) ->
354
:-(
{reply, ClientAcks, StateName, S};
355 handle_sync_event({set_client_acks, ClientAcks}, _From, StateName,
356 #state{} = S) ->
357 4 NS = S#state{client_acks = ClientAcks},
358 4 {reply, ok, StateName, NS};
359 handle_sync_event(get_cached_responses, _From, StateName,
360 #state{sent = CachedResponses} = S) ->
361 1 {reply, CachedResponses, StateName, S};
362 handle_sync_event(Event, _From, StateName, State) ->
363
:-(
?LOG_DEBUG(ls(#{what => bosh_socket_unhandled_sync_all_state,
364
:-(
state_name => StateName, event => Event}, State)),
365
:-(
Reply = ok,
366
:-(
{reply, Reply, StateName, State}.
367
368 %%--------------------------------------------------------------------
369 %% @private
370 %% @doc
371 %% This function is called by a gen_fsm when it receives any
372 %% message other than a synchronous or asynchronous event
373 %% (or a system message).
374 %%
375 %% handle_info(Info, StateName, State)->
376 %% {next_state, NextStateName, NextState} |
377 %% {next_state, NextStateName, NextState, Timeout} |
378 %% {stop, Reason, NewState}
379 %% @end
380 %%--------------------------------------------------------------------
381
382 handle_info({send, #xmlstreamend{} = StreamEnd}, normal = SName,
383 #state{pending = Pending} = S) ->
384 125 NS = send_or_store(Pending ++ [StreamEnd], S#state{pending = []}),
385 125 {next_state, SName, NS};
386 handle_info({send, Data}, accumulate = SName, #state{} = S) ->
387 512 {next_state, SName, store([Data], S)};
388 handle_info({send, Data}, normal = SName, #state{} = S) ->
389 789 NS = send_or_store(Data, S),
390 789 {next_state, SName, NS};
391 handle_info(close, _SName, #state{pending = []} = State) ->
392 115 {stop, normal, State};
393 handle_info(close, _SName, State) ->
394 10 {next_state, closing, State};
395 handle_info(inactivity_timeout, _SName, State) ->
396 2 ?LOG_INFO(ls(#{what => bosh_socket_terminating, reason => inactivity_timeout}, State)),
397 2 {stop, {shutdown, inactivity_timeout}, State};
398 handle_info({wait_timeout, {Rid, Pid}}, SName,
399 #state{handlers = Handlers} = S) ->
400 2 ?LOG_INFO(ls(#{what => bosh_socket_wait_timeout,
401 2 handler_rid => Rid, handler_pid => Pid}, S)),
402 %% In case some message was being handled when the timer fired
403 %% it may turn out that Pid is no longer available in Handlers.
404 2 case lists:keytake(Rid, 1, Handlers) of
405 false ->
406
:-(
{next_state, SName, S};
407 {value, {Rid, _, Pid}, NewHandlers} ->
408 2 NS = send_to_handler({Rid, Pid}, [],
409 S#state{handlers = NewHandlers}),
410 2 {next_state, SName, NS}
411 end;
412 handle_info(Info, SName, State) ->
413
:-(
?UNEXPECTED_INFO(Info),
414
:-(
{next_state, SName, State}.
415
416 terminate(Reason, StateName, #state{sid = Sid, handlers = Handlers} = S) ->
417 130 [Pid ! {close, Sid} || {_, _, Pid} <- lists:sort(Handlers)],
418 130 mod_bosh_backend:delete_session(Sid),
419 130 catch ejabberd_c2s:stop(S#state.c2s_pid),
420 130 ?LOG_DEBUG(ls(#{what => bosh_socket_closing_session, reason => Reason,
421 state_name => StateName, handlers => Handlers,
422 130 pending => S#state.pending}, S)).
423
424 code_change(_OldVsn, StateName, State, _Extra) ->
425
:-(
{ok, StateName, State}.
426
427 %%--------------------------------------------------------------------
428 %% callback implementations
429 %%--------------------------------------------------------------------
430
431 handle_stream_event({EventTag, Body, Rid} = Event, Handler,
432 SName, #state{rid = OldRid} = S) ->
433 2147 ExpectedRid = maybe_add(1, OldRid),
434 2147 NS = maybe_add_handler(Handler, Rid, S),
435 2147 NNS = case {EventTag,
436 maybe_is_retransmission(Rid, OldRid, S#state.sent),
437 is_expected_rid(Rid, ExpectedRid),
438 is_acceptable_rid(Rid, ExpectedRid)}
439 of
440 {_, {true, CachedResponse}, _, _} when Handler /= none ->
441 1 case CachedResponse of
442 none ->
443
:-(
NS;
444 _ ->
445 1 resend_cached(CachedResponse, NS)
446 end;
447 {streamstart, _, _, _} ->
448 130 process_acked_stream_event(Event, SName, NS);
449 {_, _, true, _} ->
450 1987 process_acked_stream_event(Event, SName, NS);
451 {_, _, false, true} ->
452 27 ?LOG_INFO(ls(#{what => bosh_socket_deferring,
453 event_rid => Rid, body => Body,
454 27 expected_rid => ExpectedRid, event_tag => EventTag}, S)),
455 27 NS#state{deferred = [Event | NS#state.deferred]};
456 {_, _, false, false} ->
457 2 ?LOG_ERROR(#{what => bosh_socket_invalid_rid,
458 event_rid => Rid, body => Body,
459 expected_rid => ExpectedRid, event_tag => EventTag,
460
:-(
difference => maybe_diff(Rid, ExpectedRid)}),
461 2 [Pid ! item_not_found
462 2 || {_, _, Pid} <- lists:sort(NS#state.handlers)],
463 2 throw({invalid_rid, NS#state{handlers = []}})
464 end,
465 2144 return_surplus_handlers(SName, NNS).
466
467 -spec maybe_is_retransmission(rid(), rid(), [cached_response()])
468 -> false | {true, none} | {true, cached_response()}.
469 maybe_is_retransmission(Rid, OldRid, Sent) ->
470 2147 case {lists:keyfind(Rid, 1, Sent), Rid =:= OldRid} of
471 {false, false} ->
472 2146 false;
473 {false, true} ->
474
:-(
?LOG_INFO(#{what => bosh_socket_request_repeated,
475 text => <<"Request repeated but no response found in cache">>,
476
:-(
event_rid => Rid, old_rid => OldRid, sent => Sent}),
477
:-(
{true, none};
478 {CachedResponse, _} ->
479 1 {true, CachedResponse}
480 end.
481
482 -spec maybe_add(rid(), rid() | undefined)
483 -> rid() | undefined.
484 130 maybe_add(_, undefined) -> undefined;
485 maybe_add(Rid1, Rid2) when is_integer(Rid1),
486 2017 is_integer(Rid2) -> Rid1 + Rid2.
487
488 -spec maybe_diff(rid(), rid() | undefined)
489 -> non_neg_integer() | undefined.
490
:-(
maybe_diff(_, undefined) -> undefined;
491 2 maybe_diff(Rid, Expected) -> abs(Rid-Expected).
492
493 -spec resend_cached(cached_response(), state()) -> state().
494 resend_cached({_Rid, _, CachedBody}, S) ->
495 1 send_to_handler(CachedBody, S).
496
497
498 -spec process_acked_stream_event({EventTag :: mod_bosh:event_type(),
499 Body :: exml:element(),
500 Rid :: 'undefined' | rid()},
501 SName :: any(),
502 S :: state() ) -> state().
503 process_acked_stream_event({EventTag, Body, Rid}, SName,
504 #state{} = S) ->
505 2117 MaybeBAck = exml_query:attr(Body, <<"ack">>),
506 2117 {Action, Ack} = determine_report_action(MaybeBAck, S#state.client_acks,
507 Rid, S#state.last_processed),
508 2117 NS = maybe_trim_cache(Ack, S),
509 2117 case Action of
510 noreport ->
511 2116 process_stream_event(EventTag, Body, SName, rid(NS, Rid));
512 report ->
513 1 NS2 = schedule_report(Ack, NS),
514 1 NS3 = process_stream_event(EventTag, Body, SName, rid(NS2, Rid)),
515 1 maybe_send_report(NS3)
516 end.
517
518 rid(#state{} = S, Rid) when is_integer(Rid), Rid > 0 ->
519 2117 S#state{rid = Rid}.
520
521
522 -spec determine_report_action(BinAck :: 'undefined' | binary(),
523 boolean(),
524 Rid :: rid(),
525 LastProcessed :: 'undefined' | pos_integer()
526 ) -> {'noreport', _} | {'report', _}.
527 determine_report_action(undefined, false, _, _) ->
528 2112 {noreport, undefined};
529 determine_report_action(undefined, true, Rid, LastProcessed) ->
530 2 ?WARNING_MSG_IF(Rid+1 /= LastProcessed, "expected 'ack' attribute on ~p~n", [Rid]),
531 2 {noreport, undefined};
532 determine_report_action(BinAck, _, _, LastProcessed) ->
533 3 Ack = binary_to_integer(BinAck),
534 3 case {LastProcessed, is_valid_ack(Ack, LastProcessed)} of
535 {undefined, _} ->
536
:-(
{noreport, Ack};
537 {_, true} ->
538 2 {noreport, Ack};
539 {_, false} ->
540 1 {report, Ack}
541 end.
542
543
544 -spec is_valid_ack(Ack :: rid(), 'undefined' | pos_integer()) -> boolean().
545 is_valid_ack(Ack, LastProcessed)
546 when Ack < LastProcessed ->
547 1 false;
548 is_valid_ack(_, _) ->
549 2 true.
550
551
552 -spec maybe_trim_cache(undefined | any(), state()) -> state().
553 maybe_trim_cache(undefined, S) ->
554 2114 S;
555 maybe_trim_cache(Ack, S) ->
556 3 UpToAck = fun({R, _, _}) when R =< Ack ->
557 5 true;
558 (_) ->
559 1 false
560 end,
561 3 NewSent = lists:dropwhile(UpToAck, S#state.sent),
562 3 S#state{sent = NewSent}.
563
564
565 -spec schedule_report(rid(), state()) -> state().
566 schedule_report(Ack, #state{sent = Sent} = S) ->
567 1 ReportRid = Ack + 1,
568 1 try
569 1 {ReportRid, TimeSent, _} = lists:keyfind(ReportRid, 1, Sent),
570 1 ElapsedTimeMillis = erlang:monotonic_time(millisecond) - TimeSent,
571 1 Report = {ReportRid, ElapsedTimeMillis},
572 1 case S#state.report of
573 false ->
574 1 S#state{report = Report};
575 OldReport when OldReport < Report ->
576
:-(
S#state{report = OldReport};
577 _ ->
578
:-(
S#state{report = Report}
579 end
580 catch
581 error:{badmatch, {resp, false}} ->
582
:-(
?LOG_ERROR(ls(#{what => bosh_socket_no_cached_response,
583
:-(
responses => Sent, rid_offender => ReportRid}, S)),
584
:-(
S
585 end.
586
587
588 -spec maybe_send_report(state()) -> state().
589 maybe_send_report(#state{report = false} = S) ->
590
:-(
S;
591 maybe_send_report(#state{} = S) ->
592 1 send_or_store([], S).
593
594
595 -spec process_stream_event(mod_bosh:event_type(), exml:element(), _SName,
596 state()) -> state().
597 process_stream_event(pause, Body, SName, State) ->
598 3 Seconds = binary_to_integer(exml_query:attr(Body, <<"pause">>)),
599 3 NewState = process_pause_event(Seconds, State),
600 2 process_deferred_events(SName, NewState);
601 process_stream_event(EventTag, Body, SName, #state{c2s_pid = C2SPid} = State) ->
602 2114 {Els, NewState} = bosh_unwrap(EventTag, Body, State),
603 2114 [forward_to_c2s(C2SPid, El) || El <- Els],
604 2114 process_deferred_events(SName, NewState).
605
606
607 -spec process_pause_event('infinity' | 'undefined' | pos_integer(),
608 state()) -> state().
609 process_pause_event(Seconds, #state{max_pause = MaxPause} = S)
610 when MaxPause == undefined;
611 Seconds > MaxPause ->
612 1 [Pid ! policy_violation || {_, _, Pid} <- S#state.handlers],
613 1 throw({invalid_pause, S#state{handlers = []}});
614 process_pause_event(Seconds, State) ->
615 2 NS = State#state{inactivity = Seconds},
616 2 F = fun(_, S) ->
617 4 send_to_handler([], S)
618 end,
619 2 lists:foldl(F, NS, lists:seq(1, length(State#state.handlers))).
620
621
622 -spec process_deferred_events(_SName, state()) -> state().
623 process_deferred_events(SName, #state{deferred = Deferred} = S) ->
624 2116 lists:foldl(fun(Event, State) ->
625 27 ?LOG_DEBUG(ls(#{what => bosh_socket_processing_deferred_event,
626 27 event => Event}, S)),
627 27 handle_stream_event(Event, none, SName, State)
628 end,
629 S#state{deferred = []},
630 lists:sort(Deferred)).
631
632
633 -spec is_expected_rid(rid(), rid() | undefined) -> boolean().
634 is_expected_rid(Rid, ExpectedRid) when Rid == ExpectedRid ->
635 1987 true;
636 is_expected_rid(_, _) ->
637 160 false.
638
639 -spec is_acceptable_rid(rid(), rid() | undefined) -> boolean().
640 is_acceptable_rid(Rid, ExpectedRid)
641 when Rid > ExpectedRid,
642 Rid < ExpectedRid + ?CONCURRENT_REQUESTS ->
643 27 true;
644 is_acceptable_rid(_, _) ->
645 2120 false.
646
647 %% @doc Send data to the client if a request handler is available, that matches next RID.
648 %% Otherwise, store for sending later.
649 -spec send_or_store(_Data, state()) -> state().
650 send_or_store(Data, State) when not is_list(Data) ->
651 789 send_or_store([Data], State);
652 send_or_store(Data, State) ->
653 2192 case send_to_handler(Data, State) of
654 no_valid_handler ->
655 91 store(Data, State);
656 NewState ->
657 2101 NewState
658 end.
659
660
661 %% @doc send_to_handler() assumes that Handlers is not empty!
662 %% Be sure that's the case if calling it.
663 -spec send_to_handler([any()] | exml:element(), state()) -> state() | no_valid_handler.
664 send_to_handler(Data, State) ->
665 2203 case pick_handler(State) of
666 {Handler, NS} ->
667 2112 send_to_handler(Handler, Data, NS);
668 false ->
669 91 no_valid_handler
670 end.
671
672
673 %% Return handler and new state if a handler is available
674 %% or `false` otherwise.
675 -spec pick_handler(state()) -> {{rid(), pid()}, state()} | false.
676 pick_handler(#state{ handlers = [] }) ->
677 88 false;
678 pick_handler(#state{ handlers = Handlers, rid = Rid } = S) ->
679 2115 case lists:sort(Handlers) of
680 [{HandlerRid, TRef, Pid} | HRest] when HandlerRid =< Rid->
681 %% The cancellation might fail if the timer already fired.
682 %% Don't worry, it's handled on receiving the timeout message.
683 2112 erlang:cancel_timer(TRef),
684 2112 {{HandlerRid, Pid}, S#state{handlers = HRest}};
685 _ ->
686 3 false
687 end.
688
689
690 -spec send_to_handler({_, atom() | pid() | port() | {atom(), atom()}},
691 Wrapped :: [any()] | exml:element(),
692 State :: state() ) -> state().
693 send_to_handler({_, Pid}, #xmlel{name = <<"body">>} = Wrapped, State) ->
694 1 send_wrapped_to_handler(Pid, Wrapped, State);
695 send_to_handler({Rid, Pid}, Data, State) ->
696 2113 {Wrapped, NS} = bosh_wrap(Data, Rid, State),
697 2113 NS2 = cache_response({Rid, erlang:monotonic_time(millisecond), Wrapped}, NS),
698 2113 send_wrapped_to_handler(Pid, Wrapped, NS2).
699
700
701 %% @doc This is the most specific variant of send_to_handler()
702 %% and the *only one* actually performing a send
703 %% to the cowboy_loop_handler serving a HTTP request.
704 -spec send_wrapped_to_handler(atom() | pid() | port() | {atom(), atom()},
705 Wrapped :: exml:element(),
706 State :: state()) -> state().
707 send_wrapped_to_handler(Pid, Wrapped, #state{handlers = []} = State) ->
708 1161 Pid ! {bosh_reply, Wrapped},
709 1161 setup_inactivity_timer(State);
710 send_wrapped_to_handler(Pid, Wrapped, State) ->
711 953 Pid ! {bosh_reply, Wrapped},
712 953 State.
713
714
715 -spec maybe_ack(rid(), state()) -> [{binary(), _}].
716 maybe_ack(HandlerRid, #state{rid = Rid} = S) ->
717 2113 case Rid > HandlerRid of
718 true ->
719 952 server_ack(S#state.server_acks, Rid);
720 false ->
721 1161 []
722 end.
723
724
725 -spec maybe_report(state()) -> {[{binary(), _}], state()}.
726 maybe_report(#state{report = false} = S) ->
727 2112 {[], S};
728 maybe_report(#state{report = Report} = S) ->
729 1 {ReportRid, ElapsedTime} = Report,
730 1 NewAttrs = [{<<"report">>, integer_to_binary(ReportRid)},
731 {<<"time">>, integer_to_binary(ElapsedTime)}],
732 1 {NewAttrs, S#state{report = false}}.
733
734
735 -spec cache_response(cached_response(), state()) -> state().
736 cache_response({Rid, _, _} = Response, #state{sent = Sent} = S) ->
737 2113 NewSent = lists:keymerge(1, [Response], Sent),
738 2113 CacheUpTo = case S#state.client_acks of
739 true ->
740 %% Acknowledgements are on - there's no limit on the number
741 %% of cached responses.
742 4 infinity;
743 false ->
744 %% Leave up to ?CONCURRENT_REQUESTS responses in cache.
745 2109 ?CONCURRENT_REQUESTS
746 end,
747 2113 S#state{sent = cache_up_to(CacheUpTo, NewSent),
748 last_processed = last_processed(Rid, S#state.last_processed)}.
749
750
751 -spec cache_up_to('infinity' | 2, Responses :: [cached_response()])
752 -> [cached_response()].
753 cache_up_to(infinity, Responses) ->
754 4 Responses;
755 cache_up_to(N, Responses) ->
756 2109 lists:nthtail(max(0, length(Responses) - N), Responses).
757
758
759 -spec last_processed(rid(), 'undefined' | pos_integer()) -> rid().
760 last_processed(Rid, undefined) ->
761 130 Rid;
762 last_processed(Rid1, Rid2) ->
763 1983 max(Rid1, Rid2).
764
765
766 -spec setup_inactivity_timer(state()) -> state().
767 setup_inactivity_timer(#state{inactivity = infinity} = S) ->
768
:-(
S;
769 setup_inactivity_timer(S) ->
770 1161 cancel_inactivity_timer(S),
771 1161 TRef = erlang:send_after(timer:seconds(S#state.inactivity), self(),
772 inactivity_timeout),
773 1161 S#state{inactivity_tref = TRef}.
774
775
776 -spec cancel_inactivity_timer(state()) -> state().
777 cancel_inactivity_timer(#state{inactivity_tref = undefined} = S) ->
778 2247 S;
779 cancel_inactivity_timer(S) ->
780 1034 erlang:cancel_timer(S#state.inactivity_tref),
781 1034 S#state{inactivity_tref = undefined}.
782
783
784 %% @doc Store data for sending later.
785 -spec store([jlib:xmlstreamel()], state()) -> state().
786 store(Data, #state{pending = Pending} = S) ->
787 603 S#state{pending = Pending ++ Data}.
788
789
790 -spec forward_to_c2s('undefined' | pid(), jlib:xmlstreamel()) -> 'ok'.
791 forward_to_c2s(C2SPid, StreamElement) ->
792 1314 gen_fsm_compat:send_event(C2SPid, StreamElement).
793
794
795 -spec maybe_add_handler(_, rid(), state()) -> state().
796 maybe_add_handler(Handler, Rid, S) when is_pid(Handler) ->
797 2120 add_handler({Rid, Handler}, S);
798 maybe_add_handler(_, _, S) ->
799 27 S.
800
801
802 -spec add_handler({rid(), pid()}, state()) -> state().
803 add_handler({Rid, Pid}, #state{handlers = Handlers} = S) ->
804 2120 TRef = erlang:send_after(timer:seconds(S#state.wait), self(),
805 {wait_timeout, {Rid, Pid}}),
806 2120 S#state{handlers = [{Rid, TRef, Pid} | Handlers]}.
807
808
809 %% @doc Keep in mind the hardcoding for hold == 1.
810 -spec return_surplus_handlers('accumulate' | 'normal' | 'closing', state()) -> state().
811 return_surplus_handlers(SName, #state{handlers = []} = State)
812 when SName == accumulate; SName == normal; SName == closing ->
813 2 State;
814 return_surplus_handlers(SName, #state{handlers = []} = State)
815 when SName == normal; SName == closing ->
816
:-(
State;
817 return_surplus_handlers(accumulate, #state{handlers = [_]} = State) ->
818 142 State;
819 return_surplus_handlers(SName, #state{handlers = [_], pending = []} = State)
820 when SName == normal; SName == closing ->
821 979 State;
822 return_surplus_handlers(accumulate, #state{handlers = _} = S) ->
823 6 case send_to_handler([], S) of
824
:-(
no_valid_handler -> S;
825 6 NS -> return_surplus_handlers(accumulate, NS)
826 end;
827 return_surplus_handlers(SName, #state{pending = Pending} = S)
828 when SName == normal; SName == closing ->
829 1021 send_or_store(Pending, S#state{pending = []}).
830
831
832 -spec bosh_unwrap(EventTag :: mod_bosh:event_type(), exml:element(), state())
833 -> {[jlib:xmlstreamel()], state()}.
834 bosh_unwrap(StreamEvent, Body, #state{} = S)
835 when StreamEvent =:= streamstart ->
836 130 Wait = min(get_attr(<<"wait">>, Body, S#state.wait), S#state.max_wait),
837 130 Hold = get_attr(<<"hold">>, Body, S#state.hold),
838 130 ClientAcks = get_client_acks(StreamEvent, Body, S#state.client_acks, S),
839 130 From = exml_query:attr(Body, <<"from">>),
840 130 To = exml_query:attr(Body, <<"to">>),
841 130 E = stream_start(From, To),
842 130 S2 = S#state{wait = Wait, hold = Hold, client_acks = ClientAcks,
843 from = From, to = To},
844 130 {[E], S2};
845
846 bosh_unwrap(StreamEvent, _Body, #state{} = S)
847 when StreamEvent =:= restart ->
848 126 {[stream_start(S#state.from, S#state.to)], S};
849
850 bosh_unwrap(streamend, Body, State) ->
851 123 {Els, NewState} = bosh_unwrap(normal, Body, State),
852 123 {Els ++ [#xmlstreamend{name = <<>>}], NewState};
853
854 bosh_unwrap(normal, Body, #state{sid = Sid} = State) ->
855 1858 Sid = exml_query:attr(Body, <<"sid">>),
856 1858 ?NS_HTTPBIND = exml_query:attr(Body, <<"xmlns">>),
857 1858 {[{xmlstreamelement, El}
858 1858 || El <- Body#xmlel.children,
859 %% Ignore whitespace keepalives.
860 935 El /= #xmlcdata{content = <<" ">>}],
861 State}.
862
863
864 -spec get_client_acks(streamstart, exml:element(), boolean(), #state{}) -> boolean().
865 get_client_acks(streamstart, Element, Default, State) ->
866 130 case exml_query:attr(Element, <<"ack">>) of
867 undefined ->
868 130 Default;
869 <<"1">> ->
870
:-(
true;
871 _ ->
872
:-(
?LOG_INFO(ls(#{what => bosh_socket_ignore_ack,
873
:-(
text => <<"Ignoring invalid client ack on stream start">>}, State)),
874
:-(
false
875 end.
876
877
878 -spec get_attr(Attr :: binary(), exml:element(), integer()) -> any().
879 get_attr(Attr, Element, Default) ->
880 260 case exml_query:attr(Element, Attr) of
881 undefined ->
882
:-(
Default;
883 Value ->
884 260 binary_to_integer(Value)
885 end.
886
887
888 -spec stream_start(binary(), binary()) -> jlib:xmlstreamstart().
889 stream_start(From, To) ->
890 256 #xmlstreamstart{name = <<"stream:stream">>,
891 attrs = [{<<"from">>, From},
892 {<<"to">>, To},
893 {<<"version">>, <<"1.0">>},
894 {<<"xml:lang">>, <<"en">>},
895 {<<"xmlns">>, ?NS_CLIENT},
896 {<<"xmlns:stream">>, ?NS_STREAM}]}.
897
898
899 -spec bosh_wrap([any()], rid(), state()) -> {exml:element(), state()}.
900 bosh_wrap(Elements, Rid, #state{} = S) ->
901 2113 EventsStanzas = lists:partition(fun is_stream_event/1, Elements),
902 2113 {{Body, Children}, NS} = case EventsStanzas of
903 {[], Stanzas} ->
904 1732 {{bosh_body(S), Stanzas}, S};
905 {[#xmlstreamstart{} = StreamStart], Stanzas} ->
906 256 {{bosh_stream_start_body(StreamStart, S), Stanzas}, S};
907 {[#xmlstreamend{}], []} ->
908 %% No stanzas except stream end - OK.
909 125 {{bosh_stream_end_body(), []}, S};
910 {[#xmlstreamend{} = StreamEnd], Stanzas} ->
911 %% Can't wrap remaining stanzas in a stream end body.
912 %% Send Stanzas and forfeit sending stream end.
913
:-(
?LOG_DEBUG(#{what => bosh_socket_cannot_send_stream_end,
914 text => <<"Can't send stream end yet. Still have pending stanzas">>,
915
:-(
stanzas => Stanzas}),
916
:-(
Pending = S#state.pending,
917
:-(
{{bosh_body(S), Stanzas},
918 S#state{pending = Pending ++ [StreamEnd]}}
919 end,
920 2113 MaybeAck = maybe_ack(Rid, NS),
921 2113 {MaybeReport, NNS} = maybe_report(NS),
922 2113 HasStreamPrefix = (exml_query:attr(Body, <<"xmlns:stream">>) /= undefined),
923 2113 MaybeStreamPrefix = maybe_stream_prefix(HasStreamPrefix, Children),
924 2113 ExtraAttrs = MaybeAck ++ MaybeReport ++ MaybeStreamPrefix,
925 2113 {Body#xmlel{attrs = Body#xmlel.attrs ++ ExtraAttrs,
926 children = maybe_add_default_ns_to_children(Children)}, NNS}.
927
928
929 -spec is_stream_event(jlib:xmlstreamel()) -> boolean().
930 is_stream_event(#xmlstreamstart{}) ->
931 256 true;
932 is_stream_event(#xmlstreamend{}) ->
933 125 true;
934 is_stream_event(_) ->
935 1045 false.
936
937
938 %% @doc Bosh body for a session creation response.
939 -spec bosh_stream_start_body(jlib:xmlstreamstart(), state()) -> exml:element().
940 bosh_stream_start_body(#xmlstreamstart{attrs = Attrs}, #state{} = S) ->
941 256 #xmlel{name = <<"body">>,
942 attrs = [{<<"wait">>, integer_to_binary(S#state.wait)},
943 {<<"requests">>,
944 integer_to_binary(?CONCURRENT_REQUESTS)},
945 {<<"hold">>, integer_to_binary(S#state.hold)},
946 {<<"from">>, proplists:get_value(<<"from">>, Attrs)},
947 %% TODO: how to support these with cowboy?
948 {<<"accept">>, <<"deflate, gzip">>},
949 {<<"sid">>, S#state.sid},
950 {<<"xmpp:restartlogic">>, <<"true">>},
951 {<<"xmpp:version">>, <<"1.0">>},
952 {<<"xmlns">>, ?NS_HTTPBIND},
953 {<<"xmlns:xmpp">>, <<"urn:xmpp:xbosh">>},
954 {<<"xmlns:stream">>, ?NS_STREAM}] ++
955 inactivity(S#state.inactivity) ++
956 maxpause(S#state.max_pause) ++
957 %% TODO: shouldn't an ack be sent on restart?
958 server_ack(S#state.server_acks, S#state.rid),
959 children = []}.
960
961
962 -spec inactivity('infinity' | 'undefined' | pos_integer()) -> [{binary(), _}].
963 inactivity(I) ->
964 256 [{<<"inactivity">>, integer_to_binary(I)} || is_integer(I)].
965
966
967 -spec maxpause('undefined' | pos_integer()) -> [{binary(), _}].
968 maxpause(MP) ->
969 256 [{<<"maxpause">>, integer_to_binary(MP)} || is_integer(MP)].
970
971
972 -spec server_ack('false' | 'true' | 'undefined', 'undefined' | rid())
973 -> [{binary(), _}].
974 server_ack(ServerAcks, Rid) ->
975 1208 [{<<"ack">>, integer_to_binary(Rid)} || ServerAcks =:= true].
976
977
978 %% @doc Bosh body for an ordinary stream element(s).
979 -spec bosh_body(state()) -> exml:element().
980 bosh_body(#state{} = S) ->
981 1732 #xmlel{name = <<"body">>,
982 attrs = [{<<"sid">>, S#state.sid},
983 {<<"xmlns">>, ?NS_HTTPBIND}],
984 children = []}.
985
986
987 -spec bosh_stream_end_body() -> exml:element().
988 bosh_stream_end_body() ->
989 125 #xmlel{name = <<"body">>,
990 attrs = [{<<"type">>, <<"terminate">>},
991 {<<"xmlns">>, ?NS_HTTPBIND}],
992 children = []}.
993
994 maybe_stream_prefix(true, _) ->
995 256 [];
996 maybe_stream_prefix(_, Stanzas) ->
997 1857 case lists:any(fun is_stream_prefix/1, Stanzas) of
998 false ->
999 1855 [];
1000 true ->
1001 2 [{<<"xmlns:stream">>, ?NS_STREAM}]
1002 end.
1003
1004 2 is_stream_prefix(#xmlel{name = <<"stream:error">>}) -> true;
1005
:-(
is_stream_prefix(#xmlel{name = <<"stream:features">>}) -> true;
1006 787 is_stream_prefix(_) -> false.
1007
1008 %%--------------------------------------------------------------------
1009 %% ejabberd_socket compatibility
1010 %%--------------------------------------------------------------------
1011
1012 %% @doc Should be negotiated on HTTP level.
1013 -spec starttls(mod_bosh:socket(), _) -> no_return().
1014 starttls(SocketData, TLSOpts) ->
1015
:-(
starttls(SocketData, TLSOpts, <<>>).
1016
1017
1018 -spec starttls(mod_bosh:socket(), _, _) -> no_return().
1019 starttls(_SocketData, _TLSOpts, _Data) ->
1020
:-(
throw({error, negotiate_tls_on_http_level}).
1021
1022
1023 %% @doc Should be negotiated on HTTP level.
1024 -spec compress(mod_bosh:socket()) -> no_return().
1025 compress(SocketData) ->
1026
:-(
compress(SocketData, <<>>, 0).
1027
1028 -spec compress(mod_bosh:socket(), _, integer()) -> no_return().
1029 compress(_SocketData, _Data, _InflateSizeLimit) ->
1030
:-(
throw({error, negotiate_compression_on_http_level}).
1031
1032
1033 -spec send_xml(mod_bosh:socket(), mongoose_transport:send_xml_input()) -> ok.
1034 send_xml(Socket, {xmlstreamelement, XML}) ->
1035 1045 send(Socket, XML);
1036 send_xml(Socket, #xmlstreamstart{} = XML) ->
1037 256 send(Socket, XML);
1038 send_xml(Socket, #xmlstreamend{} = XML) ->
1039 125 send(Socket, XML).
1040
1041
1042 -spec send(mod_bosh:socket(), _) -> 'ok'.
1043 send(#bosh_socket{pid = Pid}, Data) ->
1044 1426 Pid ! {send, Data},
1045 1426 ok.
1046
1047 -spec change_shaper(mod_bosh:socket(), shaper:shaper()) -> mod_bosh:socket().
1048 change_shaper(SocketData, _Shaper) ->
1049 %% TODO: we ignore shapers for now
1050 382 SocketData.
1051
1052
1053 -spec monitor(mod_bosh:socket()) -> reference().
1054 monitor(#bosh_socket{pid = Pid}) ->
1055 130 erlang:monitor(process, Pid).
1056
1057
1058 -spec get_sockmod(mod_bosh:socket()) -> module().
1059 get_sockmod(_SocketData) ->
1060 512 ?MODULE.
1061
1062
1063 -spec close(mod_bosh:socket()) -> 'close'.
1064 close(#bosh_socket{pid = Pid}) ->
1065 130 Pid ! close.
1066
1067 -spec peername(mod_bosh:socket()) -> mongoose_transport:peername_return().
1068 peername(#bosh_socket{peer = Peer}) ->
1069 130 {ok, Peer}.
1070
1071 -spec get_peer_certificate(mod_bosh:socket()) -> mongoose_transport:peercert_return().
1072 get_peer_certificate(#bosh_socket{peercert = undefined}) ->
1073 127 no_peer_cert;
1074 get_peer_certificate(#bosh_socket{peercert = PeerCert}) ->
1075 9 Decoded = public_key:pkix_decode_cert(PeerCert, plain),
1076 9 {ok, Decoded}.
1077
1078 %%--------------------------------------------------------------------
1079 %% Helpers
1080 %%--------------------------------------------------------------------
1081
1082 maybe_add_default_ns_to_children(Children) ->
1083 2113 lists:map(fun maybe_add_default_ns/1, Children).
1084
1085 maybe_add_default_ns(#xmlel{name = Name, attrs = Attrs} = El)
1086 when Name =:= <<"message">>; Name =:= <<"presence">>; Name =:= <<"iq">> ->
1087 661 case xml:get_attr(<<"xmlns">>, Attrs) of
1088 false ->
1089 639 El#xmlel{attrs = [{<<"xmlns">>, ?NS_CLIENT} | Attrs]};
1090 _ ->
1091 22 El
1092 end;
1093 maybe_add_default_ns(El) ->
1094 384 El.
1095
1096 ls(LogMap, State) ->
1097
:-(
S = #{sid => State#state.sid,
1098 c2s_pid => State#state.c2s_pid,
1099 from_jid => State#state.from,
1100 to_jid => State#state.to,
1101 rid => State#state.rid},
1102
:-(
maps:merge(LogMap, ignore_undefined(S)).
1103
1104 ignore_undefined(Map) ->
1105
:-(
maps:filter(fun(_, V) -> V =/= undefined end, Map).
1106
1107 %%--------------------------------------------------------------------
1108 %% Tests
1109 %%--------------------------------------------------------------------
1110
1111 -ifdef(TEST).
1112
1113 -include_lib("eunit/include/eunit.hrl").
1114
1115 cache_up_to_test_() ->
1116 [?_test(?assertEqual( [4, 5], cache_up_to(2, [1, 2, 3, 4, 5]) ))].
1117
1118 -endif.
Line Hits Source