// Copyright 2017 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/task/sequence_manager/thread_controller_impl.h" #include #include "base/bind.h" #include "base/memory/ptr_util.h" #include "base/message_loop/message_loop.h" #include "base/run_loop.h" #include "base/task/sequence_manager/lazy_now.h" #include "base/task/sequence_manager/sequenced_task_source.h" #include "base/trace_event/trace_event.h" namespace base { namespace sequence_manager { namespace internal { ThreadControllerImpl::ThreadControllerImpl( MessageLoop* message_loop, scoped_refptr task_runner, const TickClock* time_source) : message_loop_(message_loop), task_runner_(task_runner), associated_thread_(AssociatedThreadId::CreateUnbound()), message_loop_task_runner_(message_loop ? message_loop->task_runner() : nullptr), time_source_(time_source), weak_factory_(this) { immediate_do_work_closure_ = BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(), WorkType::kImmediate); delayed_do_work_closure_ = BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(), WorkType::kDelayed); } ThreadControllerImpl::~ThreadControllerImpl() = default; ThreadControllerImpl::AnySequence::AnySequence() = default; ThreadControllerImpl::AnySequence::~AnySequence() = default; ThreadControllerImpl::MainSequenceOnly::MainSequenceOnly() = default; ThreadControllerImpl::MainSequenceOnly::~MainSequenceOnly() = default; std::unique_ptr ThreadControllerImpl::Create( MessageLoop* message_loop, const TickClock* time_source) { return WrapUnique(new ThreadControllerImpl( message_loop, message_loop->task_runner(), time_source)); } void ThreadControllerImpl::SetSequencedTaskSource( SequencedTaskSource* sequence) { DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker); DCHECK(sequence); DCHECK(!sequence_); sequence_ = sequence; } void ThreadControllerImpl::SetTimerSlack(TimerSlack timer_slack) { if (!message_loop_) return; message_loop_->SetTimerSlack(timer_slack); } void ThreadControllerImpl::ScheduleWork() { DCHECK(sequence_); AutoLock lock(any_sequence_lock_); // Don't post a DoWork if there's an immediate DoWork in flight or if we're // inside a top level DoWork. We can rely on a continuation being posted as // needed. if (any_sequence().immediate_do_work_posted || (any_sequence().do_work_running_count > any_sequence().nesting_depth)) { return; } any_sequence().immediate_do_work_posted = true; TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), "ThreadControllerImpl::ScheduleWork::PostTask"); task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_); } void ThreadControllerImpl::SetNextDelayedDoWork(LazyNow* lazy_now, TimeTicks run_time) { DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker); DCHECK(sequence_); if (main_sequence_only().next_delayed_do_work == run_time) return; // Cancel DoWork if it was scheduled and we set an "infinite" delay now. if (run_time == TimeTicks::Max()) { cancelable_delayed_do_work_closure_.Cancel(); main_sequence_only().next_delayed_do_work = TimeTicks::Max(); return; } // If DoWork is running then we don't need to do anything because it will post // a continuation as needed. Bailing out here is by far the most common case. if (main_sequence_only().do_work_running_count > main_sequence_only().nesting_depth) { return; } // If DoWork is about to run then we also don't need to do anything. { AutoLock lock(any_sequence_lock_); if (any_sequence().immediate_do_work_posted) return; } base::TimeDelta delay = std::max(TimeDelta(), run_time - lazy_now->Now()); TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), "ThreadControllerImpl::SetNextDelayedDoWork::PostDelayedTask", "delay_ms", delay.InMillisecondsF()); main_sequence_only().next_delayed_do_work = run_time; // Reset also causes cancellation of the previous DoWork task. cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_); task_runner_->PostDelayedTask( FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay); } bool ThreadControllerImpl::RunsTasksInCurrentSequence() { return task_runner_->RunsTasksInCurrentSequence(); } const TickClock* ThreadControllerImpl::GetClock() { return time_source_; } void ThreadControllerImpl::SetDefaultTaskRunner( scoped_refptr task_runner) { if (!message_loop_) return; message_loop_->SetTaskRunner(task_runner); } void ThreadControllerImpl::RestoreDefaultTaskRunner() { if (!message_loop_) return; message_loop_->SetTaskRunner(message_loop_task_runner_); } void ThreadControllerImpl::WillQueueTask(PendingTask* pending_task) { task_annotator_.WillQueueTask("SequenceManager::PostTask", pending_task); } void ThreadControllerImpl::DoWork(WorkType work_type) { TRACE_EVENT0("sequence_manager", "ThreadControllerImpl::DoWork"); DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker); DCHECK(sequence_); { AutoLock lock(any_sequence_lock_); if (work_type == WorkType::kImmediate) any_sequence().immediate_do_work_posted = false; any_sequence().do_work_running_count++; } main_sequence_only().do_work_running_count++; WeakPtr weak_ptr = weak_factory_.GetWeakPtr(); // TODO(scheduler-dev): Consider moving to a time based work batch instead. for (int i = 0; i < main_sequence_only().work_batch_size_; i++) { Optional task = sequence_->TakeTask(); if (!task) break; { TRACE_TASK_EXECUTION("ThreadControllerImpl::RunTask", *task); task_annotator_.RunTask("ThreadControllerImpl::RunTask", &*task); } if (!weak_ptr) return; sequence_->DidRunTask(); // NOTE: https://crbug.com/828835. // When we're running inside a nested RunLoop it may quit anytime, so any // outstanding pending tasks must run in the outer RunLoop // (see SequenceManagerTestWithMessageLoop.QuitWhileNested test). // Unfortunately, it's MessageLoop who's receving that signal and we can't // know it before we return from DoWork, hence, OnExitNestedRunLoop // will be called later. Since we must implement ThreadController and // SequenceManager in conformance with MessageLoop task runners, we need // to disable this batching optimization while nested. // Implementing RunLoop::Delegate ourselves will help to resolve this issue. if (main_sequence_only().nesting_depth > 0) break; } main_sequence_only().do_work_running_count--; { AutoLock lock(any_sequence_lock_); any_sequence().do_work_running_count--; DCHECK_GE(any_sequence().do_work_running_count, 0); LazyNow lazy_now(time_source_); TimeDelta delay_till_next_task = sequence_->DelayTillNextTask(&lazy_now); if (delay_till_next_task <= TimeDelta()) { // The next task needs to run immediately, post a continuation if needed. if (!any_sequence().immediate_do_work_posted) { any_sequence().immediate_do_work_posted = true; task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_); } } else if (delay_till_next_task < TimeDelta::Max()) { // The next task needs to run after a delay, post a continuation if // needed. TimeTicks next_task_at = lazy_now.Now() + delay_till_next_task; if (next_task_at != main_sequence_only().next_delayed_do_work) { main_sequence_only().next_delayed_do_work = next_task_at; cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_); task_runner_->PostDelayedTask( FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay_till_next_task); } } else { // There is no next task scheduled. main_sequence_only().next_delayed_do_work = TimeTicks::Max(); } } } void ThreadControllerImpl::AddNestingObserver( RunLoop::NestingObserver* observer) { DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker); nesting_observer_ = observer; RunLoop::AddNestingObserverOnCurrentThread(this); } void ThreadControllerImpl::RemoveNestingObserver( RunLoop::NestingObserver* observer) { DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker); DCHECK_EQ(observer, nesting_observer_); nesting_observer_ = nullptr; RunLoop::RemoveNestingObserverOnCurrentThread(this); } const scoped_refptr& ThreadControllerImpl::GetAssociatedThread() const { return associated_thread_; } void ThreadControllerImpl::OnBeginNestedRunLoop() { main_sequence_only().nesting_depth++; { // We just entered a nested run loop, make sure there's a DoWork posted or // the system will grind to a halt. AutoLock lock(any_sequence_lock_); any_sequence().nesting_depth++; if (!any_sequence().immediate_do_work_posted) { any_sequence().immediate_do_work_posted = true; TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), "ThreadControllerImpl::OnBeginNestedRunLoop::PostTask"); task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_); } } if (nesting_observer_) nesting_observer_->OnBeginNestedRunLoop(); } void ThreadControllerImpl::OnExitNestedRunLoop() { main_sequence_only().nesting_depth--; { AutoLock lock(any_sequence_lock_); any_sequence().nesting_depth--; DCHECK_GE(any_sequence().nesting_depth, 0); } if (nesting_observer_) nesting_observer_->OnExitNestedRunLoop(); } void ThreadControllerImpl::SetWorkBatchSize(int work_batch_size) { main_sequence_only().work_batch_size_ = work_batch_size; } } // namespace internal } // namespace sequence_manager } // namespace base