./ct_report/coverage/mongoose_graphql_sse_handler.COVER.html

1 %% @doc An SSE handler for GraphQL subscriptions.
2 %% The graphql request is prepared, and then executed.
3 %%
4 %% 1. The first execution should return 'null' in 'data', and a new Stream in 'aux'.
5 %% 2. Then, whenever an Event is received, the prepared GraphQL request
6 %% is executed again, this time with the Stream and the Event in the context.
7 %% The resolver should then return either 'null' or the processed Event to send to the client.
8 %% 3. Upon termination, the request is executed one last time with the 'terminate' event.
9 %% This is an opportunity to clean up all stream resources.
10 -module(mongoose_graphql_sse_handler).
11
12 -behaviour(lasse_handler).
13 -export([init/3,
14 handle_notify/2,
15 handle_info/2,
16 handle_error/3,
17 terminate/3]).
18
19 -include("mongoose.hrl").
20
21 -type req() :: cowboy_req:req().
22 -type state() :: #{atom() => term()}.
23
24 -spec init(state(), any(), cowboy_req:req()) ->
25 {ok, req(), state()} |
26 {shutdown, cowboy:http_status(), cowboy:http_headers(), iodata(), req(), state()}.
27 init(#{sse_idle_timeout := Timeout} = State, _LastEvtId, Req) ->
28 16 process_flag(trap_exit, true), % needed for 'terminate' to be called
29 16 cowboy_req:cast({set_options, #{
30 idle_timeout => Timeout
31 }}, Req),
32 16 case cowboy_req:method(Req) of
33 <<"GET">> ->
34 14 case mongoose_graphql_handler:check_auth_header(Req, State) of
35 {ok, State2} ->
36 12 case mongoose_graphql_handler:gather(Req) of
37 {ok, Req2, Decoded} ->
38 10 run_request(Decoded, Req2, State2);
39 {error, Reason} ->
40 2 reply_error(Reason, Req, State)
41 end;
42 {error, Reason} ->
43 2 reply_error(Reason, Req, State)
44 end;
45 _ ->
46 2 {ok, Req, State} % lasse returns 405: Method Not Allowed
47 end.
48
49 run_request(#{document := undefined}, Req, State) ->
50 2 reply_error(make_error(decode, no_query_supplied), Req, State);
51 run_request(GQLReq, Req, #{schema_endpoint := EpName, authorized := AuthStatus} = State) ->
52 8 Ep = mongoose_graphql:get_endpoint(EpName),
53 8 Ctx = maps:get(schema_ctx, State, #{}),
54 8 GQLReq2 = GQLReq#{authorized => AuthStatus, ctx => Ctx#{method => sse}},
55 8 case mongoose_graphql:prepare(Ep, GQLReq2) of
56 {error, Reason} ->
57 4 reply_error(Reason, Req, State);
58 {ok, GQLReq3 = #{ctx := Ctx2}} ->
59 4 case mongoose_graphql:execute(Ep, GQLReq3) of
60 {ok, #{aux := [{stream, Stream}]}} ->
61 3 Ctx3 = Ctx2#{stream => Stream},
62 3 {ok, Req, State#{id => 1, ep => Ep, req => GQLReq3#{ctx := Ctx3}}};
63 {ok, Response} ->
64 1 Body = mongoose_graphql_response:term_to_json(Response),
65 1 {shutdown, 200, #{}, Body, Req, State}
66 end
67 end.
68
69 -spec handle_notify(term(), state()) -> {nosend, state()}.
70 handle_notify(Msg, State) ->
71
:-(
?UNEXPECTED_INFO(Msg),
72
:-(
{nosend, State}.
73
74 -spec handle_info(term(), state()) -> {nosend, state()} | {send, lasse_handler:event(), state()}.
75 handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) ->
76 6 Ctx1 = Ctx#{event => Event},
77 6 {ok, #{data := Data} = Response} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}),
78 6 case has_non_null_value(Data) of
79 false ->
80 2 {nosend, State};
81 true ->
82 4 EventData = mongoose_graphql_response:term_to_json(Response),
83 4 SseEvent = #{id => integer_to_binary(Id), data => EventData},
84 4 {send, SseEvent, State#{id := Id + 1}}
85 end.
86
87 %% Check if there is any value that is non-null. Any list is considered non-null.
88 has_non_null_value(M) when is_map(M) ->
89 16 lists:any(fun has_non_null_value/1, maps:values(M));
90 6 has_non_null_value(V) -> V =/= null.
91
92 -spec handle_error(iodata(), term(), state()) -> ok.
93 handle_error(Msg, Reason, _State) ->
94
:-(
?LOG_ERROR(#{what => mongoose_graphql_sse_handler_failed,
95
:-(
reason => Reason, text => Msg}).
96
97 -spec terminate(term(), req(), state()) -> ok.
98 terminate(_Reason, _Req, #{ep := Ep, req := Req = #{ctx := Ctx}}) ->
99 3 Ctx1 = Ctx#{event => terminate},
100 3 {ok, #{aux := [{stream, closed}]}} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}),
101 3 ok;
102 terminate(_Reason, _Req, #{}) ->
103 11 ok.
104
105 make_error(Phase, Term) ->
106 2 #{error_term => Term, phase => Phase}.
107
108 reply_error(Reason, Req, State) ->
109 10 {Code, Error} = mongoose_graphql_errors:format_error(Reason),
110 10 Body = jiffy:encode(#{errors => [Error]}),
111 10 {shutdown, Code, #{}, Body, Req, State}.
Line Hits Source