From c409897f508eedff8ecc6f0860c9379fcc11bf23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 12 Mar 2015 19:22:19 +0100 Subject: Add initial Websocket support All autobahntestsuite tests pass including the permessage-deflate compression tests. Some of the tests pass in a non-strict fashion. They are testing for protocol errors and expect events to happen in a particular order, which is not respected by Gun. Gun fails earlier than is expected due to concurrent processing of frames. The implementation when error occurs during handshake is probably a bit rough at this point. The documentation is also incomplete and/or wrong at this time, though this is the general state of the Gun documentation and will be resolved in a separate commit. --- src/gun_http.erl | 160 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 129 insertions(+), 31 deletions(-) (limited to 'src/gun_http.erl') diff --git a/src/gun_http.erl b/src/gun_http.erl index bd6565c..f2a2d23 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -18,16 +18,19 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/6]). -export([request/7]). +-export([request/8]). -export([data/4]). -export([cancel/2]). +-export([ws_upgrade/7]). -type opts() :: [{version, cow_http:version()}]. -export_type([opts/0]). -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked. +-type websocket_info() :: {websocket, reference(), binary(), [], []}. %% key, extensions, protocols + -record(http_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), @@ -35,7 +38,7 @@ version = 'HTTP/1.1' :: cow_http:version(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), - streams = [] :: [{reference(), boolean()}], %% ref + whether stream is alive + streams = [] :: [{reference() | websocket_info(), boolean()}], %% ref + whether stream is alive in = head :: io(), in_state :: {non_neg_integer(), non_neg_integer()}, out = head :: io() @@ -128,31 +131,40 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, connection=Conn, streams=[{StreamRef, IsAlive}|_]}) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), - In = io_from_headers(Version, Headers), - IsFin = case In of head -> fin; _ -> nofin end, - case IsAlive of - false -> - ok; - true -> - Owner ! {gun_response, self(), StreamRef, - IsFin, Status, Headers}, - ok - end, - Conn2 = if - Conn =:= close -> close; - Version =:= 'HTTP/1.0' -> close; - ClientVersion =:= 'HTTP/1.0' -> close; - true -> conn_from_headers(Headers) - end, - %% We always reset in_state even if not chunked. - if - IsFin =:= fin, Conn2 =:= close -> - close; - IsFin =:= fin -> - handle(Rest2, end_stream(State#http_state{in=In, - in_state={0, 0}, connection=Conn2})); - true -> - handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2}) + case {Status, StreamRef} of + {101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} -> + ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts); + _ -> + In = io_from_headers(Version, Headers), + IsFin = case In of head -> fin; _ -> nofin end, + case IsAlive of + false -> + ok; + true -> + StreamRef2 = case StreamRef of + {websocket, SR, _, _, _} -> SR; + _ -> StreamRef + end, + Owner ! {gun_response, self(), StreamRef2, + IsFin, Status, Headers}, + ok + end, + Conn2 = if + Conn =:= close -> close; + Version =:= 'HTTP/1.0' -> close; + ClientVersion =:= 'HTTP/1.0' -> close; + true -> conn_from_headers(Headers) + end, + %% We always reset in_state even if not chunked. + if + IsFin =:= fin, Conn2 =:= close -> + close; + IsFin =:= fin -> + handle(Rest2, end_stream(State#http_state{in=In, + in_state={0, 0}, connection=Conn2})); + true -> + handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2}) + end end. send_data_if_alive(<<>>, _, nofin) -> @@ -187,27 +199,28 @@ keepalive(State) -> State. request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Path, Headers) -> + out=head}, StreamRef, Method, Host, Port, Path, Headers) -> Headers2 = case Version of 'HTTP/1.0' -> lists:keydelete(<<"transfer-encoding">>, 1, Headers); 'HTTP/1.1' -> Headers end, Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - false -> [{<<"host">>, Host}|Headers2]; + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; true -> Headers2 end, %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Headers2), + %% @todo This should probably also check for content-type like SPDY. Out = io_from_headers(Version, Headers2), Transport:send(Socket, cow_http:request(Method, Path, Version, Headers3)), new_stream(State#http_state{connection=Conn, out=Out}, StreamRef). request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Path, Headers, Body) -> + out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) -> Headers2 = lists:keydelete(<<"content-length">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, Headers)), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - false -> [{<<"host">>, Host}|Headers2]; + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; true -> Headers2 end, %% We use Headers2 because this is the smallest list. @@ -334,3 +347,88 @@ cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> end_stream(State=#http_state{streams=[_|Tail]}) -> State#http_state{in=head, streams=Tail}. + +%% Websocket upgrade. + +%% Ensure version is 1.1. +ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> + error; %% @todo +ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, + StreamRef, Host, Port, Path, Headers, WsOpts) -> + %% @todo Add option for setting protocol. + {ExtHeaders, GunExtensions} = case maps:get(compress, WsOpts, false) of + true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}], + [<<"permessage-deflate">>]}; + false -> {[], []} + end, + Key = cow_ws:key(), + Headers2 = [ + {<<"connection">>, <<"upgrade">>}, + {<<"upgrade">>, <<"websocket">>}, + {<<"sec-websocket-version">>, <<"13">>}, + {<<"sec-websocket-key">>, Key} + |ExtHeaders + ], + Headers3 = case lists:keymember(<<"host">>, 1, Headers) of + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; + true -> Headers2 + end, + Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)), + new_stream(State#http_state{connection=keepalive, out=head}, + {websocket, StreamRef, Key, GunExtensions, [], WsOpts}). + +ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) -> + %% @todo check upgrade, connection + case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of + false -> + close; + {_, Accept} -> + case cow_ws:encode_key(Key) of + Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts); + _ -> close + end + end. + +ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) -> + case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of + false -> + ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols); + {_, ExtHd} -> + case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of + close -> close; + Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols) + end + end. + +ws_validate_extensions([], _, Acc, _) -> + Acc; +ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) -> + case lists:member(Name, GunExts) of + true -> + case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of + {ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts); + error -> close + end; + %% Fail the connection if extension was not requested. + false -> + close + end; +%% Fail the connection on unknown extension. +ws_validate_extensions(_, _, _, _) -> + close. + +%% @todo Validate protocols. +ws_handshake_protocols(Buffer, State, _Headers, Extensions, _GunProtocols = []) -> + Protocols = [], + ws_handshake_end(Buffer, State, Extensions, Protocols). + +ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Extensions, Protocols) -> + %% Send ourselves the remaining buffer, if any. + _ = case Buffer of + <<>> -> + ok; + _ -> + {OK, _, _} = Transport:messages(), + self() ! {OK, Socket, Buffer} + end, + gun_ws:init(Owner, Socket, Transport, Extensions, Protocols). -- cgit v1.2.3