// Copyright 2018 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/websockets/websocket_basic_stream_adapters.h" #include #include #include #include "base/bind.h" #include "base/location.h" #include "base/single_thread_task_runner.h" #include "base/threading/thread_task_runner_handle.h" #include "net/base/io_buffer.h" #include "net/socket/client_socket_handle.h" #include "net/socket/socket.h" #include "net/spdy/spdy_buffer.h" namespace net { WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter( std::unique_ptr connection) : connection_(std::move(connection)) {} WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() {} int WebSocketClientSocketHandleAdapter::Read( IOBuffer* buf, int buf_len, const CompletionCallback& callback) { return connection_->socket()->Read(buf, buf_len, callback); } int WebSocketClientSocketHandleAdapter::Write( IOBuffer* buf, int buf_len, const CompletionCallback& callback, const NetworkTrafficAnnotationTag& traffic_annotation) { return connection_->socket()->Write(buf, buf_len, callback, traffic_annotation); } void WebSocketClientSocketHandleAdapter::Disconnect() { connection_->socket()->Disconnect(); } bool WebSocketClientSocketHandleAdapter::is_initialized() const { return connection_->is_initialized(); } WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter( base::WeakPtr stream, Delegate* delegate, NetLogWithSource net_log) : headers_sent_(false), stream_(stream), stream_error_(ERR_CONNECTION_CLOSED), delegate_(delegate), write_length_(0), net_log_(net_log), weak_factory_(this) { stream_->SetDelegate(this); } WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() { if (stream_) { // DetachDelegate() also cancels the stream. stream_->DetachDelegate(); } } void WebSocketSpdyStreamAdapter::DetachDelegate() { delegate_ = nullptr; } int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf, int buf_len, const CompletionCallback& callback) { DCHECK(!read_callback_); DCHECK_LT(0, buf_len); read_buffer_ = buf; // |read_length_| is size_t and |buf_len| is a non-negative int, therefore // conversion is always valid. read_length_ = buf_len; if (!read_data_.IsEmpty()) return CopySavedReadDataIntoBuffer(); if (!stream_) return stream_error_; read_callback_ = callback; return ERR_IO_PENDING; } int WebSocketSpdyStreamAdapter::Write( IOBuffer* buf, int buf_len, const CompletionCallback& callback, const NetworkTrafficAnnotationTag& traffic_annotation) { CHECK(headers_sent_); DCHECK(!write_callback_); DCHECK(callback); DCHECK_LT(0, buf_len); if (!stream_) return stream_error_; stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND); write_callback_ = callback; write_length_ = buf_len; return ERR_IO_PENDING; } void WebSocketSpdyStreamAdapter::Disconnect() { if (stream_) { stream_->DetachDelegate(); stream_ = nullptr; } } bool WebSocketSpdyStreamAdapter::is_initialized() const { return true; } // SpdyStream::Delegate methods. void WebSocketSpdyStreamAdapter::OnHeadersSent() { headers_sent_ = true; if (delegate_) delegate_->OnHeadersSent(); } void WebSocketSpdyStreamAdapter::OnHeadersReceived( const spdy::SpdyHeaderBlock& response_headers, const spdy::SpdyHeaderBlock* pushed_request_headers) { if (delegate_) delegate_->OnHeadersReceived(response_headers); } void WebSocketSpdyStreamAdapter::OnDataReceived( std::unique_ptr buffer) { read_data_.Enqueue(std::move(buffer)); if (read_callback_) base::ResetAndReturn(&read_callback_).Run(CopySavedReadDataIntoBuffer()); } void WebSocketSpdyStreamAdapter::OnDataSent() { DCHECK(write_callback_); base::ResetAndReturn(&write_callback_).Run(write_length_); } void WebSocketSpdyStreamAdapter::OnTrailers( const spdy::SpdyHeaderBlock& trailers) {} void WebSocketSpdyStreamAdapter::OnClose(int status) { DCHECK_GT(ERR_IO_PENDING, status); stream_error_ = status; stream_ = nullptr; auto self = weak_factory_.GetWeakPtr(); if (read_callback_) { DCHECK(read_data_.IsEmpty()); // Might destroy |this|. base::ResetAndReturn(&read_callback_).Run(status); if (!self) return; } if (write_callback_) { // Might destroy |this|. base::ResetAndReturn(&write_callback_).Run(status); if (!self) return; } // Delay calling delegate_->OnClose() until all buffered data are read. if (read_data_.IsEmpty() && delegate_) { // Might destroy |this|. delegate_->OnClose(status); } } NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const { return net_log_.source(); } int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() { int rv = read_data_.Dequeue(read_buffer_->data(), read_length_); // Stream has been destroyed earlier but delegate_->OnClose() call was // delayed until all buffered data are read. PostTask so that Read() can // return beforehand. if (!stream_ && delegate_ && read_data_.IsEmpty()) { base::ThreadTaskRunnerHandle::Get()->PostTask( FROM_HERE, base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose, weak_factory_.GetWeakPtr())); } return rv; } void WebSocketSpdyStreamAdapter::CallDelegateOnClose() { if (delegate_) delegate_->OnClose(stream_error_); } } // namespace net