Add support for HTTP/2 CONNECT fast open

This commit is contained in:
klzgrad 2018-01-29 13:07:16 -05:00
parent a211e5abce
commit 4dc864ce1f
2 changed files with 138 additions and 66 deletions

View File

@ -42,6 +42,12 @@ NaiveClientConnection::NaiveClientConnection(
client_socket_( client_socket_(
std::make_unique<Socks5ServerSocket>(std::move(accepted_socket))), std::make_unique<Socks5ServerSocket>(std::move(accepted_socket))),
server_socket_handle_(std::make_unique<ClientSocketHandle>()), server_socket_handle_(std::make_unique<ClientSocketHandle>()),
sockets_{client_socket_.get(), nullptr},
errors_{OK, OK},
write_pending_{false, false},
early_pull_pending_(false),
can_push_to_server_(false),
early_pull_result_(ERR_IO_PENDING),
full_duplex_(false), full_duplex_(false),
time_func_(&base::TimeTicks::Now), time_func_(&base::TimeTicks::Now),
weak_ptr_factory_(this) { weak_ptr_factory_(this) {
@ -138,6 +144,15 @@ int NaiveClientConnection::DoConnectClientComplete(int result) {
if (result < 0) if (result < 0)
return result; return result;
early_pull_pending_ = true;
Pull(kClient, kServer);
if (early_pull_result_ != ERR_IO_PENDING) {
// Pull has completed synchronously.
if (early_pull_result_ <= 0) {
return early_pull_result_ ? early_pull_result_ : ERR_CONNECTION_CLOSED;
}
}
next_state_ = STATE_CONNECT_SERVER; next_state_ = STATE_CONNECT_SERVER;
return OK; return OK;
} }
@ -172,25 +187,27 @@ int NaiveClientConnection::DoConnectServerComplete(int result) {
if (result < 0) if (result < 0)
return result; return result;
DCHECK(server_socket_handle_->socket());
sockets_[kServer] = server_socket_handle_->socket();
full_duplex_ = true; full_duplex_ = true;
next_state_ = STATE_NONE; next_state_ = STATE_NONE;
return OK; return OK;
} }
int NaiveClientConnection::Run(const CompletionCallback& callback) { int NaiveClientConnection::Run(const CompletionCallback& callback) {
DCHECK(client_socket_); DCHECK(sockets_[kClient]);
DCHECK(server_socket_handle_->socket()); DCHECK(sockets_[kServer]);
DCHECK_EQ(next_state_, STATE_NONE); DCHECK_EQ(next_state_, STATE_NONE);
DCHECK(!connect_callback_); DCHECK(!connect_callback_);
if (errors_[kClient] != OK)
return errors_[kClient];
if (errors_[kServer] != OK)
return errors_[kServer];
run_callback_ = callback; run_callback_ = callback;
sockets_[kClient] = client_socket_.get();
sockets_[kServer] = server_socket_handle_->socket();
errors_[kClient] = OK;
errors_[kServer] = OK;
bytes_passed_without_yielding_[kClient] = 0; bytes_passed_without_yielding_[kClient] = 0;
bytes_passed_without_yielding_[kServer] = 0; bytes_passed_without_yielding_[kServer] = 0;
@ -199,7 +216,11 @@ int NaiveClientConnection::Run(const CompletionCallback& callback) {
base::TimeDelta::FromMilliseconds(kYieldAfterDurationMilliseconds); base::TimeDelta::FromMilliseconds(kYieldAfterDurationMilliseconds);
yield_after_time_[kServer] = yield_after_time_[kClient]; yield_after_time_[kServer] = yield_after_time_[kClient];
Pull(kClient, kServer); can_push_to_server_ = true;
if (!early_pull_pending_) {
DCHECK_GT(early_pull_result_, 0);
Push(kClient, kServer, early_pull_result_);
}
Pull(kServer, kClient); Pull(kServer, kClient);
return ERR_IO_PENDING; return ERR_IO_PENDING;
@ -209,75 +230,124 @@ void NaiveClientConnection::Pull(Direction from, Direction to) {
if (errors_[kClient] < 0 || errors_[kServer] < 0) if (errors_[kClient] < 0 || errors_[kServer] < 0)
return; return;
auto buffer = base::MakeRefCounted<IOBuffer>(kBufferSize); read_buffers_[from] = new IOBuffer(kBufferSize);
DCHECK(sockets_[from]);
int rv = sockets_[from]->Read( int rv = sockets_[from]->Read(
buffer.get(), kBufferSize, read_buffers_[from].get(), kBufferSize,
base::Bind(&NaiveClientConnection::OnReadComplete, base::Bind(&NaiveClientConnection::OnPullComplete,
weak_ptr_factory_.GetWeakPtr(), from, to, buffer)); weak_ptr_factory_.GetWeakPtr(), from, to));
if (from == kClient && early_pull_pending_)
early_pull_result_ = rv;
if (rv != ERR_IO_PENDING) if (rv != ERR_IO_PENDING)
OnReadComplete(from, to, buffer, rv); OnPullComplete(from, to, rv);
} }
void NaiveClientConnection::Push(Direction from, void NaiveClientConnection::Push(Direction from, Direction to, int size) {
Direction to, write_buffers_[to] = new DrainableIOBuffer(read_buffers_[from].get(), size);
scoped_refptr<IOBuffer> buffer, write_pending_[to] = true;
int size) { DCHECK(sockets_[to]);
if (errors_[kClient] < 0 || errors_[kServer] < 0) int rv =
return; sockets_[to]->Write(write_buffers_[to].get(), size,
base::Bind(&NaiveClientConnection::OnPushComplete,
auto drainable = base::MakeRefCounted<DrainableIOBuffer>(buffer.get(), size); weak_ptr_factory_.GetWeakPtr(), from, to));
int rv = sockets_[to]->Write(
drainable.get(), size,
base::Bind(&NaiveClientConnection::OnWriteComplete,
weak_ptr_factory_.GetWeakPtr(), from, to, drainable));
if (rv != ERR_IO_PENDING) if (rv != ERR_IO_PENDING)
OnWriteComplete(from, to, drainable, rv); OnPushComplete(from, to, rv);
} }
void NaiveClientConnection::OnIOError(Direction from, int error) { void NaiveClientConnection::Disconnect(Direction side) {
// Avoids running run_callback_ again. if (sockets_[side]) {
if (errors_[kClient] < 0 || errors_[kServer] < 0) sockets_[side]->Disconnect();
return; sockets_[side] = nullptr;
write_pending_[side] = false;
}
}
if (errors_[from] == OK) { bool NaiveClientConnection::IsConnected(Direction side) {
DCHECK(run_callback_); return sockets_[side];
}
void NaiveClientConnection::OnBothDisconnected() {
if (run_callback_) {
int error = OK;
if (errors_[kClient] != ERR_CONNECTION_CLOSED && errors_[kClient] < 0)
error = errors_[kClient];
if (errors_[kServer] != ERR_CONNECTION_CLOSED && errors_[kClient] < 0)
error = errors_[kServer];
base::ResetAndReturn(&run_callback_).Run(error); base::ResetAndReturn(&run_callback_).Run(error);
} }
}
void NaiveClientConnection::OnPullError(Direction from,
Direction to,
int error) {
DCHECK_LT(error, 0);
errors_[from] = error; errors_[from] = error;
Disconnect(from);
if (!write_pending_[to])
Disconnect(to);
if (!IsConnected(from) && !IsConnected(to))
OnBothDisconnected();
} }
void NaiveClientConnection::OnReadComplete(Direction from, void NaiveClientConnection::OnPushError(Direction from,
Direction to,
int error) {
DCHECK_LE(error, 0);
DCHECK(!write_pending_[to]);
if (error < 0) {
errors_[to] = error;
Disconnect(kServer);
Disconnect(kClient);
} else if (!IsConnected(from)) {
Disconnect(to);
}
if (!IsConnected(from) && !IsConnected(to))
OnBothDisconnected();
}
void NaiveClientConnection::OnPullComplete(Direction from,
Direction to, Direction to,
scoped_refptr<IOBuffer> buffer,
int result) { int result) {
if (from == kClient && early_pull_pending_) {
early_pull_pending_ = false;
early_pull_result_ = result;
}
if (result <= 0) { if (result <= 0) {
OnIOError(from, result ? result : ERR_CONNECTION_CLOSED); OnPullError(from, to, result ? result : ERR_CONNECTION_CLOSED);
return; return;
} }
Push(from, to, buffer, result); if (from == kClient && !can_push_to_server_)
return;
Push(from, to, result);
} }
void NaiveClientConnection::OnWriteComplete( void NaiveClientConnection::OnPushComplete(Direction from,
Direction from, Direction to,
Direction to, int result) {
scoped_refptr<DrainableIOBuffer> drainable, if (result >= 0) {
int result) { bytes_passed_without_yielding_[from] += result;
if (result < 0) { write_buffers_[to]->DidConsume(result);
OnIOError(to, result); int size = write_buffers_[to]->BytesRemaining();
return; if (size > 0) {
Push(from, to, size);
return;
}
} }
bytes_passed_without_yielding_[from] += result; write_pending_[to] = false;
// Checks for termination even if result is OK.
drainable->DidConsume(result); OnPushError(from, to, result >= 0 ? OK : result);
int size = drainable->BytesRemaining();
if (size > 0) {
Push(from, to, drainable.get(), size);
return;
}
if (bytes_passed_without_yielding_[from] > kYieldAfterBytesRead || if (bytes_passed_without_yielding_[from] > kYieldAfterBytesRead ||
time_func_() > yield_after_time_[from]) { time_func_() > yield_after_time_[from]) {

View File

@ -65,19 +65,14 @@ class NaiveClientConnection {
int DoConnectServer(); int DoConnectServer();
int DoConnectServerComplete(int result); int DoConnectServerComplete(int result);
void Pull(Direction from, Direction to); void Pull(Direction from, Direction to);
void Push(Direction from, void Push(Direction from, Direction to, int size);
Direction to, void Disconnect(Direction side);
scoped_refptr<IOBuffer> buffer, bool IsConnected(Direction side);
int size); void OnBothDisconnected();
void OnIOError(Direction from, int error); void OnPullError(Direction from, Direction to, int error);
void OnReadComplete(Direction from, void OnPushError(Direction from, Direction to, int error);
Direction to, void OnPullComplete(Direction from, Direction to, int result);
scoped_refptr<IOBuffer> buffer, void OnPushComplete(Direction from, Direction to, int result);
int result);
void OnWriteComplete(Direction from,
Direction to,
scoped_refptr<DrainableIOBuffer> drainable,
int result);
int id_; int id_;
@ -96,10 +91,17 @@ class NaiveClientConnection {
std::unique_ptr<ClientSocketHandle> server_socket_handle_; std::unique_ptr<ClientSocketHandle> server_socket_handle_;
StreamSocket* sockets_[kNumDirections]; StreamSocket* sockets_[kNumDirections];
scoped_refptr<IOBuffer> read_buffers_[kNumDirections];
scoped_refptr<DrainableIOBuffer> write_buffers_[kNumDirections];
int errors_[kNumDirections]; int errors_[kNumDirections];
bool write_pending_[kNumDirections];
int bytes_passed_without_yielding_[kNumDirections]; int bytes_passed_without_yielding_[kNumDirections];
base::TimeTicks yield_after_time_[kNumDirections]; base::TimeTicks yield_after_time_[kNumDirections];
bool early_pull_pending_;
bool can_push_to_server_;
int early_pull_result_;
bool full_duplex_; bool full_duplex_;
TimeFunc time_func_; TimeFunc time_func_;