// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef NET_SOCKET_UDP_SOCKET_POSIX_H_ #define NET_SOCKET_UDP_SOCKET_POSIX_H_ #include #include #include #include #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/message_loop/message_pump_for_io.h" #include "base/threading/thread_checker.h" #include "base/timer/timer.h" #include "build/build_config.h" #include "net/base/address_family.h" #include "net/base/completion_once_callback.h" #include "net/base/datagram_buffer.h" #include "net/base/io_buffer.h" #include "net/base/ip_endpoint.h" #include "net/base/net_export.h" #include "net/base/network_change_notifier.h" #include "net/log/net_log_with_source.h" #include "net/socket/datagram_socket.h" #include "net/socket/diff_serv_code_point.h" #include "net/socket/socket_descriptor.h" #include "net/socket/socket_tag.h" #include "net/traffic_annotation/network_traffic_annotation.h" #if defined(__ANDROID__) && defined(__aarch64__) #define HAVE_SENDMMSG 1 #elif defined(OS_LINUX) #define HAVE_SENDMMSG 1 #else #define HAVE_SENDMMSG 0 #endif namespace net { class IPAddress; class NetLog; struct NetLogSource; class SocketTag; // Sendresult is inspired by sendmmsg, but unlike sendmmsg it is not // convenient to require that a positive |write_count| and a negative // error code are mutually exclusive. struct NET_EXPORT SendResult { explicit SendResult(); ~SendResult(); SendResult(int rv, int write_count, DatagramBuffers buffers); SendResult(SendResult& other) = delete; SendResult& operator=(SendResult& other) = delete; SendResult(SendResult&& other); SendResult& operator=(SendResult&& other) = default; int rv; // number of successful writes. int write_count; DatagramBuffers buffers; }; // Don't delay writes more than this. const base::TimeDelta kWriteAsyncMsThreshold = base::TimeDelta::FromMilliseconds(1); // Prefer local if number of writes is not more than this. const int kWriteAsyncMinBuffersThreshold = 2; // Don't allow more than this many outstanding async writes. const int kWriteAsyncMaxBuffersThreshold = 16; // PostTask immediately when unwritten buffers reaches this. const int kWriteAsyncPostBuffersThreshold = kWriteAsyncMaxBuffersThreshold / 2; // Don't unblock writer unless pending async writes are less than this. const int kWriteAsyncCallbackBuffersThreshold = kWriteAsyncMaxBuffersThreshold; // To allow mock |Send|/|Sendmsg| in testing. This has to be // reference counted thread safe because |SendBuffers| and // |SendmmsgBuffers| may be invoked in another thread via PostTask*. class NET_EXPORT UDPSocketPosixSender : public base::RefCountedThreadSafe { public: explicit UDPSocketPosixSender(); SendResult SendBuffers(int fd, DatagramBuffers buffers); void SetSendmmsgEnabled(bool enabled) { #if HAVE_SENDMMSG sendmmsg_enabled_ = enabled; #endif } protected: friend class base::RefCountedThreadSafe; virtual ~UDPSocketPosixSender(); virtual ssize_t Send(int sockfd, const void* buf, size_t len, int flags) const; #if HAVE_SENDMMSG virtual int Sendmmsg(int sockfd, struct mmsghdr* msgvec, unsigned int vlen, unsigned int flags) const; #endif SendResult InternalSendBuffers(int fd, DatagramBuffers buffers) const; #if HAVE_SENDMMSG SendResult InternalSendmmsgBuffers(int fd, DatagramBuffers buffers) const; #endif private: UDPSocketPosixSender(const UDPSocketPosixSender&) = delete; UDPSocketPosixSender& operator=(const UDPSocketPosixSender&) = delete; bool sendmmsg_enabled_; }; class NET_EXPORT UDPSocketPosix { public: // Performance helper for NetworkActivityMonitor, it batches // throughput samples, subject to a byte limit threshold (64 KB) or // timer (100 ms), whichever comes first. The batching is subject // to a minimum number of samples (2) required by NQE to update its // throughput estimate. class ActivityMonitor { public: ActivityMonitor() : bytes_(0), increments_(0) {} virtual ~ActivityMonitor() {} // Provided by sent/received subclass. // Update throughput, but batch to limit overhead of NetworkActivityMonitor. void Increment(uint32_t bytes); // For flushing cached values. void OnClose(); private: virtual void NetworkActivityMonitorIncrement(uint32_t bytes) = 0; void Update(); void OnTimerFired(); uint32_t bytes_; uint32_t increments_; base::RepeatingTimer timer_; DISALLOW_COPY_AND_ASSIGN(ActivityMonitor); }; class SentActivityMonitor : public ActivityMonitor { public: ~SentActivityMonitor() override {} private: void NetworkActivityMonitorIncrement(uint32_t bytes) override; }; class ReceivedActivityMonitor : public ActivityMonitor { public: ~ReceivedActivityMonitor() override {} private: void NetworkActivityMonitorIncrement(uint32_t bytes) override; }; UDPSocketPosix(DatagramSocket::BindType bind_type, net::NetLog* net_log, const net::NetLogSource& source); virtual ~UDPSocketPosix(); // Opens the socket. // Returns a net error code. int Open(AddressFamily address_family); // Binds this socket to |network|. All data traffic on the socket will be sent // and received via |network|. Must be called before Connect(). This call will // fail if |network| has disconnected. Communication using this socket will // fail if |network| disconnects. // Returns a net error code. int BindToNetwork(NetworkChangeNotifier::NetworkHandle network); // Connects the socket to connect with a certain |address|. // Should be called after Open(). // Returns a net error code. int Connect(const IPEndPoint& address); // Binds the address/port for this socket to |address|. This is generally // only used on a server. Should be called after Open(). // Returns a net error code. int Bind(const IPEndPoint& address); // Closes the socket. // TODO(rvargas, hidehiko): Disallow re-Open() after Close(). void Close(); // Copies the remote udp address into |address| and returns a net error code. int GetPeerAddress(IPEndPoint* address) const; // Copies the local udp address into |address| and returns a net error code. // (similar to getsockname) int GetLocalAddress(IPEndPoint* address) const; // IO: // Multiple outstanding read requests are not supported. // Full duplex mode (reading and writing at the same time) is supported // Reads from the socket. // Only usable from the client-side of a UDP socket, after the socket // has been connected. int Read(IOBuffer* buf, int buf_len, CompletionOnceCallback callback); // Writes to the socket. // Only usable from the client-side of a UDP socket, after the socket // has been connected. int Write(IOBuffer* buf, int buf_len, CompletionOnceCallback callback, const NetworkTrafficAnnotationTag& traffic_annotation); // Refer to datagram_client_socket.h int WriteAsync(DatagramBuffers buffers, CompletionOnceCallback callback, const NetworkTrafficAnnotationTag& traffic_annotation); int WriteAsync(const char* buffer, size_t buf_len, CompletionOnceCallback callback, const NetworkTrafficAnnotationTag& traffic_annotation); DatagramBuffers GetUnwrittenBuffers(); // Reads from a socket and receive sender address information. // |buf| is the buffer to read data into. // |buf_len| is the maximum amount of data to read. // |address| is a buffer provided by the caller for receiving the sender // address information about the received data. This buffer must be kept // alive by the caller until the callback is placed. // |callback| is the callback on completion of the RecvFrom. // Returns a net error code, or ERR_IO_PENDING if the IO is in progress. // If ERR_IO_PENDING is returned, the caller must keep |buf| and |address| // alive until the callback is called. int RecvFrom(IOBuffer* buf, int buf_len, IPEndPoint* address, CompletionOnceCallback callback); // Sends to a socket with a particular destination. // |buf| is the buffer to send. // |buf_len| is the number of bytes to send. // |address| is the recipient address. // |callback| is the user callback function to call on complete. // Returns a net error code, or ERR_IO_PENDING if the IO is in progress. // If ERR_IO_PENDING is returned, the caller must keep |buf| and |address| // alive until the callback is called. int SendTo(IOBuffer* buf, int buf_len, const IPEndPoint& address, CompletionOnceCallback callback); // Sets the receive buffer size (in bytes) for the socket. // Returns a net error code. int SetReceiveBufferSize(int32_t size); // Sets the send buffer size (in bytes) for the socket. // Returns a net error code. int SetSendBufferSize(int32_t size); // Requests that packets sent by this socket not be fragment, either locally // by the host, or by routers (via the DF bit in the IPv4 packet header). // May not be supported by all platforms. Returns a return a network error // code if there was a problem, but the socket will still be usable. Can not // return ERR_IO_PENDING. int SetDoNotFragment(); // If |confirm| is true, then the MSG_CONFIRM flag will be passed to // subsequent writes if it's supported by the platform. void SetMsgConfirm(bool confirm); // Returns true if the socket is already connected or bound. bool is_connected() const { return is_connected_; } const NetLogWithSource& NetLog() const { return net_log_; } // Call this to enable SO_REUSEADDR on the underlying socket. // Should be called between Open() and Bind(). // Returns a net error code. int AllowAddressReuse(); // Call this to allow or disallow sending and receiving packets to and from // broadcast addresses. // Returns a net error code. int SetBroadcast(bool broadcast); // Joins the multicast group. // |group_address| is the group address to join, could be either // an IPv4 or IPv6 address. // Returns a net error code. int JoinGroup(const IPAddress& group_address) const; // Leaves the multicast group. // |group_address| is the group address to leave, could be either // an IPv4 or IPv6 address. If the socket hasn't joined the group, // it will be ignored. // It's optional to leave the multicast group before destroying // the socket. It will be done by the OS. // Returns a net error code. int LeaveGroup(const IPAddress& group_address) const; // Sets interface to use for multicast. If |interface_index| set to 0, // default interface is used. // Should be called before Bind(). // Returns a net error code. int SetMulticastInterface(uint32_t interface_index); // Sets the time-to-live option for UDP packets sent to the multicast // group address. The default value of this option is 1. // Cannot be negative or more than 255. // Should be called before Bind(). // Returns a net error code. int SetMulticastTimeToLive(int time_to_live); // Sets the loopback flag for UDP socket. If this flag is true, the host // will receive packets sent to the joined group from itself. // The default value of this option is true. // Should be called before Bind(). // Returns a net error code. // // Note: the behavior of |SetMulticastLoopbackMode| is slightly // different between Windows and Unix-like systems. The inconsistency only // happens when there are more than one applications on the same host // joined to the same multicast group while having different settings on // multicast loopback mode. On Windows, the applications with loopback off // will not RECEIVE the loopback packets; while on Unix-like systems, the // applications with loopback off will not SEND the loopback packets to // other applications on the same host. See MSDN: http://goo.gl/6vqbj int SetMulticastLoopbackMode(bool loopback); // Sets the differentiated services flags on outgoing packets. May not // do anything on some platforms. // Returns a net error code. int SetDiffServCodePoint(DiffServCodePoint dscp); // Resets the thread to be used for thread-safety checks. void DetachFromThread(); // Apply |tag| to this socket. void ApplySocketTag(const SocketTag& tag); void SetWriteAsyncEnabled(bool enabled) { write_async_enabled_ = enabled; } bool WriteAsyncEnabled() { return write_async_enabled_; } void SetMaxPacketSize(size_t max_packet_size); void SetWriteMultiCoreEnabled(bool enabled) { write_multi_core_enabled_ = enabled; } void SetSendmmsgEnabled(bool enabled) { DCHECK(sender_ != nullptr); sender_->SetSendmmsgEnabled(enabled); } void SetWriteBatchingActive(bool active) { write_batching_active_ = active; } void SetWriteAsyncMaxBuffers(int value) { LOG(INFO) << "SetWriteAsyncMaxBuffers: " << value; write_async_max_buffers_ = value; } // Enables experimental optimization. This method should be called // before the socket is used to read data for the first time. void enable_experimental_recv_optimization() { DCHECK_EQ(kInvalidSocket, socket_); experimental_recv_optimization_enabled_ = true; }; protected: // WriteAsync batching etc. are to improve throughput of large high // bandwidth uploads. // Watcher for WriteAsync paths. class WriteAsyncWatcher : public base::MessagePumpForIO::FdWatcher { public: explicit WriteAsyncWatcher(UDPSocketPosix* socket) : socket_(socket), watching_(false) {} // MessagePumpForIO::FdWatcher methods void OnFileCanReadWithoutBlocking(int /* fd */) override {} void OnFileCanWriteWithoutBlocking(int /* fd */) override; void set_watching(bool watching) { watching_ = watching; } bool watching() { return watching_; } private: UDPSocketPosix* const socket_; bool watching_; DISALLOW_COPY_AND_ASSIGN(WriteAsyncWatcher); }; void IncreaseWriteAsyncOutstanding(int increment) { write_async_outstanding_ += increment; } virtual bool InternalWatchFileDescriptor(); virtual void InternalStopWatchingFileDescriptor(); void SetWriteCallback(CompletionOnceCallback callback) { write_callback_ = std::move(callback); } void DidSendBuffers(SendResult buffers); void FlushPending(); std::unique_ptr write_async_watcher_; scoped_refptr sender_; std::unique_ptr datagram_buffer_pool_; // |WriteAsync| pending writes, does not include buffers that have // been |PostTask*|'d. DatagramBuffers pending_writes_; private: enum SocketOptions { SOCKET_OPTION_MULTICAST_LOOP = 1 << 0 }; class ReadWatcher : public base::MessagePumpForIO::FdWatcher { public: explicit ReadWatcher(UDPSocketPosix* socket) : socket_(socket) {} // MessagePumpForIO::FdWatcher methods void OnFileCanReadWithoutBlocking(int /* fd */) override; void OnFileCanWriteWithoutBlocking(int /* fd */) override {} private: UDPSocketPosix* const socket_; DISALLOW_COPY_AND_ASSIGN(ReadWatcher); }; class WriteWatcher : public base::MessagePumpForIO::FdWatcher { public: explicit WriteWatcher(UDPSocketPosix* socket) : socket_(socket) {} // MessagePumpForIO::FdWatcher methods void OnFileCanReadWithoutBlocking(int /* fd */) override {} void OnFileCanWriteWithoutBlocking(int /* fd */) override; private: UDPSocketPosix* const socket_; DISALLOW_COPY_AND_ASSIGN(WriteWatcher); }; int InternalWriteAsync(CompletionOnceCallback callback, const NetworkTrafficAnnotationTag& traffic_annotation); bool WatchFileDescriptor(); void StopWatchingFileDescriptor(); void DoReadCallback(int rv); void DoWriteCallback(int rv); void DidCompleteRead(); void DidCompleteWrite(); // Handles stats and logging. |result| is the number of bytes transferred, on // success, or the net error code on failure. On success, LogRead takes in a // sockaddr and its length, which are mandatory, while LogWrite takes in an // optional IPEndPoint. void LogRead(int result, const char* bytes, socklen_t addr_len, const sockaddr* addr); void LogWrite(int result, const char* bytes, const IPEndPoint* address); // Same as SendTo(), except that address is passed by pointer // instead of by reference. It is called from Write() with |address| // set to NULL. int SendToOrWrite(IOBuffer* buf, int buf_len, const IPEndPoint* address, CompletionOnceCallback callback); int InternalConnect(const IPEndPoint& address); // Reads data from a UDP socket. Depending whether the socket is connected or // not, the method delegates the call to InternalRecvFromConnectedSocket() // or InternalRecvFromNonConnectedSocket() respectively. // For proper detection of truncated reads, the |buf_len| should always be // one byte longer than the expected maximum packet length. int InternalRecvFrom(IOBuffer* buf, int buf_len, IPEndPoint* address); // A more efficient implementation of the InternalRecvFrom() method for // reading data from connected sockets. Internally the method uses the read() // system call. int InternalRecvFromConnectedSocket(IOBuffer* buf, int buf_len, IPEndPoint* address); // An implementation of the InternalRecvFrom() method for reading data // from non-connected sockets. Internally the method uses the recvmsg() // system call. int InternalRecvFromNonConnectedSocket(IOBuffer* buf, int buf_len, IPEndPoint* address); int InternalSendTo(IOBuffer* buf, int buf_len, const IPEndPoint* address); // Applies |socket_options_| to |socket_|. Should be called before // Bind(). int SetMulticastOptions(); int DoBind(const IPEndPoint& address); // Binds to a random port on |address|. int RandomBind(const IPAddress& address); // Helpers for |WriteAsync| base::SequencedTaskRunner* GetTaskRunner(); void OnWriteAsyncTimerFired(); void LocalSendBuffers(); void PostSendBuffers(); int ResetLastAsyncResult(); int ResetWrittenBytes(); int socket_; int addr_family_; bool is_connected_; // Bitwise-or'd combination of SocketOptions. Specifies the set of // options that should be applied to |socket_| before Bind(). int socket_options_; // Flags passed to sendto(). int sendto_flags_; // Multicast interface. uint32_t multicast_interface_; // Multicast socket options cached for SetMulticastOption. // Cannot be used after Bind(). int multicast_time_to_live_; // How to do source port binding, used only when UDPSocket is part of // UDPClientSocket, since UDPServerSocket provides Bind. DatagramSocket::BindType bind_type_; // These are mutable since they're just cached copies to make // GetPeerAddress/GetLocalAddress smarter. mutable std::unique_ptr local_address_; mutable std::unique_ptr remote_address_; // The socket's posix wrappers base::MessagePumpForIO::FdWatchController read_socket_watcher_; base::MessagePumpForIO::FdWatchController write_socket_watcher_; // The corresponding watchers for reads and writes. ReadWatcher read_watcher_; WriteWatcher write_watcher_; // Various bits to support |WriteAsync()|. bool write_async_enabled_ = false; bool write_batching_active_ = false; bool write_multi_core_enabled_ = false; int write_async_max_buffers_ = 16; int written_bytes_ = 0; int last_async_result_; base::RepeatingTimer write_async_timer_; bool write_async_timer_running_; // Total writes in flight, including those |PostTask*|'d. int write_async_outstanding_; scoped_refptr task_runner_; // The buffer used by InternalRead() to retry Read requests scoped_refptr read_buf_; int read_buf_len_; IPEndPoint* recv_from_address_; // The buffer used by InternalWrite() to retry Write requests scoped_refptr write_buf_; int write_buf_len_; std::unique_ptr send_to_address_; // External callback; called when read is complete. CompletionOnceCallback read_callback_; // External callback; called when write is complete. CompletionOnceCallback write_callback_; NetLogWithSource net_log_; // Network that this socket is bound to via BindToNetwork(). NetworkChangeNotifier::NetworkHandle bound_network_; // These are used to lower the overhead updating activity monitor. SentActivityMonitor sent_activity_monitor_; ReceivedActivityMonitor received_activity_monitor_; // Current socket tag if |socket_| is valid, otherwise the tag to apply when // |socket_| is opened. SocketTag tag_; // If set to true, the socket will use an optimized experimental code path. // By default, the value is set to false. To use the optimization, the // client of the socket has to opt-in by calling the // enable_experimental_recv_optimization() method. bool experimental_recv_optimization_enabled_; THREAD_CHECKER(thread_checker_); // Used for alternate writes that are posted for concurrent execution. base::WeakPtrFactory weak_factory_; DISALLOW_COPY_AND_ASSIGN(UDPSocketPosix); }; } // namespace net #endif // NET_SOCKET_UDP_SOCKET_POSIX_H_