// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/spdy/chromium/spdy_http_stream.h" #include #include #include #include "base/bind.h" #include "base/callback_helpers.h" #include "base/location.h" #include "base/logging.h" #include "base/single_thread_task_runner.h" #include "base/threading/thread_task_runner_handle.h" #include "base/values.h" #include "net/base/host_port_pair.h" #include "net/base/upload_data_stream.h" #include "net/http/http_request_headers.h" #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/log/net_log_event_type.h" #include "net/log/net_log_with_source.h" #include "net/spdy/chromium/spdy_http_utils.h" #include "net/spdy/chromium/spdy_session.h" #include "net/spdy/core/spdy_header_block.h" #include "net/spdy/core/spdy_protocol.h" #include "net/spdy/platform/api/spdy_string.h" namespace net { const size_t SpdyHttpStream::kRequestBodyBufferSize = 1 << 14; // 16KB SpdyHttpStream::SpdyHttpStream(const base::WeakPtr& spdy_session, bool direct, NetLogSource source_dependency) : MultiplexedHttpStream( std::make_unique(spdy_session)), spdy_session_(spdy_session), is_reused_(spdy_session_->IsReused()), source_dependency_(source_dependency), stream_(nullptr), stream_closed_(false), closed_stream_status_(ERR_FAILED), closed_stream_id_(0), closed_stream_received_bytes_(0), closed_stream_sent_bytes_(0), request_info_(NULL), response_info_(NULL), response_headers_complete_(false), user_buffer_len_(0), request_body_buf_size_(0), buffered_read_callback_pending_(false), more_read_data_pending_(false), direct_(direct), was_alpn_negotiated_(false), weak_factory_(this) { DCHECK(spdy_session_.get()); } SpdyHttpStream::~SpdyHttpStream() { if (stream_) { stream_->DetachDelegate(); DCHECK(!stream_); } } int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info, RequestPriority priority, const NetLogWithSource& stream_net_log, const CompletionCallback& callback) { DCHECK(!stream_); if (!spdy_session_) return ERR_CONNECTION_CLOSED; request_info_ = request_info; if (request_info_->method == "GET") { int error = spdy_session_->GetPushStream(request_info_->url, priority, &stream_, stream_net_log); if (error != OK) return error; // |stream_| may be NULL even if OK was returned. if (stream_) { DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM); InitializeStreamHelper(); return OK; } } int rv = stream_request_.StartRequest( SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, priority, stream_net_log, base::Bind(&SpdyHttpStream::OnStreamCreated, weak_factory_.GetWeakPtr(), callback)); if (rv == OK) { stream_ = stream_request_.ReleaseStream().get(); InitializeStreamHelper(); } return rv; } int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) { CHECK(!callback.is_null()); if (stream_closed_) return closed_stream_status_; CHECK(stream_); // Check if we already have the response headers. If so, return synchronously. if (response_headers_complete_) { CHECK(!stream_->IsIdle()); return OK; } // Still waiting for the response, return IO_PENDING. CHECK(response_callback_.is_null()); response_callback_ = callback; return ERR_IO_PENDING; } int SpdyHttpStream::ReadResponseBody( IOBuffer* buf, int buf_len, const CompletionCallback& callback) { // Invalidate HttpRequestInfo pointer. This is to allow the stream to be // shared across multiple transactions which might require this // stream to outlive the request_'s owner. // Only allowed when Reading of response body starts. It is safe to reset it // at this point since request_->upload_data_stream is also not needed // anymore. request_info_ = nullptr; if (stream_) CHECK(!stream_->IsIdle()); CHECK(buf); CHECK(buf_len); CHECK(!callback.is_null()); // If we have data buffered, complete the IO immediately. if (!response_body_queue_.IsEmpty()) { return response_body_queue_.Dequeue(buf->data(), buf_len); } else if (stream_closed_) { return closed_stream_status_; } CHECK(response_callback_.is_null()); CHECK(!user_buffer_.get()); CHECK_EQ(0, user_buffer_len_); response_callback_ = callback; user_buffer_ = buf; user_buffer_len_ = buf_len; return ERR_IO_PENDING; } void SpdyHttpStream::Close(bool not_reusable) { // Note: the not_reusable flag has no meaning for SPDY streams. Cancel(); DCHECK(!stream_); } bool SpdyHttpStream::IsResponseBodyComplete() const { return stream_closed_; } bool SpdyHttpStream::IsConnectionReused() const { return is_reused_; } int64_t SpdyHttpStream::GetTotalReceivedBytes() const { if (stream_closed_) return closed_stream_received_bytes_; if (!stream_) return 0; return stream_->raw_received_bytes(); } int64_t SpdyHttpStream::GetTotalSentBytes() const { if (stream_closed_) return closed_stream_sent_bytes_; if (!stream_) return 0; return stream_->raw_sent_bytes(); } bool SpdyHttpStream::GetAlternativeService( AlternativeService* alternative_service) const { return false; } bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { if (stream_closed_) { if (!closed_stream_has_load_timing_info_) return false; *load_timing_info = closed_stream_load_timing_info_; return true; } // If |stream_| has yet to be created, or does not yet have an ID, fail. // The reused flag can only be correctly set once a stream has an ID. Streams // get their IDs once the request has been successfully sent, so this does not // behave that differently from other stream types. if (!stream_ || stream_->stream_id() == 0) return false; return stream_->GetLoadTimingInfo(load_timing_info); } int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers, HttpResponseInfo* response, const CompletionCallback& callback) { if (stream_closed_) { return closed_stream_status_; } base::Time request_time = base::Time::Now(); CHECK(stream_); stream_->SetRequestTime(request_time); // This should only get called in the case of a request occurring // during server push that has already begun but hasn't finished, // so we set the response's request time to be the actual one if (response_info_) response_info_->request_time = request_time; CHECK(!request_body_buf_.get()); if (HasUploadData()) { request_body_buf_ = new IOBufferWithSize(kRequestBodyBufferSize); // The request body buffer is empty at first. request_body_buf_size_ = 0; } CHECK(!callback.is_null()); CHECK(response); // SendRequest can be called in two cases. // // a) A client initiated request. In this case, |response_info_| should be // NULL to start with. // b) A client request which matches a response that the server has already // pushed. if (push_response_info_.get()) { *response = *(push_response_info_.get()); push_response_info_.reset(); } else { DCHECK_EQ(static_cast(NULL), response_info_); } response_info_ = response; // Put the peer's IP address and port into the response. IPEndPoint address; int result = stream_->GetPeerAddress(&address); if (result != OK) return result; response_info_->socket_address = HostPortPair::FromIPEndPoint(address); if (stream_->type() == SPDY_PUSH_STREAM) { // Pushed streams do not send any data, and should always be // idle. However, we still want to return ERR_IO_PENDING to mimic // non-push behavior. The callback will be called when the // response is received. CHECK(response_callback_.is_null()); response_callback_ = callback; return ERR_IO_PENDING; } SpdyHeaderBlock headers; CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers, direct_, &headers); stream_->net_log().AddEvent( NetLogEventType::HTTP_TRANSACTION_HTTP2_SEND_REQUEST_HEADERS, base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); DispatchRequestHeadersCallback(headers); result = stream_->SendRequestHeaders( std::move(headers), HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND); if (result == ERR_IO_PENDING) { CHECK(request_callback_.is_null()); request_callback_ = callback; } return result; } void SpdyHttpStream::Cancel() { request_callback_.Reset(); response_callback_.Reset(); if (stream_) { stream_->Cancel(); DCHECK(!stream_); } } void SpdyHttpStream::OnHeadersSent() { if (HasUploadData()) { ReadAndSendRequestBodyData(); } else { MaybePostRequestCallback(OK); } } void SpdyHttpStream::OnHeadersReceived( const SpdyHeaderBlock& response_headers) { DCHECK(!response_headers_complete_); response_headers_complete_ = true; if (!response_info_) { DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM); push_response_info_ = std::make_unique(); response_info_ = push_response_info_.get(); } const bool headers_valid = SpdyHeadersToHttpResponse(response_headers, response_info_); DCHECK(headers_valid); response_info_->response_time = stream_->response_time(); // Don't store the SSLInfo in the response here, HttpNetworkTransaction // will take care of that part. response_info_->was_alpn_negotiated = was_alpn_negotiated_; response_info_->request_time = stream_->GetRequestTime(); response_info_->connection_info = HttpResponseInfo::CONNECTION_INFO_HTTP2; response_info_->alpn_negotiated_protocol = HttpResponseInfo::ConnectionInfoToString(response_info_->connection_info); response_info_->vary_data .Init(*request_info_, *response_info_->headers.get()); if (!response_callback_.is_null()) { DoResponseCallback(OK); } } void SpdyHttpStream::OnDataReceived(std::unique_ptr buffer) { DCHECK(response_headers_complete_); // Note that data may be received for a SpdyStream prior to the user calling // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often // happen for server initiated streams. DCHECK(stream_); DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM); if (buffer) { response_body_queue_.Enqueue(std::move(buffer)); if (user_buffer_.get()) { // Handing small chunks of data to the caller creates measurable overhead. // We buffer data in short time-spans and send a single read notification. ScheduleBufferedReadCallback(); } } } void SpdyHttpStream::OnDataSent() { request_body_buf_size_ = 0; ReadAndSendRequestBodyData(); } // TODO(xunjieli): Maybe do something with the trailers. crbug.com/422958. void SpdyHttpStream::OnTrailers(const SpdyHeaderBlock& trailers) {} void SpdyHttpStream::OnClose(int status) { // Cancel any pending reads from the upload data stream. if (request_info_ && request_info_->upload_data_stream) request_info_->upload_data_stream->Reset(); if (stream_) { stream_closed_ = true; closed_stream_status_ = status; closed_stream_id_ = stream_->stream_id(); closed_stream_has_load_timing_info_ = stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_); closed_stream_received_bytes_ = stream_->raw_received_bytes(); closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); } stream_ = nullptr; // Callbacks might destroy |this|. base::WeakPtr self = weak_factory_.GetWeakPtr(); if (!request_callback_.is_null()) { DoRequestCallback(status); if (!self) return; } if (status == OK) { // We need to complete any pending buffered read now. DoBufferedReadCallback(); if (!self) return; } if (!response_callback_.is_null()) { DoResponseCallback(status); } } NetLogSource SpdyHttpStream::source_dependency() const { return source_dependency_; } bool SpdyHttpStream::HasUploadData() const { CHECK(request_info_); return request_info_->upload_data_stream && ((request_info_->upload_data_stream->size() > 0) || request_info_->upload_data_stream->is_chunked()); } void SpdyHttpStream::OnStreamCreated( const CompletionCallback& callback, int rv) { if (rv == OK) { stream_ = stream_request_.ReleaseStream().get(); InitializeStreamHelper(); } callback.Run(rv); } void SpdyHttpStream::ReadAndSendRequestBodyData() { CHECK(HasUploadData()); CHECK_EQ(request_body_buf_size_, 0); if (request_info_->upload_data_stream->IsEOF()) { MaybePostRequestCallback(OK); return; } // Read the data from the request body stream. const int rv = request_info_->upload_data_stream ->Read(request_body_buf_.get(), request_body_buf_->size(), base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted, weak_factory_.GetWeakPtr())); if (rv != ERR_IO_PENDING) OnRequestBodyReadCompleted(rv); } void SpdyHttpStream::InitializeStreamHelper() { stream_->SetDelegate(this); was_alpn_negotiated_ = stream_->WasAlpnNegotiated(); } void SpdyHttpStream::ResetStreamInternal() { spdy_session_->ResetStream(stream()->stream_id(), ERROR_CODE_INTERNAL_ERROR, SpdyString()); } void SpdyHttpStream::OnRequestBodyReadCompleted(int status) { if (status < 0) { DCHECK_NE(ERR_IO_PENDING, status); // Post |request_callback_| with received error. This should be posted // before ResetStreamInternal, because the latter would call // |request_callback_| via OnClose with an error code potentially different // from |status|. MaybePostRequestCallback(status); base::ThreadTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::Bind(&SpdyHttpStream::ResetStreamInternal, weak_factory_.GetWeakPtr())); return; } CHECK_GE(status, 0); request_body_buf_size_ = status; const bool eof = request_info_->upload_data_stream->IsEOF(); // Only the final frame may have a length of 0. if (eof) { CHECK_GE(request_body_buf_size_, 0); } else { CHECK_GT(request_body_buf_size_, 0); } stream_->SendData(request_body_buf_.get(), request_body_buf_size_, eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); } void SpdyHttpStream::ScheduleBufferedReadCallback() { // If there is already a scheduled DoBufferedReadCallback, don't issue // another one. Mark that we have received more data and return. if (buffered_read_callback_pending_) { more_read_data_pending_ = true; return; } more_read_data_pending_ = false; buffered_read_callback_pending_ = true; const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( FROM_HERE, base::Bind(&SpdyHttpStream::DoBufferedReadCallback, weak_factory_.GetWeakPtr()), kBufferTime); } // Checks to see if we should wait for more buffered data before notifying // the caller. Returns true if we should wait, false otherwise. bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const { // If the response is complete, there is no point in waiting. if (stream_closed_) return false; DCHECK_GT(user_buffer_len_, 0); return response_body_queue_.GetTotalSize() < static_cast(user_buffer_len_); } void SpdyHttpStream::DoBufferedReadCallback() { buffered_read_callback_pending_ = false; // If the transaction is cancelled or errored out, we don't need to complete // the read. if (!stream_ && !stream_closed_) return; int stream_status = stream_closed_ ? closed_stream_status_ : stream_->response_status(); if (stream_status != OK) return; // When more_read_data_pending_ is true, it means that more data has // arrived since we started waiting. Wait a little longer and continue // to buffer. if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { ScheduleBufferedReadCallback(); return; } int rv = 0; if (user_buffer_.get()) { rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, response_callback_); CHECK_NE(rv, ERR_IO_PENDING); user_buffer_ = NULL; user_buffer_len_ = 0; DoResponseCallback(rv); } } void SpdyHttpStream::DoRequestCallback(int rv) { CHECK_NE(rv, ERR_IO_PENDING); CHECK(!request_callback_.is_null()); // Since Run may result in being called back, reset request_callback_ in // advance. base::ResetAndReturn(&request_callback_).Run(rv); } void SpdyHttpStream::MaybeDoRequestCallback(int rv) { CHECK_NE(ERR_IO_PENDING, rv); if (request_callback_) base::ResetAndReturn(&request_callback_).Run(rv); } void SpdyHttpStream::MaybePostRequestCallback(int rv) { CHECK_NE(ERR_IO_PENDING, rv); if (request_callback_) base::ThreadTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::Bind(&SpdyHttpStream::MaybeDoRequestCallback, weak_factory_.GetWeakPtr(), rv)); } void SpdyHttpStream::DoResponseCallback(int rv) { CHECK_NE(rv, ERR_IO_PENDING); CHECK(!response_callback_.is_null()); // Since Run may result in being called back, reset response_callback_ in // advance. base::ResetAndReturn(&response_callback_).Run(rv); } bool SpdyHttpStream::GetRemoteEndpoint(IPEndPoint* endpoint) { if (!spdy_session_) return false; return spdy_session_->GetPeerAddress(endpoint) == OK; } void SpdyHttpStream::PopulateNetErrorDetails(NetErrorDetails* details) { details->connection_info = HttpResponseInfo::CONNECTION_INFO_HTTP2; return; } void SpdyHttpStream::SetPriority(RequestPriority priority) { // TODO(akalin): Plumb this through to |stream_request_| and // |stream_|. } } // namespace net