// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/quic/core/quic_stream.h" #include "net/quic/core/quic_flow_controller.h" #include "net/quic/core/quic_session.h" #include "net/quic/platform/api/quic_bug_tracker.h" #include "net/quic/platform/api/quic_flag_utils.h" #include "net/quic/platform/api/quic_flags.h" #include "net/quic/platform/api/quic_logging.h" using std::string; namespace net { #define ENDPOINT \ (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ") namespace { struct iovec MakeIovec(QuicStringPiece data) { struct iovec iov = {const_cast(data.data()), static_cast(data.size())}; return iov; } size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { return session->config()->GetInitialStreamFlowControlWindowToSend(); } size_t GetReceivedFlowControlWindow(QuicSession* session) { if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); } return kMinimumFlowControlSendWindow; } } // namespace QuicStream::PendingData::PendingData( string data_in, QuicReferenceCountedPointer ack_listener) : data(std::move(data_in)), offset(0), ack_listener(std::move(ack_listener)) {} QuicStream::PendingData::~PendingData() {} QuicStream::QuicStream(QuicStreamId id, QuicSession* session) : sequencer_(this, session->connection()->clock()), id_(id), session_(session), stream_bytes_read_(0), stream_bytes_written_(0), stream_bytes_outstanding_(0), stream_error_(QUIC_STREAM_NO_ERROR), connection_error_(QUIC_NO_ERROR), read_side_closed_(false), write_side_closed_(false), fin_buffered_(false), fin_sent_(false), fin_outstanding_(false), fin_received_(false), rst_sent_(false), rst_received_(false), perspective_(session_->perspective()), flow_controller_(session_->connection(), id_, perspective_, GetReceivedFlowControlWindow(session), GetInitialStreamFlowControlWindowToSend(session), session_->flow_controller()->auto_tune_receive_window(), session_->flow_controller()), connection_flow_controller_(session_->flow_controller()), stream_contributes_to_connection_flow_control_(true), busy_counter_(0), add_random_padding_after_fin_(false), ack_listener_(nullptr), send_buffer_( session->connection()->helper()->GetStreamSendBufferAllocator()), buffered_data_threshold_( GetQuicFlag(FLAGS_quic_buffered_data_threshold)) { SetFromConfig(); } QuicStream::~QuicStream() { if (session_ != nullptr && IsWaitingForAcks()) { QUIC_DVLOG(1) << ENDPOINT << "Stream " << id_ << " gets destroyed while waiting for acks. stream_bytes_outstanding = " << stream_bytes_outstanding_ << ", fin_outstanding: " << fin_outstanding_; } } void QuicStream::SetFromConfig() {} void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) { DCHECK_EQ(frame.stream_id, id_); DCHECK(!(read_side_closed_ && write_side_closed_)); if (frame.fin) { fin_received_ = true; if (fin_sent_) { session_->StreamDraining(id_); } } if (read_side_closed_) { QUIC_DLOG(INFO) << ENDPOINT << "Stream " << frame.stream_id << " is closed for reading. Ignoring newly received stream data."; // The subclass does not want to read data: blackhole the data. return; } // This count includes duplicate data received. size_t frame_payload_size = frame.data_length; stream_bytes_read_ += frame_payload_size; // Flow control is interested in tracking highest received offset. // Only interested in received frames that carry data. if (frame_payload_size > 0 && MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { // As the highest received offset has changed, check to see if this is a // violation of flow control. if (flow_controller_.FlowControlViolation() || connection_flow_controller_->FlowControlViolation()) { CloseConnectionWithDetails( QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, "Flow control violation after increasing offset"); return; } } sequencer_.OnStreamFrame(frame); } int QuicStream::num_frames_received() const { return sequencer_.num_frames_received(); } int QuicStream::num_duplicate_frames_received() const { return sequencer_.num_duplicate_frames_received(); } void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { rst_received_ = true; MaybeIncreaseHighestReceivedOffset(frame.byte_offset); stream_error_ = frame.error_code; CloseWriteSide(); CloseReadSide(); } void QuicStream::OnConnectionClosed(QuicErrorCode error, ConnectionCloseSource /*source*/) { if (read_side_closed_ && write_side_closed_) { return; } if (error != QUIC_NO_ERROR) { stream_error_ = QUIC_STREAM_CONNECTION_ERROR; connection_error_ = error; } CloseWriteSide(); CloseReadSide(); } void QuicStream::OnFinRead() { DCHECK(sequencer_.IsClosed()); // OnFinRead can be called due to a FIN flag in a headers block, so there may // have been no OnStreamFrame call with a FIN in the frame. fin_received_ = true; // If fin_sent_ is true, then CloseWriteSide has already been called, and the // stream will be destroyed by CloseReadSide, so don't need to call // StreamDraining. CloseReadSide(); } void QuicStream::Reset(QuicRstStreamErrorCode error) { stream_error_ = error; // Sending a RstStream results in calling CloseStream. session()->SendRstStream(id(), error, stream_bytes_written_); rst_sent_ = true; } void QuicStream::CloseConnectionWithDetails(QuicErrorCode error, const string& details) { session()->connection()->CloseConnection( error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); } void QuicStream::WriteOrBufferData( QuicStringPiece data, bool fin, QuicReferenceCountedPointer ack_listener) { if (data.empty() && !fin) { QUIC_BUG << "data.empty() && !fin"; return; } if (fin_buffered_) { QUIC_BUG << "Fin already buffered"; return; } if (write_side_closed_) { QUIC_DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; return; } QuicConsumedData consumed_data(0, false); fin_buffered_ = fin; bool had_buffered_data = HasBufferedData(); // Do not respect buffered data upper limit as WriteOrBufferData guarantees // all data to be consumed. if (data.length() > 0) { struct iovec iov(MakeIovec(data)); QuicIOVector quic_iov(&iov, 1, data.length()); QuicStreamOffset offset = send_buffer_.stream_offset(); send_buffer_.SaveStreamData(quic_iov, 0, data.length()); OnDataBuffered(offset, data.length(), ack_listener); } if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { // Write data if there is no buffered data before. WriteBufferedData(); } } void QuicStream::OnCanWrite() { if (write_side_closed_) { QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id() << "attempting to write when the write side is closed"; return; } if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) { WriteBufferedData(); } if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) { // Notify upper layer to write new data when buffered data size is below // low water mark. OnCanWriteNewData(); } } void QuicStream::MaybeSendBlocked() { flow_controller_.MaybeSendBlocked(); if (!stream_contributes_to_connection_flow_control_) { return; } connection_flow_controller_->MaybeSendBlocked(); // If the stream is blocked by connection-level flow control but not by // stream-level flow control, add the stream to the write blocked list so that // the stream will be given a chance to write when a connection-level // WINDOW_UPDATE arrives. if (connection_flow_controller_->IsBlocked() && !flow_controller_.IsBlocked()) { session_->MarkConnectionLevelWriteBlocked(id()); } } QuicConsumedData QuicStream::WritevData( const struct iovec* iov, int iov_count, bool fin, QuicReferenceCountedPointer ack_listener) { if (write_side_closed_) { QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id() << "attempting to write when the write side is closed"; return QuicConsumedData(0, false); } // How much data was provided. size_t write_length = 0; if (iov != nullptr) { for (int i = 0; i < iov_count; ++i) { write_length += iov[i].iov_len; } } QuicConsumedData consumed_data(0, false); if (fin_buffered_) { QUIC_BUG << "Fin already buffered"; return consumed_data; } bool had_buffered_data = HasBufferedData(); if (CanWriteNewData()) { // Save all data if buffered data size is below low water mark. QuicIOVector quic_iovec(iov, iov_count, write_length); consumed_data.bytes_consumed = write_length; if (consumed_data.bytes_consumed > 0) { QuicStreamOffset offset = send_buffer_.stream_offset(); send_buffer_.SaveStreamData(quic_iovec, 0, write_length); OnDataBuffered(offset, write_length, ack_listener); } } consumed_data.fin_consumed = consumed_data.bytes_consumed == write_length && fin; fin_buffered_ = consumed_data.fin_consumed; if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { // Write data if there is no buffered data before. WriteBufferedData(); } return consumed_data; } QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) { DCHECK(session_->can_use_slices()); QuicConsumedData consumed_data(0, false); if (span.empty() && !fin) { QUIC_BUG << "span.empty() && !fin"; return consumed_data; } if (fin_buffered_) { QUIC_BUG << "Fin already buffered"; return consumed_data; } if (write_side_closed_) { QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id() << "attempting to write when the write side is closed"; return consumed_data; } bool had_buffered_data = HasBufferedData(); if (CanWriteNewData() || span.empty()) { consumed_data.fin_consumed = fin; if (!span.empty()) { // Buffer all data if buffered data size is below limit. QuicStreamOffset offset = send_buffer_.stream_offset(); consumed_data.bytes_consumed = span.SaveMemSlicesInSendBuffer(&send_buffer_); OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr); } } fin_buffered_ = consumed_data.fin_consumed; if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { // Write data if there is no buffered data before. WriteBufferedData(); } return consumed_data; } QuicConsumedData QuicStream::WritevDataInner( QuicIOVector iov, QuicStreamOffset offset, bool fin, QuicReferenceCountedPointer ack_listener) { StreamSendingState state = fin ? FIN : NO_FIN; if (fin && add_random_padding_after_fin_) { state = FIN_AND_PADDING; } return session()->WritevData(this, id(), iov, offset, state, std::move(ack_listener)); } void QuicStream::CloseReadSide() { if (read_side_closed_) { return; } QUIC_DLOG(INFO) << ENDPOINT << "Done reading from stream " << id(); read_side_closed_ = true; sequencer_.ReleaseBuffer(); if (write_side_closed_) { QUIC_DLOG(INFO) << ENDPOINT << "Closing stream " << id(); session_->CloseStream(id()); } } void QuicStream::CloseWriteSide() { if (write_side_closed_) { return; } QUIC_DLOG(INFO) << ENDPOINT << "Done writing to stream " << id(); write_side_closed_ = true; if (read_side_closed_) { QUIC_DLOG(INFO) << ENDPOINT << "Closing stream " << id(); session_->CloseStream(id()); } } bool QuicStream::HasBufferedData() const { DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written_); return send_buffer_.stream_offset() > stream_bytes_written_; } QuicTransportVersion QuicStream::transport_version() const { return session_->connection()->transport_version(); } void QuicStream::StopReading() { QUIC_DLOG(INFO) << ENDPOINT << "Stop reading from stream " << id(); sequencer_.StopReading(); } const QuicSocketAddress& QuicStream::PeerAddressOfLatestPacket() const { return session_->connection()->last_packet_source_address(); } void QuicStream::OnClose() { CloseReadSide(); CloseWriteSide(); if (!fin_sent_ && !rst_sent_) { // For flow control accounting, tell the peer how many bytes have been // written on this stream before termination. Done here if needed, using a // RST_STREAM frame. QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id(); session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, stream_bytes_written_); rst_sent_ = true; } // The stream is being closed and will not process any further incoming bytes. // As there may be more bytes in flight, to ensure that both endpoints have // the same connection level flow control state, mark all unreceived or // buffered bytes as consumed. QuicByteCount bytes_to_consume = flow_controller_.highest_received_byte_offset() - flow_controller_.bytes_consumed(); AddBytesConsumed(bytes_to_consume); } void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) { if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { // Writing can be done again! // TODO(rjshade): This does not respect priorities (e.g. multiple // outstanding POSTs are unblocked on arrival of // SHLO with initial window). // As long as the connection is not flow control blocked, write on! OnCanWrite(); } } bool QuicStream::MaybeIncreaseHighestReceivedOffset( QuicStreamOffset new_offset) { uint64_t increment = new_offset - flow_controller_.highest_received_byte_offset(); if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { return false; } // If |new_offset| increased the stream flow controller's highest received // offset, increase the connection flow controller's value by the incremental // difference. if (stream_contributes_to_connection_flow_control_) { connection_flow_controller_->UpdateHighestReceivedOffset( connection_flow_controller_->highest_received_byte_offset() + increment); } return true; } void QuicStream::AddBytesSent(QuicByteCount bytes) { flow_controller_.AddBytesSent(bytes); if (stream_contributes_to_connection_flow_control_) { connection_flow_controller_->AddBytesSent(bytes); } } void QuicStream::AddBytesConsumed(QuicByteCount bytes) { // Only adjust stream level flow controller if still reading. if (!read_side_closed_) { flow_controller_.AddBytesConsumed(bytes); } if (stream_contributes_to_connection_flow_control_) { connection_flow_controller_->AddBytesConsumed(bytes); } } void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { if (flow_controller_.UpdateSendWindowOffset(new_window)) { OnCanWrite(); } } void QuicStream::AddRandomPaddingAfterFin() { add_random_padding_after_fin_ = true; } void QuicStream::OnStreamFrameAcked(const QuicStreamFrame& frame, QuicTime::Delta ack_delay_time) { OnStreamFrameDiscarded(frame); if (ack_listener_ != nullptr) { ack_listener_->OnPacketAcked(frame.data_length, ack_delay_time); } } void QuicStream::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) { if (ack_listener_ != nullptr) { ack_listener_->OnPacketRetransmitted(frame.data_length); } } void QuicStream::OnStreamFrameDiscarded(const QuicStreamFrame& frame) { DCHECK_EQ(id_, frame.stream_id); if (stream_bytes_outstanding_ < frame.data_length || (!fin_outstanding_ && frame.fin)) { CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Trying to discard unsent data."); return; } stream_bytes_outstanding_ -= frame.data_length; if (frame.fin) { fin_outstanding_ = false; } if (frame.data_length > 0) { send_buffer_.RemoveStreamFrame(frame.offset, frame.data_length); } if (!IsWaitingForAcks()) { session_->OnStreamDoneWaitingForAcks(id_); } } bool QuicStream::IsWaitingForAcks() const { return stream_bytes_outstanding_ || fin_outstanding_; } bool QuicStream::WriteStreamData(QuicStreamOffset offset, QuicByteCount data_length, QuicDataWriter* writer) { DCHECK_LT(0u, data_length); return send_buffer_.WriteStreamData(offset, data_length, writer); } void QuicStream::WriteBufferedData() { DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_)); if (session_->ShouldYield(id())) { session_->MarkConnectionLevelWriteBlocked(id()); return; } // Size of buffered data. size_t write_length = BufferedDataBytes(); // A FIN with zero data payload should not be flow control blocked. bool fin_with_zero_data = (fin_buffered_ && write_length == 0); bool fin = fin_buffered_; // How much data flow control permits to be written. QuicByteCount send_window = flow_controller_.SendWindowSize(); if (stream_contributes_to_connection_flow_control_) { send_window = std::min(send_window, connection_flow_controller_->SendWindowSize()); } if (send_window == 0 && !fin_with_zero_data) { // Quick return if nothing can be sent. MaybeSendBlocked(); return; } if (write_length > send_window) { // Don't send the FIN unless all the data will be sent. fin = false; // Writing more data would be a violation of flow control. write_length = static_cast(send_window); QUIC_DVLOG(1) << "stream " << id() << " shortens write length to " << write_length << " due to flow control"; } QuicConsumedData consumed_data = WritevDataInner( QuicIOVector(/*iov=*/nullptr, /*iov_count=*/0, write_length), stream_bytes_written_, fin, nullptr); stream_bytes_written_ += consumed_data.bytes_consumed; stream_bytes_outstanding_ += consumed_data.bytes_consumed; AddBytesSent(consumed_data.bytes_consumed); QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends " << stream_bytes_written_ << " bytes " << " and has buffered data " << BufferedDataBytes() << " bytes." << " fin is sent: " << consumed_data.fin_consumed << " fin is buffered: " << fin_buffered_; // The write may have generated a write error causing this stream to be // closed. If so, simply return without marking the stream write blocked. if (write_side_closed_) { return; } if (consumed_data.bytes_consumed == write_length) { if (!fin_with_zero_data) { MaybeSendBlocked(); } if (fin && consumed_data.fin_consumed) { fin_sent_ = true; fin_outstanding_ = true; if (fin_received_) { session_->StreamDraining(id_); } CloseWriteSide(); } else if (fin && !consumed_data.fin_consumed) { session_->MarkConnectionLevelWriteBlocked(id()); } } else { session_->MarkConnectionLevelWriteBlocked(id()); } if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) { busy_counter_ = 0; } } uint64_t QuicStream::BufferedDataBytes() const { DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written_); return send_buffer_.stream_offset() - stream_bytes_written_; } bool QuicStream::CanWriteNewData() const { return BufferedDataBytes() < buffered_data_threshold_; } } // namespace net