aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-10-20 14:46:10 +0100
committerLoïc Hoguin <[email protected]>2017-10-20 14:46:10 +0100
commit4211ea41bd49be3cf152acb62ac8330f7f6a927d (patch)
treecab1a36adf108feec5bf17fc60faa7e8f47b2d35
parentc602871f86c795da8463908d12f7bf966bfeec12 (diff)
downloadcowboy-4211ea41bd49be3cf152acb62ac8330f7f6a927d.tar.gz
cowboy-4211ea41bd49be3cf152acb62ac8330f7f6a927d.tar.bz2
cowboy-4211ea41bd49be3cf152acb62ac8330f7f6a927d.zip
Add experimental metrics stream handler
It collects metrics and passes them to a configurable callback once the stream terminates. It will be documented in a future release. More tests incoming.
-rw-r--r--ebin/cowboy.app2
-rw-r--r--src/cowboy_metrics_h.erl277
-rw-r--r--test/metrics_SUITE.erl134
3 files changed, 412 insertions, 1 deletions
diff --git a/ebin/cowboy.app b/ebin/cowboy.app
index a972860..356628d 100644
--- a/ebin/cowboy.app
+++ b/ebin/cowboy.app
@@ -1,7 +1,7 @@
{application, 'cowboy', [
{description, "Small, fast, modern HTTP server."},
{vsn, "2.0.0"},
- {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
+ {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
{registered, [cowboy_sup,cowboy_clock]},
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
{mod, {cowboy_app, []}},
diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl
new file mode 100644
index 0000000..91709ad
--- /dev/null
+++ b/src/cowboy_metrics_h.erl
@@ -0,0 +1,277 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_metrics_h).
+-behavior(cowboy_stream).
+
+-export([init/3]).
+-export([data/4]).
+-export([info/3]).
+-export([terminate/3]).
+-export([early_error/5]).
+
+-type proc_metrics() :: #{pid() => #{
+ %% Time at which the process spawned.
+ spawn := integer(),
+
+ %% Time at which the process exited.
+ exit => integer(),
+
+ %% Reason for the process exit.
+ reason => any()
+}}.
+
+-type metrics() :: #{
+ %% The identifier for this listener.
+ ref := ranch:ref(),
+
+ %% The pid for this connection.
+ pid := pid(),
+
+ %% The streamid also indicates the total number of requests on
+ %% this connection (StreamID div 2 + 1).
+ streamid := cowboy_stream:streamid(),
+
+ %% The terminate reason is always useful.
+ reason := cowboy_stream:reason(),
+
+ %% A filtered Req object or a partial Req object
+ %% depending on how far the request got to.
+ req => cowboy_req:req(),
+ partial_req => cowboy_stream:partial_req(),
+
+ %% Response status.
+ resp_status := cowboy:http_status(),
+
+ %% Filtered response headers.
+ resp_headers := cowboy:http_headers(),
+
+ %% Start/end of the processing of the request.
+ %%
+ %% This represents the time from this stream handler's init
+ %% to terminate. Note that this doesn't indicate the response
+ %% has been sent fully, it still may be queued up in a buffer.
+ req_start => integer(),
+ req_end => integer(),
+
+ %% Start/end of the receiving of the request body.
+ %% Begins when the first packet has been received.
+ req_body_start => integer(),
+ req_body_end => integer(),
+
+ %% Start/end of the sending of the response.
+ %% Begins when we send the headers and ends on the final
+ %% packet of the response body. If everything is sent at
+ %% once these values are identical.
+ resp_start => integer(),
+ resp_end => integer(),
+
+ %% For early errors all we get is the time we received it.
+ early_error_time => integer(),
+
+ %% Start/end of spawned processes. This is where most of
+ %% the user code lies, excluding stream handlers. On a
+ %% default Cowboy configuration there should be only one
+ %% process: the request process.
+ procs => proc_metrics(),
+
+ %% Length of the request and response bodies. This does
+ %% not include the framing.
+ req_body_length => non_neg_integer(),
+ resp_body_length => non_neg_integer()
+}.
+-export_type([metrics/0]).
+
+-record(state, {
+ next :: any(),
+ callback :: fun((metrics()) -> any()),
+ resp_headers_filter :: undefined | fun((cowboy:http_headers()) -> cowboy:http_headers()),
+ req :: map(),
+ resp_status :: undefined | cowboy:http_status(),
+ resp_headers :: undefined | cowboy:http_headers(),
+ ref :: ranch:ref(),
+ req_start :: integer(),
+ req_end :: undefined | integer(),
+ req_body_start :: undefined | integer(),
+ req_body_end :: undefined | integer(),
+ resp_start :: undefined | integer(),
+ resp_end :: undefined | integer(),
+ procs = #{} :: proc_metrics(),
+ req_body_length = 0 :: non_neg_integer(),
+ resp_body_length = 0 :: non_neg_integer()
+}).
+
+-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
+ -> {[{spawn, pid(), timeout()}], #state{}}.
+init(StreamID, Req=#{ref := Ref}, Opts=#{metrics_callback := Fun}) ->
+ ReqStart = erlang:monotonic_time(),
+ {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
+ FilteredReq = case maps:get(metrics_req_filter, Opts, undefined) of
+ undefined -> Req;
+ ReqFilter -> ReqFilter(Req)
+ end,
+ RespHeadersFilter = maps:get(metrics_resp_headers_filter, Opts, undefined),
+ {Commands, fold(Commands, #state{
+ next=Next,
+ callback=Fun,
+ resp_headers_filter=RespHeadersFilter,
+ req=FilteredReq,
+ ref=Ref,
+ req_start=ReqStart
+ })}.
+
+-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
+ -> {cowboy_stream:commands(), State} when State::#state{}.
+data(StreamID, IsFin=fin, Data, State=#state{req_body_start=undefined}) ->
+ ReqBody = erlang:monotonic_time(),
+ do_data(StreamID, IsFin, Data, State#state{
+ req_body_start=ReqBody,
+ req_body_end=ReqBody,
+ req_body_length=byte_size(Data)
+ });
+data(StreamID, IsFin=fin, Data, State=#state{req_body_length=ReqBodyLen}) ->
+ ReqBodyEnd = erlang:monotonic_time(),
+ do_data(StreamID, IsFin, Data, State#state{
+ req_body_end=ReqBodyEnd,
+ req_body_length=ReqBodyLen + byte_size(Data)
+ });
+data(StreamID, IsFin, Data, State=#state{req_body_start=undefined}) ->
+ ReqBodyStart = erlang:monotonic_time(),
+ do_data(StreamID, IsFin, Data, State#state{
+ req_body_start=ReqBodyStart,
+ req_body_length=byte_size(Data)
+ });
+data(StreamID, IsFin, Data, State) ->
+ do_data(StreamID, IsFin, Data, State).
+
+do_data(StreamID, IsFin, Data, State0=#state{next=Next0}) ->
+ {Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
+ {Commands, fold(Commands, State0#state{next=Next})}.
+
+-spec info(cowboy_stream:streamid(), any(), State)
+ -> {cowboy_stream:commands(), State} when State::#state{}.
+info(StreamID, Info={'EXIT', Pid, Reason}, State0=#state{procs=Procs}) ->
+ ProcEnd = erlang:monotonic_time(),
+ P = maps:get(Pid, Procs),
+ State = State0#state{procs=Procs#{Pid => P#{
+ exit => ProcEnd,
+ reason => Reason
+ }}},
+ do_info(StreamID, Info, State);
+info(StreamID, Info, State) ->
+ do_info(StreamID, Info, State).
+
+do_info(StreamID, Info, State0=#state{next=Next0}) ->
+ {Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
+ {Commands, fold(Commands, State0#state{next=Next})}.
+
+fold([], State) ->
+ State;
+fold([{spawn, Pid, _}|Tail], State0=#state{procs=Procs}) ->
+ ProcStart = erlang:monotonic_time(),
+ State = State0#state{procs=Procs#{Pid => #{spawn => ProcStart}}},
+ fold(Tail, State);
+fold([{response, Status, Headers, Body}|Tail],
+ State=#state{resp_headers_filter=RespHeadersFilter}) ->
+ Resp = erlang:monotonic_time(),
+ fold(Tail, State#state{
+ resp_status=Status,
+ resp_headers=case RespHeadersFilter of
+ undefined -> Headers;
+ _ -> RespHeadersFilter(Headers)
+ end,
+ resp_start=Resp,
+ resp_end=Resp,
+ resp_body_length=resp_body_length(Body)
+ });
+fold([{headers, Status, Headers}|Tail],
+ State=#state{resp_headers_filter=RespHeadersFilter}) ->
+ RespStart = erlang:monotonic_time(),
+ fold(Tail, State#state{
+ resp_status=Status,
+ resp_headers=case RespHeadersFilter of
+ undefined -> Headers;
+ _ -> RespHeadersFilter(Headers)
+ end,
+ resp_start=RespStart
+ });
+fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
+ fold(Tail, State#state{
+ resp_body_length=RespBodyLen + resp_body_length(Data)
+ });
+fold([{data, fin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
+ RespEnd = erlang:monotonic_time(),
+ fold(Tail, State#state{
+ resp_end=RespEnd,
+ resp_body_length=RespBodyLen + resp_body_length(Data)
+ });
+fold([_|Tail], State) ->
+ fold(Tail, State).
+
+-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> any().
+terminate(StreamID, Reason, #state{next=Next, callback=Fun,
+ req=Req, resp_status=RespStatus, resp_headers=RespHeaders, ref=Ref,
+ req_start=ReqStart, req_body_start=ReqBodyStart,
+ req_body_end=ReqBodyEnd, resp_start=RespStart, resp_end=RespEnd,
+ procs=Procs, req_body_length=ReqBodyLen, resp_body_length=RespBodyLen}) ->
+ Res = cowboy_stream:terminate(StreamID, Reason, Next),
+ ReqEnd = erlang:monotonic_time(),
+ Metrics = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+ reason => Reason,
+ req => Req,
+ resp_status => RespStatus,
+ resp_headers => RespHeaders,
+ req_start => ReqStart,
+ req_end => ReqEnd,
+ req_body_start => ReqBodyStart,
+ req_body_end => ReqBodyEnd,
+ resp_start => RespStart,
+ resp_end => RespEnd,
+ procs => Procs,
+ req_body_length => ReqBodyLen,
+ resp_body_length => RespBodyLen
+ },
+ Fun(Metrics),
+ Res.
+
+-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
+ cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
+ when Resp::cowboy_stream:resp_command().
+early_error(StreamID, Reason, PartialReq=#{ref := Ref}, Resp0, Opts=#{metrics_callback := Fun}) ->
+ Time = erlang:monotonic_time(),
+ Resp = {response, RespStatus, RespHeaders, RespBody}
+ = cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp0, Opts),
+ %% As far as metrics go we are limited in what we can provide
+ %% in this case.
+ Metrics = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+ reason => Reason,
+ partial_req => PartialReq,
+ resp_status => RespStatus,
+ resp_headers => RespHeaders,
+ early_error_time => Time,
+ resp_body_length => resp_body_length(RespBody)
+ },
+ Fun(Metrics),
+ Resp.
+
+resp_body_length({sendfile, _, Len, _}) ->
+ Len;
+resp_body_length(Data) ->
+ iolist_size(Data).
diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl
new file mode 100644
index 0000000..6b10fb3
--- /dev/null
+++ b/test/metrics_SUITE.erl
@@ -0,0 +1,134 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(metrics_SUITE).
+-compile(export_all).
+
+-import(ct_helper, [config/2]).
+-import(ct_helper, [doc/1]).
+-import(cowboy_test, [gun_open/1]).
+-import(cowboy_test, [gun_down/1]).
+
+%% ct.
+
+all() ->
+ cowboy_test:common_all().
+
+groups() ->
+ cowboy_test:common_groups(ct_helper:all(?MODULE)).
+
+init_per_group(Name = http, Config) ->
+ cowboy_test:init_http(Name, init_plain_opts(Config), Config);
+init_per_group(Name = https, Config) ->
+ cowboy_test:init_http(Name, init_plain_opts(Config), Config);
+init_per_group(Name = h2, Config) ->
+ cowboy_test:init_http(Name, init_plain_opts(Config), Config);
+init_per_group(Name = h2c, Config) ->
+ Config1 = cowboy_test:init_http(Name, init_plain_opts(Config), Config),
+ lists:keyreplace(protocol, 1, Config1, {protocol, http2});
+init_per_group(Name = http_compress, Config) ->
+ cowboy_test:init_http(Name, init_compress_opts(Config), Config);
+init_per_group(Name = https_compress, Config) ->
+ cowboy_test:init_http(Name, init_compress_opts(Config), Config);
+init_per_group(Name = h2_compress, Config) ->
+ cowboy_test:init_http(Name, init_compress_opts(Config), Config);
+init_per_group(Name = h2c_compress, Config) ->
+ Config1 = cowboy_test:init_http(Name, init_compress_opts(Config), Config),
+ lists:keyreplace(protocol, 1, Config1, {protocol, http2}).
+
+end_per_group(Name, _) ->
+ cowboy:stop_listener(Name).
+
+init_plain_opts(Config) ->
+ #{
+ env => #{dispatch => cowboy_router:compile(init_routes(Config))},
+ metrics_callback => do_metrics_callback(),
+ stream_handlers => [cowboy_metrics_h, cowboy_stream_h]
+ }.
+
+init_compress_opts(Config) ->
+ #{
+ env => #{dispatch => cowboy_router:compile(init_routes(Config))},
+ metrics_callback => do_metrics_callback(),
+ stream_handlers => [cowboy_metrics_h, cowboy_compress_h, cowboy_stream_h]
+ }.
+
+init_routes(_) -> [
+ {"localhost", [
+ {"/", hello_h, []}
+ ]}
+].
+
+do_metrics_callback() ->
+ fun(Metrics=#{req := #{headers := #{<<"x-test-pid">> := PidBin}}}) ->
+ Pid = list_to_pid(binary_to_list(PidBin)),
+ Pid ! {metrics, self(), Metrics},
+ ok
+ end.
+
+%% Tests.
+
+hello_world(Config) ->
+ %% Perform a request.
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/", [{<<"x-test-pid">>, pid_to_list(self())}]),
+ {response, nofin, 200, RespHeaders} = gun:await(ConnPid, Ref),
+ {ok, RespBody} = gun:await_body(ConnPid, Ref),
+ gun:close(ConnPid),
+ %% Receive the metrics and print them.
+ receive
+ {metrics, From, Metrics} ->
+ %% Ensure the timestamps are in the expected order.
+ #{
+ req_start := ReqStart, req_end := ReqEnd,
+ resp_start := RespStart, resp_end := RespEnd
+ } = Metrics,
+ true = (ReqStart =< RespStart)
+ and (RespStart =< RespEnd)
+ and (RespEnd =< ReqEnd),
+ %% We didn't send a body.
+ #{
+ req_body_start := undefined,
+ req_body_end := undefined,
+ req_body_length := 0
+ } = Metrics,
+ %% We got a 200 response with a body.
+ #{
+ resp_status := 200,
+ resp_headers := ExpectedRespHeaders,
+ resp_body_length := RespBodyLen
+ } = Metrics,
+ ExpectedRespHeaders = maps:from_list(RespHeaders),
+ true = RespBodyLen > 0,
+ %% The request process executed normally.
+ #{procs := Procs} = Metrics,
+ [{_, #{
+ spawn := ProcSpawn,
+ exit := ProcExit,
+ reason := normal
+ }}] = maps:to_list(Procs),
+ true = ProcSpawn =< ProcExit,
+ %% Confirm other metadata are as expected.
+ #{
+ ref := _,
+ pid := From,
+ streamid := 1,
+ reason := normal,
+ req := #{}
+ } = Metrics,
+ %% All good!
+ ok
+ after 1000 ->
+ error(timeout)
+ end.