/* * * Copyright (c) 2020-2021 Project CHIP Authors * Copyright (c) 2013-2018 Nest Labs, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * This file implements Inet::TCPEndPoint using sockets. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // SOCK_CLOEXEC not defined on all platforms, e.g. iOS/macOS: #ifndef SOCK_CLOEXEC #define SOCK_CLOEXEC 0 #endif #if defined(SOL_TCP) // socket option level for Linux and BSD systems. #define TCP_SOCKOPT_LEVEL SOL_TCP #else // socket option level for macOS & iOS systems. #define TCP_SOCKOPT_LEVEL IPPROTO_TCP #endif #if defined(TCP_KEEPIDLE) // socket option for Linux and BSD systems. #define TCP_IDLE_INTERVAL_OPT_NAME TCP_KEEPIDLE #else // socket option for macOS & iOS systems. #define TCP_IDLE_INTERVAL_OPT_NAME TCP_KEEPALIVE #endif namespace chip { namespace Inet { CHIP_ERROR TCPEndPointImplSockets::BindImpl(IPAddressType addrType, const IPAddress & addr, uint16_t port, bool reuseAddr) { CHIP_ERROR res = GetSocket(addrType); if (res == CHIP_NO_ERROR && reuseAddr) { int n = 1; setsockopt(mSocket, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n)); #ifdef SO_REUSEPORT // Enable SO_REUSEPORT. This permits coexistence between an // untargetted CHIP client and other services that listen on // a CHIP port on a specific address (such as a CHIP client // with TARGETED_LISTEN or TCP proxying services). Note that // one of the costs of this implementation is the // non-deterministic connection dispatch when multple clients // listen on the address with the same degreee of selectivity, // e.g. two untargetted-listen CHIP clients, or two // targeted-listen CHIP clients with the same node id. if (setsockopt(mSocket, SOL_SOCKET, SO_REUSEPORT, &n, sizeof(n)) != 0) { ChipLogError(Inet, "SO_REUSEPORT: %d", errno); } #endif // defined(SO_REUSEPORT) } if (res == CHIP_NO_ERROR) { SockAddr sa; memset(&sa, 0, sizeof(sa)); socklen_t sockaddrsize = 0; if (addrType == IPAddressType::kIPv6) { sa.in6.sin6_family = AF_INET6; sa.in6.sin6_port = htons(port); sa.in6.sin6_flowinfo = 0; sa.in6.sin6_addr = addr.ToIPv6(); sa.in6.sin6_scope_id = 0; sockaddrsize = sizeof(sa.in6); } #if INET_CONFIG_ENABLE_IPV4 else if (addrType == IPAddressType::kIPv4) { sa.in.sin_family = AF_INET; sa.in.sin_port = htons(port); sa.in.sin_addr = addr.ToIPv4(); sockaddrsize = sizeof(sa.in); } #endif // INET_CONFIG_ENABLE_IPV4 else { res = INET_ERROR_WRONG_ADDRESS_TYPE; } if (res == CHIP_NO_ERROR) { // NOLINTNEXTLINE(clang-analyzer-unix.StdCLibraryFunctions): GetSocket calls ensure mSocket is valid if (bind(mSocket, &sa.any, sockaddrsize) != 0) { res = CHIP_ERROR_POSIX(errno); } } } return res; } CHIP_ERROR TCPEndPointImplSockets::ListenImpl(uint16_t backlog) { if (listen(mSocket, backlog) != 0) { return CHIP_ERROR_POSIX(errno); } // Enable non-blocking mode for the socket. int flags = fcntl(mSocket, F_GETFL, 0); fcntl(mSocket, F_SETFL, flags | O_NONBLOCK); // Wait for ability to read on this endpoint. CHIP_ERROR res = static_cast(GetSystemLayer()) .SetCallback(mWatch, HandlePendingIO, reinterpret_cast(this)); if (res == CHIP_NO_ERROR) { res = static_cast(GetSystemLayer()).RequestCallbackOnPendingRead(mWatch); } return res; } CHIP_ERROR TCPEndPointImplSockets::ConnectImpl(const IPAddress & addr, uint16_t port, InterfaceId intfId) { IPAddressType addrType = addr.Type(); ReturnErrorOnFailure(GetSocket(addrType)); if (!intfId.IsPresent()) { // The behavior when connecting to an IPv6 link-local address without specifying an outbound // interface is ambiguous. So prevent it in all cases. if (addr.IsIPv6LinkLocal()) { return INET_ERROR_WRONG_ADDRESS_TYPE; } } else { // Try binding to the interface // If destination is link-local then there is no need to bind to // interface or address on the interface. if (!addr.IsIPv6LinkLocal()) { #ifdef SO_BINDTODEVICE struct ::ifreq ifr; memset(&ifr, 0, sizeof(ifr)); ReturnErrorOnFailure(intfId.GetInterfaceName(ifr.ifr_name, sizeof(ifr.ifr_name))); // Attempt to bind to the interface using SO_BINDTODEVICE which requires privileged access. // If the permission is denied(EACCES) because CHIP is running in a context // that does not have privileged access, choose a source address on the // interface to bind the connetion to. int r = setsockopt(mSocket, SOL_SOCKET, SO_BINDTODEVICE, &ifr, sizeof(ifr)); if (r < 0 && errno != EACCES) { return CHIP_ERROR_POSIX(errno); } if (r < 0) #endif // SO_BINDTODEVICE { // Attempting to initiate a connection via a specific interface is not allowed. // The only way to do this is to bind the local to an address on the desired // interface. ReturnErrorOnFailure(BindSrcAddrFromIntf(addrType, intfId)); } } } // Disable generation of SIGPIPE. #ifdef SO_NOSIGPIPE int n = 1; setsockopt(mSocket, SOL_SOCKET, SO_NOSIGPIPE, &n, sizeof(n)); #endif // defined(SO_NOSIGPIPE) // Enable non-blocking mode for the socket. int flags = fcntl(mSocket, F_GETFL, 0); fcntl(mSocket, F_SETFL, flags | O_NONBLOCK); socklen_t sockaddrsize = 0; SockAddr sa; memset(&sa, 0, sizeof(sa)); if (addrType == IPAddressType::kIPv6) { sa.in6.sin6_family = AF_INET6; sa.in6.sin6_port = htons(port); sa.in6.sin6_flowinfo = 0; sa.in6.sin6_addr = addr.ToIPv6(); sa.in6.sin6_scope_id = intfId.GetPlatformInterface(); sockaddrsize = sizeof(sockaddr_in6); } #if INET_CONFIG_ENABLE_IPV4 else if (addrType == IPAddressType::kIPv4) { sa.in.sin_family = AF_INET; sa.in.sin_port = htons(port); sa.in.sin_addr = addr.ToIPv4(); sockaddrsize = sizeof(sockaddr_in); } #endif // INET_CONFIG_ENABLE_IPV4 else { return INET_ERROR_WRONG_ADDRESS_TYPE; } // NOLINTNEXTLINE(clang-analyzer-unix.StdCLibraryFunctions): GetSocket calls ensure mSocket is valid int conRes = connect(mSocket, &sa.any, sockaddrsize); if (conRes == -1 && errno != EINPROGRESS) { CHIP_ERROR res = CHIP_ERROR_POSIX(errno); DoClose(res, true); return res; } ReturnErrorOnFailure(static_cast(GetSystemLayer()) .SetCallback(mWatch, HandlePendingIO, reinterpret_cast(this))); // Once Connecting or Connected, bump the reference count. The corresponding Release() will happen in DoClose(). Retain(); if (conRes == 0) { mState = State::kConnected; // Wait for ability to read on this endpoint. ReturnErrorOnFailure(static_cast(GetSystemLayer()).RequestCallbackOnPendingRead(mWatch)); if (OnConnectComplete != nullptr) { OnConnectComplete(this, CHIP_NO_ERROR); } } else { mState = State::kConnecting; // Wait for ability to write on this endpoint. ReturnErrorOnFailure(static_cast(GetSystemLayer()).RequestCallbackOnPendingWrite(mWatch)); } return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::GetPeerInfo(IPAddress * retAddr, uint16_t * retPort) const { return GetSocketInfo(getpeername, retAddr, retPort); } CHIP_ERROR TCPEndPointImplSockets::GetLocalInfo(IPAddress * retAddr, uint16_t * retPort) const { return GetSocketInfo(getsockname, retAddr, retPort); } CHIP_ERROR TCPEndPointImplSockets::GetSocketInfo(int getname(int, sockaddr *, socklen_t *), IPAddress * retAddr, uint16_t * retPort) const { VerifyOrReturnError(IsConnected(), CHIP_ERROR_INCORRECT_STATE); SockAddr sa; memset(&sa, 0, sizeof(sa)); socklen_t saLen = sizeof(sa); if (getname(mSocket, &sa.any, &saLen) != 0) { return CHIP_ERROR_POSIX(errno); } if (sa.any.sa_family == AF_INET6) { *retAddr = IPAddress(sa.in6.sin6_addr); *retPort = ntohs(sa.in6.sin6_port); return CHIP_NO_ERROR; } #if INET_CONFIG_ENABLE_IPV4 if (sa.any.sa_family == AF_INET) { *retAddr = IPAddress(sa.in.sin_addr); *retPort = ntohs(sa.in.sin_port); return CHIP_NO_ERROR; } #endif // INET_CONFIG_ENABLE_IPV4 return CHIP_ERROR_INCORRECT_STATE; } CHIP_ERROR TCPEndPointImplSockets::GetInterfaceId(InterfaceId * retInterface) { VerifyOrReturnError(IsConnected(), CHIP_ERROR_INCORRECT_STATE); SockAddr sa; memset(&sa, 0, sizeof(sa)); socklen_t saLen = sizeof(sa); if (getpeername(mSocket, &sa.any, &saLen) != 0) { return CHIP_ERROR_POSIX(errno); } if (sa.any.sa_family == AF_INET6) { if (IPAddress(sa.in6.sin6_addr).IsIPv6LinkLocal()) { *retInterface = InterfaceId(sa.in6.sin6_scope_id); } else { // TODO: Is there still a meaningful interface id in this case? *retInterface = InterfaceId::Null(); } return CHIP_NO_ERROR; } #if INET_CONFIG_ENABLE_IPV4 if (sa.any.sa_family == AF_INET) { // No interface id available for IPv4 sockets. *retInterface = InterfaceId::Null(); return CHIP_NO_ERROR; } #endif // INET_CONFIG_ENABLE_IPV4 *retInterface = InterfaceId::Null(); return INET_ERROR_WRONG_ADDRESS_TYPE; } CHIP_ERROR TCPEndPointImplSockets::SendQueuedImpl(bool queueWasEmpty) { if (queueWasEmpty) { // Wait for ability to write on this endpoint. return static_cast(GetSystemLayer()).RequestCallbackOnPendingWrite(mWatch); } return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::EnableNoDelay() { VerifyOrReturnError(IsConnected(), CHIP_ERROR_INCORRECT_STATE); #ifdef TCP_NODELAY // Disable TCP Nagle buffering by setting TCP_NODELAY socket option to true int val = 1; if (setsockopt(mSocket, TCP_SOCKOPT_LEVEL, TCP_NODELAY, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } #endif // defined(TCP_NODELAY) return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::EnableKeepAlive(uint16_t interval, uint16_t timeoutCount) { VerifyOrReturnError(IsConnected(), CHIP_ERROR_INCORRECT_STATE); // Set the idle interval int val = interval; if (setsockopt(mSocket, TCP_SOCKOPT_LEVEL, TCP_IDLE_INTERVAL_OPT_NAME, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } // Set the probe retransmission interval. val = interval; if (setsockopt(mSocket, TCP_SOCKOPT_LEVEL, TCP_KEEPINTVL, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } // Set the probe timeout count val = timeoutCount; if (setsockopt(mSocket, TCP_SOCKOPT_LEVEL, TCP_KEEPCNT, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } // Enable keepalives for the connection. val = 1; // enable if (setsockopt(mSocket, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::DisableKeepAlive() { VerifyOrReturnError(IsConnected(), CHIP_ERROR_INCORRECT_STATE); // Disable keepalives on the connection. int val = 0; // disable if (setsockopt(mSocket, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::AckReceive(size_t len) { VerifyOrReturnError(IsConnected(), CHIP_ERROR_INCORRECT_STATE); // nothing to do for sockets case return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::SetUserTimeoutImpl(uint32_t userTimeoutMillis) { #if defined(TCP_USER_TIMEOUT) // Set the user timeout uint32_t val = userTimeoutMillis; if (setsockopt(mSocket, TCP_SOCKOPT_LEVEL, TCP_USER_TIMEOUT, &val, sizeof(val)) != 0) { return CHIP_ERROR_POSIX(errno); } return CHIP_NO_ERROR; #else // TCP_USER_TIMEOUT return CHIP_ERROR_NOT_IMPLEMENTED; #endif // defined(TCP_USER_TIMEOUT) } CHIP_ERROR TCPEndPointImplSockets::DriveSendingImpl() { CHIP_ERROR err = CHIP_NO_ERROR; #ifdef MSG_NOSIGNAL const int sendFlags = MSG_NOSIGNAL; #else const int sendFlags = 0; #endif // Pretend send() fails in the while loop below INET_FAULT_INJECT(FaultInjection::kFault_Send, { err = CHIP_ERROR_POSIX(EIO); DoClose(err, false); return err; }); while (!mSendQueue.IsNull()) { size_t bufLen = mSendQueue->DataLength(); ssize_t lenSentRaw = send(mSocket, mSendQueue->Start(), bufLen, sendFlags); if (lenSentRaw == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { err = (errno == EPIPE) ? INET_ERROR_PEER_DISCONNECTED : CHIP_ERROR_POSIX(errno); } break; } if (lenSentRaw < 0 || bufLen < static_cast(lenSentRaw)) { err = CHIP_ERROR_INCORRECT_STATE; break; } size_t lenSent = static_cast(lenSentRaw); // Mark the connection as being active. MarkActive(); if (lenSent < bufLen) { mSendQueue->ConsumeHead(lenSent); } else { mSendQueue.FreeHead(); if (mSendQueue.IsNull()) { // Do not wait for ability to write on this endpoint. err = static_cast(GetSystemLayer()).ClearCallbackOnPendingWrite(mWatch); if (err != CHIP_NO_ERROR) { break; } } } if (OnDataSent != nullptr) { OnDataSent(this, lenSent); } #if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT mBytesWrittenSinceLastProbe += lenSent; bool isProgressing = false; err = CheckConnectionProgress(isProgressing); if (err != CHIP_NO_ERROR) { break; } if (!mUserTimeoutTimerRunning) { // Timer was not running before this write. So, start // the timer. StartTCPUserTimeoutTimer(); } else if (isProgressing) { // Progress is being made. So, shift the timer // forward if it was started. RestartTCPUserTimeoutTimer(); } #endif // INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT if (lenSent < bufLen) { break; } } if (err == CHIP_NO_ERROR) { // If we're in the SendShutdown state and the send queue is now empty, shutdown writing on the socket. if (mState == State::kSendShutdown && mSendQueue.IsNull()) { if (shutdown(mSocket, SHUT_WR) != 0) { err = CHIP_ERROR_POSIX(errno); } } } return err; } void TCPEndPointImplSockets::HandleConnectCompleteImpl() { // Wait for ability to read or write on this endpoint. CHIP_ERROR err = static_cast(GetSystemLayer()).RequestCallbackOnPendingRead(mWatch); if (err == CHIP_NO_ERROR) { err = static_cast(GetSystemLayer()).RequestCallbackOnPendingWrite(mWatch); } if (err != CHIP_NO_ERROR) { DoClose(err, false); return; } } void TCPEndPointImplSockets::DoCloseImpl(CHIP_ERROR err, State oldState) { struct linger lingerStruct; // If the socket hasn't been closed already... if (mSocket != kInvalidSocketFd) { // If entering the Closed state // OR if entering the Closing state, and there's no unsent data in the send queue // THEN close the socket. if (mState == State::kClosed || (mState == State::kClosing && mSendQueue.IsNull())) { // If aborting the connection, ensure we send a TCP RST. if (IsConnected(oldState) && err != CHIP_NO_ERROR) { lingerStruct.l_onoff = 1; lingerStruct.l_linger = 0; if (setsockopt(mSocket, SOL_SOCKET, SO_LINGER, &lingerStruct, sizeof(lingerStruct)) != 0) { ChipLogError(Inet, "SO_LINGER: %d", errno); } } static_cast(GetSystemLayer()).StopWatchingSocket(&mWatch); close(mSocket); mSocket = kInvalidSocketFd; } } } #if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT void TCPEndPointImplSockets::TCPUserTimeoutHandler() { // Set the timer running flag to false mUserTimeoutTimerRunning = false; bool isProgressing = false; CHIP_ERROR err = CheckConnectionProgress(isProgressing); if (err == CHIP_NO_ERROR && mLastTCPKernelSendQueueLen != 0) { // There is data in the TCP Send Queue if (isProgressing) { // Data is flowing, so restart the UserTimeout timer // to shift it forward while also resetting the max // poll count. StartTCPUserTimeoutTimer(); } else { // Close the connection as the TCP UserTimeout has expired err = INET_ERROR_TCP_USER_TIMEOUT; } } if (err != CHIP_NO_ERROR) { // Close the connection as the TCP UserTimeout has expired DoClose(err, false); } } #endif // INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT CHIP_ERROR TCPEndPointImplSockets::BindSrcAddrFromIntf(IPAddressType addrType, InterfaceId intfId) { // If we are trying to make a TCP connection over a 'specified target interface', // then we bind the TCPEndPoint to an IP address on that target interface // and use that address as the source address for that connection. This is // done in the event that directly binding the connection to the target // interface is not allowed due to insufficient privileges. VerifyOrReturnError(mState != State::kBound, CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE); bool ipAddrFound = false; for (InterfaceAddressIterator addrIter; addrIter.HasCurrent(); addrIter.Next()) { IPAddress curAddr; if ((addrIter.GetInterfaceId() == intfId) && (addrIter.GetAddress(curAddr) == CHIP_NO_ERROR)) { // Search for an IPv4 address on the TargetInterface #if INET_CONFIG_ENABLE_IPV4 if (addrType == IPAddressType::kIPv4) { if (curAddr.IsIPv4()) { // Bind to the IPv4 address of the TargetInterface ipAddrFound = true; ReturnErrorOnFailure(Bind(IPAddressType::kIPv4, curAddr, 0, true)); break; } } #endif // INET_CONFIG_ENABLE_IPV4 if (addrType == IPAddressType::kIPv6) { // Select an IPv6 address on the interface that is not // a link local or a multicast address. // TODO: Define a proper IPv6GlobalUnicast address checker. if (!curAddr.IsIPv4() && !curAddr.IsIPv6LinkLocal() && !curAddr.IsMulticast()) { // Bind to the IPv6 address of the TargetInterface ipAddrFound = true; ReturnErrorOnFailure(Bind(IPAddressType::kIPv6, curAddr, 0, true)); break; } } } } VerifyOrReturnError(ipAddrFound, CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE); return CHIP_NO_ERROR; } CHIP_ERROR TCPEndPointImplSockets::GetSocket(IPAddressType addrType) { if (mSocket == kInvalidSocketFd) { int family; if (addrType == IPAddressType::kIPv6) { family = PF_INET6; #if INET_CONFIG_ENABLE_IPV4 } else if (addrType == IPAddressType::kIPv4) { family = PF_INET; #endif // INET_CONFIG_ENABLE_IPV4 } else { return INET_ERROR_WRONG_ADDRESS_TYPE; } mSocket = ::socket(family, SOCK_STREAM | SOCK_CLOEXEC, 0); if (mSocket == -1) { return CHIP_ERROR_POSIX(errno); } ReturnErrorOnFailure(static_cast(GetSystemLayer()).StartWatchingSocket(mSocket, &mWatch)); mAddrType = addrType; // If creating an IPv6 socket, tell the kernel that it will be IPv6 only. This makes it // posible to bind two sockets to the same port, one for IPv4 and one for IPv6. #ifdef IPV6_V6ONLY if (family == PF_INET6) { int one = 1; setsockopt(mSocket, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)); } #endif // defined(IPV6_V6ONLY) // On systems that support it, disable the delivery of SIGPIPE signals when writing to a closed // socket. #ifdef SO_NOSIGPIPE { int one = 1; int res = setsockopt(mSocket, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); if (res != 0) { ChipLogError(Inet, "SO_NOSIGPIPE: %d", errno); } } #endif // defined(SO_NOSIGPIPE) } else if (mAddrType != addrType) { return CHIP_ERROR_INCORRECT_STATE; } return CHIP_NO_ERROR; } // static void TCPEndPointImplSockets::HandlePendingIO(System::SocketEvents events, intptr_t data) { reinterpret_cast(data)->HandlePendingIO(events); } void TCPEndPointImplSockets::HandlePendingIO(System::SocketEvents events) { // Prevent the end point from being freed while in the middle of a callback. Retain(); // If in the Listening state, and the app is ready to receive a connection, and there is a connection // ready to be received on the socket, process the incoming connection. if (mState == State::kListening) { if (OnConnectionReceived != nullptr && events.Has(System::SocketEventFlags::kRead)) { HandleIncomingConnection(); } } // If in the processes of initiating a connection... else if (mState == State::kConnecting) { // The socket being writable indicates the connection has completed (successfully or otherwise). if (events.Has(System::SocketEventFlags::kWrite)) { #ifndef __MBED__ // Get the connection result from the socket. int osConRes; socklen_t optLen = sizeof(osConRes); if (getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &osConRes, &optLen) != 0) { osConRes = errno; } #else // __MBED__ // On Mbed OS, connect blocks and never returns EINPROGRESS // The socket option SO_ERROR is not available. int osConRes = 0; #endif // !__MBED__ CHIP_ERROR conRes = CHIP_ERROR_POSIX(osConRes); // Process the connection result. HandleConnectComplete(conRes); } } else { // If in a state where sending is allowed, and there is data to be sent, and the socket is ready for // writing, drive outbound data into the connection. if (IsConnected() && !mSendQueue.IsNull() && events.Has(System::SocketEventFlags::kWrite)) { DriveSending(); } // If in a state were receiving is allowed, and the app is ready to receive data, and data is ready // on the socket, receive inbound data from the connection. if ((mState == State::kConnected || mState == State::kSendShutdown) && mReceiveEnabled && OnDataReceived != nullptr && events.Has(System::SocketEventFlags::kRead)) { ReceiveData(); } } Release(); } void TCPEndPointImplSockets::ReceiveData() { System::PacketBufferHandle rcvBuf; bool isNewBuf = true; if (mRcvQueue.IsNull()) { rcvBuf = System::PacketBufferHandle::New(kMaxReceiveMessageSize, 0); } else { rcvBuf = mRcvQueue->Last(); if (rcvBuf->AvailableDataLength() == 0) { rcvBuf = System::PacketBufferHandle::New(kMaxReceiveMessageSize, 0); } else { isNewBuf = false; rcvBuf->CompactHead(); } } if (rcvBuf.IsNull()) { DoClose(CHIP_ERROR_NO_MEMORY, false); return; } // Attempt to receive data from the socket. ssize_t rcvLen = recv(mSocket, rcvBuf->Start() + rcvBuf->DataLength(), rcvBuf->AvailableDataLength(), 0); #if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT CHIP_ERROR err; bool isProgressing = false; err = CheckConnectionProgress(isProgressing); if (err != CHIP_NO_ERROR) { DoClose(err, false); return; } if (mLastTCPKernelSendQueueLen == 0) { // If the output queue has been flushed then stop the timer. StopTCPUserTimeoutTimer(); } else if (isProgressing && mUserTimeoutTimerRunning) { // Progress is being made. So, shift the timer // forward if it was started. RestartTCPUserTimeoutTimer(); } #endif // INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT // If an error occurred, abort the connection. if (rcvLen < 0) { int systemErrno = errno; if (systemErrno == EAGAIN) { // Note: in this case, we opt to not retry the recv call, // and instead we expect that the read flags will get // reset correctly upon a subsequent return from the // select call. ChipLogError(Inet, "recv: EAGAIN, will retry"); return; } DoClose(CHIP_ERROR_POSIX(systemErrno), false); } else { // Mark the connection as being active. MarkActive(); // If the peer closed their end of the connection... if (rcvLen == 0) { // If in the Connected state and the app has provided an OnPeerClose callback, // enter the ReceiveShutdown state. Providing an OnPeerClose callback allows // the app to decide whether to keep the send side of the connection open after // the peer has closed. If no OnPeerClose is provided, we assume that the app // wants to close both directions and automatically enter the Closing state. if (mState == State::kConnected && OnPeerClose != nullptr) { mState = State::kReceiveShutdown; } else { mState = State::kClosing; } // Do not wait for ability to read on this endpoint. (void) static_cast(GetSystemLayer()).ClearCallbackOnPendingRead(mWatch); // Call the app's OnPeerClose. if (OnPeerClose != nullptr) { OnPeerClose(this); } } // Otherwise, add the new data onto the receive queue. else { VerifyOrDie(rcvLen > 0); size_t newDataLength = rcvBuf->DataLength() + static_cast(rcvLen); if (isNewBuf) { rcvBuf->SetDataLength(newDataLength); rcvBuf.RightSize(); if (mRcvQueue.IsNull()) { mRcvQueue = std::move(rcvBuf); } else { mRcvQueue->AddToEnd(std::move(rcvBuf)); } } else { rcvBuf->SetDataLength(newDataLength, mRcvQueue); } } } // Drive any received data into the app. DriveReceiving(); } void TCPEndPointImplSockets::HandleIncomingConnection() { CHIP_ERROR err = CHIP_NO_ERROR; TCPEndPointImplSockets * conEP = nullptr; IPAddress peerAddr; uint16_t peerPort; SockAddr sa; memset(&sa, 0, sizeof(sa)); socklen_t saLen = sizeof(sa); // Accept the new connection. int conSocket = accept(mSocket, &sa.any, &saLen); if (conSocket == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return; } err = CHIP_ERROR_POSIX(errno); } // If there's no callback available, fail with an error. if (err == CHIP_NO_ERROR && OnConnectionReceived == nullptr) { err = CHIP_ERROR_NO_CONNECTION_HANDLER; } // Extract the peer's address information. if (err == CHIP_NO_ERROR) { if (sa.any.sa_family == AF_INET6) { peerAddr = IPAddress(sa.in6.sin6_addr); peerPort = ntohs(sa.in6.sin6_port); } #if INET_CONFIG_ENABLE_IPV4 else if (sa.any.sa_family == AF_INET) { peerAddr = IPAddress(sa.in.sin_addr); peerPort = ntohs(sa.in.sin_port); } #endif // INET_CONFIG_ENABLE_IPV4 else { err = CHIP_ERROR_INCORRECT_STATE; } } // Attempt to allocate an end point object. if (err == CHIP_NO_ERROR) { TCPEndPoint * connectEndPoint = nullptr; err = GetEndPointManager().NewEndPoint(&connectEndPoint); conEP = static_cast(connectEndPoint); } // If all went well... if (err == CHIP_NO_ERROR) { // Put the new end point into the Connected state. conEP->mSocket = conSocket; err = static_cast(GetSystemLayer()).StartWatchingSocket(conSocket, &conEP->mWatch); if (err == CHIP_NO_ERROR) { conEP->mState = State::kConnected; #if INET_CONFIG_ENABLE_IPV4 conEP->mAddrType = (sa.any.sa_family == AF_INET6) ? IPAddressType::kIPv6 : IPAddressType::kIPv4; #else // !INET_CONFIG_ENABLE_IPV4 conEP->mAddrType = IPAddressType::kIPv6; #endif // !INET_CONFIG_ENABLE_IPV4 conEP->Retain(); // Wait for ability to read on this endpoint. auto & conEPLayer = static_cast(conEP->GetSystemLayer()); err = conEPLayer.SetCallback(conEP->mWatch, HandlePendingIO, reinterpret_cast(conEP)); if (err == CHIP_NO_ERROR) { err = conEPLayer.RequestCallbackOnPendingRead(conEP->mWatch); } if (err == CHIP_NO_ERROR) { // Call the app's callback function. OnConnectionReceived(this, conEP, peerAddr, peerPort); return; } } } // Otherwise immediately close the connection, clean up and call the app's error callback. if (conSocket != -1) { close(conSocket); } if (conEP != nullptr) { if (conEP->mState == State::kConnected) { conEP->Release(); } conEP->Release(); } if (OnAcceptError != nullptr) { OnAcceptError(this, err); } } #if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT /** * This function probes the TCP output queue and checks if data is successfully * being transferred to the other end. */ CHIP_ERROR TCPEndPointImplSockets::CheckConnectionProgress(bool & isProgressing) { int currPendingBytesRaw = 0; uint32_t currPendingBytes; // Will be initialized once we know it's safe. // Fetch the bytes pending successful transmission in the TCP out queue. #ifdef __APPLE__ socklen_t len = sizeof(currPendingBytesRaw); if (getsockopt(mSocket, SOL_SOCKET, SO_NWRITE, &currPendingBytesRaw, &len) < 0) #else if (ioctl(mSocket, TIOCOUTQ, &currPendingBytesRaw) < 0) #endif { return CHIP_ERROR_POSIX(errno); } if (!CanCastTo(currPendingBytesRaw)) { return CHIP_ERROR_INCORRECT_STATE; } currPendingBytes = static_cast(currPendingBytesRaw); if ((currPendingBytes != 0) && (mBytesWrittenSinceLastProbe + mLastTCPKernelSendQueueLen == currPendingBytes)) { // No progress has been made isProgressing = false; } else { // Data is flowing successfully isProgressing = true; } // Reset the value of the bytes written since the last probe into the tcp // outqueue was made and update the last tcp outqueue sample. mBytesWrittenSinceLastProbe = 0; mLastTCPKernelSendQueueLen = currPendingBytes; return CHIP_NO_ERROR; } #endif // INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT } // namespace Inet } // namespace chip