// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/message_loop/message_pump_fuchsia.h"

#include <zircon/status.h>
#include <zircon/syscalls.h>

#include "base/auto_reset.h"
#include "base/logging.h"

namespace base {

MessagePumpFuchsia::ZxHandleWatchController::ZxHandleWatchController(
    const Location& from_here)
    : created_from_location_(from_here) {}

MessagePumpFuchsia::ZxHandleWatchController::~ZxHandleWatchController() {
  if (!StopWatchingZxHandle())
    NOTREACHED();
}

bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
  if (was_stopped_) {
    DCHECK(!*was_stopped_);
    *was_stopped_ = true;

    // |was_stopped_| points at a value stored on the stack, which will go out
    // of scope. MessagePumpFuchsia::Run() will reset it only if the value is
    // false. So we need to reset this pointer here as well, to make sure it's
    // not used again.
    was_stopped_ = nullptr;
  }

  if (!has_begun_)
    return true;

  has_begun_ = false;

  // If the pump is gone then there is nothing to cancel.
  if (!weak_pump_)
    return true;

  int result = zx_port_cancel(weak_pump_->port_.get(), handle_, wait_key());
  DLOG_IF(ERROR, result != ZX_OK)
      << "zx_port_cancel(handle=" << handle_
      << ") failed: " << zx_status_get_string(result);

  return result == ZX_OK;
}

void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
    zx_handle_t handle,
    zx_signals_t signals) {
  uint32_t events;
  __fdio_wait_end(io_, signals, &events);

  // Each |watcher_| callback we invoke may stop or delete |this|. The pump has
  // set |was_stopped_| to point to a safe location on the calling stack, so we
  // can use that to detect being stopped mid-callback and avoid doing further
  // work that would touch |this|.
  bool* was_stopped = was_stopped_;
  if (events & FDIO_EVT_WRITABLE)
    watcher_->OnFileCanWriteWithoutBlocking(fd_);
  if (!*was_stopped && (events & FDIO_EVT_READABLE))
    watcher_->OnFileCanReadWithoutBlocking(fd_);

  // Don't add additional work here without checking |*was_stopped_| again.
}

MessagePumpFuchsia::FdWatchController::FdWatchController(
    const Location& from_here)
    : ZxHandleWatchController(from_here) {}

MessagePumpFuchsia::FdWatchController::~FdWatchController() {
  if (!StopWatchingFileDescriptor())
    NOTREACHED();
}

bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
  bool success = StopWatchingZxHandle();
  if (io_) {
    __fdio_release(io_);
    io_ = nullptr;
  }
  return success;
}

MessagePumpFuchsia::MessagePumpFuchsia() : weak_factory_(this) {
  CHECK_EQ(ZX_OK, zx_port_create(0, port_.receive()));
}

MessagePumpFuchsia::~MessagePumpFuchsia() {}

bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
                                             bool persistent,
                                             int mode,
                                             FdWatchController* controller,
                                             FdWatcher* delegate) {
  DCHECK_GE(fd, 0);
  DCHECK(controller);
  DCHECK(delegate);

  if (!controller->StopWatchingFileDescriptor())
    NOTREACHED();

  controller->fd_ = fd;
  controller->watcher_ = delegate;

  DCHECK(!controller->io_);
  controller->io_ = __fdio_fd_to_io(fd);
  if (!controller->io_) {
    DLOG(ERROR) << "Failed to get IO for FD";
    return false;
  }

  switch (mode) {
    case WATCH_READ:
      controller->desired_events_ = FDIO_EVT_READABLE;
      break;
    case WATCH_WRITE:
      controller->desired_events_ = FDIO_EVT_WRITABLE;
      break;
    case WATCH_READ_WRITE:
      controller->desired_events_ = FDIO_EVT_READABLE | FDIO_EVT_WRITABLE;
      break;
    default:
      NOTREACHED() << "unexpected mode: " << mode;
      return false;
  }

  // Pass dummy |handle| and |signals| values to WatchZxHandle(). The real
  // values will be populated by FdWatchController::WaitBegin(), before actually
  // starting the wait operation.
  return WatchZxHandle(ZX_HANDLE_INVALID, persistent, 1, controller,
                       controller);
}

bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
  // Refresh the |handle_| and |desired_signals_| from the mxio for the fd.
  // Some types of mxio map read/write events to different signals depending on
  // their current state, so we must do this every time we begin to wait.
  __fdio_wait_begin(io_, desired_events_, &handle_, &desired_signals_);
  if (handle_ == ZX_HANDLE_INVALID) {
    DLOG(ERROR) << "fdio_wait_begin failed";
    return false;
  }

  return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
}

bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
                                       bool persistent,
                                       zx_signals_t signals,
                                       ZxHandleWatchController* controller,
                                       ZxHandleWatcher* delegate) {
  DCHECK_NE(0u, signals);
  DCHECK(controller);
  DCHECK(delegate);
  DCHECK(handle == ZX_HANDLE_INVALID ||
         controller->handle_ == ZX_HANDLE_INVALID ||
         handle == controller->handle_);

  if (!controller->StopWatchingZxHandle())
    NOTREACHED();

  controller->handle_ = handle;
  controller->persistent_ = persistent;
  controller->desired_signals_ = signals;
  controller->watcher_ = delegate;

  controller->weak_pump_ = weak_factory_.GetWeakPtr();

  return controller->WaitBegin();
}

bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
  DCHECK(!has_begun_);

  zx_status_t status =
      zx_object_wait_async(handle_, weak_pump_->port_.get(), wait_key(),
                           desired_signals_, ZX_WAIT_ASYNC_ONCE);
  if (status != ZX_OK) {
    DLOG(ERROR) << "zx_object_wait_async failed: "
                << zx_status_get_string(status)
                << " (port=" << weak_pump_->port_.get() << ")";
    return false;
  }

  has_begun_ = true;

  return true;
}

uint32_t MessagePumpFuchsia::ZxHandleWatchController::WaitEnd(
    zx_signals_t signals) {
  DCHECK(has_begun_);

  has_begun_ = false;

  // |signals| can include other spurious things, in particular, that an fd
  // is writable, when we only asked to know when it was readable. In that
  // case, we don't want to call both the CanWrite and CanRead callback,
  // when the caller asked for only, for example, readable callbacks. So,
  // mask with the events that we actually wanted to know about.
  signals &= desired_signals_;
  return signals;
}

bool MessagePumpFuchsia::HandleEvents(zx_time_t deadline) {
  zx_port_packet_t packet;
  const zx_status_t wait_status =
      zx_port_wait(port_.get(), deadline, &packet, 0);

  if (wait_status == ZX_ERR_TIMED_OUT)
    return false;

  if (wait_status != ZX_OK) {
    NOTREACHED() << "unexpected wait status: "
                 << zx_status_get_string(wait_status);
    return false;
  }

  if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {
    // A watched fd caused the wakeup via zx_object_wait_async().
    DCHECK_EQ(ZX_OK, packet.status);
    ZxHandleWatchController* controller =
        reinterpret_cast<ZxHandleWatchController*>(
            static_cast<uintptr_t>(packet.key));

    DCHECK_NE(0u, packet.signal.trigger & packet.signal.observed);

    zx_signals_t signals = controller->WaitEnd(packet.signal.observed);

    // In the case of a persistent Watch, the Watch may be stopped and
    // potentially deleted by the caller within the callback, in which case
    // |controller| should not be accessed again, and we mustn't continue the
    // watch. We check for this with a bool on the stack, which the Watch
    // receives a pointer to.
    bool controller_was_stopped = false;
    controller->was_stopped_ = &controller_was_stopped;

    controller->watcher_->OnZxHandleSignalled(controller->handle_, signals);

    if (!controller_was_stopped) {
      controller->was_stopped_ = nullptr;
      if (controller->persistent_)
        controller->WaitBegin();
    }
  } else {
    // Wakeup caused by ScheduleWork().
    DCHECK_EQ(ZX_PKT_TYPE_USER, packet.type);
  }

  return true;
}

void MessagePumpFuchsia::Run(Delegate* delegate) {
  AutoReset<bool> auto_reset_keep_running(&keep_running_, true);

  for (;;) {
    bool did_work = delegate->DoWork();
    if (!keep_running_)
      break;

    did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    if (!keep_running_)
      break;

    did_work |= HandleEvents(/*deadline=*/0);
    if (!keep_running_)
      break;

    if (did_work)
      continue;

    did_work = delegate->DoIdleWork();
    if (!keep_running_)
      break;

    if (did_work)
      continue;

    zx_time_t deadline = delayed_work_time_.is_null()
                             ? ZX_TIME_INFINITE
                             : delayed_work_time_.ToZxTime();
    HandleEvents(deadline);
  }
}

void MessagePumpFuchsia::Quit() {
  keep_running_ = false;
}

void MessagePumpFuchsia::ScheduleWork() {
  // Since this can be called on any thread, we need to ensure that our Run loop
  // wakes up.
  zx_port_packet_t packet = {};
  packet.type = ZX_PKT_TYPE_USER;
  zx_status_t status = zx_port_queue(port_.get(), &packet, 0);
  DLOG_IF(ERROR, status != ZX_OK)
      << "zx_port_queue failed: " << status << " (port=" << port_.get() << ")";
}

void MessagePumpFuchsia::ScheduleDelayedWork(
    const TimeTicks& delayed_work_time) {
  // We know that we can't be blocked right now since this method can only be
  // called on the same thread as Run, so we only need to update our record of
  // how long to sleep when we do sleep.
  delayed_work_time_ = delayed_work_time;
}

}  // namespace base