/* * * Copyright (c) 2020-2021 Project CHIP Authors * Copyright (c) 2013-2017 Nest Labs, Inc. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * @file * This file implements the CHIP Transport object that maintains TCP connections * to peers. Handles both establishing new connections and accepting peer connection * requests. */ #include #include #include #include #include #include #include namespace chip { namespace Transport { namespace { using namespace chip::Encoding; // Packets start with a 32-bit size field. constexpr size_t kPacketSizeBytes = 4; static_assert(System::PacketBuffer::kLargeBufMaxSizeWithoutReserve <= UINT32_MAX, "Cast below could truncate the value"); static_assert(System::PacketBuffer::kLargeBufMaxSizeWithoutReserve >= kPacketSizeBytes, "Large buffer allocation should be large enough to hold the length field"); constexpr uint32_t kMaxTCPMessageSize = static_cast(System::PacketBuffer::kLargeBufMaxSizeWithoutReserve - kPacketSizeBytes); constexpr int kListenBacklogSize = 2; } // namespace TCPBase::~TCPBase() { if (mListenSocket != nullptr) { // endpoint is only non null if it is initialized and listening mListenSocket->Free(); mListenSocket = nullptr; } CloseActiveConnections(); } void TCPBase::CloseActiveConnections() { for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (mActiveConnections[i].InUse()) { CloseConnectionInternal(&mActiveConnections[i], CHIP_NO_ERROR, SuppressCallback::Yes); } } } CHIP_ERROR TCPBase::Init(TcpListenParameters & params) { CHIP_ERROR err = CHIP_NO_ERROR; VerifyOrExit(mState == TCPState::kNotReady, err = CHIP_ERROR_INCORRECT_STATE); #if INET_CONFIG_ENABLE_TCP_ENDPOINT err = params.GetEndPointManager()->NewEndPoint(&mListenSocket); #else err = CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE; #endif SuccessOrExit(err); err = mListenSocket->Bind(params.GetAddressType(), Inet::IPAddress::Any, params.GetListenPort(), params.GetInterfaceId().IsPresent()); SuccessOrExit(err); mListenSocket->mAppState = reinterpret_cast(this); mListenSocket->OnConnectionReceived = HandleIncomingConnection; mListenSocket->OnAcceptError = HandleAcceptError; mEndpointType = params.GetAddressType(); err = mListenSocket->Listen(kListenBacklogSize); SuccessOrExit(err); mState = TCPState::kInitialized; exit: if (err != CHIP_NO_ERROR) { ChipLogError(Inet, "Failed to initialize TCP transport: %" CHIP_ERROR_FORMAT, err.Format()); if (mListenSocket) { mListenSocket->Free(); mListenSocket = nullptr; } } return err; } void TCPBase::Close() { if (mListenSocket) { mListenSocket->Free(); mListenSocket = nullptr; } mState = TCPState::kNotReady; } ActiveTCPConnectionState * TCPBase::AllocateConnection() { for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (!mActiveConnections[i].InUse()) { return &mActiveConnections[i]; } } return nullptr; } // Find an ActiveTCPConnectionState corresponding to a peer address ActiveTCPConnectionState * TCPBase::FindActiveConnection(const PeerAddress & address) { if (address.GetTransportType() != Type::kTcp) { return nullptr; } for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (!mActiveConnections[i].IsConnected()) { continue; } Inet::IPAddress addr; uint16_t port; mActiveConnections[i].mEndPoint->GetPeerInfo(&addr, &port); if ((addr == address.GetIPAddress()) && (port == address.GetPort())) { return &mActiveConnections[i]; } } return nullptr; } // Find the ActiveTCPConnectionState for a given TCPEndPoint ActiveTCPConnectionState * TCPBase::FindActiveConnection(const Inet::TCPEndPoint * endPoint) { for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (mActiveConnections[i].mEndPoint == endPoint && mActiveConnections[i].IsConnected()) { return &mActiveConnections[i]; } } return nullptr; } ActiveTCPConnectionState * TCPBase::FindInUseConnection(const Inet::TCPEndPoint * endPoint) { if (endPoint == nullptr) { return nullptr; } for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (mActiveConnections[i].mEndPoint == endPoint) { return &mActiveConnections[i]; } } return nullptr; } CHIP_ERROR TCPBase::SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) { // Sent buffer data format is: // - packet size as a uint32_t // - actual data VerifyOrReturnError(address.GetTransportType() == Type::kTcp, CHIP_ERROR_INVALID_ARGUMENT); VerifyOrReturnError(mState == TCPState::kInitialized, CHIP_ERROR_INCORRECT_STATE); VerifyOrReturnError(kPacketSizeBytes + msgBuf->DataLength() <= System::PacketBuffer::kLargeBufMaxSizeWithoutReserve, CHIP_ERROR_INVALID_ARGUMENT); static_assert(kPacketSizeBytes <= UINT16_MAX); VerifyOrReturnError(msgBuf->EnsureReservedSize(static_cast(kPacketSizeBytes)), CHIP_ERROR_NO_MEMORY); msgBuf->SetStart(msgBuf->Start() - kPacketSizeBytes); uint8_t * output = msgBuf->Start(); LittleEndian::Write32(output, static_cast(msgBuf->DataLength() - kPacketSizeBytes)); // Reuse existing connection if one exists, otherwise a new one // will be established ActiveTCPConnectionState * connection = FindActiveConnection(address); if (connection != nullptr) { return connection->mEndPoint->Send(std::move(msgBuf)); } return SendAfterConnect(address, std::move(msgBuf)); } CHIP_ERROR TCPBase::StartConnect(const PeerAddress & addr, Transport::AppTCPConnectionCallbackCtxt * appState, Transport::ActiveTCPConnectionState ** outPeerConnState) { #if INET_CONFIG_ENABLE_TCP_ENDPOINT ActiveTCPConnectionState * activeConnection = nullptr; Inet::TCPEndPoint * endPoint = nullptr; *outPeerConnState = nullptr; ReturnErrorOnFailure(mListenSocket->GetEndPointManager().NewEndPoint(&endPoint)); auto EndPointDeletor = [](Inet::TCPEndPoint * e) { e->Free(); }; std::unique_ptr endPointHolder(endPoint, EndPointDeletor); endPoint->mAppState = reinterpret_cast(this); endPoint->OnConnectComplete = HandleTCPEndPointConnectComplete; endPoint->SetConnectTimeout(mConnectTimeout); activeConnection = AllocateConnection(); VerifyOrReturnError(activeConnection != nullptr, CHIP_ERROR_NO_MEMORY); activeConnection->Init(endPoint, addr); activeConnection->mAppState = appState; activeConnection->mConnectionState = TCPState::kConnecting; // Set the return value of the peer connection state to the allocated // connection. *outPeerConnState = activeConnection; ReturnErrorOnFailure(endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface())); mUsedEndPointCount++; endPointHolder.release(); return CHIP_NO_ERROR; #else return CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE; #endif } CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBufferHandle && msg) { #if INET_CONFIG_ENABLE_TCP_ENDPOINT // This will initiate a connection to the specified peer bool alreadyConnecting = false; // Iterate through the ENTIRE array. If a pending packet for // the address already exists, this means a connection is pending and // does NOT need to be re-established. mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) { if (pending->mPeerAddress == addr) { // same destination exists. alreadyConnecting = true; pending->mPacketBuffer->AddToEnd(std::move(msg)); return Loop::Break; } return Loop::Continue; }); // If already connecting, buffer was just enqueued for more sending if (alreadyConnecting) { return CHIP_NO_ERROR; } // Ensures sufficient active connections size exist VerifyOrReturnError(mUsedEndPointCount < mActiveConnectionsSize, CHIP_ERROR_NO_MEMORY); Transport::ActiveTCPConnectionState * peerConnState = nullptr; ReturnErrorOnFailure(StartConnect(addr, nullptr, &peerConnState)); // enqueue the packet once the connection succeeds VerifyOrReturnError(mPendingPackets.CreateObject(addr, std::move(msg)) != nullptr, CHIP_ERROR_NO_MEMORY); mUsedEndPointCount++; return CHIP_NO_ERROR; #else return CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE; #endif } CHIP_ERROR TCPBase::ProcessReceivedBuffer(Inet::TCPEndPoint * endPoint, const PeerAddress & peerAddress, System::PacketBufferHandle && buffer) { ActiveTCPConnectionState * state = FindActiveConnection(endPoint); VerifyOrReturnError(state != nullptr, CHIP_ERROR_INTERNAL); state->mReceived.AddToEnd(std::move(buffer)); while (!state->mReceived.IsNull()) { uint8_t messageSizeBuf[kPacketSizeBytes]; CHIP_ERROR err = state->mReceived->Read(messageSizeBuf); if (err == CHIP_ERROR_BUFFER_TOO_SMALL) { // We don't have enough data to read the message size. Wait until there's more. return CHIP_NO_ERROR; } if (err != CHIP_NO_ERROR) { return err; } uint32_t messageSize = LittleEndian::Get32(messageSizeBuf); if (messageSize >= kMaxTCPMessageSize) { // Message is too big for this node to process. Disconnect from peer. ChipLogError(Inet, "Received TCP message of length %" PRIu32 " exceeds limit.", messageSize); CloseConnectionInternal(state, CHIP_ERROR_MESSAGE_TOO_LONG, SuppressCallback::No); return CHIP_ERROR_MESSAGE_TOO_LONG; } // The subtraction will not underflow because we successfully read kPacketSizeBytes. if (messageSize > (state->mReceived->TotalLength() - kPacketSizeBytes)) { // We have not yet received the complete message. return CHIP_NO_ERROR; } state->mReceived.Consume(kPacketSizeBytes); ReturnErrorOnFailure(ProcessSingleMessage(peerAddress, state, messageSize)); } return CHIP_NO_ERROR; } CHIP_ERROR TCPBase::ProcessSingleMessage(const PeerAddress & peerAddress, ActiveTCPConnectionState * state, size_t messageSize) { // We enter with `state->mReceived` containing at least one full message, perhaps in a chain. // `state->mReceived->Start()` currently points to the message data. // On exit, `state->mReceived` will have had `messageSize` bytes consumed, no matter what. System::PacketBufferHandle message; MessageTransportContext msgContext; msgContext.conn = state; if (state->mReceived->DataLength() == messageSize) { // In this case, the head packet buffer contains exactly the message. // This is common because typical messages fit in a network packet, and are delivered as such. // Peel off the head to pass upstream, which effectively consumes it from `state->mReceived`. message = state->mReceived.PopHead(); } else { // The message is either longer or shorter than the head buffer. // In either case, copy the message to a fresh linear buffer to pass upstream. We always copy, rather than provide // a shared reference to the current buffer, in case upper layers manipulate the buffer in ways that would affect // our use, e.g. chaining it elsewhere or reusing space beyond the current message. message = System::PacketBufferHandle::New(messageSize, 0); if (message.IsNull()) { return CHIP_ERROR_NO_MEMORY; } CHIP_ERROR err = state->mReceived->Read(message->Start(), messageSize); state->mReceived.Consume(messageSize); ReturnErrorOnFailure(err); message->SetDataLength(messageSize); } HandleMessageReceived(peerAddress, std::move(message), &msgContext); return CHIP_NO_ERROR; } void TCPBase::CloseConnectionInternal(ActiveTCPConnectionState * connection, CHIP_ERROR err, SuppressCallback suppressCallback) { TCPState prevState; if (connection == nullptr) { return; } if (connection->mConnectionState != TCPState::kClosed && connection->mEndPoint) { char addrStr[Transport::PeerAddress::kMaxToStringSize]; connection->mPeerAddr.ToString(addrStr); ChipLogProgress(Inet, "Closing connection with peer %s.", addrStr); if (err == CHIP_NO_ERROR) { connection->mEndPoint->Close(); } else { connection->mEndPoint->Abort(); } prevState = connection->mConnectionState; connection->mConnectionState = TCPState::kClosed; if (suppressCallback == SuppressCallback::No) { if (prevState == TCPState::kConnecting) { // Call upper layer connection attempt complete handler HandleConnectionAttemptComplete(connection, err); } else { // Call upper layer connection closed handler HandleConnectionClosed(connection, err); } } connection->Free(); mUsedEndPointCount--; } } CHIP_ERROR TCPBase::HandleTCPEndPointDataReceived(Inet::TCPEndPoint * endPoint, System::PacketBufferHandle && buffer) { Inet::IPAddress ipAddress; uint16_t port; Inet::InterfaceId interfaceId; endPoint->GetPeerInfo(&ipAddress, &port); endPoint->GetInterfaceId(&interfaceId); PeerAddress peerAddress = PeerAddress::TCP(ipAddress, port, interfaceId); TCPBase * tcp = reinterpret_cast(endPoint->mAppState); CHIP_ERROR err = tcp->ProcessReceivedBuffer(endPoint, peerAddress, std::move(buffer)); if (err != CHIP_NO_ERROR) { // Connection could need to be closed at this point ChipLogError(Inet, "Failed to accept received TCP message: %" CHIP_ERROR_FORMAT, err.Format()); return CHIP_ERROR_UNEXPECTED_EVENT; } return CHIP_NO_ERROR; } void TCPBase::HandleTCPEndPointConnectComplete(Inet::TCPEndPoint * endPoint, CHIP_ERROR conErr) { CHIP_ERROR err = CHIP_NO_ERROR; bool foundPendingPacket = false; TCPBase * tcp = reinterpret_cast(endPoint->mAppState); Inet::IPAddress ipAddress; uint16_t port; Inet::InterfaceId interfaceId; ActiveTCPConnectionState * activeConnection = nullptr; endPoint->GetPeerInfo(&ipAddress, &port); endPoint->GetInterfaceId(&interfaceId); char addrStr[Transport::PeerAddress::kMaxToStringSize]; PeerAddress addr = PeerAddress::TCP(ipAddress, port, interfaceId); addr.ToString(addrStr); if (conErr == CHIP_NO_ERROR) { // Set the Data received handler when connection completes endPoint->OnDataReceived = HandleTCPEndPointDataReceived; endPoint->OnDataSent = nullptr; endPoint->OnConnectionClosed = HandleTCPEndPointConnectionClosed; activeConnection = tcp->FindInUseConnection(endPoint); VerifyOrDie(activeConnection != nullptr); // Set to Connected state activeConnection->mConnectionState = TCPState::kConnected; // Disable TCP Nagle buffering by setting TCP_NODELAY socket option to true. // This is to expedite transmission of payload data and not rely on the // network stack's configuration of collating enough data in the TCP // window to begin transmission. err = endPoint->EnableNoDelay(); if (err != CHIP_NO_ERROR) { tcp->CloseConnectionInternal(activeConnection, err, SuppressCallback::No); return; } // Send any pending packets that are queued for this connection tcp->mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) { if (pending->mPeerAddress == addr) { foundPendingPacket = true; System::PacketBufferHandle buffer = std::move(pending->mPacketBuffer); tcp->mPendingPackets.ReleaseObject(pending); if ((conErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR)) { err = endPoint->Send(std::move(buffer)); } } return Loop::Continue; }); // Set the TCPKeepalive configurations on the established connection endPoint->EnableKeepAlive(activeConnection->mTCPKeepAliveIntervalSecs, activeConnection->mTCPMaxNumKeepAliveProbes); ChipLogProgress(Inet, "Connection established successfully with %s.", addrStr); // Let higher layer/delegate know that connection is successfully // established tcp->HandleConnectionAttemptComplete(activeConnection, CHIP_NO_ERROR); } else { ChipLogError(Inet, "Connection establishment with %s encountered an error: %" CHIP_ERROR_FORMAT, addrStr, err.Format()); endPoint->Free(); tcp->mUsedEndPointCount--; } } void TCPBase::HandleTCPEndPointConnectionClosed(Inet::TCPEndPoint * endPoint, CHIP_ERROR err) { TCPBase * tcp = reinterpret_cast(endPoint->mAppState); ActiveTCPConnectionState * activeConnection = tcp->FindInUseConnection(endPoint); if (activeConnection == nullptr) { endPoint->Free(); return; } if (err == CHIP_NO_ERROR && activeConnection->IsConnected()) { err = CHIP_ERROR_CONNECTION_CLOSED_UNEXPECTEDLY; } tcp->CloseConnectionInternal(activeConnection, err, SuppressCallback::No); } // Handler for incoming connection requests from peer nodes void TCPBase::HandleIncomingConnection(Inet::TCPEndPoint * listenEndPoint, Inet::TCPEndPoint * endPoint, const Inet::IPAddress & peerAddress, uint16_t peerPort) { TCPBase * tcp = reinterpret_cast(listenEndPoint->mAppState); ActiveTCPConnectionState * activeConnection = nullptr; Inet::InterfaceId interfaceId; Inet::IPAddress ipAddress; uint16_t port; endPoint->GetPeerInfo(&ipAddress, &port); endPoint->GetInterfaceId(&interfaceId); PeerAddress addr = PeerAddress::TCP(ipAddress, port, interfaceId); if (tcp->mUsedEndPointCount < tcp->mActiveConnectionsSize) { activeConnection = tcp->AllocateConnection(); endPoint->mAppState = listenEndPoint->mAppState; endPoint->OnDataReceived = HandleTCPEndPointDataReceived; endPoint->OnDataSent = nullptr; endPoint->OnConnectionClosed = HandleTCPEndPointConnectionClosed; // By default, disable TCP Nagle buffering by setting TCP_NODELAY socket option to true endPoint->EnableNoDelay(); // Update state for the active connection activeConnection->Init(endPoint, addr); tcp->mUsedEndPointCount++; activeConnection->mConnectionState = TCPState::kConnected; // Set the TCPKeepalive configurations on the received connection endPoint->EnableKeepAlive(activeConnection->mTCPKeepAliveIntervalSecs, activeConnection->mTCPMaxNumKeepAliveProbes); char addrStr[Transport::PeerAddress::kMaxToStringSize]; peerAddress.ToString(addrStr); ChipLogProgress(Inet, "Incoming connection established with peer at %s.", addrStr); // Call the upper layer handler for incoming connection received. tcp->HandleConnectionReceived(activeConnection); } else { ChipLogError(Inet, "Insufficient connection space to accept new connections."); endPoint->Free(); listenEndPoint->OnAcceptError(endPoint, CHIP_ERROR_TOO_MANY_CONNECTIONS); } } void TCPBase::HandleAcceptError(Inet::TCPEndPoint * endPoint, CHIP_ERROR err) { endPoint->Free(); ChipLogError(Inet, "Accept error: %" CHIP_ERROR_FORMAT, err.Format()); } CHIP_ERROR TCPBase::TCPConnect(const PeerAddress & address, Transport::AppTCPConnectionCallbackCtxt * appState, Transport::ActiveTCPConnectionState ** outPeerConnState) { VerifyOrReturnError(mState == TCPState::kInitialized, CHIP_ERROR_INCORRECT_STATE); // Verify that PeerAddress AddressType is TCP VerifyOrReturnError(address.GetTransportType() == Transport::Type::kTcp, CHIP_ERROR_INVALID_ARGUMENT); VerifyOrReturnError(mUsedEndPointCount < mActiveConnectionsSize, CHIP_ERROR_NO_MEMORY); char addrStr[Transport::PeerAddress::kMaxToStringSize]; address.ToString(addrStr); ChipLogProgress(Inet, "Connecting to peer %s.", addrStr); ReturnErrorOnFailure(StartConnect(address, appState, outPeerConnState)); return CHIP_NO_ERROR; } void TCPBase::TCPDisconnect(const PeerAddress & address) { // Closes an existing connection for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (mActiveConnections[i].IsConnected()) { const Inet::IPAddress & ipAddress = mActiveConnections[i].mPeerAddr.GetIPAddress(); uint16_t port = mActiveConnections[i].mPeerAddr.GetPort(); // Ignoring the InterfaceID in the check as it may not have been provided in // the PeerAddress during connection establishment. The IPAddress and Port // are the necessary and sufficient set of parameters for searching // through the connections. if (ipAddress == address.GetIPAddress() && port == address.GetPort() && address.GetTransportType() == Type::kTcp) { // NOTE: this leaves the socket in TIME_WAIT. // Calling Abort() would clean it since SO_LINGER would be set to 0, // however this seems not to be useful. CloseConnectionInternal(&mActiveConnections[i], CHIP_NO_ERROR, SuppressCallback::Yes); } } } } void TCPBase::TCPDisconnect(Transport::ActiveTCPConnectionState * conn, bool shouldAbort) { if (conn == nullptr) { ChipLogError(Inet, "Failed to Disconnect. Passed in Connection is null."); return; } // This call should be able to disconnect the connection either when it is // already established, or when it is being set up. if ((conn->IsConnected() && shouldAbort) || conn->IsConnecting()) { CloseConnectionInternal(conn, CHIP_ERROR_CONNECTION_ABORTED, SuppressCallback::Yes); } if (conn->IsConnected() && !shouldAbort) { CloseConnectionInternal(conn, CHIP_NO_ERROR, SuppressCallback::Yes); } } bool TCPBase::HasActiveConnections() const { for (size_t i = 0; i < mActiveConnectionsSize; i++) { if (mActiveConnections[i].IsConnected()) { return true; } } return false; } } // namespace Transport } // namespace chip