From 4211ea41bd49be3cf152acb62ac8330f7f6a927d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 20 Oct 2017 14:46:10 +0100 Subject: Add experimental metrics stream handler It collects metrics and passes them to a configurable callback once the stream terminates. It will be documented in a future release. More tests incoming. --- ebin/cowboy.app | 2 +- src/cowboy_metrics_h.erl | 277 +++++++++++++++++++++++++++++++++++++++++++++++ test/metrics_SUITE.erl | 134 +++++++++++++++++++++++ 3 files changed, 412 insertions(+), 1 deletion(-) create mode 100644 src/cowboy_metrics_h.erl create mode 100644 test/metrics_SUITE.erl diff --git a/ebin/cowboy.app b/ebin/cowboy.app index a972860..356628d 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, {vsn, "2.0.0"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {mod, {cowboy_app, []}}, diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl new file mode 100644 index 0000000..91709ad --- /dev/null +++ b/src/cowboy_metrics_h.erl @@ -0,0 +1,277 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_metrics_h). +-behavior(cowboy_stream). + +-export([init/3]). +-export([data/4]). +-export([info/3]). +-export([terminate/3]). +-export([early_error/5]). + +-type proc_metrics() :: #{pid() => #{ + %% Time at which the process spawned. + spawn := integer(), + + %% Time at which the process exited. + exit => integer(), + + %% Reason for the process exit. + reason => any() +}}. + +-type metrics() :: #{ + %% The identifier for this listener. + ref := ranch:ref(), + + %% The pid for this connection. + pid := pid(), + + %% The streamid also indicates the total number of requests on + %% this connection (StreamID div 2 + 1). + streamid := cowboy_stream:streamid(), + + %% The terminate reason is always useful. + reason := cowboy_stream:reason(), + + %% A filtered Req object or a partial Req object + %% depending on how far the request got to. + req => cowboy_req:req(), + partial_req => cowboy_stream:partial_req(), + + %% Response status. + resp_status := cowboy:http_status(), + + %% Filtered response headers. + resp_headers := cowboy:http_headers(), + + %% Start/end of the processing of the request. + %% + %% This represents the time from this stream handler's init + %% to terminate. Note that this doesn't indicate the response + %% has been sent fully, it still may be queued up in a buffer. + req_start => integer(), + req_end => integer(), + + %% Start/end of the receiving of the request body. + %% Begins when the first packet has been received. + req_body_start => integer(), + req_body_end => integer(), + + %% Start/end of the sending of the response. + %% Begins when we send the headers and ends on the final + %% packet of the response body. If everything is sent at + %% once these values are identical. + resp_start => integer(), + resp_end => integer(), + + %% For early errors all we get is the time we received it. + early_error_time => integer(), + + %% Start/end of spawned processes. This is where most of + %% the user code lies, excluding stream handlers. On a + %% default Cowboy configuration there should be only one + %% process: the request process. + procs => proc_metrics(), + + %% Length of the request and response bodies. This does + %% not include the framing. + req_body_length => non_neg_integer(), + resp_body_length => non_neg_integer() +}. +-export_type([metrics/0]). + +-record(state, { + next :: any(), + callback :: fun((metrics()) -> any()), + resp_headers_filter :: undefined | fun((cowboy:http_headers()) -> cowboy:http_headers()), + req :: map(), + resp_status :: undefined | cowboy:http_status(), + resp_headers :: undefined | cowboy:http_headers(), + ref :: ranch:ref(), + req_start :: integer(), + req_end :: undefined | integer(), + req_body_start :: undefined | integer(), + req_body_end :: undefined | integer(), + resp_start :: undefined | integer(), + resp_end :: undefined | integer(), + procs = #{} :: proc_metrics(), + req_body_length = 0 :: non_neg_integer(), + resp_body_length = 0 :: non_neg_integer() +}). + +-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts()) + -> {[{spawn, pid(), timeout()}], #state{}}. +init(StreamID, Req=#{ref := Ref}, Opts=#{metrics_callback := Fun}) -> + ReqStart = erlang:monotonic_time(), + {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts), + FilteredReq = case maps:get(metrics_req_filter, Opts, undefined) of + undefined -> Req; + ReqFilter -> ReqFilter(Req) + end, + RespHeadersFilter = maps:get(metrics_resp_headers_filter, Opts, undefined), + {Commands, fold(Commands, #state{ + next=Next, + callback=Fun, + resp_headers_filter=RespHeadersFilter, + req=FilteredReq, + ref=Ref, + req_start=ReqStart + })}. + +-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) + -> {cowboy_stream:commands(), State} when State::#state{}. +data(StreamID, IsFin=fin, Data, State=#state{req_body_start=undefined}) -> + ReqBody = erlang:monotonic_time(), + do_data(StreamID, IsFin, Data, State#state{ + req_body_start=ReqBody, + req_body_end=ReqBody, + req_body_length=byte_size(Data) + }); +data(StreamID, IsFin=fin, Data, State=#state{req_body_length=ReqBodyLen}) -> + ReqBodyEnd = erlang:monotonic_time(), + do_data(StreamID, IsFin, Data, State#state{ + req_body_end=ReqBodyEnd, + req_body_length=ReqBodyLen + byte_size(Data) + }); +data(StreamID, IsFin, Data, State=#state{req_body_start=undefined}) -> + ReqBodyStart = erlang:monotonic_time(), + do_data(StreamID, IsFin, Data, State#state{ + req_body_start=ReqBodyStart, + req_body_length=byte_size(Data) + }); +data(StreamID, IsFin, Data, State) -> + do_data(StreamID, IsFin, Data, State). + +do_data(StreamID, IsFin, Data, State0=#state{next=Next0}) -> + {Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0), + {Commands, fold(Commands, State0#state{next=Next})}. + +-spec info(cowboy_stream:streamid(), any(), State) + -> {cowboy_stream:commands(), State} when State::#state{}. +info(StreamID, Info={'EXIT', Pid, Reason}, State0=#state{procs=Procs}) -> + ProcEnd = erlang:monotonic_time(), + P = maps:get(Pid, Procs), + State = State0#state{procs=Procs#{Pid => P#{ + exit => ProcEnd, + reason => Reason + }}}, + do_info(StreamID, Info, State); +info(StreamID, Info, State) -> + do_info(StreamID, Info, State). + +do_info(StreamID, Info, State0=#state{next=Next0}) -> + {Commands, Next} = cowboy_stream:info(StreamID, Info, Next0), + {Commands, fold(Commands, State0#state{next=Next})}. + +fold([], State) -> + State; +fold([{spawn, Pid, _}|Tail], State0=#state{procs=Procs}) -> + ProcStart = erlang:monotonic_time(), + State = State0#state{procs=Procs#{Pid => #{spawn => ProcStart}}}, + fold(Tail, State); +fold([{response, Status, Headers, Body}|Tail], + State=#state{resp_headers_filter=RespHeadersFilter}) -> + Resp = erlang:monotonic_time(), + fold(Tail, State#state{ + resp_status=Status, + resp_headers=case RespHeadersFilter of + undefined -> Headers; + _ -> RespHeadersFilter(Headers) + end, + resp_start=Resp, + resp_end=Resp, + resp_body_length=resp_body_length(Body) + }); +fold([{headers, Status, Headers}|Tail], + State=#state{resp_headers_filter=RespHeadersFilter}) -> + RespStart = erlang:monotonic_time(), + fold(Tail, State#state{ + resp_status=Status, + resp_headers=case RespHeadersFilter of + undefined -> Headers; + _ -> RespHeadersFilter(Headers) + end, + resp_start=RespStart + }); +fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) -> + fold(Tail, State#state{ + resp_body_length=RespBodyLen + resp_body_length(Data) + }); +fold([{data, fin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) -> + RespEnd = erlang:monotonic_time(), + fold(Tail, State#state{ + resp_end=RespEnd, + resp_body_length=RespBodyLen + resp_body_length(Data) + }); +fold([_|Tail], State) -> + fold(Tail, State). + +-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> any(). +terminate(StreamID, Reason, #state{next=Next, callback=Fun, + req=Req, resp_status=RespStatus, resp_headers=RespHeaders, ref=Ref, + req_start=ReqStart, req_body_start=ReqBodyStart, + req_body_end=ReqBodyEnd, resp_start=RespStart, resp_end=RespEnd, + procs=Procs, req_body_length=ReqBodyLen, resp_body_length=RespBodyLen}) -> + Res = cowboy_stream:terminate(StreamID, Reason, Next), + ReqEnd = erlang:monotonic_time(), + Metrics = #{ + ref => Ref, + pid => self(), + streamid => StreamID, + reason => Reason, + req => Req, + resp_status => RespStatus, + resp_headers => RespHeaders, + req_start => ReqStart, + req_end => ReqEnd, + req_body_start => ReqBodyStart, + req_body_end => ReqBodyEnd, + resp_start => RespStart, + resp_end => RespEnd, + procs => Procs, + req_body_length => ReqBodyLen, + resp_body_length => RespBodyLen + }, + Fun(Metrics), + Res. + +-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(), + cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp + when Resp::cowboy_stream:resp_command(). +early_error(StreamID, Reason, PartialReq=#{ref := Ref}, Resp0, Opts=#{metrics_callback := Fun}) -> + Time = erlang:monotonic_time(), + Resp = {response, RespStatus, RespHeaders, RespBody} + = cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp0, Opts), + %% As far as metrics go we are limited in what we can provide + %% in this case. + Metrics = #{ + ref => Ref, + pid => self(), + streamid => StreamID, + reason => Reason, + partial_req => PartialReq, + resp_status => RespStatus, + resp_headers => RespHeaders, + early_error_time => Time, + resp_body_length => resp_body_length(RespBody) + }, + Fun(Metrics), + Resp. + +resp_body_length({sendfile, _, Len, _}) -> + Len; +resp_body_length(Data) -> + iolist_size(Data). diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl new file mode 100644 index 0000000..6b10fb3 --- /dev/null +++ b/test/metrics_SUITE.erl @@ -0,0 +1,134 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(metrics_SUITE). +-compile(export_all). + +-import(ct_helper, [config/2]). +-import(ct_helper, [doc/1]). +-import(cowboy_test, [gun_open/1]). +-import(cowboy_test, [gun_down/1]). + +%% ct. + +all() -> + cowboy_test:common_all(). + +groups() -> + cowboy_test:common_groups(ct_helper:all(?MODULE)). + +init_per_group(Name = http, Config) -> + cowboy_test:init_http(Name, init_plain_opts(Config), Config); +init_per_group(Name = https, Config) -> + cowboy_test:init_http(Name, init_plain_opts(Config), Config); +init_per_group(Name = h2, Config) -> + cowboy_test:init_http(Name, init_plain_opts(Config), Config); +init_per_group(Name = h2c, Config) -> + Config1 = cowboy_test:init_http(Name, init_plain_opts(Config), Config), + lists:keyreplace(protocol, 1, Config1, {protocol, http2}); +init_per_group(Name = http_compress, Config) -> + cowboy_test:init_http(Name, init_compress_opts(Config), Config); +init_per_group(Name = https_compress, Config) -> + cowboy_test:init_http(Name, init_compress_opts(Config), Config); +init_per_group(Name = h2_compress, Config) -> + cowboy_test:init_http(Name, init_compress_opts(Config), Config); +init_per_group(Name = h2c_compress, Config) -> + Config1 = cowboy_test:init_http(Name, init_compress_opts(Config), Config), + lists:keyreplace(protocol, 1, Config1, {protocol, http2}). + +end_per_group(Name, _) -> + cowboy:stop_listener(Name). + +init_plain_opts(Config) -> + #{ + env => #{dispatch => cowboy_router:compile(init_routes(Config))}, + metrics_callback => do_metrics_callback(), + stream_handlers => [cowboy_metrics_h, cowboy_stream_h] + }. + +init_compress_opts(Config) -> + #{ + env => #{dispatch => cowboy_router:compile(init_routes(Config))}, + metrics_callback => do_metrics_callback(), + stream_handlers => [cowboy_metrics_h, cowboy_compress_h, cowboy_stream_h] + }. + +init_routes(_) -> [ + {"localhost", [ + {"/", hello_h, []} + ]} +]. + +do_metrics_callback() -> + fun(Metrics=#{req := #{headers := #{<<"x-test-pid">> := PidBin}}}) -> + Pid = list_to_pid(binary_to_list(PidBin)), + Pid ! {metrics, self(), Metrics}, + ok + end. + +%% Tests. + +hello_world(Config) -> + %% Perform a request. + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/", [{<<"x-test-pid">>, pid_to_list(self())}]), + {response, nofin, 200, RespHeaders} = gun:await(ConnPid, Ref), + {ok, RespBody} = gun:await_body(ConnPid, Ref), + gun:close(ConnPid), + %% Receive the metrics and print them. + receive + {metrics, From, Metrics} -> + %% Ensure the timestamps are in the expected order. + #{ + req_start := ReqStart, req_end := ReqEnd, + resp_start := RespStart, resp_end := RespEnd + } = Metrics, + true = (ReqStart =< RespStart) + and (RespStart =< RespEnd) + and (RespEnd =< ReqEnd), + %% We didn't send a body. + #{ + req_body_start := undefined, + req_body_end := undefined, + req_body_length := 0 + } = Metrics, + %% We got a 200 response with a body. + #{ + resp_status := 200, + resp_headers := ExpectedRespHeaders, + resp_body_length := RespBodyLen + } = Metrics, + ExpectedRespHeaders = maps:from_list(RespHeaders), + true = RespBodyLen > 0, + %% The request process executed normally. + #{procs := Procs} = Metrics, + [{_, #{ + spawn := ProcSpawn, + exit := ProcExit, + reason := normal + }}] = maps:to_list(Procs), + true = ProcSpawn =< ProcExit, + %% Confirm other metadata are as expected. + #{ + ref := _, + pid := From, + streamid := 1, + reason := normal, + req := #{} + } = Metrics, + %% All good! + ok + after 1000 -> + error(timeout) + end. -- cgit v1.2.3