1 |
|
%% @doc An SSE handler for GraphQL subscriptions. |
2 |
|
%% The graphql request is prepared, and then executed. |
3 |
|
%% |
4 |
|
%% 1. The first execution should return 'null' in 'data', and a new Stream in 'aux'. |
5 |
|
%% 2. Then, whenever an Event is received, the prepared GraphQL request |
6 |
|
%% is executed again, this time with the Stream and the Event in the context. |
7 |
|
%% The resolver should then return either 'null' or the processed Event to send to the client. |
8 |
|
%% 3. Upon termination, the request is executed one last time with the 'terminate' event. |
9 |
|
%% This is an opportunity to clean up all stream resources. |
10 |
|
-module(mongoose_graphql_sse_handler). |
11 |
|
|
12 |
|
-behaviour(lasse_handler). |
13 |
|
-export([init/3, |
14 |
|
handle_notify/2, |
15 |
|
handle_info/2, |
16 |
|
handle_error/3, |
17 |
|
terminate/3]). |
18 |
|
|
19 |
|
-include("mongoose.hrl"). |
20 |
|
|
21 |
|
-type req() :: cowboy_req:req(). |
22 |
|
-type state() :: #{atom() => term()}. |
23 |
|
|
24 |
|
-spec init(state(), any(), cowboy_req:req()) -> |
25 |
|
{ok, req(), state()} | |
26 |
|
{shutdown, cowboy:http_status(), cowboy:http_headers(), iodata(), req(), state()}. |
27 |
|
init(#{sse_idle_timeout := Timeout} = State, _LastEvtId, Req) -> |
28 |
16 |
process_flag(trap_exit, true), % needed for 'terminate' to be called |
29 |
16 |
cowboy_req:cast({set_options, #{ |
30 |
|
idle_timeout => Timeout |
31 |
|
}}, Req), |
32 |
16 |
case cowboy_req:method(Req) of |
33 |
|
<<"GET">> -> |
34 |
14 |
case mongoose_graphql_handler:check_auth_header(Req, State) of |
35 |
|
{ok, State2} -> |
36 |
12 |
case mongoose_graphql_handler:gather(Req) of |
37 |
|
{ok, Req2, Decoded} -> |
38 |
10 |
run_request(Decoded, Req2, State2); |
39 |
|
{error, Reason} -> |
40 |
2 |
reply_error(Reason, Req, State) |
41 |
|
end; |
42 |
|
{error, Reason} -> |
43 |
2 |
reply_error(Reason, Req, State) |
44 |
|
end; |
45 |
|
_ -> |
46 |
2 |
{ok, Req, State} % lasse returns 405: Method Not Allowed |
47 |
|
end. |
48 |
|
|
49 |
|
run_request(#{document := undefined}, Req, State) -> |
50 |
2 |
reply_error(make_error(decode, no_query_supplied), Req, State); |
51 |
|
run_request(GQLReq, Req, #{schema_endpoint := EpName, authorized := AuthStatus} = State) -> |
52 |
8 |
Ep = mongoose_graphql:get_endpoint(EpName), |
53 |
8 |
Ctx = maps:get(schema_ctx, State, #{}), |
54 |
8 |
GQLReq2 = GQLReq#{authorized => AuthStatus, ctx => Ctx#{method => sse}}, |
55 |
8 |
case mongoose_graphql:prepare(Ep, GQLReq2) of |
56 |
|
{error, Reason} -> |
57 |
4 |
reply_error(Reason, Req, State); |
58 |
|
{ok, GQLReq3 = #{ctx := Ctx2}} -> |
59 |
4 |
case mongoose_graphql:execute(Ep, GQLReq3) of |
60 |
|
{ok, #{aux := [{stream, Stream}]}} -> |
61 |
3 |
Ctx3 = Ctx2#{stream => Stream}, |
62 |
3 |
{ok, Req, State#{id => 1, ep => Ep, req => GQLReq3#{ctx := Ctx3}}}; |
63 |
|
{ok, Response} -> |
64 |
1 |
Body = mongoose_graphql_response:term_to_json(Response), |
65 |
1 |
{shutdown, 200, #{}, Body, Req, State} |
66 |
|
end |
67 |
|
end. |
68 |
|
|
69 |
|
-spec handle_notify(term(), state()) -> {nosend, state()}. |
70 |
|
handle_notify(Msg, State) -> |
71 |
:-( |
?UNEXPECTED_INFO(Msg), |
72 |
:-( |
{nosend, State}. |
73 |
|
|
74 |
|
-spec handle_info(term(), state()) -> {nosend, state()} | {send, lasse_handler:event(), state()}. |
75 |
|
handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) -> |
76 |
5 |
Ctx1 = Ctx#{event => Event}, |
77 |
5 |
{ok, #{data := Data} = Response} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), |
78 |
5 |
case has_non_null_value(Data) of |
79 |
|
false -> |
80 |
1 |
{nosend, State}; |
81 |
|
true -> |
82 |
4 |
EventData = mongoose_graphql_response:term_to_json(Response), |
83 |
4 |
SseEvent = #{id => integer_to_binary(Id), data => EventData}, |
84 |
4 |
{send, SseEvent, State#{id := Id + 1}} |
85 |
|
end. |
86 |
|
|
87 |
|
%% Check if there is any value that is non-null. Any list is considered non-null. |
88 |
|
has_non_null_value(M) when is_map(M) -> |
89 |
14 |
lists:any(fun has_non_null_value/1, maps:values(M)); |
90 |
5 |
has_non_null_value(V) -> V =/= null. |
91 |
|
|
92 |
|
-spec handle_error(iodata(), term(), state()) -> ok. |
93 |
|
handle_error(Msg, Reason, _State) -> |
94 |
:-( |
?LOG_ERROR(#{what => mongoose_graphql_sse_handler_failed, |
95 |
:-( |
reason => Reason, text => Msg}). |
96 |
|
|
97 |
|
-spec terminate(term(), req(), state()) -> ok. |
98 |
|
terminate(_Reason, _Req, #{ep := Ep, req := Req = #{ctx := Ctx}}) -> |
99 |
3 |
Ctx1 = Ctx#{event => terminate}, |
100 |
3 |
{ok, #{aux := [{stream, closed}]}} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), |
101 |
3 |
ok; |
102 |
|
terminate(_Reason, _Req, #{}) -> |
103 |
11 |
ok. |
104 |
|
|
105 |
|
make_error(Phase, Term) -> |
106 |
2 |
#{error_term => Term, phase => Phase}. |
107 |
|
|
108 |
|
reply_error(Reason, Req, State) -> |
109 |
10 |
{Code, Error} = mongoose_graphql_errors:format_error(Reason), |
110 |
10 |
Body = jiffy:encode(#{errors => [Error]}), |
111 |
10 |
{shutdown, Code, #{}, Body, Req, State}. |