diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 84 | ||||
-rw-r--r-- | src/gun_http.erl | 160 | ||||
-rw-r--r-- | src/gun_ws.erl | 125 |
3 files changed, 307 insertions, 62 deletions
diff --git a/src/gun.erl b/src/gun.erl index 71af26e..3b98732 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -58,6 +58,7 @@ %% Websocket. -export([ws_upgrade/2]). -export([ws_upgrade/3]). +-export([ws_upgrade/4]). -export([ws_send/2]). %% Debug. @@ -85,6 +86,8 @@ | {type, conn_type()}]. -export_type([opts/0]). +-type ws_opts() :: [{compress, boolean()}]. + -record(state, { parent :: pid(), owner :: pid(), @@ -98,7 +101,7 @@ socket :: inet:socket() | ssl:sslsocket(), transport :: module(), protocol :: module(), - proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY and WS too. + proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY too. protocol_state :: any() }). @@ -338,14 +341,19 @@ cancel(ServerPid, StreamRef) -> %% Websocket. --spec ws_upgrade(pid(), iodata()) -> ok. +-spec ws_upgrade(pid(), iodata()) -> reference(). ws_upgrade(ServerPid, Path) -> - ws_upgrade(ServerPid, Path, []). + ws_upgrade(ServerPid, Path, [], []). --spec ws_upgrade(pid(), iodata(), headers()) -> ok. +-spec ws_upgrade(pid(), iodata(), headers()) -> reference(). ws_upgrade(ServerPid, Path, Headers) -> - _ = ServerPid ! {ws_upgrade, self(), Path, Headers}, - ok. + ws_upgrade(ServerPid, Path, Headers, []). + +-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference(). +ws_upgrade(ServerPid, Path, Headers, Opts) -> + StreamRef = make_ref(), + _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts}, + StreamRef. -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> @@ -428,8 +436,10 @@ connect(State=#state{owner=Owner, host=Host, port=Port, type=Type, retry(State, Retries - 1) end. -retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when - is_reference(KeepaliveRef) -> +%% Exit normally if the retry functionality has been disabled. +retry(_, 0) -> + ok; +retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) -> _ = erlang:cancel_timer(KeepaliveRef), %% Flush if we have a keepalive message receive @@ -458,7 +468,7 @@ before_loop(State=#state{keepalive=Keepalive}) -> KeepaliveRef = erlang:send_after(Keepalive, self(), keepalive), loop(State#state{keepalive_ref=KeepaliveRef}). -loop(State=#state{parent=Parent, owner=Owner, host=Host, +loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, retry=Retry, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> {OK, Closed, Error} = Transport:messages(), @@ -470,7 +480,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, Transport:close(Socket), retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); - ProtoState2 -> + {upgrade, Protocol2, ProtoState2} -> + ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); + ProtoState2 -> loop(State#state{protocol_state=ProtoState2}) end; {Closed, Socket} -> @@ -494,11 +506,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, before_loop(State#state{protocol_state=ProtoState2}); {request, Owner, StreamRef, Method, Path, Headers} -> ProtoState2 = Protocol:request(ProtoState, - StreamRef, Method, Host, Path, Headers), + StreamRef, Method, Host, Port, Path, Headers), loop(State#state{protocol_state=ProtoState2}); {request, Owner, StreamRef, Method, Path, Headers, Body} -> ProtoState2 = Protocol:request(ProtoState, - StreamRef, Method, Host, Path, Headers, Body), + StreamRef, Method, Host, Port, Path, Headers, Body), loop(State#state{protocol_state=ProtoState2}); {data, Owner, StreamRef, IsFin, Data} -> ProtoState2 = Protocol:data(ProtoState, @@ -507,11 +519,10 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, {cancel, Owner, StreamRef} -> ProtoState2 = Protocol:cancel(ProtoState, StreamRef), loop(State#state{protocol_state=ProtoState2}); - {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy -> - %% @todo - ProtoState2 = Protocol:ws_upgrade(ProtoState, - Path, Headers), - ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2}); + {ws_upgrade, Owner, StreamRef, Path, Headers, Opts} when Protocol =/= gun_spdy -> + ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, Opts), + loop(State#state{protocol_state=ProtoState2}); + %% @todo can fail if http/1.0 {shutdown, Owner} -> %% @todo Protocol:shutdown? ok; @@ -525,9 +536,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, element(2, Any) ! {gun_error, self(), {notowner, "Operations are restricted to the owner of the connection."}}, loop(State); - {ws_upgrade, _, _, _} -> - Owner ! {gun_error, self(), {badstate, - "Websocket over SPDY isn't supported."}}, + {ws_upgrade, _, StreamRef, _, _} -> + Owner ! {gun_error, self(), StreamRef, {badstate, + "Websocket is only supported over HTTP/1.1."}}, loop(State); {ws_send, _, _} -> Owner ! {gun_error, self(), {badstate, @@ -545,23 +556,34 @@ ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket, ok = Transport:setopts(Socket, [{active, once}]), receive {OK, Socket, Data} -> - ProtoState2 = Protocol:handle(ProtoState, Data), - ws_loop(State#state{protocol_state=ProtoState2}); + case Protocol:handle(Data, ProtoState) of + close -> + Transport:close(Socket), + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + ws_loop(State#state{protocol_state=ProtoState2}) + end; {Closed, Socket} -> Transport:close(Socket), - retry(State#state{socket=undefined, transport=undefined, - protocol=undefined}, Retry); + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); {Error, Socket, _} -> Transport:close(Socket), - retry(State#state{socket=undefined, transport=undefined, - protocol=undefined}, Retry); - %% @todo keepalive - {ws_send, Owner, Frames} when is_list(Frames) -> - todo; %% @todo + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); + %% Ignore any previous HTTP keep-alive. + keepalive -> + ws_loop(State); +% {ws_send, Owner, Frames} when is_list(Frames) -> +% todo; %% @todo {ws_send, Owner, Frame} -> - {todo, Frame}; %% @todo + case Protocol:send(Frame, ProtoState) of + close -> + Transport:close(Socket), + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + ws_loop(State#state{protocol_state=ProtoState2}) + end; {shutdown, Owner} -> - %% @todo Protocol:shutdown? + %% @todo Protocol:shutdown? %% @todo close frame ok; {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], diff --git a/src/gun_http.erl b/src/gun_http.erl index bd6565c..f2a2d23 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -18,16 +18,19 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/6]). -export([request/7]). +-export([request/8]). -export([data/4]). -export([cancel/2]). +-export([ws_upgrade/7]). -type opts() :: [{version, cow_http:version()}]. -export_type([opts/0]). -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked. +-type websocket_info() :: {websocket, reference(), binary(), [], []}. %% key, extensions, protocols + -record(http_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), @@ -35,7 +38,7 @@ version = 'HTTP/1.1' :: cow_http:version(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), - streams = [] :: [{reference(), boolean()}], %% ref + whether stream is alive + streams = [] :: [{reference() | websocket_info(), boolean()}], %% ref + whether stream is alive in = head :: io(), in_state :: {non_neg_integer(), non_neg_integer()}, out = head :: io() @@ -128,31 +131,40 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, connection=Conn, streams=[{StreamRef, IsAlive}|_]}) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), - In = io_from_headers(Version, Headers), - IsFin = case In of head -> fin; _ -> nofin end, - case IsAlive of - false -> - ok; - true -> - Owner ! {gun_response, self(), StreamRef, - IsFin, Status, Headers}, - ok - end, - Conn2 = if - Conn =:= close -> close; - Version =:= 'HTTP/1.0' -> close; - ClientVersion =:= 'HTTP/1.0' -> close; - true -> conn_from_headers(Headers) - end, - %% We always reset in_state even if not chunked. - if - IsFin =:= fin, Conn2 =:= close -> - close; - IsFin =:= fin -> - handle(Rest2, end_stream(State#http_state{in=In, - in_state={0, 0}, connection=Conn2})); - true -> - handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2}) + case {Status, StreamRef} of + {101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} -> + ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts); + _ -> + In = io_from_headers(Version, Headers), + IsFin = case In of head -> fin; _ -> nofin end, + case IsAlive of + false -> + ok; + true -> + StreamRef2 = case StreamRef of + {websocket, SR, _, _, _} -> SR; + _ -> StreamRef + end, + Owner ! {gun_response, self(), StreamRef2, + IsFin, Status, Headers}, + ok + end, + Conn2 = if + Conn =:= close -> close; + Version =:= 'HTTP/1.0' -> close; + ClientVersion =:= 'HTTP/1.0' -> close; + true -> conn_from_headers(Headers) + end, + %% We always reset in_state even if not chunked. + if + IsFin =:= fin, Conn2 =:= close -> + close; + IsFin =:= fin -> + handle(Rest2, end_stream(State#http_state{in=In, + in_state={0, 0}, connection=Conn2})); + true -> + handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2}) + end end. send_data_if_alive(<<>>, _, nofin) -> @@ -187,27 +199,28 @@ keepalive(State) -> State. request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Path, Headers) -> + out=head}, StreamRef, Method, Host, Port, Path, Headers) -> Headers2 = case Version of 'HTTP/1.0' -> lists:keydelete(<<"transfer-encoding">>, 1, Headers); 'HTTP/1.1' -> Headers end, Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - false -> [{<<"host">>, Host}|Headers2]; + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; true -> Headers2 end, %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Headers2), + %% @todo This should probably also check for content-type like SPDY. Out = io_from_headers(Version, Headers2), Transport:send(Socket, cow_http:request(Method, Path, Version, Headers3)), new_stream(State#http_state{connection=Conn, out=Out}, StreamRef). request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Path, Headers, Body) -> + out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) -> Headers2 = lists:keydelete(<<"content-length">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, Headers)), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - false -> [{<<"host">>, Host}|Headers2]; + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; true -> Headers2 end, %% We use Headers2 because this is the smallest list. @@ -334,3 +347,88 @@ cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> end_stream(State=#http_state{streams=[_|Tail]}) -> State#http_state{in=head, streams=Tail}. + +%% Websocket upgrade. + +%% Ensure version is 1.1. +ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> + error; %% @todo +ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, + StreamRef, Host, Port, Path, Headers, WsOpts) -> + %% @todo Add option for setting protocol. + {ExtHeaders, GunExtensions} = case maps:get(compress, WsOpts, false) of + true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}], + [<<"permessage-deflate">>]}; + false -> {[], []} + end, + Key = cow_ws:key(), + Headers2 = [ + {<<"connection">>, <<"upgrade">>}, + {<<"upgrade">>, <<"websocket">>}, + {<<"sec-websocket-version">>, <<"13">>}, + {<<"sec-websocket-key">>, Key} + |ExtHeaders + ], + Headers3 = case lists:keymember(<<"host">>, 1, Headers) of + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; + true -> Headers2 + end, + Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)), + new_stream(State#http_state{connection=keepalive, out=head}, + {websocket, StreamRef, Key, GunExtensions, [], WsOpts}). + +ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) -> + %% @todo check upgrade, connection + case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of + false -> + close; + {_, Accept} -> + case cow_ws:encode_key(Key) of + Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts); + _ -> close + end + end. + +ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) -> + case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of + false -> + ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols); + {_, ExtHd} -> + case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of + close -> close; + Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols) + end + end. + +ws_validate_extensions([], _, Acc, _) -> + Acc; +ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) -> + case lists:member(Name, GunExts) of + true -> + case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of + {ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts); + error -> close + end; + %% Fail the connection if extension was not requested. + false -> + close + end; +%% Fail the connection on unknown extension. +ws_validate_extensions(_, _, _, _) -> + close. + +%% @todo Validate protocols. +ws_handshake_protocols(Buffer, State, _Headers, Extensions, _GunProtocols = []) -> + Protocols = [], + ws_handshake_end(Buffer, State, Extensions, Protocols). + +ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Extensions, Protocols) -> + %% Send ourselves the remaining buffer, if any. + _ = case Buffer of + <<>> -> + ok; + _ -> + {OK, _, _} = Transport:messages(), + self() ! {OK, Socket, Buffer} + end, + gun_ws:init(Owner, Socket, Transport, Extensions, Protocols). diff --git a/src/gun_ws.erl b/src/gun_ws.erl new file mode 100644 index 0000000..5379362 --- /dev/null +++ b/src/gun_ws.erl @@ -0,0 +1,125 @@ +%% Copyright (c) 2015, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_ws). + +-export([init/5]). +-export([handle/2]). +-export([send/2]). + +-record(payload, { + type = undefined :: cow_ws:frame_type(), + rsv = undefined :: cow_ws:rsv(), + len = undefined :: non_neg_integer(), + mask_key = undefined :: cow_ws:mask_key(), + close_code = undefined :: undefined | cow_ws:close_code(), + unmasked = <<>> :: binary(), + unmasked_len = 0 :: non_neg_integer() +}). + +-record(ws_state, { + owner :: pid(), + socket :: inet:socket() | ssl:sslsocket(), + transport :: module(), + buffer = <<>> :: binary(), + in = head :: head | #payload{} | close, + frag_state = undefined :: cow_ws:frag_state(), + frag_buffer = <<>> :: binary(), + utf8_state = 0 :: cow_ws:utf8_state(), + extensions = #{} :: cow_ws:extensions() +}). + +%% @todo Protocols +init(Owner, Socket, Transport, Extensions, _Protocols) -> + Owner ! {gun_ws_upgrade, self(), ok}, + {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions}}. + +%% Do not handle anything if we received a close frame. +handle(_, State=#ws_state{in=close}) -> + State; +%% Shortcut for common case when Data is empty after processing a frame. +handle(<<>>, State=#ws_state{in=head}) -> + State; +handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) -> + Data2 = << Buffer/binary, Data/binary >>, + case cow_ws:parse_header(Data2, Extensions, FragState) of + {Type, FragState2, Rsv, Len, MaskKey, Rest} -> + handle(Rest, State#ws_state{buffer= <<>>, + in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2}); + more -> + State#ws_state{buffer=Data2}; + error -> + close({error, badframe}, State) + end; +handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, + unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, utf8_state=Utf8State, extensions=Extensions}) -> + case cow_ws:parse_payload(Data, MaskKey, Utf8State, UnmaskedLen, Type, Len, FragState, Extensions, Rsv) of + {ok, CloseCode2, Payload, Utf8State2, Rest} -> + dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode2); + {ok, Payload, Utf8State2, Rest} -> + dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode); + {more, CloseCode2, Payload, Utf8State2} -> + State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>, + len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}; + {more, Payload, Utf8State2} -> + State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>, + len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}; + Error = {error, _Reason} -> + close(Error, State) + end. + +dispatch(Rest, State=#ws_state{owner=Owner, frag_state=FragState, frag_buffer=SoFar}, + Type0, Payload0, CloseCode0) -> + case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of + {fragment, nofin, _, Payload} -> + handle(Rest, State#ws_state{frag_buffer= << SoFar/binary, Payload/binary >>}); + {fragment, fin, Type, Payload} -> + Owner ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}}, + handle(Rest, State#ws_state{frag_state=undefined, frag_buffer= <<>>}); + ping -> + State2 = send(pong, State), + handle(Rest, State2); + {ping, Payload} -> + State2 = send({pong, Payload}, State), + handle(Rest, State2); + pong -> + handle(Rest, State); + {pong, _} -> + handle(Rest, State); + Frame -> + Owner ! {gun_ws, self(), Frame}, + case Frame of + close -> handle(Rest, State#ws_state{in=close}); + {close, _, _} -> handle(Rest, State#ws_state{in=close}); + _ -> handle(Rest, State) + end + end. + +close(Reason, State) -> + case Reason of + Normal when Normal =:= stop; Normal =:= timeout -> + send({close, 1000, <<>>}, State); + {error, badframe} -> + send({close, 1002, <<>>}, State); + {error, badencoding} -> + send({close, 1007, <<>>}, State) + end. + +send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Extensions}) -> + Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), + case Frame of + close -> close; + {close, _, _} -> close; + _ -> State + end. |