// Copyright 2012 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ #define BASE_OBSERVER_LIST_THREADSAFE_H_ #include #include #include "base/auto_reset.h" #include "base/base_export.h" #include "base/check.h" #include "base/check_op.h" #include "base/containers/contains.h" #include "base/dcheck_is_on.h" #include "base/debug/stack_trace.h" #include "base/functional/bind.h" #include "base/location.h" #include "base/memory/raw_ptr.h" #include "base/memory/ref_counted.h" #include "base/observer_list.h" #include "base/strings/strcat.h" #include "base/synchronization/lock.h" #include "base/task/sequenced_task_runner.h" #include "base/task/single_thread_task_runner.h" #include "build/build_config.h" #include "third_party/abseil-cpp/absl/base/attributes.h" /////////////////////////////////////////////////////////////////////////////// // // OVERVIEW: // // A thread-safe container for a list of observers. This is similar to the // observer_list (see observer_list.h), but it is more robust for multi- // threaded situations. // // The following use cases are supported: // * Observers can register for notifications from any sequence. They are // always notified on the sequence from which they were registered. // * Any sequence may trigger a notification via Notify(). // * Observers can remove themselves from the observer list inside of a // callback. // * If one sequence is notifying observers concurrently with an observer // removing itself from the observer list, the notifications will be // silently dropped. However if the observer is currently inside a // notification callback, the callback will finish running. // // By default, observers can be removed from any sequence. However this can be // error-prone since an observer may be running a callback when it's removed, // in which case it isn't safe to delete until the callback is finished. // Consider using the RemoveObserverPolicy::kAddingSequenceOnly template // parameter, which will CHECK that observers are only removed from the // sequence where they were added (which is also the sequence that runs // callbacks). // // The drawback of the threadsafe observer list is that notifications are not // as real-time as the non-threadsafe version of this class. Notifications // will always be done via PostTask() to another sequence, whereas with the // non-thread-safe ObserverList, notifications happen synchronously. // // Note: this class previously supported synchronous notifications for // same-sequence observers, but it was error-prone and removed in // crbug.com/1193750, think twice before re-considering this paradigm. // /////////////////////////////////////////////////////////////////////////////// namespace base { namespace internal { class BASE_EXPORT ObserverListThreadSafeBase : public RefCountedThreadSafe { public: struct NotificationDataBase { NotificationDataBase(void* observer_list_in, const Location& from_here_in) : observer_list(observer_list_in), from_here(from_here_in) {} raw_ptr observer_list; Location from_here; }; ObserverListThreadSafeBase() = default; ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete; ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) = delete; protected: template struct Dispatcher; template struct Dispatcher { static void Run(void (ReceiverType::*m)(Params...), Params... params, ObserverType* obj) { (obj->*m)(std::forward(params)...); } }; static const NotificationDataBase*& GetCurrentNotification(); virtual ~ObserverListThreadSafeBase() = default; private: friend class RefCountedThreadSafe; }; } // namespace internal enum class RemoveObserverPolicy { // Observers can be removed from any sequence. kAnySequence, // Observers can only be removed from the sequence that added them. kAddingSequenceOnly, }; template class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase { using Self = ObserverListThreadSafe; public: enum class AddObserverResult { kBecameNonEmpty, kWasAlreadyNonEmpty, }; enum class RemoveObserverResult { kWasOrBecameEmpty, kRemainsNonEmpty, }; ObserverListThreadSafe() = default; explicit ObserverListThreadSafe(ObserverListPolicy policy) : policy_(policy) {} ObserverListThreadSafe(const ObserverListThreadSafe&) = delete; ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete; // Adds |observer| to the list. |observer| must not already be in the list. AddObserverResult AddObserver(ObserverType* observer) { DCHECK(SequencedTaskRunner::HasCurrentDefault()) << "An observer can only be registered when " "SequencedTaskRunner::HasCurrentDefault. If this is in a unit test, " "you're likely merely missing a " "base::test::(SingleThread)TaskEnvironment in your fixture. " "Otherwise, try running this code on a named thread (main/UI/IO) or " "from a task posted to a base::SequencedTaskRunner or " "base::SingleThreadTaskRunner."; AutoLock auto_lock(lock_); bool was_empty = observers_.empty(); // Add |observer| to the list of observers. DCHECK(!Contains(observers_, observer)); const scoped_refptr task_runner = SequencedTaskRunner::GetCurrentDefault(); // Each observer gets a unique identifier. These unique identifiers are used // to avoid execution of pending posted-tasks over removed or released // observers. const size_t observer_id = ++observer_id_counter_; #if DCHECK_IS_ON() ObserverTaskRunnerInfo task_info = {task_runner, base::debug::StackTrace(), observer_id}; #else ObserverTaskRunnerInfo task_info = {task_runner, observer_id}; #endif observers_[observer] = std::move(task_info); // If this is called while a notification is being dispatched on this thread // and |policy_| is ALL, |observer| must be notified (if a notification is // being dispatched on another thread in parallel, the notification may or // may not make it to |observer| depending on the outcome of the race to // |lock_|). if (policy_ == ObserverListPolicy::ALL) { if (const NotificationDataBase* const current_notification = GetCurrentNotification(); current_notification && current_notification->observer_list == this) { const NotificationData* notification_data = static_cast(current_notification); task_runner->PostTask( current_notification->from_here, BindOnce(&Self::NotifyWrapper, this, // While `observer` may be dangling, we pass it and // check it wasn't deallocated in NotifyWrapper() which can // check `observers_` to verify presence (the owner of the // observer is responsible for removing it from that list // before deallocation). UnsafeDangling(observer), NotificationData(this, observer_id, current_notification->from_here, notification_data->method))); } } return was_empty ? AddObserverResult::kBecameNonEmpty : AddObserverResult::kWasAlreadyNonEmpty; } // Remove an observer from the list if it is in the list. // // If a notification was sent to the observer but hasn't started to run yet, // it will be aborted. If a notification has started to run, removing the // observer won't stop it. RemoveObserverResult RemoveObserver(ObserverType* observer) { AutoLock auto_lock(lock_); if constexpr (RemovePolicy == RemoveObserverPolicy::kAddingSequenceOnly) { const auto it = observers_.find(observer); CHECK(it == observers_.end() || it->second.task_runner->RunsTasksInCurrentSequence()); } observers_.erase(observer); return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty : RemoveObserverResult::kRemainsNonEmpty; } // Verifies that the list is currently empty (i.e. there are no observers). void AssertEmpty() const { #if DCHECK_IS_ON() AutoLock auto_lock(lock_); bool observers_is_empty = observers_.empty(); DUMP_WILL_BE_CHECK(observers_is_empty) << "\n" << GetObserversCreationStackStringLocked(); #endif } // Asynchronously invokes a callback on all observers, on their registration // sequence. You cannot assume that at the completion of the Notify call that // all Observers have been Notified. The notification may still be pending // delivery. template void Notify(const Location& from_here, Method m, Params&&... params) { RepeatingCallback method = BindRepeating(&Dispatcher::Run, m, std::forward(params)...); AutoLock lock(lock_); for (const auto& observer : observers_) { observer.second.task_runner->PostTask( from_here, BindOnce(&Self::NotifyWrapper, this, // While `observer.first` may be dangling, we pass it and // check it wasn't deallocated in NotifyWrapper() which can // check `observers_` to verify presence (the owner of the // observer is responsible for removing it from that list // before deallocation). UnsafeDangling(observer.first), NotificationData(this, observer.second.observer_id, from_here, method))); } } private: friend class RefCountedThreadSafe; struct NotificationData : public NotificationDataBase { NotificationData(ObserverListThreadSafe* observer_list_in, size_t observer_id_in, const Location& from_here_in, const RepeatingCallback& method_in) : NotificationDataBase(observer_list_in, from_here_in), method(method_in), observer_id(observer_id_in) {} RepeatingCallback method; size_t observer_id; }; ~ObserverListThreadSafe() override = default; void NotifyWrapper(MayBeDangling observer, const NotificationData& notification) { { AutoLock auto_lock(lock_); // Check whether the observer still needs a notification. DCHECK_EQ(notification.observer_list, this); auto it = observers_.find(observer); if (it == observers_.end() || it->second.observer_id != notification.observer_id) { return; } DCHECK(it->second.task_runner->RunsTasksInCurrentSequence()); } // Keep track of the notification being dispatched on the current thread. // This will be used if the callback below calls AddObserver(). // // Note: GetCurrentNotification() may not return null if this runs in a // nested loop started by a notification callback. In that case, it is // important to save the previous value to restore it later. const AutoReset resetter_( &GetCurrentNotification(), ¬ification); // Invoke the callback. notification.method.Run(observer); } std::string GetObserversCreationStackStringLocked() const EXCLUSIVE_LOCKS_REQUIRED(lock_) { std::string result; #if DCHECK_IS_ON() for (const auto& observer : observers_) { StrAppend(&result, {observer.second.add_observer_stack_.ToString(), "\n"}); } #endif return result; } const ObserverListPolicy policy_ = ObserverListPolicy::ALL; mutable Lock lock_; size_t observer_id_counter_ GUARDED_BY(lock_) = 0; struct ObserverTaskRunnerInfo { scoped_refptr task_runner; #if DCHECK_IS_ON() base::debug::StackTrace add_observer_stack_; #endif size_t observer_id = 0; }; // Keys are observers. Values are the SequencedTaskRunners on which they must // be notified. std::unordered_map observers_ GUARDED_BY(lock_); }; } // namespace base #endif // BASE_OBSERVER_LIST_THREADSAFE_H_