// Copyright 2017 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/synchronization/waitable_event.h" #include #include #include #include "base/debug/activity_tracker.h" #include "base/files/scoped_file.h" #include "base/mac/dispatch_source_mach.h" #include "base/mac/mac_util.h" #include "base/mac/mach_logging.h" #include "base/mac/scoped_dispatch_object.h" #include "base/posix/eintr_wrapper.h" #include "base/threading/scoped_blocking_call.h" #include "base/threading/thread_restrictions.h" #include "build/build_config.h" namespace base { WaitableEvent::WaitableEvent(ResetPolicy reset_policy, InitialState initial_state) : policy_(reset_policy) { mach_port_options_t options{}; options.flags = MPO_INSERT_SEND_RIGHT; options.mpl.mpl_qlimit = 1; mach_port_t name; kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name); MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct"; receive_right_ = new ReceiveRight(name, UseSlowWatchList(policy_)); send_right_.reset(name); if (initial_state == InitialState::SIGNALED) Signal(); } WaitableEvent::~WaitableEvent() = default; void WaitableEvent::Reset() { PeekPort(receive_right_->Name(), true); } void WaitableEvent::Signal() { // If using the slow watch-list, copy the watchers to a local. After // mach_msg(), the event object may be deleted by an awoken thread. const bool use_slow_path = UseSlowWatchList(policy_); ReceiveRight* receive_right = nullptr; // Manually reference counted. std::unique_ptr> watch_list; if (use_slow_path) { // To avoid a race condition of a WaitableEventWatcher getting added // while another thread is in this method, hold the watch-list lock for // the duration of mach_msg(). This requires ref-counting the // |receive_right_| object that contains it, in case the event is deleted // by a waiting thread after mach_msg(). receive_right = receive_right_.get(); receive_right->AddRef(); ReceiveRight::WatchList* slow_watch_list = receive_right->SlowWatchList(); slow_watch_list->lock.Acquire(); if (!slow_watch_list->list.empty()) { watch_list.reset(new std::list()); std::swap(*watch_list, slow_watch_list->list); } } mach_msg_empty_send_t msg{}; msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND); msg.header.msgh_size = sizeof(&msg); msg.header.msgh_remote_port = send_right_.get(); // If the event is already signaled, this will time out because the queue // has a length of one. kern_return_t kr = mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0, MACH_PORT_NULL, 0, MACH_PORT_NULL); MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg"; if (use_slow_path) { // If a WaitableEventWatcher were to start watching when the event is // signaled, it runs the callback immediately without adding it to the // list. Therefore the watch list can only be non-empty if the event is // newly signaled. if (watch_list.get()) { MACH_CHECK(kr == KERN_SUCCESS, kr); for (auto& watcher : *watch_list) { std::move(watcher).Run(); } } receive_right->SlowWatchList()->lock.Release(); receive_right->Release(); } } bool WaitableEvent::IsSignaled() { return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC); } void WaitableEvent::Wait() { bool result = TimedWaitUntil(TimeTicks::Max()); DCHECK(result) << "TimedWait() should never fail with infinite timeout"; } bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) { return TimedWaitUntil(TimeTicks::Now() + wait_delta); } bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) { internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call( BlockingType::MAY_BLOCK); // Record the event that this thread is blocking upon (for hang diagnosis). debug::ScopedEventWaitActivity event_activity(this); mach_msg_empty_rcv_t msg{}; msg.header.msgh_local_port = receive_right_->Name(); mach_msg_option_t options = MACH_RCV_MSG; if (!end_time.is_max()) options |= MACH_RCV_TIMEOUT | MACH_RCV_INTERRUPT; mach_msg_size_t rcv_size = sizeof(msg); if (policy_ == ResetPolicy::MANUAL) { // To avoid dequeing the message, receive with a size of 0 and set // MACH_RCV_LARGE to keep the message in the queue. options |= MACH_RCV_LARGE; rcv_size = 0; } kern_return_t kr; mach_msg_timeout_t timeout = MACH_MSG_TIMEOUT_NONE; do { if (!end_time.is_max()) { timeout = std::max( 0, (end_time - TimeTicks::Now()).InMillisecondsRoundedUp()); } kr = mach_msg(&msg.header, options, 0, rcv_size, receive_right_->Name(), timeout, MACH_PORT_NULL); // If the thread is interrupted during mach_msg(), the system call // will be restarted. However, the libsyscall wrapper does not adjust // the timeout by the amount of time already waited. // Using MACH_RCV_INTERRUPT will instead return from mach_msg(), // so that the call can be retried with an adjusted timeout. } while (kr == MACH_RCV_INTERRUPTED); if (kr == KERN_SUCCESS) { return true; } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) { return true; } else { MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg"; return false; } } // static bool WaitableEvent::UseSlowWatchList(ResetPolicy policy) { #if defined(OS_IOS) const bool use_slow_path = false; #else static bool use_slow_path = !mac::IsAtLeastOS10_12(); #endif return policy == ResetPolicy::MANUAL && use_slow_path; } // static size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) { DCHECK(count) << "Cannot wait on no events"; internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call( BlockingType::MAY_BLOCK); // Record an event (the first) that this thread is blocking upon. debug::ScopedEventWaitActivity event_activity(raw_waitables[0]); // On macOS 10.11+, using Mach port sets may cause system instability, per // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used // instead to work around that. On macOS 10.9 and 10.10, kqueue only works // for port sets, so port sets are just used directly. On macOS 10.11, // libdispatch sources are used. Therefore, there are three different // primitives that can be used to implement WaitMany. Which one to use is // selected at run-time by OS version checks. enum WaitManyPrimitive { KQUEUE, DISPATCH, PORT_SET, }; #if defined(OS_IOS) const WaitManyPrimitive kPrimitive = PORT_SET; #else const WaitManyPrimitive kPrimitive = mac::IsAtLeastOS10_12() ? KQUEUE : (mac::IsOS10_11() ? DISPATCH : PORT_SET); #endif if (kPrimitive == KQUEUE) { std::vector events(count); for (size_t i = 0; i < count; ++i) { EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(), EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0); } std::vector out_events(count); ScopedFD wait_many(kqueue()); PCHECK(wait_many.is_valid()) << "kqueue"; int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count, out_events.data(), count, 0, nullptr)); PCHECK(rv > 0) << "kevent64"; size_t triggered = -1; for (size_t i = 0; i < static_cast(rv); ++i) { // WaitMany should return the lowest index in |raw_waitables| that was // triggered. size_t index = static_cast(out_events[i].udata); triggered = std::min(triggered, index); } if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) { // The message needs to be dequeued to reset the event. PeekPort(raw_waitables[triggered]->receive_right_->Name(), true); } return triggered; } else if (kPrimitive == DISPATCH) { // Each item in |raw_waitables| will be watched using a dispatch souce // scheduled on the serial |queue|. The first one to be invoked will // signal the |semaphore| that this method will wait on. ScopedDispatchObject queue(dispatch_queue_create( "org.chromium.base.WaitableEvent.WaitMany", DISPATCH_QUEUE_SERIAL)); ScopedDispatchObject semaphore( dispatch_semaphore_create(0)); // Block capture references. |signaled| will identify the index in // |raw_waitables| whose source was invoked. dispatch_semaphore_t semaphore_ref = semaphore.get(); const size_t kUnsignaled = -1; __block size_t signaled = kUnsignaled; // Create a MACH_RECV dispatch source for each event. These must be // destroyed before the |queue| and |semaphore|. std::vector> sources; for (size_t i = 0; i < count; ++i) { const bool auto_reset = raw_waitables[i]->policy_ == WaitableEvent::ResetPolicy::AUTOMATIC; // The block will copy a reference to |right|. scoped_refptr right = raw_waitables[i]->receive_right_; auto source = std::make_unique(queue, right->Name(), ^{ // After the semaphore is signaled, another event be signaled and // the source may have its block put on the |queue|. WaitMany // should only report (and auto-reset) one event, so the first // event to signal is reported. if (signaled == kUnsignaled) { signaled = i; if (auto_reset) { PeekPort(right->Name(), true); } dispatch_semaphore_signal(semaphore_ref); } }); source->Resume(); sources.push_back(std::move(source)); } dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); DCHECK_NE(signaled, kUnsignaled); return signaled; } else { DCHECK_EQ(kPrimitive, PORT_SET); kern_return_t kr; mac::ScopedMachPortSet port_set; { mach_port_t name; kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name); MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate"; port_set.reset(name); } for (size_t i = 0; i < count; ++i) { kr = mach_port_insert_member(mach_task_self(), raw_waitables[i]->receive_right_->Name(), port_set.get()); MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i; } mach_msg_empty_rcv_t msg{}; // Wait on the port set. Only specify space enough for the header, to // identify which port in the set is signaled. Otherwise, receiving from the // port set may dequeue a message for a manual-reset event object, which // would cause it to be reset. kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0, sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL); MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg"; for (size_t i = 0; i < count; ++i) { WaitableEvent* event = raw_waitables[i]; if (msg.header.msgh_local_port == event->receive_right_->Name()) { if (event->policy_ == ResetPolicy::AUTOMATIC) { // The message needs to be dequeued to reset the event. PeekPort(msg.header.msgh_local_port, true); } return i; } } NOTREACHED(); return 0; } } // static bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) { if (dequeue) { mach_msg_empty_rcv_t msg{}; msg.header.msgh_local_port = port; kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0, sizeof(msg), port, 0, MACH_PORT_NULL); if (kr == KERN_SUCCESS) { return true; } else { MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg"; return false; } } else { mach_port_seqno_t seqno = 0; mach_msg_size_t size; mach_msg_id_t id; mach_msg_trailer_t trailer; mach_msg_type_number_t trailer_size = sizeof(trailer); kern_return_t kr = mach_port_peek( mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL), &seqno, &size, &id, reinterpret_cast(&trailer), &trailer_size); if (kr == KERN_SUCCESS) { return true; } else { MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek"; return false; } } } WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name, bool create_slow_watch_list) : right_(name), slow_watch_list_(create_slow_watch_list ? new WatchList() : nullptr) {} WaitableEvent::ReceiveRight::~ReceiveRight() = default; WaitableEvent::ReceiveRight::WatchList::WatchList() = default; WaitableEvent::ReceiveRight::WatchList::~WatchList() = default; } // namespace base