Use HTTP/2 CONNECT fast open

Also put the first client Pull to as early as possible.

Use member variables to hold the buffers instead of passing temporaries
around.
This commit is contained in:
klzgrad 2018-01-29 13:07:16 -05:00
parent ba287cc0fc
commit 629977fd1e
2 changed files with 138 additions and 66 deletions

View File

@ -42,6 +42,12 @@ NaiveClientConnection::NaiveClientConnection(
client_socket_( client_socket_(
std::make_unique<Socks5ServerSocket>(std::move(accepted_socket))), std::make_unique<Socks5ServerSocket>(std::move(accepted_socket))),
server_socket_handle_(std::make_unique<ClientSocketHandle>()), server_socket_handle_(std::make_unique<ClientSocketHandle>()),
sockets_{client_socket_.get(), nullptr},
errors_{OK, OK},
write_pending_{false, false},
early_pull_pending_(false),
can_push_to_server_(false),
early_pull_result_(ERR_IO_PENDING),
full_duplex_(false), full_duplex_(false),
time_func_(&base::TimeTicks::Now), time_func_(&base::TimeTicks::Now),
weak_ptr_factory_(this) { weak_ptr_factory_(this) {
@ -138,6 +144,15 @@ int NaiveClientConnection::DoConnectClientComplete(int result) {
if (result < 0) if (result < 0)
return result; return result;
early_pull_pending_ = true;
Pull(kClient, kServer);
if (early_pull_result_ != ERR_IO_PENDING) {
// Pull has completed synchronously.
if (early_pull_result_ <= 0) {
return early_pull_result_ ? early_pull_result_ : ERR_CONNECTION_CLOSED;
}
}
next_state_ = STATE_CONNECT_SERVER; next_state_ = STATE_CONNECT_SERVER;
return OK; return OK;
} }
@ -172,25 +187,27 @@ int NaiveClientConnection::DoConnectServerComplete(int result) {
if (result < 0) if (result < 0)
return result; return result;
DCHECK(server_socket_handle_->socket());
sockets_[kServer] = server_socket_handle_->socket();
full_duplex_ = true; full_duplex_ = true;
next_state_ = STATE_NONE; next_state_ = STATE_NONE;
return OK; return OK;
} }
int NaiveClientConnection::Run(const CompletionCallback& callback) { int NaiveClientConnection::Run(const CompletionCallback& callback) {
DCHECK(client_socket_); DCHECK(sockets_[kClient]);
DCHECK(server_socket_handle_->socket()); DCHECK(sockets_[kServer]);
DCHECK_EQ(next_state_, STATE_NONE); DCHECK_EQ(next_state_, STATE_NONE);
DCHECK(!connect_callback_); DCHECK(!connect_callback_);
if (errors_[kClient] != OK)
return errors_[kClient];
if (errors_[kServer] != OK)
return errors_[kServer];
run_callback_ = callback; run_callback_ = callback;
sockets_[kClient] = client_socket_.get();
sockets_[kServer] = server_socket_handle_->socket();
errors_[kClient] = OK;
errors_[kServer] = OK;
bytes_passed_without_yielding_[kClient] = 0; bytes_passed_without_yielding_[kClient] = 0;
bytes_passed_without_yielding_[kServer] = 0; bytes_passed_without_yielding_[kServer] = 0;
@ -199,7 +216,11 @@ int NaiveClientConnection::Run(const CompletionCallback& callback) {
base::TimeDelta::FromMilliseconds(kYieldAfterDurationMilliseconds); base::TimeDelta::FromMilliseconds(kYieldAfterDurationMilliseconds);
yield_after_time_[kServer] = yield_after_time_[kClient]; yield_after_time_[kServer] = yield_after_time_[kClient];
Pull(kClient, kServer); can_push_to_server_ = true;
if (!early_pull_pending_) {
DCHECK_GT(early_pull_result_, 0);
Push(kClient, kServer, early_pull_result_);
}
Pull(kServer, kClient); Pull(kServer, kClient);
return ERR_IO_PENDING; return ERR_IO_PENDING;
@ -209,75 +230,124 @@ void NaiveClientConnection::Pull(Direction from, Direction to) {
if (errors_[kClient] < 0 || errors_[kServer] < 0) if (errors_[kClient] < 0 || errors_[kServer] < 0)
return; return;
auto buffer = base::MakeRefCounted<IOBuffer>(kBufferSize); read_buffers_[from] = new IOBuffer(kBufferSize);
DCHECK(sockets_[from]);
int rv = sockets_[from]->Read( int rv = sockets_[from]->Read(
buffer.get(), kBufferSize, read_buffers_[from].get(), kBufferSize,
base::Bind(&NaiveClientConnection::OnReadComplete, base::Bind(&NaiveClientConnection::OnPullComplete,
weak_ptr_factory_.GetWeakPtr(), from, to, buffer)); weak_ptr_factory_.GetWeakPtr(), from, to));
if (from == kClient && early_pull_pending_)
early_pull_result_ = rv;
if (rv != ERR_IO_PENDING) if (rv != ERR_IO_PENDING)
OnReadComplete(from, to, buffer, rv); OnPullComplete(from, to, rv);
} }
void NaiveClientConnection::Push(Direction from, void NaiveClientConnection::Push(Direction from, Direction to, int size) {
Direction to, write_buffers_[to] = new DrainableIOBuffer(read_buffers_[from].get(), size);
scoped_refptr<IOBuffer> buffer, write_pending_[to] = true;
int size) { DCHECK(sockets_[to]);
if (errors_[kClient] < 0 || errors_[kServer] < 0) int rv =
return; sockets_[to]->Write(write_buffers_[to].get(), size,
base::Bind(&NaiveClientConnection::OnPushComplete,
auto drainable = base::MakeRefCounted<DrainableIOBuffer>(buffer.get(), size); weak_ptr_factory_.GetWeakPtr(), from, to));
int rv = sockets_[to]->Write(
drainable.get(), size,
base::Bind(&NaiveClientConnection::OnWriteComplete,
weak_ptr_factory_.GetWeakPtr(), from, to, drainable));
if (rv != ERR_IO_PENDING) if (rv != ERR_IO_PENDING)
OnWriteComplete(from, to, drainable, rv); OnPushComplete(from, to, rv);
} }
void NaiveClientConnection::OnIOError(Direction from, int error) { void NaiveClientConnection::Disconnect(Direction side) {
// Avoids running run_callback_ again. if (sockets_[side]) {
if (errors_[kClient] < 0 || errors_[kServer] < 0) sockets_[side]->Disconnect();
return; sockets_[side] = nullptr;
write_pending_[side] = false;
}
}
if (errors_[from] == OK) { bool NaiveClientConnection::IsConnected(Direction side) {
DCHECK(run_callback_); return sockets_[side];
}
void NaiveClientConnection::OnBothDisconnected() {
if (run_callback_) {
int error = OK;
if (errors_[kClient] != ERR_CONNECTION_CLOSED && errors_[kClient] < 0)
error = errors_[kClient];
if (errors_[kServer] != ERR_CONNECTION_CLOSED && errors_[kClient] < 0)
error = errors_[kServer];
base::ResetAndReturn(&run_callback_).Run(error); base::ResetAndReturn(&run_callback_).Run(error);
} }
}
void NaiveClientConnection::OnPullError(Direction from,
Direction to,
int error) {
DCHECK_LT(error, 0);
errors_[from] = error; errors_[from] = error;
Disconnect(from);
if (!write_pending_[to])
Disconnect(to);
if (!IsConnected(from) && !IsConnected(to))
OnBothDisconnected();
} }
void NaiveClientConnection::OnReadComplete(Direction from, void NaiveClientConnection::OnPushError(Direction from,
Direction to,
int error) {
DCHECK_LE(error, 0);
DCHECK(!write_pending_[to]);
if (error < 0) {
errors_[to] = error;
Disconnect(kServer);
Disconnect(kClient);
} else if (!IsConnected(from)) {
Disconnect(to);
}
if (!IsConnected(from) && !IsConnected(to))
OnBothDisconnected();
}
void NaiveClientConnection::OnPullComplete(Direction from,
Direction to, Direction to,
scoped_refptr<IOBuffer> buffer,
int result) { int result) {
if (from == kClient && early_pull_pending_) {
early_pull_pending_ = false;
early_pull_result_ = result;
}
if (result <= 0) { if (result <= 0) {
OnIOError(from, result ? result : ERR_CONNECTION_CLOSED); OnPullError(from, to, result ? result : ERR_CONNECTION_CLOSED);
return; return;
} }
Push(from, to, buffer, result); if (from == kClient && !can_push_to_server_)
return;
Push(from, to, result);
} }
void NaiveClientConnection::OnWriteComplete( void NaiveClientConnection::OnPushComplete(Direction from,
Direction from,
Direction to, Direction to,
scoped_refptr<DrainableIOBuffer> drainable,
int result) { int result) {
if (result < 0) { if (result >= 0) {
OnIOError(to, result);
return;
}
bytes_passed_without_yielding_[from] += result; bytes_passed_without_yielding_[from] += result;
write_buffers_[to]->DidConsume(result);
drainable->DidConsume(result); int size = write_buffers_[to]->BytesRemaining();
int size = drainable->BytesRemaining();
if (size > 0) { if (size > 0) {
Push(from, to, drainable.get(), size); Push(from, to, size);
return; return;
} }
}
write_pending_[to] = false;
// Checks for termination even if result is OK.
OnPushError(from, to, result >= 0 ? OK : result);
if (bytes_passed_without_yielding_[from] > kYieldAfterBytesRead || if (bytes_passed_without_yielding_[from] > kYieldAfterBytesRead ||
time_func_() > yield_after_time_[from]) { time_func_() > yield_after_time_[from]) {

View File

@ -65,19 +65,14 @@ class NaiveClientConnection {
int DoConnectServer(); int DoConnectServer();
int DoConnectServerComplete(int result); int DoConnectServerComplete(int result);
void Pull(Direction from, Direction to); void Pull(Direction from, Direction to);
void Push(Direction from, void Push(Direction from, Direction to, int size);
Direction to, void Disconnect(Direction side);
scoped_refptr<IOBuffer> buffer, bool IsConnected(Direction side);
int size); void OnBothDisconnected();
void OnIOError(Direction from, int error); void OnPullError(Direction from, Direction to, int error);
void OnReadComplete(Direction from, void OnPushError(Direction from, Direction to, int error);
Direction to, void OnPullComplete(Direction from, Direction to, int result);
scoped_refptr<IOBuffer> buffer, void OnPushComplete(Direction from, Direction to, int result);
int result);
void OnWriteComplete(Direction from,
Direction to,
scoped_refptr<DrainableIOBuffer> drainable,
int result);
int id_; int id_;
@ -96,10 +91,17 @@ class NaiveClientConnection {
std::unique_ptr<ClientSocketHandle> server_socket_handle_; std::unique_ptr<ClientSocketHandle> server_socket_handle_;
StreamSocket* sockets_[kNumDirections]; StreamSocket* sockets_[kNumDirections];
scoped_refptr<IOBuffer> read_buffers_[kNumDirections];
scoped_refptr<DrainableIOBuffer> write_buffers_[kNumDirections];
int errors_[kNumDirections]; int errors_[kNumDirections];
bool write_pending_[kNumDirections];
int bytes_passed_without_yielding_[kNumDirections]; int bytes_passed_without_yielding_[kNumDirections];
base::TimeTicks yield_after_time_[kNumDirections]; base::TimeTicks yield_after_time_[kNumDirections];
bool early_pull_pending_;
bool can_push_to_server_;
int early_pull_result_;
bool full_duplex_; bool full_duplex_;
TimeFunc time_func_; TimeFunc time_func_;