diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy.erl | 1 | ||||
-rw-r--r-- | src/cowboy_client.erl | 270 | ||||
-rw-r--r-- | src/cowboy_dispatcher.erl | 2 | ||||
-rw-r--r-- | src/cowboy_http.erl | 8 | ||||
-rw-r--r-- | src/cowboy_http_protocol.erl | 50 | ||||
-rw-r--r-- | src/cowboy_http_req.erl | 152 | ||||
-rw-r--r-- | src/cowboy_http_websocket.erl | 31 | ||||
-rw-r--r-- | src/cowboy_ssl_transport.erl | 7 | ||||
-rw-r--r-- | src/cowboy_tcp_transport.erl | 6 |
9 files changed, 416 insertions, 111 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index 7963df2..1097197 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -51,7 +51,6 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) Transport, TransOpts, Protocol, ProtoOpts)). %% @doc Stop a listener identified by <em>Ref</em>. -%% @todo Currently request processes aren't terminated with the listener. -spec stop_listener(any()) -> ok | {error, not_found}. stop_listener(Ref) -> case supervisor:terminate_child(cowboy_sup, {cowboy_listener_sup, Ref}) of diff --git a/src/cowboy_client.erl b/src/cowboy_client.erl new file mode 100644 index 0000000..e46619f --- /dev/null +++ b/src/cowboy_client.erl @@ -0,0 +1,270 @@ +%% Copyright (c) 2012, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(cowboy_client). + +-export([init/1]). +-export([state/1]). +-export([transport/1]). + +-export([connect/4]). +-export([raw_request/2]). +-export([request/3]). +-export([request/4]). +-export([request/5]). +-export([response/1]). +-export([response_body/1]). +-export([skip_body/1]). +-export([stream_status/1]). +-export([stream_headers/1]). +-export([stream_header/1]). +-export([stream_body/1]). + +-record(client, { + state = wait :: wait | request | response | response_body, + opts = [] :: [any()], + socket = undefined :: undefined | inet:socket(), + transport = undefined :: module(), + timeout = 5000 :: timeout(), %% @todo Configurable. + buffer = <<>> :: binary(), + connection = keepalive :: keepalive | close, + version = {1, 1} :: cowboy_http:version(), + response_body = undefined :: undefined | non_neg_integer() +}). + +init(Opts) -> + {ok, #client{opts=Opts}}. + +state(#client{state=State}) -> + State. + +transport(#client{socket=undefined}) -> + {error, notconnected}; +transport(#client{transport=Transport, socket=Socket}) -> + {ok, Transport, Socket}. + +connect(Transport, Host, Port, Client) + when is_binary(Host) -> + connect(Transport, binary_to_list(Host), Port, Client); +connect(Transport, Host, Port, Client=#client{state=State, opts=Opts}) + when is_atom(Transport), is_list(Host), + is_integer(Port), is_record(Client, client), + State =:= wait -> + {ok, Socket} = Transport:connect(Host, Port, Opts), + {ok, Client#client{state=request, socket=Socket, transport=Transport}}. + +raw_request(Data, Client=#client{state=response_body}) -> + {done, Client2} = skip_body(Client), + raw_request(Data, Client2); +raw_request(Data, Client=#client{ + state=State, socket=Socket, transport=Transport}) + when State =:= request -> + ok = Transport:send(Socket, Data), + {ok, Client}. + +request(Method, URL, Client) -> + request(Method, URL, [], <<>>, Client). + +request(Method, URL, Headers, Client) -> + request(Method, URL, Headers, <<>>, Client). + +request(Method, URL, Headers, Body, Client=#client{state=response_body}) -> + {done, Client2} = skip_body(Client), + request(Method, URL, Headers, Body, Client2); +request(Method, URL, Headers, Body, Client=#client{ + state=State, version=Version}) + when State =:= wait; State =:= request -> + {Transport, FullHost, Host, Port, Path} = parse_url(URL), + {ok, Client2} = case State of + wait -> connect(Transport, Host, Port, Client); + request -> {ok, Client} + end, + VersionBin = cowboy_http:version_to_binary(Version), + %% @todo do keepalive too, allow override... + Headers2 = [ + {<<"host">>, FullHost}, + {<<"user-agent">>, <<"Cow">>} + |Headers], + Headers3 = case iolist_size(Body) of + 0 -> Headers2; + Length -> [{<<"content-length">>, integer_to_list(Length)}|Headers2] + end, + HeadersData = [[Name, <<": ">>, Value, <<"\r\n">>] + || {Name, Value} <- Headers3], + Data = [Method, <<" ">>, Path, <<" ">>, VersionBin, <<"\r\n">>, + HeadersData, <<"\r\n">>, Body], + raw_request(Data, Client2). + +parse_url(<< "https://", Rest/binary >>) -> + parse_url(Rest, cowboy_ssl_transport); +parse_url(<< "http://", Rest/binary >>) -> + parse_url(Rest, cowboy_tcp_transport); +parse_url(URL) -> + parse_url(URL, cowboy_tcp_transport). + +parse_url(URL, Transport) -> + case binary:split(URL, <<"/">>) of + [Peer] -> + {Host, Port} = parse_peer(Peer, Transport), + {Transport, Peer, Host, Port, <<"/">>}; + [Peer, Path] -> + {Host, Port} = parse_peer(Peer, Transport), + {Transport, Peer, Host, Port, [<<"/">>, Path]} + end. + +parse_peer(Peer, Transport) -> + case binary:split(Peer, <<":">>) of + [Host] when Transport =:= cowboy_tcp_transport -> + {binary_to_list(Host), 80}; + [Host] when Transport =:= cowboy_ssl_transport -> + {binary_to_list(Host), 443}; + [Host, Port] -> + {binary_to_list(Host), list_to_integer(binary_to_list(Port))} + end. + +response(Client=#client{state=response_body}) -> + {done, Client2} = skip_body(Client), + response(Client2); +response(Client=#client{state=request}) -> + case stream_status(Client) of + {ok, Status, _, Client2} -> + case stream_headers(Client2) of + {ok, Headers, Client3} -> + {ok, Status, Headers, Client3}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +response_body(Client=#client{state=response_body}) -> + response_body_loop(Client, <<>>). + +response_body_loop(Client, Acc) -> + case stream_body(Client) of + {ok, Data, Client2} -> + response_body_loop(Client2, << Acc/binary, Data/binary >>); + {done, Client2} -> + {ok, Acc, Client2}; + {error, Reason} -> + {error, Reason} + end. + +skip_body(Client=#client{state=response_body}) -> + case stream_body(Client) of + {ok, _, Client2} -> skip_body(Client2); + Done -> Done + end. + +stream_status(Client=#client{state=State, buffer=Buffer}) + when State =:= request -> + case binary:split(Buffer, <<"\r\n">>) of + [Line, Rest] -> + parse_status(Client#client{state=response, buffer=Rest}, Line); + _ -> + case recv(Client) of + {ok, Data} -> + Buffer2 = << Buffer/binary, Data/binary >>, + stream_status(Client#client{buffer=Buffer2}); + {error, Reason} -> + {error, Reason} + end + end. + +parse_status(Client, << "HTTP/", High, ".", Low, " ", + S3, S2, S1, " ", StatusStr/binary >>) + when High >= $0, High =< $9, Low >= $0, Low =< $9, + S3 >= $0, S3 =< $9, S2 >= $0, S2 =< $9, S1 >= $0, S1 =< $9 -> + Version = {High - $0, Low - $0}, + Status = (S3 - $0) * 100 + (S2 - $0) * 10 + S1 - $0, + {ok, Status, StatusStr, Client#client{version=Version}}. + +stream_headers(Client=#client{state=State}) + when State =:= response -> + stream_headers(Client, []). + +stream_headers(Client, Acc) -> + case stream_header(Client) of + {ok, Name, Value, Client2} -> + stream_headers(Client2, [{Name, Value}|Acc]); + {done, Client2} -> + {ok, Acc, Client2}; + {error, Reason} -> + {error, Reason} + end. + +stream_header(Client=#client{state=State, buffer=Buffer, + response_body=RespBody}) when State =:= response -> + case binary:split(Buffer, <<"\r\n">>) of + [<<>>, Rest] -> + %% If we have a body, set response_body. + Client2 = case RespBody of + undefined -> Client#client{state=request}; + 0 -> Client#client{state=request}; + _ -> Client#client{state=response_body} + end, + {done, Client2#client{buffer=Rest}}; + [Line, Rest] -> + %% @todo Do a better parsing later on. + [Name, Value] = binary:split(Line, <<": ">>), + Name2 = cowboy_bstr:to_lower(Name), + Client2 = case Name2 of + <<"content-length">> -> + Length = list_to_integer(binary_to_list(Value)), + if Length >= 0 -> ok end, + Client#client{response_body=Length}; + _ -> + Client + end, + {ok, Name2, Value, Client2#client{buffer=Rest}}; + _ -> + case recv(Client) of + {ok, Data} -> + Buffer2 = << Buffer/binary, Data/binary >>, + stream_header(Client#client{buffer=Buffer2}); + {error, Reason} -> + {error, Reason} + end + end. + +stream_body(Client=#client{state=response_body, response_body=RespBody}) + when RespBody =:= undefined; RespBody =:= 0 -> + {done, Client#client{state=request, response_body=undefined}}; +stream_body(Client=#client{state=response_body, buffer=Buffer, + response_body=Length}) when is_integer(Length) -> + case byte_size(Buffer) of + 0 -> + case recv(Client) of + {ok, Body} when byte_size(Body) =< Length -> + Length2 = Length - byte_size(Body), + {ok, Body, Client#client{response_body=Length2}}; + {ok, Data} -> + << Body:Length/binary, Rest/binary >> = Data, + {ok, Body, Client#client{buffer=Rest, + response_body=undefined}}; + {error, Reason} -> + {error, Reason} + end; + N when N =< Length -> + Length2 = Length - N, + {ok, Buffer, Client#client{buffer= <<>>, response_body=Length2}}; + _ -> + << Body:Length/binary, Rest/binary >> = Buffer, + {ok, Body, Client#client{buffer=Rest, response_body=undefined}} + end. + +recv(#client{socket=Socket, transport=Transport, timeout=Timeout}) -> + Transport:recv(Socket, 0, Timeout). diff --git a/src/cowboy_dispatcher.erl b/src/cowboy_dispatcher.erl index db40e63..6de8b49 100644 --- a/src/cowboy_dispatcher.erl +++ b/src/cowboy_dispatcher.erl @@ -215,6 +215,8 @@ split_path_test_() -> {<<"?">>, [], <<"">>, <<"">>}, {<<"???">>, [], <<"">>, <<"??">>}, {<<"/">>, [], <<"/">>, <<"">>}, + {<<"/extend//cowboy">>, [<<"extend">>, <<>>, <<"cowboy">>], + <<"/extend//cowboy">>, <<>>}, {<<"/users">>, [<<"users">>], <<"/users">>, <<"">>}, {<<"/users?">>, [<<"users">>], <<"/users">>, <<"">>}, {<<"/users?a">>, [<<"users">>], <<"/users">>, <<"a">>}, diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 7bea8e0..f8d3314 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -26,7 +26,8 @@ -export([te_chunked/2, te_identity/2, ce_identity/1]). %% Interpretation. --export([connection_to_atom/1, urldecode/1, urldecode/2, urlencode/1, +-export([connection_to_atom/1, version_to_binary/1, + urldecode/1, urldecode/2, urlencode/1, urlencode/2, x_www_form_urlencoded/2]). -type method() :: 'OPTIONS' | 'GET' | 'HEAD' @@ -781,6 +782,11 @@ connection_to_atom([<<"close">>|_Tail]) -> connection_to_atom([_Any|Tail]) -> connection_to_atom(Tail). +%% @doc Convert an HTTP version tuple to its binary form. +-spec version_to_binary(version()) -> binary(). +version_to_binary({1, 1}) -> <<"HTTP/1.1">>; +version_to_binary({1, 0}) -> <<"HTTP/1.0">>. + %% @doc Decode a URL encoded binary. %% @equiv urldecode(Bin, crash) -spec urldecode(binary()) -> binary(). diff --git a/src/cowboy_http_protocol.erl b/src/cowboy_http_protocol.erl index 04abfbc..9e1ad88 100644 --- a/src/cowboy_http_protocol.erl +++ b/src/cowboy_http_protocol.erl @@ -48,6 +48,8 @@ dispatch :: cowboy_dispatcher:dispatch_rules(), handler :: {module(), any()}, onrequest :: undefined | fun((#http_req{}) -> #http_req{}), + onresponse = undefined :: undefined | fun((cowboy_http:status(), + cowboy_http:headers(), #http_req{}) -> #http_req{}), urldecode :: {fun((binary(), T) -> binary()), T}, req_empty_lines = 0 :: integer(), max_empty_lines :: integer(), @@ -79,6 +81,7 @@ init(ListenerPid, Socket, Transport, Opts) -> MaxKeepalive = proplists:get_value(max_keepalive, Opts, infinity), MaxLineLength = proplists:get_value(max_line_length, Opts, 4096), OnRequest = proplists:get_value(onrequest, Opts), + OnResponse = proplists:get_value(onresponse, Opts), Timeout = proplists:get_value(timeout, Opts, 5000), URLDecDefault = {fun cowboy_http:urldecode/2, crash}, URLDec = proplists:get_value(urldecode, Opts, URLDecDefault), @@ -86,7 +89,8 @@ init(ListenerPid, Socket, Transport, Opts) -> wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_line_length=MaxLineLength, - timeout=Timeout, onrequest=OnRequest, urldecode=URLDec}). + timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse, + urldecode=URLDec}). %% @private -spec parse_request(#state{}) -> ok. @@ -121,19 +125,28 @@ request({http_request, Method, {absoluteURI, _Scheme, _Host, _Port, Path}, request({http_request, Method, {abs_path, Path}, Version}, State); request({http_request, Method, {abs_path, AbsPath}, Version}, State=#state{socket=Socket, transport=Transport, - urldecode={URLDecFun, URLDecArg}=URLDec}) -> + req_keepalive=Keepalive, max_keepalive=MaxKeepalive, + onresponse=OnResponse, urldecode={URLDecFun, URLDecArg}=URLDec}) -> URLDecode = fun(Bin) -> URLDecFun(Bin, URLDecArg) end, {Path, RawPath, Qs} = cowboy_dispatcher:split_path(AbsPath, URLDecode), - ConnAtom = version_to_connection(Version), + ConnAtom = if Keepalive < MaxKeepalive -> version_to_connection(Version); + true -> close + end, parse_header(#http_req{socket=Socket, transport=Transport, connection=ConnAtom, pid=self(), method=Method, version=Version, - path=Path, raw_path=RawPath, raw_qs=Qs, urldecode=URLDec}, State); + path=Path, raw_path=RawPath, raw_qs=Qs, onresponse=OnResponse, + urldecode=URLDec}, State); request({http_request, Method, '*', Version}, - State=#state{socket=Socket, transport=Transport, urldecode=URLDec}) -> - ConnAtom = version_to_connection(Version), + State=#state{socket=Socket, transport=Transport, + req_keepalive=Keepalive, max_keepalive=MaxKeepalive, + onresponse=OnResponse, urldecode=URLDec}) -> + ConnAtom = if Keepalive < MaxKeepalive -> version_to_connection(Version); + true -> close + end, parse_header(#http_req{socket=Socket, transport=Transport, connection=ConnAtom, pid=self(), method=Method, version=Version, - path='*', raw_path= <<"*">>, raw_qs= <<>>, urldecode=URLDec}, State); + path='*', raw_path= <<"*">>, raw_qs= <<>>, onresponse=OnResponse, + urldecode=URLDec}, State); request({http_request, _Method, _URI, _Version}, State) -> error_terminate(501, State); request({http_error, <<"\r\n">>}, @@ -186,7 +199,9 @@ header({http_header, _I, 'Host', _R, RawHost}, Req=#http_req{ header({http_header, _I, 'Host', _R, _V}, Req, State) -> parse_header(Req, State); header({http_header, _I, 'Connection', _R, Connection}, - Req=#http_req{headers=Headers}, State) -> + Req=#http_req{headers=Headers}, State=#state{ + req_keepalive=Keepalive, max_keepalive=MaxKeepalive}) + when Keepalive < MaxKeepalive -> Req2 = Req#http_req{headers=[{'Connection', Connection}|Headers]}, {ConnTokens, Req3} = cowboy_http_req:parse_header('Connection', Req2), @@ -316,16 +331,15 @@ handler_loop_timeout(State=#state{loop_timeout=Timeout, loop_timeout_ref=PrevRef}) -> _ = case PrevRef of undefined -> ignore; PrevRef -> erlang:cancel_timer(PrevRef) end, - TRef = make_ref(), - erlang:send_after(Timeout, self(), {?MODULE, timeout, TRef}), + TRef = erlang:start_timer(Timeout, self(), ?MODULE), State#state{loop_timeout_ref=TRef}. -spec handler_loop(any(), #http_req{}, #state{}) -> ok. handler_loop(HandlerState, Req, State=#state{loop_timeout_ref=TRef}) -> receive - {?MODULE, timeout, TRef} -> + {timeout, TRef, ?MODULE} -> terminate_request(HandlerState, Req, State); - {?MODULE, timeout, OlderTRef} when is_reference(OlderTRef) -> + {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> handler_loop(HandlerState, Req, State); Message -> handler_call(HandlerState, Req, State, Message) @@ -376,15 +390,14 @@ terminate_request(HandlerState, Req, State) -> next_request(Req, State, HandlerRes). -spec next_request(#http_req{}, #state{}, any()) -> ok. -next_request(Req=#http_req{connection=Conn}, - State=#state{req_keepalive=Keepalive, max_keepalive=MaxKeepalive}, - HandlerRes) -> +next_request(Req=#http_req{connection=Conn}, State=#state{ + req_keepalive=Keepalive}, HandlerRes) -> RespRes = ensure_response(Req), {BodyRes, Buffer} = ensure_body_processed(Req), %% Flush the resp_sent message before moving on. receive {cowboy_http_req, resp_sent} -> ok after 0 -> ok end, case {HandlerRes, BodyRes, RespRes, Conn} of - {ok, ok, ok, keepalive} when Keepalive < MaxKeepalive -> + {ok, ok, ok, keepalive} -> ?MODULE:parse_request(State#state{ buffer=Buffer, req_empty_lines=0, req_keepalive=Keepalive + 1}); @@ -425,12 +438,13 @@ ensure_response(#http_req{socket=Socket, transport=Transport, %% Only send an error reply if there is no resp_sent message. -spec error_terminate(cowboy_http:status(), #state{}) -> ok. -error_terminate(Code, State=#state{socket=Socket, transport=Transport}) -> +error_terminate(Code, State=#state{socket=Socket, transport=Transport, + onresponse=OnResponse}) -> receive {cowboy_http_req, resp_sent} -> ok after 0 -> _ = cowboy_http_req:reply(Code, #http_req{ - socket=Socket, transport=Transport, + socket=Socket, transport=Transport, onresponse=OnResponse, connection=close, pid=self(), resp_state=waiting}), ok end, diff --git a/src/cowboy_http_req.erl b/src/cowboy_http_req.erl index a6e8834..8f1f789 100644 --- a/src/cowboy_http_req.erl +++ b/src/cowboy_http_req.erl @@ -456,10 +456,11 @@ stream_body(Req=#http_req{body_state=done}) -> -spec stream_body_recv(#http_req{}) -> {ok, binary(), #http_req{}} | {error, atom()}. -stream_body_recv(Req=#http_req{transport=Transport, socket=Socket}) -> +stream_body_recv(Req=#http_req{ + transport=Transport, socket=Socket, buffer=Buffer}) -> %% @todo Allow configuring the timeout. case Transport:recv(Socket, 0, 5000) of - {ok, Data} -> transfer_decode(Data, Req); + {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, Req); {error, Reason} -> {error, Reason} end. @@ -477,7 +478,7 @@ transfer_decode(Data, Req=#http_req{ {stream, TransferDecode, TransferState2, ContentDecode}}); %% @todo {header(s) for chunked more -> - stream_body_recv(Req); + stream_body_recv(Req#http_req{buffer=Data}); {done, Length, Rest} -> Req2 = transfer_decode_done(Length, Rest, Req), {done, Req2}; @@ -562,34 +563,26 @@ body_qs(Req=#http_req{urldecode={URLDecFun, URLDecArg}}) -> %% %% Use this function for multipart streaming. For each part in the request, %% this function returns <em>{headers, Headers}</em> followed by a sequence of -%% <em>{data, Data}</em> tuples and finally <em>end_of_part</em>. When there +%% <em>{body, Data}</em> tuples and finally <em>end_of_part</em>. When there %% is no part to parse anymore, <em>eof</em> is returned. %% %% If the request Content-Type is not a multipart one, <em>{error, badarg}</em> %% is returned. -spec multipart_data(#http_req{}) -> {{headers, cowboy_http:headers()} - | {data, binary()} | end_of_part | eof, + | {body, binary()} | end_of_part | eof, #http_req{}}. multipart_data(Req=#http_req{body_state=waiting}) -> {{<<"multipart">>, _SubType, Params}, Req2} = parse_header('Content-Type', Req), {_, Boundary} = lists:keyfind(<<"boundary">>, 1, Params), - {Length, Req3=#http_req{buffer=Buffer}} = - parse_header('Content-Length', Req2), - multipart_data(Req3, Length, cowboy_multipart:parser(Boundary), Buffer); + {Length, Req3} = parse_header('Content-Length', Req2), + multipart_data(Req3, Length, {more, cowboy_multipart:parser(Boundary)}); multipart_data(Req=#http_req{body_state={multipart, Length, Cont}}) -> multipart_data(Req, Length, Cont()); multipart_data(Req=#http_req{body_state=done}) -> {eof, Req}. -multipart_data(Req, Length, Parser, Buffer) when byte_size(Buffer) >= Length -> - << Data:Length/binary, Rest/binary >> = Buffer, - multipart_data(Req#http_req{buffer=Rest}, 0, Parser(Data)); -multipart_data(Req, Length, Parser, Buffer) -> - NewLength = Length - byte_size(Buffer), - multipart_data(Req#http_req{buffer= <<>>}, NewLength, Parser(Buffer)). - multipart_data(Req, Length, {headers, Headers, Cont}) -> {{headers, Headers}, Req#http_req{body_state={multipart, Length, Cont}}}; multipart_data(Req, Length, {body, Data, Cont}) -> @@ -600,15 +593,15 @@ multipart_data(Req, 0, eof) -> {eof, Req#http_req{body_state=done}}; multipart_data(Req=#http_req{socket=Socket, transport=Transport}, Length, eof) -> + %% We just want to skip so no need to stream data here. {ok, _Data} = Transport:recv(Socket, Length, 5000), {eof, Req#http_req{body_state=done}}; -multipart_data(Req=#http_req{socket=Socket, transport=Transport}, - Length, {more, Parser}) when Length > 0 -> - case Transport:recv(Socket, 0, 5000) of - {ok, << Data:Length/binary, Buffer/binary >>} -> - multipart_data(Req#http_req{buffer=Buffer}, 0, Parser(Data)); - {ok, Data} -> - multipart_data(Req, Length - byte_size(Data), Parser(Data)) +multipart_data(Req, Length, {more, Parser}) when Length > 0 -> + case stream_body(Req) of + {ok, << Data:Length/binary, Buffer/binary >>, Req2} -> + multipart_data(Req2#http_req{buffer=Buffer}, 0, Parser(Data)); + {ok, Data, Req2} -> + multipart_data(Req2, Length - byte_size(Data), Parser(Data)) end. %% @doc Skip a part returned by the multipart parser. @@ -667,7 +660,6 @@ set_resp_body(Body, Req) -> set_resp_body_fun(StreamLen, StreamFun, Req) -> {ok, Req#http_req{resp_body={StreamLen, StreamFun}}}. - %% @doc Return whether the given header has been set for the response. -spec has_resp_header(cowboy_http:header(), #http_req{}) -> boolean(). has_resp_header(Name, #http_req{resp_headers=RespHeaders}) -> @@ -695,24 +687,29 @@ reply(Status, Headers, Req=#http_req{resp_body=Body}) -> %% @doc Send a reply to the client. -spec reply(cowboy_http:status(), cowboy_http:headers(), iodata(), #http_req{}) -> {ok, #http_req{}}. -reply(Status, Headers, Body, Req=#http_req{socket=Socket, - transport=Transport, connection=Connection, pid=ReqPid, +reply(Status, Headers, Body, Req=#http_req{socket=Socket, transport=Transport, + version=Version, connection=Connection, method=Method, resp_state=waiting, resp_headers=RespHeaders}) -> RespConn = response_connection(Headers, Connection), ContentLen = case Body of {CL, _} -> CL; _ -> iolist_size(Body) end, - Head = response_head(Status, Headers, RespHeaders, [ - {<<"Connection">>, atom_to_connection(Connection)}, + HTTP11Headers = case Version of + {1, 1} -> [{<<"Connection">>, atom_to_connection(Connection)}]; + _ -> [] + end, + {ReplyType, Req2} = response(Status, Headers, RespHeaders, [ {<<"Content-Length">>, integer_to_list(ContentLen)}, {<<"Date">>, cowboy_clock:rfc1123()}, {<<"Server">>, <<"Cowboy">>} - ]), - case {Method, Body} of - {'HEAD', _} -> Transport:send(Socket, Head); - {_, {_, StreamFun}} -> Transport:send(Socket, Head), StreamFun(); - {_, _} -> Transport:send(Socket, [Head, Body]) + |HTTP11Headers], Req), + if Method =:= 'HEAD' -> ok; + ReplyType =:= hook -> ok; %% Hook replied for us, stop there. + true -> + case Body of + {_, StreamFun} -> StreamFun(); + _ -> Transport:send(Socket, Body) + end end, - ReqPid ! {?MODULE, resp_sent}, - {ok, Req#http_req{connection=RespConn, resp_state=done, + {ok, Req2#http_req{connection=RespConn, resp_state=done, resp_headers=[], resp_body= <<>>}}. %% @equiv chunked_reply(Status, [], Req) @@ -724,25 +721,21 @@ chunked_reply(Status, Req) -> %% @see cowboy_http_req:chunk/2 -spec chunked_reply(cowboy_http:status(), cowboy_http:headers(), #http_req{}) -> {ok, #http_req{}}. -chunked_reply(Status, Headers, Req=#http_req{socket=Socket, - transport=Transport, version=Version, connection=Connection, - pid=ReqPid, resp_state=waiting, resp_headers=RespHeaders}) -> +chunked_reply(Status, Headers, Req=#http_req{ + version=Version, connection=Connection, + resp_state=waiting, resp_headers=RespHeaders}) -> RespConn = response_connection(Headers, Connection), - DefaultHeaders = [ - {<<"Date">>, cowboy_clock:rfc1123()}, - {<<"Server">>, <<"Cowboy">>} - ], - DefaultHeaders2 = case Version of + HTTP11Headers = case Version of {1, 1} -> [ - {<<"Connection">>, atom_to_connection(Connection)}, - {<<"Transfer-Encoding">>, <<"chunked">>} - ] ++ DefaultHeaders; - _ -> DefaultHeaders + {<<"Connection">>, atom_to_connection(Connection)}, + {<<"Transfer-Encoding">>, <<"chunked">>}]; + _ -> [] end, - Head = response_head(Status, Headers, RespHeaders, DefaultHeaders2), - Transport:send(Socket, Head), - ReqPid ! {?MODULE, resp_sent}, - {ok, Req#http_req{connection=RespConn, resp_state=chunks, + {_, Req2} = response(Status, Headers, RespHeaders, [ + {<<"Date">>, cowboy_clock:rfc1123()}, + {<<"Server">>, <<"Cowboy">>} + |HTTP11Headers], Req), + {ok, Req2#http_req{connection=RespConn, resp_state=chunks, resp_headers=[], resp_body= <<>>}}. %% @doc Send a chunk of data. @@ -761,14 +754,12 @@ chunk(Data, #http_req{socket=Socket, transport=Transport, resp_state=chunks}) -> %% @private -spec upgrade_reply(cowboy_http:status(), cowboy_http:headers(), #http_req{}) -> {ok, #http_req{}}. -upgrade_reply(Status, Headers, Req=#http_req{socket=Socket, transport=Transport, - pid=ReqPid, resp_state=waiting, resp_headers=RespHeaders}) -> - Head = response_head(Status, Headers, RespHeaders, [ +upgrade_reply(Status, Headers, Req=#http_req{ + resp_state=waiting, resp_headers=RespHeaders}) -> + {_, Req2} = response(Status, Headers, RespHeaders, [ {<<"Connection">>, <<"Upgrade">>} - ]), - Transport:send(Socket, Head), - ReqPid ! {?MODULE, resp_sent}, - {ok, Req#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}. + ], Req), + {ok, Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}. %% Misc API. @@ -798,6 +789,35 @@ transport(#http_req{transport=Transport, socket=Socket}) -> %% Internal. +-spec response(cowboy_http:status(), cowboy_http:headers(), + cowboy_http:headers(), cowboy_http:headers(), #http_req{}) + -> {normal | hook, #http_req{}}. +response(Status, Headers, RespHeaders, DefaultHeaders, Req=#http_req{ + socket=Socket, transport=Transport, version=Version, + pid=ReqPid, onresponse=OnResponse}) -> + FullHeaders = response_merge_headers(Headers, RespHeaders, DefaultHeaders), + Req2 = case OnResponse of + undefined -> Req; + OnResponse -> OnResponse(Status, FullHeaders, + %% Don't call 'onresponse' from the hook itself. + Req#http_req{resp_headers=[], resp_body= <<>>, + onresponse=undefined}) + end, + ReplyType = case Req2#http_req.resp_state of + waiting -> + HTTPVer = cowboy_http:version_to_binary(Version), + StatusLine = << HTTPVer/binary, " ", + (status(Status))/binary, "\r\n" >>, + HeaderLines = [[Key, <<": ">>, Value, <<"\r\n">>] + || {Key, Value} <- FullHeaders], + Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>]), + ReqPid ! {?MODULE, resp_sent}, + normal; + _ -> + hook + end, + {ReplyType, Req2}. + -spec response_connection(cowboy_http:headers(), keepalive | close) -> keepalive | close. response_connection([], Connection) -> @@ -819,17 +839,13 @@ response_connection_parse(ReplyConn) -> Tokens = cowboy_http:nonempty_list(ReplyConn, fun cowboy_http:token/2), cowboy_http:connection_to_atom(Tokens). --spec response_head(cowboy_http:status(), cowboy_http:headers(), - cowboy_http:headers(), cowboy_http:headers()) -> iolist(). -response_head(Status, Headers, RespHeaders, DefaultHeaders) -> - StatusLine = <<"HTTP/1.1 ", (status(Status))/binary, "\r\n">>, +-spec response_merge_headers(cowboy_http:headers(), cowboy_http:headers(), + cowboy_http:headers()) -> cowboy_http:headers(). +response_merge_headers(Headers, RespHeaders, DefaultHeaders) -> Headers2 = [{header_to_binary(Key), Value} || {Key, Value} <- Headers], - Headers3 = merge_headers( + merge_headers( merge_headers(Headers2, RespHeaders), - DefaultHeaders), - Headers4 = [[Key, <<": ">>, Value, <<"\r\n">>] - || {Key, Value} <- Headers3], - [StatusLine, Headers4, <<"\r\n">>]. + DefaultHeaders). -spec merge_headers(cowboy_http:headers(), cowboy_http:headers()) -> cowboy_http:headers(). @@ -894,6 +910,9 @@ status(423) -> <<"423 Locked">>; status(424) -> <<"424 Failed Dependency">>; status(425) -> <<"425 Unordered Collection">>; status(426) -> <<"426 Upgrade Required">>; +status(428) -> <<"428 Precondition Required">>; +status(429) -> <<"429 Too Many Requests">>; +status(431) -> <<"431 Request Header Fields Too Large">>; status(500) -> <<"500 Internal Server Error">>; status(501) -> <<"501 Not Implemented">>; status(502) -> <<"502 Bad Gateway">>; @@ -903,6 +922,7 @@ status(505) -> <<"505 HTTP Version Not Supported">>; status(506) -> <<"506 Variant Also Negotiates">>; status(507) -> <<"507 Insufficient Storage">>; status(510) -> <<"510 Not Extended">>; +status(511) -> <<"511 Network Authentication Required">>; status(B) when is_binary(B) -> B. -spec header_to_binary(cowboy_http:header()) -> binary(). diff --git a/src/cowboy_http_websocket.erl b/src/cowboy_http_websocket.erl index ab96e93..f550041 100644 --- a/src/cowboy_http_websocket.erl +++ b/src/cowboy_http_websocket.erl @@ -14,27 +14,9 @@ %% @doc WebSocket protocol implementation. %% -%% Supports the protocol version 0 (hixie-76), version 7 (hybi-7) -%% and version 8 (hybi-8, hybi-9 and hybi-10). -%% -%% Version 0 is supported by the following browsers: -%% <ul> -%% <li>Firefox 4-5 (disabled by default)</li> -%% <li>Chrome 6-13</li> -%% <li>Safari 5.0.1+</li> -%% <li>Opera 11.00+ (disabled by default)</li> -%% </ul> -%% -%% Version 7 is supported by the following browser: -%% <ul> -%% <li>Firefox 6</li> -%% </ul> -%% -%% Version 8+ is supported by the following browsers: -%% <ul> -%% <li>Firefox 7+</li> -%% <li>Chrome 14+</li> -%% </ul> +%% When using websockets, make sure that the crypto application is +%% included in your release. If you are not using releases then there +%% is no need for concern as crypto is already included. -module(cowboy_http_websocket). -export([upgrade/4]). %% API. @@ -232,8 +214,7 @@ handler_loop_timeout(State=#state{timeout=infinity}) -> handler_loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) -> _ = case PrevRef of undefined -> ignore; PrevRef -> erlang:cancel_timer(PrevRef) end, - TRef = make_ref(), - erlang:send_after(Timeout, self(), {?MODULE, timeout, TRef}), + TRef = erlang:start_timer(Timeout, self(), ?MODULE), State#state{timeout_ref=TRef}. %% @private @@ -248,9 +229,9 @@ handler_loop(State=#state{messages={OK, Closed, Error}, timeout_ref=TRef}, handler_terminate(State, Req, HandlerState, {error, closed}); {Error, Socket, Reason} -> handler_terminate(State, Req, HandlerState, {error, Reason}); - {?MODULE, timeout, TRef} -> + {timeout, TRef, ?MODULE} -> websocket_close(State, Req, HandlerState, {normal, timeout}); - {?MODULE, timeout, OlderTRef} when is_reference(OlderTRef) -> + {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> handler_loop(State, Req, HandlerState, SoFar); Message -> handler_call(State, Req, HandlerState, diff --git a/src/cowboy_ssl_transport.erl b/src/cowboy_ssl_transport.erl index 3b130f0..a661622 100644 --- a/src/cowboy_ssl_transport.erl +++ b/src/cowboy_ssl_transport.erl @@ -25,6 +25,7 @@ -module(cowboy_ssl_transport). -export([name/0, messages/0, listen/1, accept/2, recv/3, send/2, setopts/2, controlling_process/2, peername/1, close/1, sockname/1]). +-export([connect/3]). %% @doc Name of this transport API, <em>ssl</em>. -spec name() -> ssl. @@ -37,6 +38,12 @@ name() -> ssl. -spec messages() -> {ssl, ssl_closed, ssl_error}. messages() -> {ssl, ssl_closed, ssl_error}. +%% @private +%% @todo Probably filter Opts? +connect(Host, Port, Opts) when is_list(Host), is_integer(Port) -> + ssl:connect(Host, Port, + Opts ++ [binary, {active, false}, {packet, raw}]). + %% @doc Setup a socket to listen on the given port on the local host. %% %% The available options are: diff --git a/src/cowboy_tcp_transport.erl b/src/cowboy_tcp_transport.erl index f197dd1..079494d 100644 --- a/src/cowboy_tcp_transport.erl +++ b/src/cowboy_tcp_transport.erl @@ -21,6 +21,7 @@ -export([name/0, messages/0, listen/1, accept/2, recv/3, send/2, setopts/2, controlling_process/2, peername/1, close/1, sockname/1]). +-export([connect/3]). %% @doc Name of this transport API, <em>tcp</em>. -spec name() -> tcp. @@ -33,6 +34,11 @@ name() -> tcp. -spec messages() -> {tcp, tcp_closed, tcp_error}. messages() -> {tcp, tcp_closed, tcp_error}. +%% @private +connect(Host, Port, Opts) when is_list(Host), is_integer(Port) -> + gen_tcp:connect(Host, Port, + Opts ++ [binary, {active, false}, {packet, raw}]). + %% @doc Setup a socket to listen on the given port on the local host. %% %% The available options are: |