/*--------------------------------------------------------------- * Copyright (c) 1999,2000,2001,2002,2003 * The Board of Trustees of the University of Illinois * All Rights Reserved. *--------------------------------------------------------------- * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software (Iperf) and associated * documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, * sublicense, and/or sell copies of the Software, and to permit * persons to whom the Software is furnished to do * so, subject to the following conditions: * * * Redistributions of source code must retain the above * copyright notice, this list of conditions and * the following disclaimers. * * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimers in the documentation and/or other materials * provided with the distribution. * * * Neither the names of the University of Illinois, NCSA, * nor the names of its contributors may be used to endorse * or promote products derived from this Software without * specific prior written permission. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTRIBUTORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, * ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * ________________________________________________________________ * National Laboratory for Applied Network Research * National Center for Supercomputing Applications * University of Illinois at Urbana-Champaign * http://www.ncsa.uiuc.edu * ________________________________________________________________ * * Listener.cpp * by Mark Gates <mgates@nlanr.net> * & Ajay Tirumala <tirumala@ncsa.uiuc.edu> * rewritten by Robert McMahon * ------------------------------------------------------------------- * Listener sets up a socket listening on the server host. For each * connected socket that accept() returns, this creates a Server * socket and spawns a thread for it. * * Changes to the latest version. Listener will run as a daemon * Multicast Server is now Multi-threaded * ------------------------------------------------------------------- * headers * uses * <stdlib.h> * <stdio.h> * <string.h> * <errno.h> * * <sys/types.h> * <unistd.h> * * <netdb.h> * <netinet/in.h> * <sys/socket.h> * ------------------------------------------------------------------- */ #define HEADERS() #include "headers.h" #include "Listener.hpp" #include "SocketAddr.h" #include "PerfSocket.hpp" #include "active_hosts.h" #include "util.h" #include "version.h" #include "Locale.h" #include "SocketAddr.h" #include "payloads.h" #include "delay.h" /* ------------------------------------------------------------------- * Stores local hostname and socket info. * ------------------------------------------------------------------- */ Listener::Listener (thread_Settings *inSettings) { mClients = inSettings->mThreads; ListenSocket = INVALID_SOCKET; /* * These thread settings are stored in three places * * 1) Listener thread * 2) Reporter Thread (per the ReportSettings()) * 3) Server thread */ mSettings = inSettings; } // end Listener /* ------------------------------------------------------------------- * Delete memory (buffer). * ------------------------------------------------------------------- */ Listener::~Listener () { #if HAVE_THREAD_DEBUG thread_debug("Listener destructor close sock=%d", ListenSocket); #endif if (ListenSocket != INVALID_SOCKET) { int rc = close(ListenSocket); WARN_errno(rc == SOCKET_ERROR, "listener close"); } } // end ~Listener /* ------------------------------------------------------------------- * This is the main Listener thread loop, listens and accept new * connections and starts traffic threads * * Flow is * o) suspend on traffic done for single client case * o) hang a select() then accept() on the listener socket * o) read or, more accurately, peak the socket for initial messages * o) determine and set server's settings flags * o) instantiate new settings for listener's clients if needed * o) instantiate and bind sum and bidir report objects as needed * o) start the threads needed * * ------------------------------------------------------------------- */ void Listener::Run () { // mCount is set True if -P was passed to the server int mCount = ((mSettings->mThreads != 0) ? mSettings->mThreads : -1); // This is a listener launched by the client per -r or -d if (mSettings->clientListener) { SockAddr_remoteAddr(mSettings); } if (!isUDP(mSettings)) { // TCP needs just one listen my_listen(); // This will set ListenSocket to a new sock fd } bool mMode_Time = isServerModeTime(mSettings) && !isDaemon(mSettings); if (mMode_Time) { mEndTime.setnow(); mEndTime.add(mSettings->mAmount / 100.0); } else if (isPermitKey(mSettings) && (mSettings->mListenerTimeout > 0)) { mEndTime.setnow(); mEndTime.add(mSettings->mListenerTimeout); } Timestamp now; #define SINGLECLIENTDELAY_DURATION 50000 // units is microseconds while (!sInterupted && mCount) { #ifdef HAVE_THREAD_DEBUG thread_debug("Listener main loop port %d ", mSettings->mPort); #endif now.setnow(); if ((mMode_Time || (mSettings->mListenerTimeout > 0)) && mEndTime.before(now)) { #ifdef HAVE_THREAD_DEBUG thread_debug("Listener port %d (loop timer expired)", mSettings->mPort); #endif break; } // Serialize in the event the -1 option or --singleclient is set int tc; if ((isSingleClient(mSettings) || isMulticast(mSettings)) && \ mCount && (tc = (thread_numtrafficthreads()) > 0)) { // Start with a delay in the event some traffic // threads are pending to be scheduled and haven't // had a chance to update the traffic thread count. // An event system between listener thread and traffic threads // might better but also more complex. This delay // really should be good enough unless the os scheduler sucks delay_loop(SINGLECLIENTDELAY_DURATION); #ifdef HAVE_THREAD_DEBUG thread_debug("Listener single client loop mc/t/mcast/sc %d/%d/%d/%d",mCount, tc, isMulticast(mSettings), isSingleClient(mSettings)); #endif continue; } if (isUDP(mSettings)) { // UDP needs a new listen per every new socket my_listen(); // This will set ListenSocket to a new sock fd } // Use a select() with a timeout if -t is set or if this is a v1 -r or -d test fd_set set; if ((mMode_Time) || isCompat(mSettings) || isPermitKey(mSettings)) { // Hang a select w/timeout on the listener socket struct timeval timeout; if (!isPermitKey(mSettings)) { timeout.tv_sec = mSettings->mAmount / 100; timeout.tv_usec = (mSettings->mAmount % 100) * 10000; } else { timeout.tv_sec = static_cast<long>(mSettings->mListenerTimeout); timeout.tv_usec = (static_cast<long>(mSettings->mListenerTimeout) * 1000000) % 1000000; } if (isTxStartTime(mSettings)) { now.setnow(); long adjsecs = (mSettings->txstart_epoch.tv_sec - now.getSecs()); if (adjsecs > 0) timeout.tv_sec += adjsecs + 1; } FD_ZERO(&set); FD_SET(ListenSocket, &set); if (!(select(ListenSocket + 1, &set, NULL, NULL, &timeout) > 0)) { #ifdef HAVE_THREAD_DEBUG thread_debug("Listener select timeout"); #endif if (isCompat(mSettings)) { fprintf(stderr, "ERROR: expected reverse connect did not occur\n"); break; } else continue; } } if (!setsock_blocking(mSettings->mSock, true)) { WARN(1, "Failed setting socket to blocking mode"); } // Instantiate another settings object to be used by the server thread Settings_Copy(mSettings, &server, 1); FAIL(!server, "Failed memory allocation for server settings", mSettings); server->mThreadMode = kMode_Server; if (!isDataReport(mSettings)) setNoDataReport(server); // accept a new socket and assign it to the server thread int accept_sock = my_accept(server); if (!(accept_sock > 0)) { assert(server != mSettings); #ifdef HAVE_THREAD_DEBUG thread_debug("Listener thread accept fail %d", accept_sock); #endif Settings_Destroy(server); continue; } #ifdef HAVE_THREAD_DEBUG thread_debug("Listener thread accepted server sock=%d transferID", server->mSock, server->mTransferID); #endif // Decrement the -P counter, commonly usd to kill the listener // after one test, i.e. -s -P 1 if (mCount > 0) { mCount--; } // These are some exception cases where the accepted socket shouldn't have been // accepted but the accept() was first required to figure this out // // 1) When a client started the listener per -d or -r (but not --reverse.) // What's done here is to see if the server peer opening the // socket matches the expected peer per a compare of the ip addresses // For the case of a *client Listener* the server and host must match // Note: it's a good idea to prefer --reverse and full duplex socket vs this // -d,-r legacy approach. Still support it though in the name of legacy usage // // 2) The peer is using a V6 address but the listener/server didn't get -V (for v6) on // it's command line // if ((mSettings->clientListener && SockAddr_Hostare_Equal(&mSettings->peer, &server->peer)) || \ (!isIPV6(mSettings) && SockAddr_isIPv6(&server->peer))) { // Not allowed, reset things and restart the loop // Don't forget to delete the UDP entry (inserted in my_accept) Iperf_remove_host(server); if (DecrSumReportRefCounter(server->mSumReport) <= 0) { FreeSumReport(server->mSumReport); } if (!isUDP(server)) close(server->mSock); assert(server != mSettings); Settings_Destroy(server); continue; } // isCompat is a version 1.7 test, basically it indicates there is nothing // in the first messages so don't try to process them. Later iperf versions use // the first message to convey test request and test settings information. This flag // is also used for threads that are children so-to-speak, e.g. a -d or -r client, // which cannot have test flags otherwise there would be "test setup recursion" // Time to read the very first packet received (per UDP) or the test flags (TCP) // to get the client's requested test information. // Note 1: It's important to know that this will also populate mBuf with // enough information for the listener to perform test info exchange later in the code // Note 2: The mBuf read is a peek so the server's traffic thread started later // will also process the first message from an accounting perspective. // This is required for accurate traffic statistics if (!isCompat(server) && (isConnectOnly(server) || !apply_client_settings(server))) { if (isConnectionReport(server) && !isSumOnly(server)) { struct ReportHeader *reporthdr = InitConnectionReport(server, NULL); struct ConnectionInfo *cr = static_cast<struct ConnectionInfo *>(reporthdr->this_report); cr->connect_timestamp.tv_sec = server->accept_time.tv_sec; cr->connect_timestamp.tv_usec = server->accept_time.tv_usec; assert(reporthdr); PostReport(reporthdr); } Iperf_remove_host(server); if (DecrSumReportRefCounter(server->mSumReport) <= 0) { FreeSumReport(server->mSumReport); } close(server->mSock); assert(server != mSettings); Settings_Destroy(server); continue; } // server settings flags should now be set per the client's first message exchange // so the server setting's flags per the client can now be checked if (isUDP(server)){ if (!isCompat(mSettings) && !isTapDev(mSettings) && (isL2LengthCheck(mSettings) || isL2LengthCheck(server)) && !L2_setup(server, server->mSock)) { // Requested L2 testing but L2 setup failed Iperf_remove_host(server); if (DecrSumReportRefCounter(server->mSumReport) <= 0) { FreeSumReport(server->mSumReport); } assert(server != mSettings); Settings_Destroy(server); continue; } } // Force compat mode to use 64 bit seq numbers if (isUDP(server) && isCompat(mSettings)) { setSeqNo64b(server); } setTransferID(server, 0); // Read any more test settings and test values (not just the flags) and instantiate // any settings objects for client threads (e.g. bidir or full duplex) // This will set the listener_client_settings to NULL if // there is no need for the Listener to start a client // // Note: the packet payload pointer for this information has different // offsets per TCP or UDP. Basically, TCP starts at byte 0 but UDP // has to skip over the UDP seq no, etc. // if (!isCompat(server) && !isCompat(mSettings) && \ (isFullDuplex(server) || isServerReverse(server) || (server->mMode != kTest_Normal))) { thread_Settings *listener_client_settings = NULL; // read client header for reverse settings Settings_GenerateClientSettings(server, &listener_client_settings, server->mBuf); if (listener_client_settings) { if (server->mMode != kTest_Normal) listener_client_settings->mTransferID = 0; setTransferID(listener_client_settings, 1); if (isFullDuplex(listener_client_settings) || isReverse(listener_client_settings)) Iperf_push_host(listener_client_settings); if (isFullDuplex(server)) { assert(server->mSumReport != NULL); if (!server->mSumReport->sum_fd_set) { // Reset the sum output routine for the server sum report // now that it's know to be full duplex. This wasn't known // during accept() SetSumHandlers(server, server->mSumReport); server->mSumReport->sum_fd_set = 1; } server->mFullDuplexReport = InitSumReport(server, server->mSock, 1); listener_client_settings->mFullDuplexReport = server->mFullDuplexReport; #if HAVE_THREAD_DEBUG thread_debug("FullDuplex report client=%p/%p server=%p/%p", (void *) listener_client_settings, (void *) listener_client_settings->mFullDuplexReport, (void *) server, (void *) server->mFullDuplexReport); #endif server->runNow = listener_client_settings; } else if (server->mMode != kTest_Normal) { #if HAVE_THREAD_DEBUG thread_debug("V1 test (-d or -r) sum report client=%p/%p server=%p/%p", (void *) listener_client_settings, (void *) listener_client_settings->mFullDuplexReport, (void *) server, (void *) server->mFullDuplexReport); #endif if (listener_client_settings->mMode == kTest_DualTest) { #ifdef HAVE_THREAD server->runNow = listener_client_settings; #else server->runNext = listener_client_settings; #endif } else { server->runNext = listener_client_settings; } } } } setTransferID(server, 0); if (isConnectionReport(server) && !isSumOnly(server)) { struct ReportHeader *reporthdr = InitConnectionReport(server, NULL); struct ConnectionInfo *cr = static_cast<struct ConnectionInfo *>(reporthdr->this_report); cr->connect_timestamp.tv_sec = server->accept_time.tv_sec; cr->connect_timestamp.tv_usec = server->accept_time.tv_usec; assert(reporthdr); PostReport(reporthdr); } // Now start the server side traffic threads thread_start_all(server); } #ifdef HAVE_THREAD_DEBUG thread_debug("Listener exiting port/sig/threads %d/%d/%d", mSettings->mPort, sInterupted, mCount); #endif } // end Run /* ------------------------------------------------------------------- * Setup a socket listening on a port. * For TCP, this calls bind() and listen(). * For UDP, this just calls bind(). * If inLocalhost is not null, bind to that address rather than the * wildcard server address, specifying what incoming interface to * accept connections on. * ------------------------------------------------------------------- */ void Listener::my_listen () { int rc; int type; int domain; SockAddr_localAddr(mSettings); #if (((HAVE_TUNTAP_TUN) || (HAVE_TUNTAP_TAP)) && (AF_PACKET)) if (isTapDev(mSettings)) { ListenSocket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); FAIL_errno(ListenSocket == SOCKET_ERROR, "tuntap socket()", mSettings); mSettings->mSock = ListenSocket; rc = SockAddr_v4_Accept_BPF(ListenSocket, mSettings->mPort); WARN_errno((rc == SOCKET_ERROR), "tap accept bpf"); SetSocketOptions(mSettings); } else #endif { // create an AF_INET socket for the accepts // for the case of L2 testing and UDP, a new AF_PACKET // will be created to supercede this one type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM); domain = (SockAddr_isIPv6(&mSettings->local) ? #if HAVE_IPV6 AF_INET6 #else AF_INET #endif : AF_INET); #ifdef WIN32 if (SockAddr_isMulticast(&mSettings->local)) { // Multicast on Win32 requires special handling ListenSocket = WSASocket(domain, type, 0, 0, 0, WSA_FLAG_MULTIPOINT_C_LEAF | WSA_FLAG_MULTIPOINT_D_LEAF); WARN_errno(ListenSocket == INVALID_SOCKET, "socket"); } else #endif { ListenSocket = socket(domain, type, 0); WARN_errno(ListenSocket == INVALID_SOCKET, "socket"); } mSettings->mSock = ListenSocket; SetSocketOptions(mSettings); // reuse the address, so we can run if a former server was killed off int boolean = 1; Socklen_t len = sizeof(boolean); rc = setsockopt(ListenSocket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&boolean), len); // bind socket to server address #ifdef WIN32 if (SockAddr_isMulticast(&mSettings->local)) { // Multicast on Win32 requires special handling rc = WSAJoinLeaf(ListenSocket, (sockaddr*) &mSettings->local, mSettings->size_local,0,0,0,0,JL_BOTH); WARN_errno(rc == SOCKET_ERROR, "WSAJoinLeaf (aka bind)"); } else #endif { rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&mSettings->local), mSettings->size_local); FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings); } } // update the reporter thread if (isReport(mSettings) && isSettingsReport(mSettings)) { struct ReportHeader *report_settings = InitSettingsReport(mSettings); assert(report_settings != NULL); // disable future settings reports, listener should only do it once unsetReport(mSettings); PostReport(report_settings); } // listen for connections (TCP only). // use large (INT_MAX) backlog allowing multiple simultaneous connections if (!isUDP(mSettings)) { if (isSingleClient(mSettings) || isPermitKey(mSettings)) { rc = listen(ListenSocket, 0 + mSettings->mThreads); } else { rc = listen(ListenSocket, INT_MAX); } WARN_errno(rc == SOCKET_ERROR, "listen"); } else { #ifndef WIN32 // if UDP and multicast, join the group if (SockAddr_isMulticast(&mSettings->local)) { #ifdef HAVE_MULTICAST my_multicast_join(); #else fprintf(stderr, "Multicast not supported"); #endif // HAVE_MULTICAST } #endif } } // end my_listen() /* ------------------------------------------------------------------- * Joins the multicast group or source and group (SSM S,G) * * taken from: https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.1.0/com.ibm.zos.v2r1.hale001/ipv6d0141001708.htm * * Multicast function IPv4 IPv6 Protocol-independent * ================== ==== ==== ==================== * Level of specified option on setsockopt()/getsockopt() IPPROTO_IP IPPROTO_IPV6 IPPROTO_IP or IPPROTO_IPV6 * Join a multicast group IP_ADD_MEMBERSHIP IPV6_JOIN_GROUP MCAST_JOIN_GROUP * Leave a multicast group or leave all sources of that * multicast group IP_DROP_MEMBERSHIP IPV6_LEAVE_GROUP MCAST_LEAVE_GROUP * Select outbound interface for sending multicast datagrams IP_MULTICAST_IF IPV6_MULTICAST_IF NA * Set maximum hop count IP_MULTICAST_TTL IPV6_MULTICAST_HOPS NA * Enable multicast loopback IP_MULTICAST_LOOP IPV6_MULTICAST_LOOP NA * Join a source multicast group IP_ADD_SOURCE_MEMBERSHIP NA MCAST_JOIN_SOURCE_GROUP * Leave a source multicast group IP_DROP_SOURCE_MEMBERSHIP NA MCAST_LEAVE_SOURCE_GROUP * Block data from a source to a multicast group IP_BLOCK_SOURCE NA MCAST_BLOCK_SOURCE * Unblock a previously blocked source for a multicast group IP_UNBLOCK_SOURCE NA MCAST_UNBLOCK_SOURCE * * * Reminder: The os will decide which version of IGMP or MLD to use. This may be controlled by system settings, e.g.: * * [rmcmahon@lvnvdb0987:~/Code/ssm/iperf2-code] $ sysctl -a | grep mld | grep force * net.ipv6.conf.all.force_mld_version = 0 * net.ipv6.conf.default.force_mld_version = 0 * net.ipv6.conf.lo.force_mld_version = 0 * net.ipv6.conf.eth0.force_mld_version = 0 * * [rmcmahon@lvnvdb0987:~/Code/ssm/iperf2-code] $ sysctl -a | grep igmp | grep force * net.ipv4.conf.all.force_igmp_version = 0 * net.ipv4.conf.default.force_igmp_version = 0 * net.ipv4.conf.lo.force_igmp_version = 0 * net.ipv4.conf.eth0.force_igmp_version = 0 * * ------------------------------------------------------------------- */ void Listener::my_multicast_join () { // This is the older mulitcast join code. Both SSM and binding the // an interface requires the newer socket options. Using the older // code here will maintain compatiblity with previous iperf versions if (!isSSMMulticast(mSettings) && !mSettings->mIfrname) { if (!SockAddr_isIPv6(&mSettings->local)) { struct ip_mreq mreq; memcpy(&mreq.imr_multiaddr, SockAddr_get_in_addr(&mSettings->local), \ sizeof(mreq.imr_multiaddr)); mreq.imr_interface.s_addr = htonl(INADDR_ANY); int rc = setsockopt(ListenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast<char*>(&mreq), sizeof(mreq)); WARN_errno(rc == SOCKET_ERROR, "multicast join"); #if HAVE_DECL_IP_MULTICAST_ALL int mc_all = 0; rc = setsockopt(ListenSocket, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all)); WARN_errno(rc == SOCKET_ERROR, "ip_multicast_all"); #endif } else { #if (HAVE_IPV6 && HAVE_IPV6_MULTICAST && (HAVE_DECL_IPV6_JOIN_GROUP || HAVE_DECL_IPV6_ADD_MEMBERSHIP)) struct ipv6_mreq mreq; memcpy(&mreq.ipv6mr_multiaddr, SockAddr_get_in6_addr(&mSettings->local), sizeof(mreq.ipv6mr_multiaddr)); mreq.ipv6mr_interface = 0; #if HAVE_DECL_IPV6_JOIN_GROUP int rc = setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_JOIN_GROUP, \ reinterpret_cast<char*>(&mreq), sizeof(mreq)); #else int rc = setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, \ reinterpret_cast<char*>(&mreq), sizeof(mreq)); #endif FAIL_errno(rc == SOCKET_ERROR, "multicast v6 join", mSettings); #else fprintf(stderr, "IPv6 multicast is not supported on this platform\n"); #endif } } else { int rc; #ifdef HAVE_SSM_MULTICAST // Here it's either an SSM S,G multicast join or a *,G with an interface specifier // Use the newer socket options when these are specified socklen_t socklen = sizeof(struct sockaddr_storage); int iface=0; #ifdef HAVE_NET_IF_H /* Set the interface or any */ if (mSettings->mIfrname) { iface = if_nametoindex(mSettings->mIfrname); FAIL_errno(!iface, "mcast if_nametoindex",mSettings); } else { iface = 0; } #endif if (isIPV6(mSettings)) { #if HAVE_IPV6_MULTICAST if (mSettings->mSSMMulticastStr) { struct group_source_req group_source_req; struct sockaddr_in6 *group; struct sockaddr_in6 *source; memset(&group_source_req, 0, sizeof(struct group_source_req)); group_source_req.gsr_interface = iface; group=reinterpret_cast<struct sockaddr_in6*>(&group_source_req.gsr_group); source=reinterpret_cast<struct sockaddr_in6*>(&group_source_req.gsr_source); source->sin6_family = AF_INET6; group->sin6_family = AF_INET6; /* Set the group */ rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); FAIL_errno(rc == SOCKET_ERROR, "mcast join source group getsockname",mSettings); group->sin6_port = 0; /* Ignored */ /* Set the source, apply the S,G */ rc=inet_pton(AF_INET6, mSettings->mSSMMulticastStr,&source->sin6_addr); FAIL_errno(rc != 1, "mcast v6 join source group pton",mSettings); source->sin6_port = 0; /* Ignored */ #ifdef HAVE_STRUCT_SOCKADDR_IN6_SIN6_LEN source->sin6_len = group->sin6_len; #endif rc = -1; #if HAVE_DECL_MCAST_JOIN_SOURCE_GROUP rc = setsockopt(ListenSocket,IPPROTO_IPV6,MCAST_JOIN_SOURCE_GROUP, &group_source_req, sizeof(group_source_req)); #endif FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join source group",mSettings); } else { struct group_req group_req; struct sockaddr_in6 *group; memset(&group_req, 0, sizeof(struct group_req)); group_req.gr_interface = iface; group=reinterpret_cast<struct sockaddr_in6*>(&group_req.gr_group); group->sin6_family = AF_INET6; /* Set the group */ rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join group getsockname",mSettings); group->sin6_port = 0; /* Ignored */ rc = -1; #if HAVE_DECL_MCAST_JOIN_GROUP rc = setsockopt(ListenSocket,IPPROTO_IPV6,MCAST_JOIN_GROUP, &group_req, sizeof(group_source_req)); #endif FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join group",mSettings); } #else fprintf(stderr, "Unfortunately, IPv6 multicast is not supported on this platform\n"); #endif } else { if (mSettings->mSSMMulticastStr) { struct sockaddr_in *group; struct sockaddr_in *source; // Fill out both structures because we don't which one will succeed // and both may need to be tried #ifdef HAVE_STRUCT_IP_MREQ_SOURCE struct ip_mreq_source imr; memset (&imr, 0, sizeof (imr)); #endif #ifdef HAVE_STRUCT_GROUP_SOURCE_REQ struct group_source_req group_source_req; memset(&group_source_req, 0, sizeof(struct group_source_req)); group_source_req.gsr_interface = iface; group=reinterpret_cast<struct sockaddr_in*>(&group_source_req.gsr_group); source=reinterpret_cast<struct sockaddr_in*>(&group_source_req.gsr_source); #else struct sockaddr_in imrgroup; struct sockaddr_in imrsource; group = &imrgroup; source = &imrsource; #endif source->sin_family = AF_INET; group->sin_family = AF_INET; /* Set the group */ rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); FAIL_errno(rc == SOCKET_ERROR, "mcast join source group getsockname",mSettings); group->sin_port = 0; /* Ignored */ /* Set the source, apply the S,G */ rc=inet_pton(AF_INET,mSettings->mSSMMulticastStr,&source->sin_addr); FAIL_errno(rc != 1, "mcast join source pton",mSettings); #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN source->sin_len = group->sin_len; #endif source->sin_port = 0; /* Ignored */ rc = -1; #if HAVE_DECL_MCAST_JOIN_SOURCE_GROUP rc = setsockopt(ListenSocket,IPPROTO_IP,MCAST_JOIN_SOURCE_GROUP, &group_source_req, sizeof(group_source_req)); #endif #if HAVE_DECL_IP_ADD_SOURCE_MEMBERSHIP #ifdef HAVE_STRUCT_IP_MREQ_SOURCE // Some operating systems will have MCAST_JOIN_SOURCE_GROUP but still fail // In those cases try the IP_ADD_SOURCE_MEMBERSHIP if (rc < 0) { #ifdef HAVE_STRUCT_IP_MREQ_SOURCE_IMR_MULTIADDR_S_ADDR imr.imr_multiaddr = ((const struct sockaddr_in *)group)->sin_addr; imr.imr_sourceaddr = ((const struct sockaddr_in *)source)->sin_addr; #else // Some Android versions declare mreq_source without an s_addr imr.imr_multiaddr = ((const struct sockaddr_in *)group)->sin_addr.s_addr; imr.imr_sourceaddr = ((const struct sockaddr_in *)source)->sin_addr.s_addr; #endif rc = setsockopt (ListenSocket, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, reinterpret_cast<char*>(&imr), sizeof (imr)); } #endif #endif FAIL_errno(rc == SOCKET_ERROR, "mcast join source group",mSettings); } else { struct group_req group_req; struct sockaddr_in *group; memset(&group_req, 0, sizeof(struct group_req)); group_req.gr_interface = iface; group=reinterpret_cast<struct sockaddr_in*>(&group_req.gr_group); group->sin_family = AF_INET; /* Set the group */ rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); FAIL_errno(rc == SOCKET_ERROR, "mcast join group getsockname",mSettings); group->sin_port = 0; /* Ignored */ rc = -1; #if HAVE_DECL_MCAST_JOIN_GROUP rc = setsockopt(ListenSocket,IPPROTO_IP,MCAST_JOIN_GROUP, &group_req, sizeof(group_source_req)); #endif FAIL_errno(rc == SOCKET_ERROR, "mcast join group",mSettings); } } #else fprintf(stderr, "Unfortunately, SSM is not supported on this platform\n"); exit(-1); #endif } } // end my_multicast_join() bool Listener::L2_setup (thread_Settings *server, int sockfd) { #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET) // // Supporting parallel L2 UDP threads is a bit tricky. Below are some notes as to why and the approach used. // // The primary issues for UDP are: // // 1) We want the listener thread to hand off the flow to a server thread and not be burdened by that flow // 2) For -P support, the listener thread neads to detect new flows which will share the same UDP port // and UDP is stateless // // The listener thread needs to detect new traffic flows and hand them to a new server thread, and then // rehang a listen/accept. For standard iperf the "flow routing" is done using connect() per the ip quintuple. // The OS will then route established connected flows to the socket descriptor handled by a server thread and won't // burden the listener thread with these packets. // // For L2 verification, we have to create a two sockets that will exist for the life of the flow. A // new packet socket (AF_PACKET) will receive L2 frames and bypasses // the OS network stack. The original AF_INET socket will still send up packets // to the network stack. // // When using packet sockets there is inherent packet duplication, the hand off to a server // thread is not so straight forward as packets will continue being sent up to the listener thread // (technical problem is that packet sockets do not support connect() which binds the IP quintuple as the // forwarding key) Since the Listener uses recvfrom(), there is no OS mechanism to detect new flows nor // to drop packets. The listener can't listen on quintuple based connected flows because the client's source // port is unknown. Therefore the Listener thread will continue to receive packets from all established // flows sharing the same dst port which will impact CPU utilization and hence performance. // // The technique used to address this is to open an AF_PACKET socket and leave the AF_INET socket open. // (This also aligns with BSD based systems) The original AF_INET socket will remain in the (connected) // state so the network stack has it's connected state. A cBPF is then used to cause the kernel to fast drop // those packets. A cBPF is set up to drop such packets. The test traffic will then only come over the // packet (raw) socket and not the AF_INET socket. If we were to try to close the original AF_INET socket // (vs leave it open w/the fast drop cBPF) then the existing traffic will be sent up by the network stack // to he Listener thread, flooding it with packets, again something we want to avoid. // // On the packet (raw) socket itself, we do two more things to better handle performance // // 1) Use a full quintuple cBPF allowing the kernel to filter packets (allow) per the quintuple // 2) Use the packet fanout option to assign a CBPF to a socket and hence to a single server thread minimizing // duplication (reduce all cBPF's filtering load) // struct sockaddr *p = reinterpret_cast<sockaddr *>(&server->peer); struct sockaddr *l = reinterpret_cast<sockaddr *>(&server->local); int rc = 0; // // Establish a packet (raw) socket to be used by the server thread giving it full L2 packets // struct sockaddr s; socklen_t len = sizeof(s); getpeername(sockfd, &s, &len); if (isIPV6(server)) { server->mSock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_IPV6)); WARN_errno(server->mSock == INVALID_SOCKET, "ip6 packet socket (AF_PACKET)"); server->l4offset = IPV6HDRLEN + sizeof(struct ether_header); } else { server->mSock = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_IP)); WARN_errno(server->mSock == INVALID_SOCKET, "ip packet socket (AF_PACKET)"); unsetIPV6(server); server->l4offset = sizeof(struct iphdr) + sizeof(struct ether_header); } // Didn't get a valid socket, return now if (server->mSock < 0) { return false; } // More per thread settings based on using a packet socket server->l4payloadoffset = server->l4offset + sizeof(struct udphdr); server->recvflags = MSG_TRUNC; // The original AF_INET socket only exists to keep the connected state // in the OS for this flow. Fast drop packets there as // now packets will use the AF_PACKET (raw) socket // Also, store the original AF_INET socket descriptor so it can be // closed in the Server's destructor. (Note: closing the // socket descriptors will also free the cBPF.) // server->mSockDrop = sockfd; rc = SockAddr_Drop_All_BPF(sockfd); WARN_errno(rc == SOCKET_ERROR, "l2 all drop bpf"); // Now optimize packet flow up the raw socket // Establish the flow BPF to forward up only "connected" packets to this raw socket if (l->sa_family == AF_INET6) { #if HAVE_IPV6 struct in6_addr *v6peer = SockAddr_get_in6_addr(&server->peer); struct in6_addr *v6local = SockAddr_get_in6_addr(&server->local); if (isIPV6(server)) { rc = SockAddr_v6_Connect_BPF(server->mSock, v6local, v6peer, (reinterpret_cast<struct sockaddr_in6 *>(l))->sin6_port, (reinterpret_cast<struct sockaddr_in6 *>(p))->sin6_port); WARN_errno(rc == SOCKET_ERROR, "l2 connect ipv6 bpf"); } else { // This is an ipv4 address in a v6 family (structure), just pull the lower 32 bits for the v4 addr rc = SockAddr_v4_Connect_BPF(server->mSock, v6local->s6_addr32[3], v6peer->s6_addr32[3], (reinterpret_cast<struct sockaddr_in6 *>(l))->sin6_port, (reinterpret_cast<struct sockaddr_in6 *>(p))->sin6_port); WARN_errno(rc == SOCKET_ERROR, "l2 v4in6 connect ip bpf"); } #else fprintf(stderr, "Unfortunately, IPv6 is not supported on this platform\n"); return false; #endif /* HAVE_IPV6 */ } else { rc = SockAddr_v4_Connect_BPF(server->mSock, (reinterpret_cast<struct sockaddr_in *>(l))->sin_addr.s_addr, (reinterpret_cast<struct sockaddr_in *>(p))->sin_addr.s_addr, (reinterpret_cast<struct sockaddr_in *>(l))->sin_port, (reinterpret_cast<struct sockaddr_in *>(p))->sin_port); WARN_errno(rc == SOCKET_ERROR, "l2 connect ip bpf"); } return rc >= 0; #else fprintf(stderr, "Client requested --l2checks but not supported on this platform\n"); return false; #endif } bool Listener::tap_setup (thread_Settings *server, int sockfd) { #if defined(HAVE_IF_TUNTAP) && defined(HAVE_AF_PACKET) && defined(HAVE_DECL_SO_BINDTODEVICE) struct sockaddr *p = reinterpret_cast<sockaddr *>(&server->peer); struct sockaddr *l = reinterpret_cast<sockaddr *>(&server->local); int rc = 0; // // Establish a packet (raw) socket to be used by the server thread giving it full L2 packets // struct sockaddr s; socklen_t len = sizeof(s); getpeername(sockfd, &s, &len); if (isIPV6(server)) { server->l4offset = IPV6HDRLEN + sizeof(struct ether_header); } else { server->l4offset = sizeof(struct iphdr) + sizeof(struct ether_header); } // Didn't get a valid socket, return now if (server->mSock < 0) { return false; } // More per thread settings based on using a packet socket server->l4payloadoffset = server->l4offset + sizeof(struct udphdr); server->recvflags = MSG_TRUNC; // Now optimize packet flow up the raw socket // Establish the flow BPF to forward up only "connected" packets to this raw socket if (l->sa_family == AF_INET6) { #if HAVE_IPV6 struct in6_addr *v6peer = SockAddr_get_in6_addr(&server->peer); struct in6_addr *v6local = SockAddr_get_in6_addr(&server->local); if (isIPV6(server)) { rc = SockAddr_v6_Connect_BPF(server->mSock, v6local, v6peer, (reinterpret_cast<struct sockaddr_in6 *>(l))->sin6_port, (reinterpret_cast<struct sockaddr_in6 *>(p))->sin6_port); WARN_errno(rc == SOCKET_ERROR, "l2 connect ipv6 bpf"); } else { // This is an ipv4 address in a v6 family (structure), just pull the lower 32 bits for the v4 addr rc = SockAddr_v4_Connect_BPF(server->mSock, v6local->s6_addr32[3], v6peer->s6_addr32[3], (reinterpret_cast<struct sockaddr_in6 *>(l))->sin6_port, (reinterpret_cast<struct sockaddr_in6 *>(p))->sin6_port); WARN_errno(rc == SOCKET_ERROR, "l2 v4in6 connect ip bpf"); } #else fprintf(stderr, "Unfortunately, IPv6 is not supported on this platform\n"); return false; #endif /* HAVE_IPV6 */ } else { rc = SockAddr_v4_Connect_BPF(server->mSock, (reinterpret_cast<struct sockaddr_in *>(l))->sin_addr.s_addr, (reinterpret_cast<struct sockaddr_in *>(p))->sin_addr.s_addr, (reinterpret_cast<struct sockaddr_in *>(l))->sin_port, (reinterpret_cast<struct sockaddr_in *>(p))->sin_port); WARN_errno(rc == SOCKET_ERROR, "l2 connect ip bpf"); } return rc >= 0; #else fprintf(stderr, "Client requested --l2checks but not supported on this platform\n"); return false; #endif } /* ------------------------------------------------------------------------ * Do the equivalent of an accept() call for UDP sockets. This checks * a listening UDP socket for new or first received datagram * ------------------------------------------------------------------- ----*/ int Listener::udp_accept (thread_Settings *server) { assert(server != NULL); int nread = 0; assert(ListenSocket > 0); // Preset the server socket to INVALID, hang recvfrom on the Listener's socket // The INVALID socket is used to keep the while loop going server->mSock = INVALID_SOCKET; nread = recvfrom(ListenSocket, server->mBuf, server->mBufLen, 0, \ reinterpret_cast<struct sockaddr*>(&server->peer), &server->size_peer); Timestamp now; server->accept_time.tv_sec = now.getSecs(); server->accept_time.tv_usec = now.getUsecs(); #if HAVE_THREAD_DEBUG { char tmpaddr[200]; size_t len=200; unsigned short port = SockAddr_getPort(&server->peer); SockAddr_getHostAddress(&server->peer, tmpaddr, len); thread_debug("rcvfrom peer: %s port %d len=%d", tmpaddr, port, nread); } #endif FAIL_errno(nread == SOCKET_ERROR, "recvfrom", mSettings); if (!(nread < 0) && !sInterupted) { // Handle connection for UDP sockets int gid = Iperf_push_host_port_conditional(server); #if HAVE_THREAD_DEBUG if (gid < 0) thread_debug("rcvfrom peer: drop duplicate"); #endif if (gid > 0) { int rc; // We have a new UDP flow (based upon key of quintuple) // so let's hand off this socket // to the server and create a new listener socket server->mSock = ListenSocket; ListenSocket = INVALID_SOCKET; // This connect() will allow the OS to only // send packets with the ip quintuple up to the server // socket and, hence, to the server thread (yet to be created) // This connect() routing is only supported with AF_INET or AF_INET6 sockets, // e.g. AF_PACKET sockets can't do this. We'll handle packet sockets later // All UDP accepts here will use AF_INET. This is intentional and needed rc = connect(server->mSock, reinterpret_cast<struct sockaddr*>(&server->peer), server->size_peer); FAIL_errno(rc == SOCKET_ERROR, "connect UDP", mSettings); server->size_local = sizeof(iperf_sockaddr); getsockname(server->mSock, reinterpret_cast<sockaddr*>(&server->local), &server->size_local); SockAddr_Ifrname(server); server->firstreadbytes = nread; } } return server->mSock; } #if (((HAVE_TUNTAP_TUN) || (HAVE_TUNTAP_TAP)) && (AF_PACKET)) int Listener::tuntap_accept(thread_Settings *server) { int rc = recv(server->mSock, server->mBuf, (server->mBufLen + TAPBYTESSLOP + sizeof(struct iphdr) + sizeof(struct ether_header) + sizeof(struct udphdr)), 0); if (rc <= 0) return 0; // rc = udpchecksum((void *)ip_hdr, (void *)udp_hdr, udplen, (isIPV6(mSettings) ? 1 : 0)); struct iphdr *l3hdr = (struct iphdr *)((char *)server->mBuf + sizeof(struct ether_header)); struct udphdr *l4hdr = (struct udphdr *)((char *)server->mBuf + sizeof(struct iphdr) + sizeof(struct ether_header)); // uint16_t ipver = (uint16_t) ntohs(mBuf + sizeof(struct ether_header)); // printf ("*** version = %d\n", ipver); // Note: sockaddrs are stored in network bytes order struct sockaddr_in *local = (struct sockaddr_in *) &server->local; struct sockaddr_in *peer = (struct sockaddr_in *) &server->peer; server->size_peer = sizeof(iperf_sockaddr); server->size_local = sizeof(iperf_sockaddr); peer->sin_family = AF_INET; local->sin_family = AF_INET; peer->sin_addr.s_addr = l3hdr->saddr; local->sin_addr.s_addr = l3hdr->daddr; peer->sin_port = l4hdr->source; local->sin_port = l4hdr->dest; server->l4offset = sizeof(struct iphdr) + sizeof(struct ether_header); SockAddr_v4_Connect_TAP_BPF(server->mSock, local->sin_addr.s_addr, peer->sin_addr.s_addr, local->sin_port, peer->sin_port); server->l4payloadoffset = sizeof(struct iphdr) + sizeof(struct ether_header) + sizeof(struct udphdr); server->firstreadbytes = rc; return server->mSock; } #endif /* ------------------------------------------------------------------- * This is called by the Listener thread main loop, return a socket or error * ------------------------------------------------------------------- */ int Listener::my_accept (thread_Settings *server) { assert(server != NULL); #ifdef HAVE_THREAD_DEBUG if (isUDP(server)) { thread_debug("Listener thread listening for UDP (sock=%d)", ListenSocket); } else { thread_debug("Listener thread listening for TCP (sock=%d)", ListenSocket); } #endif SockAddr_zeroAddress(&server->peer); server->size_peer = sizeof(iperf_sockaddr); server->accept_time.tv_sec = 0; server->accept_time.tv_usec = 0; if (isUDP(server)) { #if (((HAVE_TUNTAP_TUN) || (HAVE_TUNTAP_TAP)) && (AF_PACKET)) if (isTapDev(server) || isTunDev(server)) { server->mSock = tuntap_accept(server); } else #endif { server->mSock = udp_accept(server); } // note udp_accept will update the active host table } else { // accept a TCP connection server->mSock = accept(ListenSocket, reinterpret_cast<sockaddr*>(&server->peer), &server->size_peer); if (server->mSock > 0) { Timestamp now; server->accept_time.tv_sec = now.getSecs(); server->accept_time.tv_usec = now.getUsecs(); server->size_local = sizeof(iperf_sockaddr); getsockname(server->mSock, reinterpret_cast<sockaddr*>(&server->local), &server->size_local); SockAddr_Ifrname(server); Iperf_push_host(server); } } return server->mSock; } // end my_accept // Read deep enough into the packet to get the client settings // Read the headers but don't pull them from the queue in order to // preserve server thread accounting, i.e. these exchanges will // be part of traffic accounting. Return false if it's determined // this traffic shouldn't be accepted for a test run // Description of bits and fields is in include/payloads.h bool Listener::apply_client_settings (thread_Settings *server) { assert(server != NULL); bool rc = false; // Set the receive timeout for the very first read int sorcvtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs SetSocketOptionsReceiveTimeout(server, sorcvtimer); server->peer_version_u = 0; server->peer_version_l = 0; server->mMode = kTest_Normal; if (isUDP(server)) { rc = apply_client_settings_udp(server); } else if (!isConnectOnly(server)) { rc = apply_client_settings_tcp(server); } if (isOverrideTOS(server)) { SetSocketOptionsIPTos(server, server->mRTOS); } else if (server->mTOS) { SetSocketOptionsIPTos(server, server->mTOS); } return rc; } inline bool Listener::test_permit_key(uint32_t flags, thread_Settings *server, int keyoffset) { if (!(flags & HEADER_KEYCHECK)) { server->mKeyCheck= false; return false; } struct permitKey *thiskey = reinterpret_cast<struct permitKey *>(server->mBuf + (keyoffset - sizeof(thiskey->length))); int keylen = ntohs(thiskey->length); if ((keylen < MIN_PERMITKEY_LEN) || (keylen > MAX_PERMITKEY_LEN)) { server->mKeyCheck= false; // fprintf(stderr, "REJECT: key length bounds error (%d)\n", keylen); return false; } if (keylen != static_cast<int>(strlen(server->mPermitKey))) { server->mKeyCheck= false; // fprintf(stderr, "REJECT: key length mismatch (%d!=%d)\n", keylen, (int) strlen(server->mPermitKey)); return false; } if (!isUDP(server)) { int nread = 0; nread = recvn(server->mSock, reinterpret_cast<char *>(&thiskey->value), keylen, 0); FAIL_errno((nread < (keyoffset + keylen)), "read key", server); } strncpy(server->mPermitKey, thiskey->value, MAX_PERMITKEY_LEN + 1); if (strncmp(server->mPermitKey, mSettings->mPermitKey, keylen) != 0) { server->mKeyCheck= false; // fprintf(stderr, "REJECT: key value mismatch per %s\n", thiskey->value); return false; } server->mKeyCheck= true; return true; } bool Listener::apply_client_settings_udp (thread_Settings *server) { struct client_udp_testhdr *hdr = reinterpret_cast<struct client_udp_testhdr *>(server->mBuf + server->l4payloadoffset); uint32_t flags = ntohl(hdr->base.flags); uint16_t upperflags = 0; if (flags & HEADER_SEQNO64B) { setSeqNo64b(server); } #if HAVE_THREAD_DEBUG thread_debug("UDP test flags = %X", flags); #endif if (flags & HEADER32_SMALL_TRIPTIMES) { #if HAVE_THREAD_DEBUG thread_debug("UDP small header"); #endif server->sent_time.tv_sec = ntohl(hdr->seqno_ts.tv_sec); server->sent_time.tv_usec = ntohl(hdr->seqno_ts.tv_usec); uint32_t seqno = ntohl(hdr->seqno_ts.id); if (seqno != 1) { fprintf(stderr, "WARN: first received packet (id=%d) was not first sent packet, report start time will be off\n", seqno); } setTripTime(server); setEnhanced(server); } else if ((flags & HEADER_VERSION1) || (flags & HEADER_VERSION2) || (flags & HEADER_EXTEND)) { if ((flags & HEADER_VERSION1) && !(flags & HEADER_VERSION2)) { if (flags & RUN_NOW) server->mMode = kTest_DualTest; else server->mMode = kTest_TradeOff; } if (flags & HEADER_EXTEND) { upperflags = htons(hdr->extend.upperflags); server->mTOS = ntohs(hdr->extend.tos); server->peer_version_u = ntohl(hdr->extend.version_u); server->peer_version_l = ntohl(hdr->extend.version_l); if (flags & HEADER_UDPTESTS) { // Handle stateless flags if (upperflags & HEADER_ISOCH) { setIsochronous(server); } if (upperflags & HEADER_L2ETHPIPV6) { setIPV6(server); } else { unsetIPV6(server); } if (upperflags & HEADER_L2LENCHECK) { setL2LengthCheck(server); } if (upperflags & HEADER_NOUDPFIN) { setNoUDPfin(server); } } if (upperflags & HEADER_EPOCH_START) { server->txstart_epoch.tv_sec = ntohl(hdr->start_fq.start_tv_sec); server->txstart_epoch.tv_usec = ntohl(hdr->start_fq.start_tv_usec); Timestamp now; if ((abs(now.getSecs() - server->txstart_epoch.tv_sec)) > (MAXDIFFTXSTART + 1)) { fprintf(stdout,"WARN: ignore --txstart-time because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTXSTART); unsetTxStartTime(server); } else { setTxStartTime(server); } } if (upperflags & HEADER_TRIPTIME) { server->sent_time.tv_sec = ntohl(hdr->start_fq.start_tv_sec); server->sent_time.tv_usec = ntohl(hdr->start_fq.start_tv_usec); Timestamp now; if (!isTxStartTime(server) && ((abs(now.getSecs() - server->sent_time.tv_sec)) > (MAXDIFFTIMESTAMPSECS + 1))) { fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTIMESTAMPSECS); } else { setTripTime(server); setEnhanced(server); } } } if (flags & HEADER_VERSION2) { upperflags = htons(hdr->extend.upperflags); if (upperflags & HEADER_FULLDUPLEX) { setFullDuplex(server); setServerReverse(server); } if (upperflags & HEADER_REVERSE) { server->mThreadMode=kMode_Client; setServerReverse(server); setNoUDPfin(server); unsetReport(server); } } } return true; } bool Listener::apply_client_settings_tcp (thread_Settings *server) { bool rc = false; int nread = recvn(server->mSock, server->mBuf, sizeof(uint32_t), 0); char *readptr = server->mBuf; if (nread == 0) { //peer closed the socket, with no writes e.g. a connect-only test WARN(1, "read tcp flags (peer close)"); goto DONE; } if (nread < (int) sizeof(uint32_t)) { WARN(1, "read tcp flags (runt)"); goto DONE; } else { rc = true; readptr += nread; struct client_tcp_testhdr *hdr = reinterpret_cast<struct client_tcp_testhdr *>(server->mBuf); uint32_t flags = ntohl(hdr->base.flags); if (flags & HEADER_BOUNCEBACK) { setBounceBack(server); } uint16_t upperflags = 0; int readlen; // figure out the length of the test header if ((readlen = Settings_ClientTestHdrLen(flags, server)) > 0) { // read the test settings passed to the server by the client nread += recvn(server->mSock, readptr, (readlen - (int) sizeof(uint32_t)), 0); FAIL_errno((nread < readlen), "read tcp test info", server); if (isPermitKey(mSettings)) { if (!test_permit_key(flags, server, readlen)) { rc = false; goto DONE; } } else if (flags & HEADER_KEYCHECK) { rc = false; server->mKeyCheck = false; goto DONE; } server->firstreadbytes = nread; struct client_tcp_testhdr *hdr = reinterpret_cast<struct client_tcp_testhdr*>(server->mBuf); if ((flags & HEADER_VERSION1) && !(flags & HEADER_VERSION2)) { if (flags & RUN_NOW) server->mMode = kTest_DualTest; else server->mMode = kTest_TradeOff; } if (flags & HEADER_EXTEND) { upperflags = htons(hdr->extend.upperflags); server->mTOS = ntohs(hdr->extend.tos); server->peer_version_u = ntohl(hdr->extend.version_u); server->peer_version_l = ntohl(hdr->extend.version_l); if (upperflags & HEADER_ISOCH) { setIsochronous(server); } if (upperflags & HEADER_EPOCH_START) { server->txstart_epoch.tv_sec = ntohl(hdr->start_fq.start_tv_sec); server->txstart_epoch.tv_usec = ntohl(hdr->start_fq.start_tv_usec); Timestamp now; if ((abs(now.getSecs() - server->txstart_epoch.tv_sec)) > (MAXDIFFTXSTART + 1)) { fprintf(stdout,"WARN: ignore --txstart-time because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTXSTART); unsetTxStartTime(server); } else { setTxStartTime(server); } } if (upperflags & HEADER_TRIPTIME) { Timestamp now; server->sent_time.tv_sec = ntohl(hdr->start_fq.start_tv_sec); server->sent_time.tv_usec = ntohl(hdr->start_fq.start_tv_usec); if (!isTxStartTime(server) && ((abs(now.getSecs() - server->sent_time.tv_sec)) > (MAXDIFFTIMESTAMPSECS + 1))) { fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTIMESTAMPSECS); } else { setTripTime(server); setEnhanced(server); } } if (upperflags & HEADER_PERIODICBURST) { setEnhanced(server); setFrameInterval(server); setPeriodicBurst(server); { struct client_tcp_testhdr *hdr = reinterpret_cast<struct client_tcp_testhdr *>(server->mBuf); server->mFPS = ntohl(hdr->isoch_settings.FPSl); server->mFPS += ntohl(hdr->isoch_settings.FPSu) / static_cast<double>(rMillion); } if (!server->mFPS) { server->mFPS = 1.0; } } if (flags & HEADER_VERSION2) { if (upperflags & HEADER_FULLDUPLEX) { setFullDuplex(server); setServerReverse(server); } if (upperflags & HEADER_REVERSE) { server->mThreadMode=kMode_Client; setServerReverse(server); } } #if HAVE_DECL_TCP_NOTSENT_LOWAT if ((isServerReverse(server) || isFullDuplex(server)) && (upperflags & HEADER_WRITEPREFETCH)) { server->mWritePrefetch = ntohl(hdr->extend.TCPWritePrefetch); if (server->mWritePrefetch > 0) { setWritePrefetch(server); } } #endif if (upperflags & HEADER_BOUNCEBACK) { setBounceBack(server); } } } // Handle case that requires an ack back to the client // Signaled by not UDP (only supported by TCP) // and either 2.0.13 flags or the newer 2.0.14 flag of // V2PEERDETECT if (!isUDP(server) && !isCompat(mSettings) && \ ((!(flags & HEADER_VERSION2) && (flags & HEADER_EXTEND)) || \ (flags & HEADER_V2PEERDETECT))) { client_test_ack(server); } } DONE: return rc; } int Listener::client_test_ack(thread_Settings *server) { if (isUDP(server)) return 1; client_hdr_ack ack; int sotimer = 0; int size = sizeof(struct client_hdr_ack); ack.typelen.type = htonl(CLIENTHDRACK); ack.flags = 0; ack.reserved1 = 0; ack.reserved2 = 0; ack.version_u = htonl(IPERF_VERSION_MAJORHEX); ack.version_l = htonl(IPERF_VERSION_MINORHEX); if (isTripTime(server)) { ack.ts.sent_tv_sec = htonl(server->sent_time.tv_sec); ack.ts.sent_tv_usec = htonl(server->sent_time.tv_usec); ack.ts.sentrx_tv_sec = htonl(server->accept_time.tv_sec); ack.ts.sentrx_tv_usec = htonl(server->accept_time.tv_usec); Timestamp now; ack.ts.ack_tv_sec = htonl(now.getSecs()); ack.ts.ack_tv_usec = htonl(now.getUsecs()); } else { size -= sizeof (struct client_hdr_ack_ts); } ack.typelen.length = htonl(size); int rc = 1; // This is a version 2.0.10 or greater client // write back to the client so it knows the server // version // sotimer units microseconds convert if (server->mInterval) { sotimer = static_cast<int>((server->mInterval) / 4); } else if (isModeTime(server)) { sotimer = static_cast<int>((server->mAmount * 10000) / 4); } if (sotimer > HDRXACKMAX) { sotimer = HDRXACKMAX; } else if (sotimer < HDRXACKMIN) { sotimer = HDRXACKMIN; } SetSocketOptionsSendTimeout(server, sotimer); #if HAVE_DECL_TCP_NODELAY int optflag = 1; // Disable Nagle to reduce latency of this intial message if ((rc = setsockopt(server->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&optflag), sizeof(int))) < 0) { WARN_errno(rc < 0, "tcpnodelay"); } #endif if ((rc = send(server->mSock, reinterpret_cast<const char*>(&ack), size, 0)) < 0) { WARN_errno(rc <= 0, "send_ack"); rc = 0; } #if HAVE_DECL_TCP_NODELAY // Re-nable Nagle optflag = isNoDelay(server) ? 1 : 0; if (!isUDP(server) && (rc = setsockopt(server->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&optflag), sizeof(int))) < 0) { WARN_errno(rc < 0, "tcpnodelay"); } #endif return rc; }