// Copyright 2012 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/spdy/spdy_http_stream.h" #include #include #include #include #include #include "base/check_op.h" #include "base/functional/bind.h" #include "base/location.h" #include "base/metrics/histogram_macros.h" #include "base/task/single_thread_task_runner.h" #include "base/values.h" #include "net/base/ip_endpoint.h" #include "net/base/upload_data_stream.h" #include "net/http/http_request_headers.h" #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/log/net_log_event_type.h" #include "net/log/net_log_with_source.h" #include "net/spdy/spdy_http_utils.h" #include "net/spdy/spdy_session.h" #include "net/third_party/quiche/src/quiche/spdy/core/http2_header_block.h" #include "net/third_party/quiche/src/quiche/spdy/core/spdy_protocol.h" #include "url/scheme_host_port.h" namespace net { namespace { // TODO(https://crbug.com/1426477): Remove. bool ValidatePushedHeaders( const HttpRequestInfo& request_info, const spdy::Http2HeaderBlock& pushed_request_headers, const spdy::Http2HeaderBlock& pushed_response_headers, const HttpResponseInfo& pushed_response_info) { spdy::Http2HeaderBlock::const_iterator status_it = pushed_response_headers.find(spdy::kHttp2StatusHeader); DCHECK(status_it != pushed_response_headers.end()); // 206 Partial Content and 416 Requested Range Not Satisfiable are range // responses. if (status_it->second == "206" || status_it->second == "416") { std::string client_request_range; if (!request_info.extra_headers.GetHeader(HttpRequestHeaders::kRange, &client_request_range)) { // Client initiated request is not a range request. return false; } spdy::Http2HeaderBlock::const_iterator pushed_request_range_it = pushed_request_headers.find("range"); if (pushed_request_range_it == pushed_request_headers.end()) { // Pushed request is not a range request. return false; } if (client_request_range != pushed_request_range_it->second) { // Client and pushed request ranges do not match. return false; } } HttpRequestInfo pushed_request_info; ConvertHeaderBlockToHttpRequestHeaders(pushed_request_headers, &pushed_request_info.extra_headers); HttpVaryData vary_data; if (!vary_data.Init(pushed_request_info, *pushed_response_info.headers.get())) { // Pushed response did not contain non-empty Vary header. return true; } if (vary_data.MatchesRequest(request_info, *pushed_response_info.headers.get())) { return true; } return false; } } // anonymous namespace // Align our request body with |kMaxSpdyFrameChunkSize| to prevent unexpected // buffer chunking. This is 16KB - frame header size. const size_t SpdyHttpStream::kRequestBodyBufferSize = kMaxSpdyFrameChunkSize; SpdyHttpStream::SpdyHttpStream(const base::WeakPtr& spdy_session, spdy::SpdyStreamId pushed_stream_id, NetLogSource source_dependency, std::set dns_aliases) : MultiplexedHttpStream( std::make_unique(spdy_session)), spdy_session_(spdy_session), pushed_stream_id_(pushed_stream_id), is_reused_(spdy_session_->IsReused()), source_dependency_(source_dependency), dns_aliases_(std::move(dns_aliases)) { DCHECK(spdy_session_.get()); } SpdyHttpStream::~SpdyHttpStream() { if (stream_) { stream_->DetachDelegate(); DCHECK(!stream_); } } void SpdyHttpStream::RegisterRequest(const HttpRequestInfo* request_info) { DCHECK(request_info); request_info_ = request_info; } int SpdyHttpStream::InitializeStream(bool can_send_early, RequestPriority priority, const NetLogWithSource& stream_net_log, CompletionOnceCallback callback) { DCHECK(!stream_); DCHECK(request_info_); if (!spdy_session_) return ERR_CONNECTION_CLOSED; if (pushed_stream_id_ != kNoPushedStreamFound) { int error = spdy_session_->GetPushedStream( request_info_->url, pushed_stream_id_, priority, &stream_); if (error != OK) return error; // |stream_| may be NULL even if OK was returned. if (stream_) { DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM); InitializeStreamHelper(); return OK; } } int rv = stream_request_.StartRequest( SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, can_send_early, priority, request_info_->socket_tag, stream_net_log, base::BindOnce(&SpdyHttpStream::OnStreamCreated, weak_factory_.GetWeakPtr(), std::move(callback)), NetworkTrafficAnnotationTag{request_info_->traffic_annotation}); if (rv == OK) { stream_ = stream_request_.ReleaseStream().get(); InitializeStreamHelper(); } return rv; } int SpdyHttpStream::ReadResponseHeaders(CompletionOnceCallback callback) { CHECK(!callback.is_null()); if (stream_closed_) return closed_stream_status_; CHECK(stream_); // Check if we already have the response headers. If so, return synchronously. if (response_headers_complete_) { CHECK(!stream_->IsIdle()); return OK; } // Still waiting for the response, return IO_PENDING. CHECK(response_callback_.is_null()); response_callback_ = std::move(callback); return ERR_IO_PENDING; } int SpdyHttpStream::ReadResponseBody(IOBuffer* buf, int buf_len, CompletionOnceCallback callback) { if (stream_) CHECK(!stream_->IsIdle()); CHECK(buf); CHECK(buf_len); CHECK(!callback.is_null()); // If we have data buffered, complete the IO immediately. if (!response_body_queue_.IsEmpty()) { return response_body_queue_.Dequeue(buf->data(), buf_len); } else if (stream_closed_) { return closed_stream_status_; } CHECK(response_callback_.is_null()); CHECK(!user_buffer_.get()); CHECK_EQ(0, user_buffer_len_); response_callback_ = std::move(callback); user_buffer_ = buf; user_buffer_len_ = buf_len; return ERR_IO_PENDING; } void SpdyHttpStream::Close(bool not_reusable) { // Note: the not_reusable flag has no meaning for SPDY streams. Cancel(); DCHECK(!stream_); } bool SpdyHttpStream::IsResponseBodyComplete() const { return stream_closed_; } bool SpdyHttpStream::IsConnectionReused() const { return is_reused_; } int64_t SpdyHttpStream::GetTotalReceivedBytes() const { if (stream_closed_) return closed_stream_received_bytes_; if (!stream_) return 0; return stream_->raw_received_bytes(); } int64_t SpdyHttpStream::GetTotalSentBytes() const { if (stream_closed_) return closed_stream_sent_bytes_; if (!stream_) return 0; return stream_->raw_sent_bytes(); } bool SpdyHttpStream::GetAlternativeService( AlternativeService* alternative_service) const { return false; } bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { if (stream_closed_) { if (!closed_stream_has_load_timing_info_) return false; *load_timing_info = closed_stream_load_timing_info_; } else { // If |stream_| has yet to be created, or does not yet have an ID, fail. // The reused flag can only be correctly set once a stream has an ID. // Streams get their IDs once the request has been successfully sent, so // this does not behave that differently from other stream types. if (!stream_ || stream_->stream_id() == 0) return false; if (!stream_->GetLoadTimingInfo(load_timing_info)) return false; } // If the request waited for handshake confirmation, shift |ssl_end| to // include that time. if (!load_timing_info->connect_timing.ssl_end.is_null() && !stream_request_.confirm_handshake_end().is_null()) { load_timing_info->connect_timing.ssl_end = stream_request_.confirm_handshake_end(); load_timing_info->connect_timing.connect_end = stream_request_.confirm_handshake_end(); } return true; } int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers, HttpResponseInfo* response, CompletionOnceCallback callback) { if (stream_closed_) { return closed_stream_status_; } base::Time request_time = base::Time::Now(); CHECK(stream_); stream_->SetRequestTime(request_time); // This should only get called in the case of a request occurring // during server push that has already begun but hasn't finished, // so we set the response's request time to be the actual one if (response_info_) response_info_->request_time = request_time; CHECK(!request_body_buf_.get()); if (HasUploadData()) { request_body_buf_ = base::MakeRefCounted(kRequestBodyBufferSize); // The request body buffer is empty at first. request_body_buf_size_ = 0; } CHECK(!callback.is_null()); CHECK(response); // SendRequest can be called in two cases. // // a) A client initiated request. In this case, |response_info_| should be // NULL to start with. // b) A client request which matches a response that the server has already // pushed. if (push_response_info_.get()) { *response = *(push_response_info_.get()); push_response_info_.reset(); } else { DCHECK_EQ(static_cast(nullptr), response_info_); } response_info_ = response; // Put the peer's IP address and port into the response. IPEndPoint address; int result = stream_->GetPeerAddress(&address); if (result != OK) return result; response_info_->remote_endpoint = address; if (stream_->type() == SPDY_PUSH_STREAM) { // Pushed streams do not send any data, and should always be // idle. However, we still want to return ERR_IO_PENDING to mimic // non-push behavior. The callback will be called when the // response is received. CHECK(response_callback_.is_null()); response_callback_ = std::move(callback); return ERR_IO_PENDING; } spdy::Http2HeaderBlock headers; CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers, &headers); DispatchRequestHeadersCallback(headers); bool will_send_data = HasUploadData() || spdy_session_->EndStreamWithDataFrame(); result = stream_->SendRequestHeaders( std::move(headers), will_send_data ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND); if (result == ERR_IO_PENDING) { CHECK(request_callback_.is_null()); request_callback_ = std::move(callback); } return result; } void SpdyHttpStream::Cancel() { request_callback_.Reset(); response_callback_.Reset(); if (stream_) { stream_->Cancel(ERR_ABORTED); DCHECK(!stream_); } } void SpdyHttpStream::OnHeadersSent() { if (HasUploadData()) { ReadAndSendRequestBodyData(); } else if (spdy_session_->EndStreamWithDataFrame()) { SendEmptyBody(); } else { MaybePostRequestCallback(OK); } } void SpdyHttpStream::OnEarlyHintsReceived( const spdy::Http2HeaderBlock& headers) { DCHECK(!response_headers_complete_); DCHECK(response_info_); DCHECK_EQ(stream_->type(), SPDY_REQUEST_RESPONSE_STREAM); const int rv = SpdyHeadersToHttpResponse(headers, response_info_); CHECK_NE(rv, ERR_INCOMPLETE_HTTP2_HEADERS); if (!response_callback_.is_null()) { DoResponseCallback(OK); } } void SpdyHttpStream::OnHeadersReceived( const spdy::Http2HeaderBlock& response_headers, const spdy::Http2HeaderBlock* pushed_request_headers) { DCHECK(!response_headers_complete_); response_headers_complete_ = true; if (!response_info_) { DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM); push_response_info_ = std::make_unique(); response_info_ = push_response_info_.get(); } const int rv = SpdyHeadersToHttpResponse(response_headers, response_info_); DCHECK_NE(rv, ERR_INCOMPLETE_HTTP2_HEADERS); if (rv == ERR_RESPONSE_HEADERS_MULTIPLE_LOCATION) { // Cancel will call OnClose, which might call callbacks and might destroy // `this`. stream_->Cancel(rv); return; } if (pushed_request_headers && !ValidatePushedHeaders(*request_info_, *pushed_request_headers, response_headers, *response_info_)) { // Cancel will call OnClose, which might call callbacks and might destroy // `this`. stream_->Cancel(ERR_HTTP2_PUSHED_RESPONSE_DOES_NOT_MATCH); return; } response_info_->response_time = stream_->response_time(); // Don't store the SSLInfo in the response here, HttpNetworkTransaction // will take care of that part. response_info_->was_alpn_negotiated = was_alpn_negotiated_; response_info_->request_time = stream_->GetRequestTime(); response_info_->connection_info = HttpResponseInfo::CONNECTION_INFO_HTTP2; response_info_->alpn_negotiated_protocol = HttpResponseInfo::ConnectionInfoToString(response_info_->connection_info); // Invalidate HttpRequestInfo pointer. This is to allow |this| to be // shared across multiple consumers at the cache layer which might require // this stream to outlive the request_info_'s owner. if (!upload_stream_in_progress_) request_info_ = nullptr; if (!response_callback_.is_null()) { DoResponseCallback(OK); } } void SpdyHttpStream::OnDataReceived(std::unique_ptr buffer) { DCHECK(response_headers_complete_); // Note that data may be received for a SpdyStream prior to the user calling // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often // happen for server initiated streams. DCHECK(stream_); DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM); if (buffer) { response_body_queue_.Enqueue(std::move(buffer)); MaybeScheduleBufferedReadCallback(); } } void SpdyHttpStream::OnDataSent() { if (request_info_ && HasUploadData()) { request_body_buf_size_ = 0; ReadAndSendRequestBodyData(); } else { CHECK(spdy_session_->EndStreamWithDataFrame()); MaybePostRequestCallback(OK); } } // TODO(xunjieli): Maybe do something with the trailers. crbug.com/422958. void SpdyHttpStream::OnTrailers(const spdy::Http2HeaderBlock& trailers) {} void SpdyHttpStream::OnClose(int status) { DCHECK(stream_); // Cancel any pending reads from the upload data stream. if (request_info_ && request_info_->upload_data_stream) request_info_->upload_data_stream->Reset(); stream_closed_ = true; closed_stream_status_ = status; closed_stream_id_ = stream_->stream_id(); closed_stream_has_load_timing_info_ = stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_); closed_stream_received_bytes_ = stream_->raw_received_bytes(); closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); stream_ = nullptr; // Callbacks might destroy |this|. base::WeakPtr self = weak_factory_.GetWeakPtr(); if (!request_callback_.is_null()) { DoRequestCallback(status); if (!self) return; } if (status == OK) { // We need to complete any pending buffered read now. DoBufferedReadCallback(); if (!self) return; } if (!response_callback_.is_null()) { DoResponseCallback(status); } } bool SpdyHttpStream::CanGreaseFrameType() const { return true; } NetLogSource SpdyHttpStream::source_dependency() const { return source_dependency_; } bool SpdyHttpStream::HasUploadData() const { CHECK(request_info_); return request_info_->upload_data_stream && ((request_info_->upload_data_stream->size() > 0) || request_info_->upload_data_stream->is_chunked()); } void SpdyHttpStream::OnStreamCreated(CompletionOnceCallback callback, int rv) { if (rv == OK) { stream_ = stream_request_.ReleaseStream().get(); InitializeStreamHelper(); } std::move(callback).Run(rv); } void SpdyHttpStream::ReadAndSendRequestBodyData() { CHECK(HasUploadData()); upload_stream_in_progress_ = true; CHECK_EQ(request_body_buf_size_, 0); if (request_info_->upload_data_stream->IsEOF()) { MaybePostRequestCallback(OK); // Invalidate HttpRequestInfo pointer. This is to allow |this| to be // shared across multiple consumers at the cache layer which might require // this stream to outlive the request_info_'s owner. upload_stream_in_progress_ = false; if (response_headers_complete_) request_info_ = nullptr; return; } // Read the data from the request body stream. const int rv = request_info_->upload_data_stream->Read( request_body_buf_.get(), request_body_buf_->size(), base::BindOnce(&SpdyHttpStream::OnRequestBodyReadCompleted, weak_factory_.GetWeakPtr())); if (rv != ERR_IO_PENDING) OnRequestBodyReadCompleted(rv); } void SpdyHttpStream::SendEmptyBody() { CHECK(!HasUploadData()); CHECK(spdy_session_->EndStreamWithDataFrame()); auto buffer = base::MakeRefCounted(/* buffer_size = */ 0); stream_->SendData(buffer.get(), /* length = */ 0, NO_MORE_DATA_TO_SEND); } void SpdyHttpStream::InitializeStreamHelper() { stream_->SetDelegate(this); was_alpn_negotiated_ = stream_->WasAlpnNegotiated(); } void SpdyHttpStream::ResetStream(int error) { spdy_session_->ResetStream(stream()->stream_id(), error, std::string()); } void SpdyHttpStream::OnRequestBodyReadCompleted(int status) { if (status < 0) { DCHECK_NE(ERR_IO_PENDING, status); base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask( FROM_HERE, base::BindOnce(&SpdyHttpStream::ResetStream, weak_factory_.GetWeakPtr(), status)); return; } CHECK_GE(status, 0); request_body_buf_size_ = status; const bool eof = request_info_->upload_data_stream->IsEOF(); // Only the final frame may have a length of 0. if (eof) { CHECK_GE(request_body_buf_size_, 0); } else { CHECK_GT(request_body_buf_size_, 0); } stream_->SendData(request_body_buf_.get(), request_body_buf_size_, eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); } void SpdyHttpStream::MaybeScheduleBufferedReadCallback() { DCHECK(!stream_closed_); if (!user_buffer_.get()) return; // If enough data was received to fill the user buffer, invoke // DoBufferedReadCallback() with no delay. // // Note: DoBufferedReadCallback() is invoked asynchronously to preserve // historical behavior. It would be interesting to evaluate whether it can be // invoked synchronously to avoid the overhead of posting a task. A long time // ago, the callback was invoked synchronously // https://codereview.chromium.org/652209/diff/2018/net/spdy/spdy_stream.cc. if (response_body_queue_.GetTotalSize() >= static_cast(user_buffer_len_)) { buffered_read_timer_.Start(FROM_HERE, base::TimeDelta() /* no delay */, this, &SpdyHttpStream::DoBufferedReadCallback); return; } // Handing small chunks of data to the caller creates measurable overhead. // Wait 1ms to allow handing off multiple chunks of data received within a // short time span at once. buffered_read_timer_.Start(FROM_HERE, base::Milliseconds(1), this, &SpdyHttpStream::DoBufferedReadCallback); } void SpdyHttpStream::DoBufferedReadCallback() { buffered_read_timer_.Stop(); // If the transaction is cancelled or errored out, we don't need to complete // the read. if (stream_closed_ && closed_stream_status_ != OK) { if (response_callback_) DoResponseCallback(closed_stream_status_); return; } if (!user_buffer_.get()) return; if (!response_body_queue_.IsEmpty()) { int rv = response_body_queue_.Dequeue(user_buffer_->data(), user_buffer_len_); user_buffer_ = nullptr; user_buffer_len_ = 0; DoResponseCallback(rv); return; } if (stream_closed_ && response_callback_) DoResponseCallback(closed_stream_status_); } void SpdyHttpStream::DoRequestCallback(int rv) { CHECK_NE(rv, ERR_IO_PENDING); CHECK(!request_callback_.is_null()); // Since Run may result in being called back, reset request_callback_ in // advance. std::move(request_callback_).Run(rv); } void SpdyHttpStream::MaybeDoRequestCallback(int rv) { CHECK_NE(ERR_IO_PENDING, rv); if (request_callback_) std::move(request_callback_).Run(rv); } void SpdyHttpStream::MaybePostRequestCallback(int rv) { CHECK_NE(ERR_IO_PENDING, rv); if (request_callback_) base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask( FROM_HERE, base::BindOnce(&SpdyHttpStream::MaybeDoRequestCallback, weak_factory_.GetWeakPtr(), rv)); } void SpdyHttpStream::DoResponseCallback(int rv) { CHECK_NE(rv, ERR_IO_PENDING); CHECK(!response_callback_.is_null()); // Since Run may result in being called back, reset response_callback_ in // advance. std::move(response_callback_).Run(rv); } int SpdyHttpStream::GetRemoteEndpoint(IPEndPoint* endpoint) { if (!spdy_session_) return ERR_SOCKET_NOT_CONNECTED; return spdy_session_->GetPeerAddress(endpoint); } void SpdyHttpStream::PopulateNetErrorDetails(NetErrorDetails* details) { details->connection_info = HttpResponseInfo::CONNECTION_INFO_HTTP2; return; } void SpdyHttpStream::SetPriority(RequestPriority priority) { if (stream_) { stream_->SetPriority(priority); } } const std::set& SpdyHttpStream::GetDnsAliases() const { return dns_aliases_; } base::StringPiece SpdyHttpStream::GetAcceptChViaAlps() const { if (!request_info_) { return {}; } return session()->GetAcceptChViaAlps(url::SchemeHostPort(request_info_->url)); } } // namespace net