1 |
|
%% @doc An SSE handler for GraphQL subscriptions. |
2 |
|
%% The graphql request is prepared, and then executed. |
3 |
|
%% |
4 |
|
%% 1. The first execution should return 'null' in 'data', and a new Stream in 'aux'. |
5 |
|
%% 2. Then, whenever an Event is received, the prepared GraphQL request |
6 |
|
%% is executed again, this time with the Stream and the Event in the context. |
7 |
|
%% The resolver should then return either 'null' or the processed Event to send to the client. |
8 |
|
%% 3. Upon termination, the request is executed one last time with the 'terminate' event. |
9 |
|
%% This is an opportunity to clean up all stream resources. |
10 |
|
-module(mongoose_graphql_sse_handler). |
11 |
|
|
12 |
|
-behaviour(lasse_handler). |
13 |
|
-export([init/3, |
14 |
|
handle_notify/2, |
15 |
|
handle_info/2, |
16 |
|
handle_error/3, |
17 |
|
terminate/3]). |
18 |
|
|
19 |
|
-include("mongoose.hrl"). |
20 |
|
|
21 |
|
-type req() :: cowboy_req:req(). |
22 |
|
-type state() :: #{atom() => term()}. |
23 |
|
|
24 |
|
-spec init(state(), any(), cowboy_req:req()) -> |
25 |
|
{ok, req(), state()} | |
26 |
|
{shutdown, cowboy:http_status(), cowboy:http_headers(), iodata(), req(), state()}. |
27 |
|
init(State, _LastEvtId, Req) -> |
28 |
15 |
process_flag(trap_exit, true), % needed for 'terminate' to be called |
29 |
15 |
case cowboy_req:method(Req) of |
30 |
|
<<"GET">> -> |
31 |
13 |
case mongoose_graphql_handler:check_auth_header(Req, State) of |
32 |
|
{ok, State2} -> |
33 |
11 |
case mongoose_graphql_handler:gather(Req) of |
34 |
|
{ok, Req2, Decoded} -> |
35 |
9 |
run_request(Decoded, Req2, State2); |
36 |
|
{error, Reason} -> |
37 |
2 |
reply_error(Reason, Req, State) |
38 |
|
end; |
39 |
|
{error, Reason} -> |
40 |
2 |
reply_error(Reason, Req, State) |
41 |
|
end; |
42 |
|
_ -> |
43 |
2 |
{ok, Req, State} % lasse returns 405: Method Not Allowed |
44 |
|
end. |
45 |
|
|
46 |
|
run_request(#{document := undefined}, Req, State) -> |
47 |
2 |
reply_error(make_error(decode, no_query_supplied), Req, State); |
48 |
|
run_request(GQLReq, Req, #{schema_endpoint := EpName, authorized := AuthStatus} = State) -> |
49 |
7 |
Ep = mongoose_graphql:get_endpoint(EpName), |
50 |
7 |
Ctx = maps:get(schema_ctx, State, #{}), |
51 |
7 |
GQLReq2 = GQLReq#{authorized => AuthStatus, ctx => Ctx#{method => sse}}, |
52 |
7 |
case mongoose_graphql:prepare(Ep, GQLReq2) of |
53 |
|
{error, Reason} -> |
54 |
4 |
reply_error(Reason, Req, State); |
55 |
|
{ok, GQLReq3 = #{ctx := Ctx2}} -> |
56 |
3 |
case mongoose_graphql:execute(Ep, GQLReq3) of |
57 |
|
{ok, #{aux := [{stream, Stream}]}} -> |
58 |
2 |
Ctx3 = Ctx2#{stream => Stream}, |
59 |
2 |
{ok, Req, State#{id => 1, ep => Ep, req => GQLReq3#{ctx := Ctx3}}}; |
60 |
|
{ok, Response} -> |
61 |
1 |
Body = mongoose_graphql_response:term_to_json(Response), |
62 |
1 |
{shutdown, 200, #{}, Body, Req, State} |
63 |
|
end |
64 |
|
end. |
65 |
|
|
66 |
|
-spec handle_notify(term(), state()) -> {nosend, state()}. |
67 |
|
handle_notify(Msg, State) -> |
68 |
:-( |
?UNEXPECTED_INFO(Msg), |
69 |
:-( |
{nosend, State}. |
70 |
|
|
71 |
|
-spec handle_info(term(), state()) -> {nosend, state()} | {send, lasse_handler:event(), state()}. |
72 |
|
handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) -> |
73 |
3 |
Ctx1 = Ctx#{event => Event}, |
74 |
3 |
{ok, #{data := Data} = Response} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), |
75 |
3 |
case has_non_null_value(Data) of |
76 |
|
false -> |
77 |
1 |
{nosend, State}; |
78 |
|
true -> |
79 |
2 |
EventData = mongoose_graphql_response:term_to_json(Response), |
80 |
2 |
SseEvent = #{id => integer_to_binary(Id), data => EventData}, |
81 |
2 |
{send, SseEvent, State#{id := Id + 1}} |
82 |
|
end. |
83 |
|
|
84 |
|
%% Check if there is any value that is non-null. Any list is considered non-null. |
85 |
|
has_non_null_value(M) when is_map(M) -> |
86 |
8 |
lists:any(fun has_non_null_value/1, maps:values(M)); |
87 |
3 |
has_non_null_value(V) -> V =/= null. |
88 |
|
|
89 |
|
-spec handle_error(iodata(), term(), state()) -> ok. |
90 |
|
handle_error(Msg, Reason, _State) -> |
91 |
:-( |
?LOG_ERROR(#{what => mongoose_graphql_sse_handler_failed, |
92 |
:-( |
reason => Reason, text => Msg}). |
93 |
|
|
94 |
|
-spec terminate(term(), req(), state()) -> ok. |
95 |
|
terminate(_Reason, _Req, #{ep := Ep, req := Req = #{ctx := Ctx}}) -> |
96 |
2 |
Ctx1 = Ctx#{event => terminate}, |
97 |
2 |
{ok, #{aux := [{stream, closed}]}} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), |
98 |
2 |
ok; |
99 |
|
terminate(_Reason, _Req, #{}) -> |
100 |
11 |
ok. |
101 |
|
|
102 |
|
make_error(Phase, Term) -> |
103 |
2 |
#{error_term => Term, phase => Phase}. |
104 |
|
|
105 |
|
reply_error(Reason, Req, State) -> |
106 |
10 |
{Code, Error} = mongoose_graphql_errors:format_error(Reason), |
107 |
10 |
Body = jiffy:encode(#{errors => [Error]}), |
108 |
10 |
{shutdown, Code, #{}, Body, Req, State}. |