// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/proxy/multi_threaded_proxy_resolver.h" #include #include #include "base/bind.h" #include "base/bind_helpers.h" #include "base/containers/circular_deque.h" #include "base/location.h" #include "base/single_thread_task_runner.h" #include "base/stl_util.h" #include "base/strings/string_util.h" #include "base/strings/stringprintf.h" #include "base/threading/thread.h" #include "base/threading/thread_checker.h" #include "base/threading/thread_restrictions.h" #include "base/threading/thread_task_runner_handle.h" #include "net/base/net_errors.h" #include "net/log/net_log.h" #include "net/log/net_log_event_type.h" #include "net/log/net_log_with_source.h" #include "net/proxy/proxy_info.h" #include "net/proxy/proxy_resolver.h" namespace net { namespace { class Job; // An "executor" is a job-runner for PAC requests. It encapsulates a worker // thread and a synchronous ProxyResolver (which will be operated on said // thread.) class Executor : public base::RefCountedThreadSafe { public: class Coordinator { public: virtual void OnExecutorReady(Executor* executor) = 0; protected: virtual ~Coordinator() = default; }; // |coordinator| must remain valid throughout our lifetime. It is used to // signal when the executor is ready to receive work by calling // |coordinator->OnExecutorReady()|. // |thread_number| is an identifier used when naming the worker thread. Executor(Coordinator* coordinator, int thread_number); // Submit a job to this executor. void StartJob(Job* job); // Callback for when a job has completed running on the executor's thread. void OnJobCompleted(Job* job); // Cleanup the executor. Cancels all outstanding work, and frees the thread // and resolver. void Destroy(); // Returns the outstanding job, or NULL. Job* outstanding_job() const { return outstanding_job_.get(); } ProxyResolver* resolver() { return resolver_.get(); } int thread_number() const { return thread_number_; } void set_resolver(std::unique_ptr resolver) { resolver_ = std::move(resolver); } void set_coordinator(Coordinator* coordinator) { DCHECK(coordinator); DCHECK(coordinator_); coordinator_ = coordinator; } private: friend class base::RefCountedThreadSafe; ~Executor(); Coordinator* coordinator_; const int thread_number_; // The currently active job for this executor (either a CreateProxyResolver or // GetProxyForURL task). scoped_refptr outstanding_job_; // The synchronous resolver implementation. std::unique_ptr resolver_; // The thread where |resolver_| is run on. // Note that declaration ordering is important here. |thread_| needs to be // destroyed *before* |resolver_|, in case |resolver_| is currently // executing on |thread_|. std::unique_ptr thread_; }; class MultiThreadedProxyResolver : public ProxyResolver, public Executor::Coordinator { public: // Creates an asynchronous ProxyResolver that runs requests on up to // |max_num_threads|. // // For each thread that is created, an accompanying synchronous ProxyResolver // will be provisioned using |resolver_factory|. All methods on these // ProxyResolvers will be called on the one thread. MultiThreadedProxyResolver( std::unique_ptr resolver_factory, size_t max_num_threads, const scoped_refptr& script_data, scoped_refptr executor); ~MultiThreadedProxyResolver() override; // ProxyResolver implementation: int GetProxyForURL(const GURL& url, ProxyInfo* results, const CompletionCallback& callback, std::unique_ptr* request, const NetLogWithSource& net_log) override; private: class GetProxyForURLJob; class RequestImpl; // FIFO queue of pending jobs waiting to be started. // TODO(eroman): Make this priority queue. using PendingJobsQueue = base::circular_deque>; using ExecutorList = std::vector>; // Returns an idle worker thread which is ready to receive GetProxyForURL() // requests. If all threads are occupied, returns NULL. Executor* FindIdleExecutor(); // Creates a new worker thread, and appends it to |executors_|. void AddNewExecutor(); // Starts the next job from |pending_jobs_| if possible. void OnExecutorReady(Executor* executor) override; const std::unique_ptr resolver_factory_; const size_t max_num_threads_; PendingJobsQueue pending_jobs_; ExecutorList executors_; scoped_refptr script_data_; THREAD_CHECKER(thread_checker_); }; // Job --------------------------------------------- class Job : public base::RefCountedThreadSafe { public: // Identifies the subclass of Job (only being used for debugging purposes). enum Type { TYPE_GET_PROXY_FOR_URL, TYPE_CREATE_RESOLVER, }; Job(Type type, const CompletionCallback& callback) : type_(type), callback_(callback), executor_(NULL), was_cancelled_(false) { } void set_executor(Executor* executor) { executor_ = executor; } // The "executor" is the job runner that is scheduling this job. If // this job has not been submitted to an executor yet, this will be // NULL (and we know it hasn't started yet). Executor* executor() { return executor_; } // Mark the job as having been cancelled. void Cancel() { was_cancelled_ = true; } // Returns true if Cancel() has been called. bool was_cancelled() const { return was_cancelled_; } Type type() const { return type_; } // Returns true if this job still has a user callback. Some jobs // do not have a user callback, because they were helper jobs // scheduled internally (for example TYPE_CREATE_RESOLVER). // // Otherwise jobs that correspond with user-initiated work will // have a non-null callback up until the callback is run. bool has_user_callback() const { return !callback_.is_null(); } // This method is called when the job is inserted into a wait queue // because no executors were ready to accept it. virtual void WaitingForThread() {} // This method is called just before the job is posted to the work thread. virtual void FinishedWaitingForThread() {} // This method is called on the worker thread to do the job's work. On // completion, implementors are expected to call OnJobCompleted() on // |origin_runner|. virtual void Run( scoped_refptr origin_runner) = 0; protected: void OnJobCompleted() { // |executor_| will be NULL if the executor has already been deleted. if (executor_) executor_->OnJobCompleted(this); } void RunUserCallback(int result) { DCHECK(has_user_callback()); CompletionCallback callback = callback_; // Reset the callback so has_user_callback() will now return false. callback_.Reset(); callback.Run(result); } friend class base::RefCountedThreadSafe; virtual ~Job() {} private: const Type type_; CompletionCallback callback_; Executor* executor_; bool was_cancelled_; }; class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request { public: explicit RequestImpl(scoped_refptr job) : job_(std::move(job)) {} ~RequestImpl() override { job_->Cancel(); } LoadState GetLoadState() override { return LOAD_STATE_RESOLVING_PROXY_FOR_URL; } private: scoped_refptr job_; }; // CreateResolverJob ----------------------------------------------------------- // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. class CreateResolverJob : public Job { public: CreateResolverJob(const scoped_refptr& script_data, ProxyResolverFactory* factory) : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), script_data_(script_data), factory_(factory) {} // Runs on the worker thread. void Run(scoped_refptr origin_runner) override { std::unique_ptr request; int rv = factory_->CreateProxyResolver(script_data_, &resolver_, CompletionCallback(), &request); DCHECK_NE(rv, ERR_IO_PENDING); origin_runner->PostTask( FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv)); } protected: ~CreateResolverJob() override {} private: // Runs the completion callback on the origin thread. void RequestComplete(int result_code) { // The task may have been cancelled after it was started. if (!was_cancelled()) { DCHECK(executor()); executor()->set_resolver(std::move(resolver_)); } OnJobCompleted(); } const scoped_refptr script_data_; ProxyResolverFactory* factory_; std::unique_ptr resolver_; }; // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ class MultiThreadedProxyResolver::GetProxyForURLJob : public Job { public: // |url| -- the URL of the query. // |results| -- the structure to fill with proxy resolve results. GetProxyForURLJob(const GURL& url, ProxyInfo* results, const CompletionCallback& callback, const NetLogWithSource& net_log) : Job(TYPE_GET_PROXY_FOR_URL, callback), results_(results), net_log_(net_log), url_(url), was_waiting_for_thread_(false) { DCHECK(!callback.is_null()); } NetLogWithSource* net_log() { return &net_log_; } void WaitingForThread() override { was_waiting_for_thread_ = true; net_log_.BeginEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD); } void FinishedWaitingForThread() override { DCHECK(executor()); if (was_waiting_for_thread_) { net_log_.EndEvent(NetLogEventType::WAITING_FOR_PROXY_RESOLVER_THREAD); } net_log_.AddEvent( NetLogEventType::SUBMITTED_TO_RESOLVER_THREAD, NetLog::IntCallback("thread_number", executor()->thread_number())); } // Runs on the worker thread. void Run(scoped_refptr origin_runner) override { ProxyResolver* resolver = executor()->resolver(); DCHECK(resolver); int rv = resolver->GetProxyForURL( url_, &results_buf_, CompletionCallback(), NULL, net_log_); DCHECK_NE(rv, ERR_IO_PENDING); origin_runner->PostTask( FROM_HERE, base::Bind(&GetProxyForURLJob::QueryComplete, this, rv)); } protected: ~GetProxyForURLJob() override {} private: // Runs the completion callback on the origin thread. void QueryComplete(int result_code) { // The Job may have been cancelled after it was started. if (!was_cancelled()) { if (result_code >= OK) { // Note: unit-tests use values > 0. results_->Use(results_buf_); } RunUserCallback(result_code); } OnJobCompleted(); } // Must only be used on the "origin" thread. ProxyInfo* results_; // Can be used on either "origin" or worker thread. NetLogWithSource net_log_; const GURL url_; // Usable from within DoQuery on the worker thread. ProxyInfo results_buf_; bool was_waiting_for_thread_; }; // Executor ---------------------------------------- Executor::Executor(Executor::Coordinator* coordinator, int thread_number) : coordinator_(coordinator), thread_number_(thread_number) { DCHECK(coordinator); // Start up the thread. thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d", thread_number))); CHECK(thread_->Start()); } void Executor::StartJob(Job* job) { DCHECK(!outstanding_job_.get()); outstanding_job_ = job; // Run the job. Once it has completed (regardless of whether it was // cancelled), it will invoke OnJobCompleted() on this thread. job->set_executor(this); job->FinishedWaitingForThread(); thread_->task_runner()->PostTask( FROM_HERE, base::Bind(&Job::Run, job, base::ThreadTaskRunnerHandle::Get())); } void Executor::OnJobCompleted(Job* job) { DCHECK_EQ(job, outstanding_job_.get()); outstanding_job_ = NULL; coordinator_->OnExecutorReady(this); } void Executor::Destroy() { DCHECK(coordinator_); { // See http://crbug.com/69710. base::ThreadRestrictions::ScopedAllowIO allow_io; // Join the worker thread. thread_.reset(); } // Cancel any outstanding job. if (outstanding_job_.get()) { outstanding_job_->Cancel(); // Orphan the job (since this executor may be deleted soon). outstanding_job_->set_executor(NULL); } // It is now safe to free the ProxyResolver, since all the tasks that // were using it on the resolver thread have completed. resolver_.reset(); // Null some stuff as a precaution. coordinator_ = NULL; outstanding_job_ = NULL; } Executor::~Executor() { // The important cleanup happens as part of Destroy(), which should always be // called first. DCHECK(!coordinator_) << "Destroy() was not called"; DCHECK(!thread_.get()); DCHECK(!resolver_.get()); DCHECK(!outstanding_job_.get()); } // MultiThreadedProxyResolver -------------------------------------------------- MultiThreadedProxyResolver::MultiThreadedProxyResolver( std::unique_ptr resolver_factory, size_t max_num_threads, const scoped_refptr& script_data, scoped_refptr executor) : resolver_factory_(std::move(resolver_factory)), max_num_threads_(max_num_threads), script_data_(script_data) { DCHECK(script_data_); executor->set_coordinator(this); executors_.push_back(executor); } MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); // We will cancel all outstanding requests. pending_jobs_.clear(); for (auto& executor : executors_) { executor->Destroy(); } } int MultiThreadedProxyResolver::GetProxyForURL( const GURL& url, ProxyInfo* results, const CompletionCallback& callback, std::unique_ptr* request, const NetLogWithSource& net_log) { DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK(!callback.is_null()); scoped_refptr job( new GetProxyForURLJob(url, results, callback, net_log)); // Completion will be notified through |callback|, unless the caller cancels // the request using |request|. if (request) request->reset(new RequestImpl(job)); // If there is an executor that is ready to run this request, submit it! Executor* executor = FindIdleExecutor(); if (executor) { DCHECK_EQ(0u, pending_jobs_.size()); executor->StartJob(job.get()); return ERR_IO_PENDING; } // Otherwise queue this request. (We will schedule it to a thread once one // becomes available). job->WaitingForThread(); pending_jobs_.push_back(job); // If we haven't already reached the thread limit, provision a new thread to // drain the requests more quickly. if (executors_.size() < max_num_threads_) AddNewExecutor(); return ERR_IO_PENDING; } Executor* MultiThreadedProxyResolver::FindIdleExecutor() { DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) { Executor* executor = it->get(); if (!executor->outstanding_job()) return executor; } return NULL; } void MultiThreadedProxyResolver::AddNewExecutor() { DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); DCHECK_LT(executors_.size(), max_num_threads_); // The "thread number" is used to give the thread a unique name. int thread_number = executors_.size(); Executor* executor = new Executor(this, thread_number); executor->StartJob( new CreateResolverJob(script_data_, resolver_factory_.get())); executors_.push_back(base::WrapRefCounted(executor)); } void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); while (!pending_jobs_.empty()) { scoped_refptr job = pending_jobs_.front(); pending_jobs_.pop_front(); if (!job->was_cancelled()) { executor->StartJob(job.get()); return; } } } } // namespace class MultiThreadedProxyResolverFactory::Job : public ProxyResolverFactory::Request, public Executor::Coordinator { public: Job(MultiThreadedProxyResolverFactory* factory, const scoped_refptr& script_data, std::unique_ptr* resolver, std::unique_ptr resolver_factory, size_t max_num_threads, const CompletionCallback& callback) : factory_(factory), resolver_out_(resolver), resolver_factory_(std::move(resolver_factory)), max_num_threads_(max_num_threads), script_data_(script_data), executor_(new Executor(this, 0)), callback_(callback) { executor_->StartJob( new CreateResolverJob(script_data_, resolver_factory_.get())); } ~Job() override { if (factory_) { executor_->Destroy(); factory_->RemoveJob(this); } } void FactoryDestroyed() { executor_->Destroy(); executor_ = nullptr; factory_ = nullptr; } private: void OnExecutorReady(Executor* executor) override { int error = OK; if (executor->resolver()) { resolver_out_->reset(new MultiThreadedProxyResolver( std::move(resolver_factory_), max_num_threads_, std::move(script_data_), executor_)); } else { error = ERR_PAC_SCRIPT_FAILED; executor_->Destroy(); } factory_->RemoveJob(this); factory_ = nullptr; callback_.Run(error); } MultiThreadedProxyResolverFactory* factory_; std::unique_ptr* const resolver_out_; std::unique_ptr resolver_factory_; const size_t max_num_threads_; scoped_refptr script_data_; scoped_refptr executor_; const CompletionCallback callback_; }; MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory( size_t max_num_threads, bool factory_expects_bytes) : ProxyResolverFactory(factory_expects_bytes), max_num_threads_(max_num_threads) { DCHECK_GE(max_num_threads, 1u); } MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() { for (auto* job : jobs_) { job->FactoryDestroyed(); } } int MultiThreadedProxyResolverFactory::CreateProxyResolver( const scoped_refptr& pac_script, std::unique_ptr* resolver, const CompletionCallback& callback, std::unique_ptr* request) { std::unique_ptr job(new Job(this, pac_script, resolver, CreateProxyResolverFactory(), max_num_threads_, callback)); jobs_.insert(job.get()); *request = std::move(job); return ERR_IO_PENDING; } void MultiThreadedProxyResolverFactory::RemoveJob( MultiThreadedProxyResolverFactory::Job* job) { size_t erased = jobs_.erase(job); DCHECK_EQ(1u, erased); } } // namespace net