/*---------------------------------------------------------------
 * Copyright (c) 1999,2000,2001,2002,2003
 * The Board of Trustees of the University of Illinois
 * All Rights Reserved.
 *---------------------------------------------------------------
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software (Iperf) and associated
 * documentation files (the "Software"), to deal in the Software
 * without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or
 * sell copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following
 * conditions:
 *
 *
 * Redistributions of source code must retain the above
 * copyright notice, this list of conditions and
 * the following disclaimers.
 *
 *
 * Redistributions in binary form must reproduce the above
 * copyright notice, this list of conditions and the following
 * disclaimers in the documentation and/or other materials
 * provided with the distribution.
 *
 *
 * Neither the names of the University of Illinois, NCSA,
 * nor the names of its contributors may be used to endorse
 * or promote products derived from this Software without
 * specific prior written permission.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 * ________________________________________________________________
 * National Laboratory for Applied Network Research
 * National Center for Supercomputing Applications
 * University of Illinois at Urbana-Champaign
 * http://www.ncsa.uiuc.edu
 * ________________________________________________________________
 *
 * Reporter.c
 * by Kevin Gibbs <kgibbs@nlanr.net>
 *
 * Major rewrite by Robert McMahon (Sept 2020, ver 2.0.14)
 * ________________________________________________________________ */

#include <math.h>
#include "headers.h"
#include "Settings.hpp"
#include "util.h"
#include "Reporter.h"
#include "Thread.h"
#include "Locale.h"
#include "PerfSocket.hpp"
#include "SocketAddr.h"
#include "histogram.h"
#include "delay.h"
#include "packet_ring.h"
#include "payloads.h"
#include "gettcpinfo.h"

#ifdef __cplusplus
extern "C" {
#endif

#ifndef INITIAL_PACKETID
# define INITIAL_PACKETID 0
#endif

struct ReportHeader *ReportRoot = NULL;
struct ReportHeader *ReportPendingHead = NULL;
struct ReportHeader *ReportPendingTail = NULL;

// Reporter's reset of stats after a print occurs
static void reporter_reset_transfer_stats_client_tcp(struct TransferInfo *stats);
static void reporter_reset_transfer_stats_client_udp(struct TransferInfo *stats);
static void reporter_reset_transfer_stats_server_udp(struct TransferInfo *stats);
static void reporter_reset_transfer_stats_server_tcp(struct TransferInfo *stats);

static void reporter_mmm_update (struct MeanMinMaxStats *stats, double value);

#if HAVE_TCP_STATS
static inline void reporter_handle_packet_tcpistats(struct ReporterData *data, struct ReportStruct *packet);
#endif
static struct ConnectionInfo *myConnectionReport;

void PostReport (struct ReportHeader *reporthdr) {
#ifdef HAVE_THREAD_DEBUG
    char rs[REPORTTXTMAX];
    reporttype_text(reporthdr, &rs[0]);
    thread_debug("Jobq *POST* report %p (%s)", reporthdr, &rs[0]);
#endif
    if (reporthdr) {
#ifdef HAVE_THREAD
	/*
	 * Update the ReportRoot to include this report.
	 */
	Condition_Lock(ReportCond);
	reporthdr->next = NULL;
	if (!ReportPendingHead) {
	  ReportPendingHead = reporthdr;
	  ReportPendingTail = reporthdr;
	} else {
	  ReportPendingTail->next = reporthdr;
	  ReportPendingTail = reporthdr;
	}
	Condition_Unlock(ReportCond);
	// wake up the reporter thread
	Condition_Signal(&ReportCond);
#else
	/*
	 * Process the report in this thread
	 */
	reporthdr->next = NULL;
	reporter_process_report(reporthdr);
#endif
    }
}
/*
 * ReportPacket is called by a transfer agent to record
 * the arrival or departure of a "packet" (for TCP it
 * will actually represent many packets). This needs to
 * be as simple and fast as possible as it gets called for
 * every "packet".
 *
 * Returns true when the tcpinfo was sampled, false ohterwise
 */
bool ReportPacket (struct ReporterData* data, struct ReportStruct *packet) {
    assert(data != NULL);

    bool rc = false;
  #ifdef HAVE_THREAD_DEBUG
    if (packet->packetID < 0) {
	thread_debug("Reporting last packet for %p  qdepth=%d sock=%d", (void *) data, packetring_getcount(data->packetring), data->info.common->socket);
    }
  #endif
#if HAVE_TCP_STATS
    struct TransferInfo *stats = &data->info;
    if (stats->isEnableTcpInfo) {
	if (!TimeZero(stats->ts.nextTCPStampleTime) && (TimeDifference(stats->ts.nextTCPStampleTime, packet->packetTime) < 0)) {
	    gettcpinfo(data->info.common->socket, packet);
	    TimeAdd(stats->ts.nextTCPStampleTime, stats->ts.intervalTime);
	} else {
	    gettcpinfo(data->info.common->socket, packet);
	}
    }
#endif

    // Note for threaded operation all that needs
    // to be done is to enqueue the packet data
    // into the ring.
    packetring_enqueue(data->packetring, packet);
    // The traffic thread calls the reporting process
    // directly forr non-threaded operation
    // These defeats the puropse of separating
    // traffic i/o from user i/o and really
    // should be avoided.
  #ifdef HAVE_THREAD
    // bypass the reporter thread here for single UDP
    if (isSingleUDP(data->info.common))
        reporter_process_transfer_report(data);
  #else
    /*
     * Process the report in this thread
     */
    reporter_process_transfer_report(data);
  #endif

    return rc;
}

/*
 * EndJob is called by a traffic thread to inform the reporter
 * thread to print a final report and to remove the data report from its jobq.
 * It also handles the freeing reports and other closing actions
 */
int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) {
    assert(reporthdr!=NULL);
    assert(finalpacket!=NULL);
    struct ReporterData *report = (struct ReporterData *) reporthdr->this_report;
    struct ReportStruct packet;

    memset(&packet, 0, sizeof(struct ReportStruct));
    int do_close = 1;
    /*
     * Using PacketID of -1 ends reporting
     * It pushes a "special packet" through
     * the packet ring which will be detected
     * by the reporter thread as and end of traffic
     * event
     */
#if HAVE_TCP_STATS
    // tcpi stats are sampled on a final packet
    struct TransferInfo *stats = &report->info;
    if (stats->isEnableTcpInfo) {
	gettcpinfo(report->info.common->socket, finalpacket);
    }
#endif
    // clear the reporter done predicate
    report->packetring->consumerdone = 0;
    // the negative packetID is used to inform the report thread this traffic thread is done
    packet.packetID = -1;
    packet.packetLen = finalpacket->packetLen;
    packet.packetTime = finalpacket->packetTime;
    if (isSingleUDP(report->info.common)) {
	packetring_enqueue(report->packetring, &packet);
	reporter_process_transfer_report(report);
    } else {
	ReportPacket(report, &packet);
#ifdef HAVE_THREAD_DEBUG
	thread_debug( "Traffic thread awaiting reporter to be done with %p and cond %p", (void *)report, (void *) report->packetring->awake_producer);
#endif
	Condition_Lock((*(report->packetring->awake_producer)));
	while (!report->packetring->consumerdone) {
	    // This wait time is the lag between the reporter thread
	    // and the traffic thread, a reporter thread with lots of
	    // reports (e.g. fastsampling) can lag per the i/o
	    Condition_TimedWait(report->packetring->awake_producer, 1);
	    // printf("Consumer done may be stuck\n");
	}
	Condition_Unlock((*(report->packetring->awake_producer)));
    }
    if (report->FullDuplexReport && isFullDuplex(report->FullDuplexReport->info.common)) {
	if (fullduplex_stop_barrier(&report->FullDuplexReport->fullduplex_barrier)) {
	    struct Condition *tmp = &report->FullDuplexReport->fullduplex_barrier.await;
	    Condition_Destroy(tmp);
#if HAVE_THREAD_DEBUG
	    thread_debug("Socket fullduplex close sock=%d", report->FullDuplexReport->info.common->socket);
#endif
	    FreeSumReport(report->FullDuplexReport);
	} else {
	    do_close = 0;
	}
    }
    return do_close;
}

//  This is used to determine the packet/cpu load into the reporter thread
//  If the overall reporter load is too low, add some yield
//  or delay so the traffic threads can fill the packet rings
#define MINPACKETDEPTH 10
#define MINPERQUEUEDEPTH 20
#define REPORTERDELAY_DURATION 16000 // units is microseconds
struct ConsumptionDetectorType {
    int accounted_packets;
    int accounted_packet_threads;
    int reporter_thread_suspends ;
};
struct ConsumptionDetectorType consumption_detector = \
  {.accounted_packets = 0, .accounted_packet_threads = 0, .reporter_thread_suspends = 0};

static inline void reset_consumption_detector (void) {
    consumption_detector.accounted_packet_threads = thread_numtrafficthreads();
    if ((consumption_detector.accounted_packets = thread_numtrafficthreads() * MINPERQUEUEDEPTH) <= MINPACKETDEPTH) {
	consumption_detector.accounted_packets = MINPACKETDEPTH;
    }
}
static inline void apply_consumption_detector (void) {
    if (--consumption_detector.accounted_packet_threads <= 0) {
	// All active threads have been processed for the loop,
	// reset the thread counter and check the consumption rate
	// If the rate is too low add some delay to the reporter
	consumption_detector.accounted_packet_threads = thread_numtrafficthreads();
	// Check to see if we need to suspend the reporter
	if (consumption_detector.accounted_packets > 0) {
	    /*
	     * Suspend the reporter thread for some (e.g. 4) milliseconds
	     *
	     * This allows the thread to receive client or server threads'
	     * packet events in "aggregates."  This can reduce context
	     * switching allowing for better CPU utilization,
	     * which is very noticble on CPU constrained systems.
	     */
	    delay_loop(REPORTERDELAY_DURATION);
	    consumption_detector.reporter_thread_suspends++;
	    // printf("DEBUG: forced reporter suspend, accounted=%d,  queueue depth after = %d\n", accounted_packets, getcount_packetring(reporthdr));
	} else {
	    // printf("DEBUG: no suspend, accounted=%d,  queueue depth after = %d\n", accounted_packets, getcount_packetring(reporthdr));
	}
	reset_consumption_detector();
    }
}

#ifdef HAVE_THREAD_DEBUG
static void reporter_jobq_dump(void) {
  thread_debug("reporter thread job queue request lock");
  Condition_Lock(ReportCond);
  struct ReportHeader *itr = ReportRoot;
  while (itr) {
    thread_debug("Job in queue %p",(void *) itr);
    itr = itr->next;
  }
  Condition_Unlock(ReportCond);
  thread_debug("reporter thread job queue unlock");
}
#endif


/* Concatenate pending reports and return the head */
static inline struct ReportHeader *reporter_jobq_set_root (struct thread_Settings *inSettings) {
    struct ReportHeader *root = NULL;
    Condition_Lock(ReportCond);
    // check the jobq for empty
    if (ReportRoot == NULL) {
	// The reporter is starting from an empty state
	// so set the load detect to trigger an initial delay
        if (!isSingleUDP(inSettings)) {
	    reset_consumption_detector();
	    reporter_default_heading_flags((inSettings->mReportMode == kReport_CSV));
        }
	// Only hang the timed wait if more than this thread is active
	if (!ReportPendingHead && (thread_numuserthreads() > 1)) {
	    Condition_TimedWait(&ReportCond, 1);
#ifdef HAVE_THREAD_DEBUG
	    thread_debug( "Jobq *WAIT* exit  %p/%p cond=%p threads u/t=%d/%d", \
			  (void *) ReportRoot, (void *) ReportPendingHead, \
			  (void *) &ReportCond, thread_numuserthreads(), thread_numtrafficthreads());
#endif
	}
    }
    // update the jobq per pending reports
    if (ReportPendingHead) {
	ReportPendingTail->next = ReportRoot;
	ReportRoot = ReportPendingHead;
#ifdef HAVE_THREAD_DEBUG
	thread_debug( "Jobq *ROOT* %p (last=%p)", \
		      (void *) ReportRoot, (void * ) ReportPendingTail->next);
#endif
	ReportPendingHead = NULL;
	ReportPendingTail = NULL;
    }
    root = ReportRoot;
    Condition_Unlock(ReportCond);
    return root;
}
/*
 * Welford's online algorithm
 *
 * # For a new value newValue, compute the new count, new mean, the new M2.
 * # mean accumulates the mean of the entire dataset
 * # M2 aggregates the squared distance from the mean
 * # count aggregates the number of samples seen so far
 * def update(existingAggregate, newValue):
 *   (count, mean, M2) = existingAggregate
 *   count += 1
 *   delta = newValue - mean
 *   mean += delta / count
 *   delta2 = newValue - mean
 *   M2 += delta * delta2
 *   return (count, mean, M2)
 *
 * # Retrieve the mean, variance and sample variance from an aggregate
 * def finalize(existingAggregate):
 *   (count, mean, M2) = existingAggregate
 *   if count < 2:
 *       return float("nan")
 *   else:
 *       (mean, variance, sampleVariance) = (mean, M2 / count, M2 / (count - 1))
 *       return (mean, variance, sampleVariance)
 *
 */
static void reporter_mmm_update (struct MeanMinMaxStats *stats, double value) {
    assert(stats != NULL);
    stats->cnt++;
    if (stats->cnt == 1) {
	// Very first entry
	stats->min = value;
	stats->max = value;
	stats->sum = value;
	stats->vd = value;
	stats->mean = value;
	stats->m2 = 0;
	stats->sum = value;
    } else {
	stats->sum += value;
	stats->vd = value - stats->mean;
	stats->mean += (stats->vd / stats->cnt);
	stats->m2 += stats->vd * (value - stats->mean);
//	printf("*****m2=%f, mmm = %f/%f\n", stats->m2, stats->vd, (value - stats->mean));
	// mean min max tests
	if (value < stats->min)
	    stats->min = value;
	if (value > stats->max)
	    stats->max = value;
    }
//    printf("*****val=%f, mmm = %d/%f/%f/%f/%f/%f/%f\n", value, stats->cnt, stats->sum, stats->vd, stats->mean, stats->m2, stats->min, stats->max);
}

/*
 * This function is the loop that the reporter thread processes
 */
void reporter_spawn (struct thread_Settings *thread) {
#ifdef HAVE_THREAD_DEBUG
    thread_debug( "Reporter thread started");
#endif
    myConnectionReport = InitConnectOnlyReport(thread);
    /*
     * reporter main loop needs to wait on all threads being started
     */
    Condition_Lock(threads_start.await);
    while (!threads_start.ready) {
	Condition_TimedWait(&threads_start.await, 1);
    }
    Condition_Unlock(threads_start.await);
#ifdef HAVE_THREAD_DEBUG
    thread_debug( "Reporter await done");
#endif

    //
    // Signal to other (client) threads that the
    // reporter is now running.
    //
    Condition_Lock(reporter_state.await);
    reporter_state.ready = 1;
    Condition_Unlock(reporter_state.await);
    Condition_Broadcast(&reporter_state.await);
#if HAVE_SCHED_SETSCHEDULER
    // set reporter thread to realtime if requested
    thread_setscheduler(thread);
#endif
    /*
     * Keep the reporter thread alive under the following conditions
     *
     * o) There are more reports to output, ReportRoot has a report
     * o) The number of threads is greater than one which indicates
     *    either traffic threads are still running or a Listener thread
     *    is running. If equal to 1 then only the reporter thread is alive
     */
    while ((reporter_jobq_set_root(thread) != NULL) || (thread_numuserthreads() > 1)){
#ifdef HAVE_THREAD_DEBUG
	// thread_debug( "Jobq *HEAD* %p (%d)", (void *) ReportRoot, thread_numuserthreads());
#endif
	if (ReportRoot) {
	    // https://blog.kloetzl.info/beautiful-code/
	    // Linked list removal/processing is derived from:
	    //
	    // remove_list_entry(entry) {
	    //     indirect = &head;
	    //     while ((*indirect) != entry) {
	    //	       indirect = &(*indirect)->next;
	    //     }
	    //     *indirect = entry->next
	    // }
	    struct ReportHeader **work_item = &ReportRoot;
	    while (*work_item) {
#ifdef HAVE_THREAD_DEBUG
		// thread_debug( "Jobq *NEXT* %p", (void *) *work_item);
#endif
		// Report process report returns true
		// when a report needs to be removed
		// from the jobq.  Also, work item might
		// be removed as part of processing
		// Store a cached pointer for the linked list maitenance
		struct ReportHeader *tmp = (*work_item)->next;
	        if (reporter_process_report(*work_item)) {
#ifdef HAVE_THREAD_DEBUG
		  thread_debug("Jobq *REMOVE* %p", (void *) (*work_item));
#endif
		    // memory for *work_item is gone by now
		    *work_item = tmp;
		    if (!tmp)
			break;
		}
		work_item = &(*work_item)->next;
	    }
	}
    }
    if (myConnectionReport) {
	if (myConnectionReport->connect_times.cnt > 1) {
	    reporter_connect_printf_tcp_final(myConnectionReport);
	}
	FreeConnectionReport(myConnectionReport);
    }
#ifdef HAVE_THREAD_DEBUG
    if (sInterupted)
        reporter_jobq_dump();
    thread_debug("Reporter thread finished user/traffic %d/%d", thread_numuserthreads(), thread_numtrafficthreads());
#endif
}

// The Transfer or Data report is by far the most complicated report
int reporter_process_transfer_report (struct ReporterData *this_ireport) {
    assert(this_ireport != NULL);
    struct TransferInfo *sumstats = (this_ireport->GroupSumReport ? &this_ireport->GroupSumReport->info : NULL);
    struct TransferInfo *fullduplexstats = (this_ireport->FullDuplexReport ? &this_ireport->FullDuplexReport->info : NULL);
    int need_free = 0;
    // The consumption detector applies delay to the reporter
    // thread when its consumption rate is too low.   This allows
    // the traffic threads to send aggregates vs thrash
    // the packet rings.  The dissimilarity between the thread
    // speeds is due to the performance differences between i/o
    // bound threads vs cpu bound ones, and it's expected
    // that reporter thread being CPU limited should be much
    // faster than the traffic threads, even in aggregate.
    // Note: If this detection is not going off it means
    // the system is likely CPU bound and iperf is now likely
    // becoming a CPU bound test vs a network i/o bound test
    if (!isSingleUDP(this_ireport->info.common))
	apply_consumption_detector();
    // If there are more packets to process then handle them
    struct ReportStruct *packet = NULL;
    int advance_jobq = 0;
    while (!advance_jobq && (packet = packetring_dequeue(this_ireport->packetring))) {
	// Increment the total packet count processed by this thread
	// this will be used to make decisions on if the reporter
	// thread should add some delay to eliminate cpu thread
	// thrashing,
	consumption_detector.accounted_packets--;
	// Check against a final packet event on this packet ring
#if HAVE_TCP_STATS
	if (this_ireport->info.isEnableTcpInfo && packet->tcpstats.isValid) {
	    reporter_handle_packet_tcpistats(this_ireport, packet);
	}
#endif
	if (!(packet->packetID < 0)) {
	    // Check to output any interval reports,
            // bursts need to report the packet first
	    if (this_ireport->packet_handler_pre_report) {
		(*this_ireport->packet_handler_pre_report)(this_ireport, packet);
	    }
	    if (this_ireport->transfer_interval_handler) {
		advance_jobq = (*this_ireport->transfer_interval_handler)(this_ireport, packet);
	    }
	    if (this_ireport->packet_handler_post_report) {
		(*this_ireport->packet_handler_post_report)(this_ireport, packet);
	    }
	    // Sum reports update the report header's last
	    // packet time after the handler. This means
	    // the report header's packet time will be
	    // the previous time before the interval
	    if (sumstats)
		sumstats->ts.packetTime = packet->packetTime;
	    if (fullduplexstats)
		fullduplexstats->ts.packetTime = packet->packetTime;
	} else {
	    need_free = 1;
	    advance_jobq = 1;
	    // A last packet event was detected
	    // printf("last packet event detected\n"); fflush(stdout);
	    this_ireport->reporter_thread_suspends = consumption_detector.reporter_thread_suspends;
	    if (this_ireport->packet_handler_pre_report) {
		(*this_ireport->packet_handler_pre_report)(this_ireport, packet);
	    }
	    if (this_ireport->packet_handler_post_report) {
		(*this_ireport->packet_handler_post_report)(this_ireport, packet);
	    }
	    this_ireport->info.ts.packetTime = packet->packetTime;
	    assert(this_ireport->transfer_protocol_handler != NULL);
	    (*this_ireport->transfer_protocol_handler)(this_ireport, 1);
	    // This is a final report so set the sum report header's packet time
	    // Note, the thread with the max value will set this
	    if (fullduplexstats && isEnhanced(this_ireport->info.common)) {
		// The largest packet timestamp sets the sum report final time
		if (TimeDifference(fullduplexstats->ts.packetTime, packet->packetTime) > 0) {
		    fullduplexstats->ts.packetTime = packet->packetTime;
		}
		if (DecrSumReportRefCounter(this_ireport->FullDuplexReport) == 0) {
		    if (this_ireport->FullDuplexReport->transfer_protocol_sum_handler) {
			(*this_ireport->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, 1);
		    }
		    // FullDuplex report gets freed by a traffic thread (per its barrier)
		}
	    }
	    if (sumstats) {
		if (TimeDifference(sumstats->ts.packetTime, packet->packetTime) > 0) {
		    sumstats->ts.packetTime = packet->packetTime;
		}
		if (DecrSumReportRefCounter(this_ireport->GroupSumReport) == 0) {
		    if (this_ireport->GroupSumReport->transfer_protocol_sum_handler && \
			((this_ireport->GroupSumReport->reference.maxcount > 1) || isSumOnly(this_ireport->info.common))) {
			(*this_ireport->GroupSumReport->transfer_protocol_sum_handler)(&this_ireport->GroupSumReport->info, 1);
		    }
		    FreeSumReport(this_ireport->GroupSumReport);
		}
	    }
	}
    }
    return need_free;
}
/*
 * Process reports
 *
 * Make notice here, the reporter thread is freeing most reports, traffic threads
 * can't use them anymore (except for the DATA REPORT);
 *
 */
inline int reporter_process_report (struct ReportHeader *reporthdr) {
    assert(reporthdr != NULL);
    int done = 1;
    switch (reporthdr->type) {
    case DATA_REPORT:
	done = reporter_process_transfer_report((struct ReporterData *)reporthdr->this_report);
	fflush(stdout);
	if (done) {
	    struct ReporterData *tmp = (struct ReporterData *)reporthdr->this_report;
	    struct PacketRing *pr = tmp->packetring;
	    pr->consumerdone = 1;
	    // Data Reports are special because the traffic thread needs to free them, just signal
	    Condition_Signal(pr->awake_producer);
	}
	break;
    case CONNECTION_REPORT:
    {
	struct ConnectionInfo *creport = (struct ConnectionInfo *)reporthdr->this_report;
	assert(creport!=NULL);
	if (!isCompat(creport->common) && (creport->common->ThreadMode == kMode_Client) && myConnectionReport) {
	    // Clients' connect times will be inputs to the overall connect stats
	    if (creport->init_cond.connecttime > 0.0) {
		reporter_mmm_update(&myConnectionReport->connect_times, creport->init_cond.connecttime);
	    } else {
		myConnectionReport->connect_times.err++;
	    }
	}
	reporter_print_connection_report(creport);
	fflush(stdout);
	FreeReport(reporthdr);
    }
	break;
    case SETTINGS_REPORT:
	reporter_print_settings_report((struct ReportSettings *)reporthdr->this_report);
	fflush(stdout);
	FreeReport(reporthdr);
	break;
    case SERVER_RELAY_REPORT:
	reporter_print_server_relay_report((struct ServerRelay *)reporthdr->this_report);
	fflush(stdout);
	FreeReport(reporthdr);
	break;
    default:
	fprintf(stderr,"Invalid report type in process report %p\n", reporthdr->this_report);
	assert(0);
	break;
    }
#ifdef HAVE_THREAD_DEBUG
    // thread_debug("Processed report %p type=%d", (void *)reporthdr, reporthdr->report.type);
#endif
    return done;
}

/*
 * Updates connection stats
 */
#define L2DROPFILTERCOUNTER 100

// Reporter private routines
void reporter_handle_packet_null (struct ReporterData *data, struct ReportStruct *packet) {
}
void reporter_transfer_protocol_null (struct ReporterData *data, int final){
}

inline void reporter_handle_packet_pps (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    if (!packet->emptyreport) {
        stats->total.Datagrams.current++;
        stats->total.IPG.current++;
    }
    stats->ts.IPGstart = packet->packetTime;
    stats->IPGsum += TimeDifference(packet->packetTime, packet->prevPacketTime);
#ifdef DEBUG_PPS
    printf("*** IPGsum = %f cnt=%ld ipg=%ld.%ld pkt=%ld.%ld id=%ld empty=%d transit=%f prev=%ld.%ld\n", stats->IPGsum, stats->cntIPG, stats->ts.IPGstart.tv_sec, stats->ts.IPGstart.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec, packet->packetID, packet->emptyreport, TimeDifference(packet->packetTime, packet->prevPacketTime), packet->prevPacketTime.tv_sec, packet->prevPacketTime.tv_usec);
#endif
}

// Variance uses the Welford inline algorithm, mean is also inline
static inline double reporter_handle_packet_oneway_transit (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    // Transit or latency updates done inline below
    double transit = TimeDifference(packet->packetTime, packet->sentTime);
    double usec_transit = transit * 1e6;

    if (stats->latency_histogram) {
        histogram_insert(stats->latency_histogram, transit, NULL);
    }

    if (stats->transit.totcntTransit == 0) {
	// Very first packet
	stats->transit.minTransit = transit;
	stats->transit.maxTransit = transit;
	stats->transit.sumTransit = transit;
	stats->transit.cntTransit = 1;
	stats->transit.totminTransit = transit;
	stats->transit.totmaxTransit = transit;
	stats->transit.totsumTransit = transit;
	stats->transit.totcntTransit = 1;
	// For variance, working units is microseconds
	stats->transit.vdTransit = usec_transit;
	stats->transit.meanTransit = usec_transit;
	stats->transit.m2Transit = usec_transit * usec_transit;
	stats->transit.totvdTransit = usec_transit;
	stats->transit.totmeanTransit = usec_transit;
	stats->transit.totm2Transit = usec_transit * usec_transit;
    } else {
	double deltaTransit;
	// from RFC 1889, Real Time Protocol (RTP)
	// J = J + ( | D(i-1,i) | - J ) /
	// Compute jitter
	deltaTransit = transit - stats->transit.lastTransit;
	if (deltaTransit < 0.0) {
	    deltaTransit = -deltaTransit;
	}
	stats->jitter += (deltaTransit - stats->jitter) / (16.0);
	// Compute end/end delay stats
	stats->transit.sumTransit += transit;
	stats->transit.cntTransit++;
	stats->transit.totsumTransit += transit;
	stats->transit.totcntTransit++;
	// mean min max tests
	if (transit < stats->transit.minTransit) {
	    stats->transit.minTransit=transit;
	}
	if (transit < stats->transit.totminTransit) {
	    stats->transit.totminTransit=transit;
	}
	if (transit > stats->transit.maxTransit) {
	    stats->transit.maxTransit=transit;
	}
	if (transit > stats->transit.totmaxTransit) {
	    stats->transit.totmaxTransit=transit;
	}
	// For variance, working units is microseconds
	// variance interval
	stats->transit.vdTransit = usec_transit - stats->transit.meanTransit;
	stats->transit.meanTransit = stats->transit.meanTransit + (stats->transit.vdTransit / stats->transit.cntTransit);
	stats->transit.m2Transit = stats->transit.m2Transit + (stats->transit.vdTransit * (usec_transit - stats->transit.meanTransit));
	// variance total
	stats->transit.totvdTransit = usec_transit - stats->transit.totmeanTransit;
	stats->transit.totmeanTransit = stats->transit.totmeanTransit + (stats->transit.totvdTransit / stats->transit.totcntTransit);
	stats->transit.totm2Transit = stats->transit.totm2Transit + (stats->transit.totvdTransit * (usec_transit - stats->transit.totmeanTransit));
    }
    stats->transit.lastTransit = transit;
    return (transit);
}

static inline void reporter_handle_burst_tcp_server_transit (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    // very first burst
    if (!stats->isochstats.frameID) {
	stats->isochstats.frameID = packet->frameID;
    }
    if (packet->frameID && packet->transit_ready) {
        double transit = reporter_handle_packet_oneway_transit(data, packet);
	if (!TimeZero(stats->ts.prevpacketTime)) {
	    double delta = TimeDifference(packet->sentTime, stats->ts.prevpacketTime);
	    stats->IPGsum += delta;
	}
	stats->ts.prevpacketTime = packet->sentTime;
	if (stats->framelatency_histogram) {
	    histogram_insert(stats->framelatency_histogram, transit, isTripTime(stats->common) ? &packet->sentTime : NULL);
	}
	stats->isochstats.frameID++;  // RJM fix this overload
	stats->burstid_transition = true;
	// printf("***Burst id = %ld, transit = %f\n", packet->frameID, stats->transit.lastTransit);
    } else if (stats->burstid_transition && packet->frameID && (packet->frameID != stats->isochstats.frameID)) {
	stats->burstid_transition = false;
	fprintf(stderr,"%sError: expected burst id %u but got %" PRIdMAX "\n", \
		stats->common->transferIDStr, stats->isochstats.frameID + 1, packet->frameID);
	stats->isochstats.frameID = packet->frameID;
    }
}

static inline void reporter_handle_burst_tcp_client_transit (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    // very first burst
    if (!stats->isochstats.frameID) {
	stats->isochstats.frameID = packet->frameID;
    }
    if (stats->burstid_transition && packet->frameID && packet->transit_ready) {
	stats->burstid_transition = false;
	// printf("***Burst id = %ld, transit = %f\n", packet->frameID, stats->transit.lastTransit);
    } else if (isIsochronous(stats->common) && !stats->burstid_transition) {
	stats->burstid_transition = true;
	if (packet->frameID && (packet->frameID != (stats->isochstats.frameID + 1))) {
	    fprintf(stderr,"%sError: expected burst id %u but got %" PRIdMAX "\n", \
		    stats->common->transferIDStr, stats->isochstats.frameID + 1, packet->frameID);
	}
	stats->isochstats.frameID = packet->frameID;
    }
}

inline void reporter_handle_packet_isochronous (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    // printf("fid=%lu bs=%lu remain=%lu\n", packet->frameID, packet->burstsize, packet->remaining);
    if (packet->frameID && packet->transit_ready) {
	int framedelta=0;
	// very first isochronous frame
	if (!stats->isochstats.frameID) {
	    stats->isochstats.framecnt.current=packet->frameID;
	}
	// perform client and server frame based accounting
	if ((framedelta = (packet->frameID - stats->isochstats.frameID))) {
	    stats->isochstats.framecnt.current++;
	    if (framedelta > 1) {
		if (stats->common->ThreadMode == kMode_Server) {
		    int lost = framedelta - (packet->frameID - packet->prevframeID);
		    stats->isochstats.framelostcnt.current += lost;
		} else {
		    stats->isochstats.framelostcnt.current += (framedelta-1);
		    stats->isochstats.slipcnt.current++;
		}
	    }
	}
	// peform frame latency checks
	if (stats->framelatency_histogram) {
	    // first packet of a burst and not a duplicate
	    if ((packet->burstsize == packet->remaining) && (stats->matchframeID!=packet->frameID)) {
		stats->matchframeID=packet->frameID;
	    }
	    if ((packet->packetLen == packet->remaining) && (packet->frameID == stats->matchframeID)) {
		// last packet of a burst (or first-last in case of a duplicate) and frame id match
		double frametransit = TimeDifference(packet->packetTime, packet->isochStartTime) \
		    - ((packet->burstperiod * (packet->frameID - 1)) / 1000000.0);
		histogram_insert(stats->framelatency_histogram, frametransit, NULL);
		stats->matchframeID = 0;  // reset the matchid so any potential duplicate is ignored
	    }
	}
	stats->isochstats.frameID = packet->frameID;
    }
}

inline void reporter_handle_packet_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    if (packet->packetLen > 0) {
	int bin;
	stats->total.Bytes.current += packet->packetLen;
	// mean min max tests
	stats->sock_callstats.read.cntRead++;
	stats->sock_callstats.read.totcntRead++;
	bin = (int)floor((packet->packetLen -1)/stats->sock_callstats.read.binsize);
	if (bin < TCPREADBINCOUNT) {
	    stats->sock_callstats.read.bins[bin]++;
	    stats->sock_callstats.read.totbins[bin]++;
	}
	if (isPeriodicBurst(stats->common) || isTripTime(stats->common))
	    reporter_handle_burst_tcp_server_transit(data, packet);
    }
}

inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    stats->ts.packetTime = packet->packetTime;
    if (packet->emptyreport && (stats->transit.cntTransit == 0)) {
	// This is the case when empty reports
	// cross the report interval boundary
	// Hence, set the per interval min to infinity
	// and the per interval max and sum to zero
	stats->transit.minTransit = FLT_MAX;
	stats->transit.maxTransit = FLT_MIN;
	stats->transit.sumTransit = 0;
	stats->transit.vdTransit = 0;
	stats->transit.meanTransit = 0;
	stats->transit.m2Transit = 0;
    } else if (packet->packetID > 0) {
	stats->total.Bytes.current += packet->packetLen;
	// These are valid packets that need standard iperf accounting
	// Do L2 accounting first (if needed)
	if (packet->l2errors && (stats->total.Datagrams.current > L2DROPFILTERCOUNTER)) {
	    stats->l2counts.cnt++;
	    stats->l2counts.tot_cnt++;
	    if (packet->l2errors & L2UNKNOWN) {
		stats->l2counts.unknown++;
		stats->l2counts.tot_unknown++;
	    }
	    if (packet->l2errors & L2LENERR) {
		stats->l2counts.lengtherr++;
		stats->l2counts.tot_lengtherr++;
	    }
	    if (packet->l2errors & L2CSUMERR) {
		stats->l2counts.udpcsumerr++;
		stats->l2counts.tot_udpcsumerr++;
	    }
	}
	// packet loss occured if the datagram numbers aren't sequential
	if (packet->packetID != stats->PacketID + 1) {
	    if (packet->packetID < stats->PacketID + 1) {
		stats->total.OutofOrder.current++;
	    } else {
		stats->total.Lost.current += packet->packetID - stats->PacketID - 1;
	    }
	}
	// never decrease datagramID (e.g. if we get an out-of-order packet)
	if (packet->packetID > stats->PacketID) {
	    stats->PacketID = packet->packetID;
	}
	reporter_handle_packet_pps(data, packet);
	reporter_handle_packet_oneway_transit(data, packet);
	reporter_handle_packet_isochronous(data, packet);
    }
}

// This is done in reporter thread context
#if HAVE_TCP_STATS
static inline void reporter_handle_packet_tcpistats (struct ReporterData *data, struct ReportStruct *packet) {
    assert(data!=NULL);
    struct TransferInfo *stats = &data->info;
    stats->sock_callstats.write.TCPretry += (packet->tcpstats.retry_tot - stats->sock_callstats.write.totTCPretry);
    stats->sock_callstats.write.totTCPretry = packet->tcpstats.retry_tot;
    stats->sock_callstats.write.cwnd = packet->tcpstats.cwnd;
    stats->sock_callstats.write.rtt = packet->tcpstats.rtt;
    stats->sock_callstats.write.rttvar = packet->tcpstats.rttvar;
}
#endif

void reporter_handle_packet_client (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    stats->ts.packetTime = packet->packetTime;
    if (!packet->emptyreport) {
	stats->total.Bytes.current += packet->packetLen;
        if (packet->errwrite && (packet->errwrite != WriteErrNoAccount)) {
	    stats->sock_callstats.write.WriteErr++;
	    stats->sock_callstats.write.totWriteErr++;
	}
	// These are valid packets that need standard iperf accounting
	stats->sock_callstats.write.WriteCnt += packet->writecnt;
	stats->sock_callstats.write.totWriteCnt += packet->writecnt;
	if (isIsochronous(stats->common)) {
	    reporter_handle_packet_isochronous(data, packet);
	} else if (isPeriodicBurst(stats->common)) {
	    reporter_handle_burst_tcp_client_transit(data, packet);
	}
#if HAVE_DECL_TCP_NOTSENT_LOWAT
	if (stats->latency_histogram && (packet->select_delay > 0.0)) {
	   histogram_insert(stats->latency_histogram, packet->select_delay, &packet->packetTime);
       }
       if (isTcpDrain(stats->common) && packet->transit_ready && (packet->drain_time)) {
	   reporter_mmm_update(&stats->drain_mmm.current, (double) packet->drain_time);
	   reporter_mmm_update(&stats->drain_mmm.total, (double) packet->drain_time);
	   if (stats->drain_histogram ) {
	       // convert drain time from microseconds to seconds prior to insert
	       histogram_insert(stats->drain_histogram, (1e-6 * packet->drain_time), &packet->packetTime);
	   }
       }
#endif
    }
    if (isUDP(stats->common)) {
	stats->PacketID = packet->packetID;
	reporter_handle_packet_pps(data, packet);
    }
}

/*
 * Report printing routines below
 */
static inline void reporter_set_timestamps_time (struct ReportTimeStamps *times, enum TimeStampType tstype) {
    // There is a corner case when the first packet is also the last where the start time (which comes
    // from app level syscall) is greater than the packetTime (which come for kernel level SO_TIMESTAMP)
    // For this case set the start and end time to both zero.
    if (TimeDifference(times->packetTime, times->startTime) < 0) {
	times->iEnd = 0;
	times->iStart = 0;
    } else {
	switch (tstype) {
	case INTERVAL:
	    times->iStart = times->iEnd;
	    times->iEnd = TimeDifference(times->nextTime, times->startTime);
	    TimeAdd(times->nextTime, times->intervalTime);
	    break;
	case TOTAL:
	    times->iStart = 0;
	    times->iEnd = TimeDifference(times->packetTime, times->startTime);
	    break;
	case FINALPARTIAL:
	    times->iStart = times->iEnd;
	    times->iEnd = TimeDifference(times->packetTime, times->startTime);
	    break;
	case FRAME:
	    if ((times->iStart = TimeDifference(times->prevpacketTime, times->startTime)) < 0)
		times->iStart = 0.0;
	    times->iEnd = TimeDifference(times->packetTime, times->startTime);
	    break;
	default:
	    times->iEnd = -1;
	    times->iStart = -1;
	    break;
	}
    }
}

// If reports were missed, catch up now
static inline void reporter_transfer_protocol_missed_reports (struct TransferInfo *stats, struct ReportStruct *packet) {
    while (TimeDifference(packet->packetTime, stats->ts.nextTime) > TimeDouble(stats->ts.intervalTime)) {
//	printf("**** cmp=%f/%f next %ld.%ld packet %ld.%ld id=%ld\n", TimeDifference(packet->packetTime, stats->ts.nextTime), TimeDouble(stats->ts.intervalTime), stats->ts.nextTime.tv_sec, stats->ts.nextTime.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec, packet->packetID);
	reporter_set_timestamps_time(&stats->ts, INTERVAL);
	struct TransferInfo emptystats;
	memset(&emptystats, 0, sizeof(struct TransferInfo));
	emptystats.ts.iStart = stats->ts.iStart;
	emptystats.ts.iEnd = stats->ts.iEnd;
	emptystats.common = stats->common;
	if ((stats->output_handler) && !(stats->isMaskOutput))
	    (*stats->output_handler)(&emptystats);
    }
}

static inline void reporter_reset_transfer_stats_client_tcp (struct TransferInfo *stats) {
    stats->total.Bytes.prev = stats->total.Bytes.current;
    stats->sock_callstats.write.WriteCnt = 0;
    stats->sock_callstats.write.WriteErr = 0;
    stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current;
    stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current;
    stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current;
#if HAVE_TCP_STATS
    stats->sock_callstats.write.TCPretry = 0;
#endif
#if HAVE_DECL_TCP_NOTSENT_LOWAT
    if (isTcpDrain(stats->common)) {
	stats->drain_mmm.current.cnt = 0;
	stats->drain_mmm.current.min = FLT_MAX;
	stats->drain_mmm.current.max = FLT_MIN;
	stats->drain_mmm.current.sum = 0;
	stats->drain_mmm.current.vd = 0;
	stats->drain_mmm.current.mean = 0;
	stats->drain_mmm.current.m2 = 0;
    }
#endif
}

static inline void reporter_reset_transfer_stats_client_udp (struct TransferInfo *stats) {
    if (stats->cntError < 0) {
	stats->cntError = 0;
    }
    stats->total.Lost.prev = stats->total.Lost.current;
    stats->total.Datagrams.prev = stats->total.Datagrams.current;
    stats->total.Bytes.prev = stats->total.Bytes.current;
    stats->total.IPG.prev = stats->total.IPG.current;
    stats->sock_callstats.write.WriteCnt = 0;
    stats->sock_callstats.write.WriteErr = 0;
    stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current;
    stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current;
    stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current;
    if (stats->cntDatagrams)
	stats->IPGsum = 0;
}

static inline void reporter_reset_transfer_stats_server_tcp (struct TransferInfo *stats) {
    int ix;
    stats->total.Bytes.prev = stats->total.Bytes.current;
    stats->sock_callstats.read.cntRead = 0;
    for (ix = 0; ix < 8; ix++) {
	stats->sock_callstats.read.bins[ix] = 0;
    }
    stats->transit.minTransit = FLT_MAX;
    stats->transit.maxTransit = FLT_MIN;
    stats->transit.sumTransit = 0;
    stats->transit.cntTransit = 0;
    stats->transit.vdTransit = 0;
    stats->transit.meanTransit = 0;
    stats->transit.m2Transit = 0;
    stats->IPGsum = 0;
}

static inline void reporter_reset_transfer_stats_server_udp (struct TransferInfo *stats) {
    // Reset the enhanced stats for the next report interval
    stats->total.Bytes.prev = stats->total.Bytes.current;
    stats->total.Datagrams.prev = stats->PacketID;
    stats->total.OutofOrder.prev = stats->total.OutofOrder.current;
    stats->total.Lost.prev = stats->total.Lost.current;
    stats->total.IPG.prev = stats->total.IPG.current;
    stats->transit.minTransit = FLT_MAX;
    stats->transit.maxTransit = FLT_MIN;
    stats->transit.sumTransit = 0;
    stats->transit.cntTransit = 0;
    stats->transit.vdTransit = 0;
    stats->transit.meanTransit = 0;
    stats->transit.m2Transit = 0;
    stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current;
    stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current;
    stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current;
    stats->l2counts.cnt = 0;
    stats->l2counts.unknown = 0;
    stats->l2counts.udpcsumerr = 0;
    stats->l2counts.lengtherr = 0;
    if (stats->cntDatagrams)
	stats->IPGsum = 0;
}

// These do the following
//
// o) set the TransferInfo struct and then calls the individual report output handler
// o) updates the sum and fullduplex reports
//
void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final) {
    struct TransferInfo *stats = &data->info;
    struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
    struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
    // print a interval report and possibly a partial interval report if this a final
    stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
    stats->cntOutofOrder = stats->total.OutofOrder.current - stats->total.OutofOrder.prev;
    // assume most of the  time out-of-order packets are
    // duplicate packets, so conditionally subtract them from the lost packets.
    stats->cntError = stats->total.Lost.current - stats->total.Lost.prev - stats->cntOutofOrder;
    if (stats->cntError < 0)
	stats->cntError = 0;
    stats->cntDatagrams = stats->PacketID - stats->total.Datagrams.prev;
    stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
    if (stats->latency_histogram) {
        stats->latency_histogram->final = final;
    }

    if (isIsochronous(stats->common)) {
	stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
	stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
	stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
	if (stats->framelatency_histogram) {
	    stats->framelatency_histogram->final = final;
	}

    }
    if (stats->total.Datagrams.current == 1)
	stats->jitter = 0;
    if (sumstats) {
	sumstats->total.OutofOrder.current += stats->total.OutofOrder.current - stats->total.OutofOrder.prev;
	// assume most of the  time out-of-order packets are not
	// duplicate packets, so conditionally subtract them from the lost packets.
	sumstats->total.Lost.current += stats->total.Lost.current - stats->total.Lost.prev;
	sumstats->total.Datagrams.current += stats->PacketID - stats->total.Datagrams.prev;
	sumstats->total.Bytes.current += stats->cntBytes;
	sumstats->total.IPG.current += stats->cntIPG;
	if (sumstats->IPGsum < stats->IPGsum)
	    sumstats->IPGsum = stats->IPGsum;
	sumstats->threadcnt++;
    }
    if (fullduplexstats) {
	fullduplexstats->total.Bytes.current += stats->cntBytes;
	fullduplexstats->total.IPG.current += stats->cntIPG;
	fullduplexstats->total.Datagrams.current += (stats->total.Datagrams.current - stats->total.Datagrams.prev);
	if (fullduplexstats->IPGsum < stats->IPGsum)
	    fullduplexstats->IPGsum = stats->IPGsum;
    }
    if (final) {
	if ((stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime)) {
	    stats->cntOutofOrder = stats->total.OutofOrder.current - stats->total.OutofOrder.prev;
	    // assume most of the  time out-of-order packets are not
	    // duplicate packets, so conditionally subtract them from the lost packets.
	    stats->cntError = stats->total.Lost.current - stats->total.Lost.prev;
	    stats->cntError -= stats->cntOutofOrder;
	    if (stats->cntError < 0)
		stats->cntError = 0;
	    stats->cntDatagrams = stats->PacketID - stats->total.Datagrams.prev;
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
	    }
	}
	reporter_set_timestamps_time(&stats->ts, TOTAL);
	stats->final = true;
	stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
	stats->cntOutofOrder = stats->total.OutofOrder.current;
	// assume most of the  time out-of-order packets are not
	// duplicate packets, so conditionally subtract them from the lost packets.
	stats->cntError = stats->total.Lost.current;
	stats->cntError -= stats->cntOutofOrder;
	if (stats->cntError < 0)
	    stats->cntError = 0;
	stats->cntDatagrams = stats->PacketID;
	stats->cntIPG = stats->total.IPG.current;
	stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
	stats->cntBytes = stats->total.Bytes.current;
	stats->l2counts.cnt = stats->l2counts.tot_cnt;
	stats->l2counts.unknown = stats->l2counts.tot_unknown;
	stats->l2counts.udpcsumerr = stats->l2counts.tot_udpcsumerr;
	stats->l2counts.lengtherr = stats->l2counts.tot_lengtherr;
	stats->transit.minTransit = stats->transit.totminTransit;
        stats->transit.maxTransit = stats->transit.totmaxTransit;
	stats->transit.cntTransit = stats->transit.totcntTransit;
	stats->transit.sumTransit = stats->transit.totsumTransit;
	stats->transit.meanTransit = stats->transit.totmeanTransit;
	stats->transit.m2Transit = stats->transit.totm2Transit;
	stats->transit.vdTransit = stats->transit.totvdTransit;
	if (isIsochronous(stats->common)) {
	    stats->isochstats.cntFrames = stats->isochstats.framecnt.current;
	    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current;
	    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current;
	}
	if (stats->latency_histogram) {
	    stats->latency_histogram->final = 1;
	}
	if (stats->framelatency_histogram) {
	    stats->framelatency_histogram->final = 1;
	}
    }
    if ((stats->output_handler) && !(stats->isMaskOutput))
	(*stats->output_handler)(stats);
    if (!final)
	reporter_reset_transfer_stats_server_udp(stats);
}

void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, int final) {
    if (final) {
	reporter_set_timestamps_time(&stats->ts, TOTAL);
	stats->cntOutofOrder = stats->total.OutofOrder.current;
	// assume most of the  time out-of-order packets are not
	// duplicate packets, so conditionally subtract them from the lost packets.
	stats->cntError = stats->total.Lost.current;
	stats->cntError -= stats->cntOutofOrder;
	if (stats->cntError < 0)
	    stats->cntError = 0;
	stats->cntDatagrams = stats->total.Datagrams.current;
	stats->cntBytes = stats->total.Bytes.current;
	stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
	stats->cntIPG = stats->total.IPG.current;
    } else {
	stats->cntOutofOrder = stats->total.OutofOrder.current - stats->total.OutofOrder.prev;
	// assume most of the  time out-of-order packets are not
	// duplicate packets, so conditionally subtract them from the lost packets.
	stats->cntError = stats->total.Lost.current - stats->total.Lost.prev;
	stats->cntError -= stats->cntOutofOrder;
	if (stats->cntError < 0)
	    stats->cntError = 0;
	stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev;
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
    }
    if ((stats->output_handler) && !(stats->isMaskOutput))
	(*stats->output_handler)(stats);
    if (!final) {
	stats->threadcnt = 0;
	// there is no packet ID for sum server reports, set it to total cnt for calculation
	stats->PacketID = stats->total.Datagrams.current;
	reporter_reset_transfer_stats_server_udp(stats);
    }
}
void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, int final) {
    if (final) {
	reporter_set_timestamps_time(&stats->ts, TOTAL);
	stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr;
	stats->sock_callstats.write.WriteCnt = stats->sock_callstats.write.totWriteCnt;
	stats->cntDatagrams = stats->total.Datagrams.current;
	stats->cntBytes = stats->total.Bytes.current;
	stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
	stats->cntIPG = stats->total.IPG.current;
    } else {
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
	stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev;
    }
    if ((stats->output_handler) && !(stats->isMaskOutput))
	(*stats->output_handler)(stats);

    if (!final) {
	stats->threadcnt = 0;
	reporter_reset_transfer_stats_client_udp(stats);
    } else if ((stats->common->ReportMode != kReport_CSV) && !(stats->isMaskOutput)) {
	printf(report_sumcnt_datagrams, stats->threadcnt, stats->total.Datagrams.current);
	fflush(stdout);
    }
}

void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final) {
    struct TransferInfo *stats = &data->info;
    struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
    struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
    stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
    stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev;
    stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
    if (isIsochronous(stats->common)) {
	stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
	stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
	stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
    }
    if (sumstats) {
	sumstats->total.Bytes.current += stats->cntBytes;
	sumstats->sock_callstats.write.WriteErr += stats->sock_callstats.write.WriteErr;
	sumstats->sock_callstats.write.WriteCnt += stats->sock_callstats.write.WriteCnt;
	sumstats->sock_callstats.write.totWriteErr += stats->sock_callstats.write.WriteErr;
	sumstats->sock_callstats.write.totWriteCnt += stats->sock_callstats.write.WriteCnt;
	sumstats->total.Datagrams.current += stats->cntDatagrams;
	if (sumstats->IPGsum < stats->IPGsum)
	    sumstats->IPGsum = stats->IPGsum;
	sumstats->total.IPG.current += stats->cntIPG;
	sumstats->threadcnt++;
    }
    if (fullduplexstats) {
	fullduplexstats->total.Bytes.current += stats->cntBytes;
	fullduplexstats->total.IPG.current += stats->cntIPG;
	fullduplexstats->total.Datagrams.current += stats->cntDatagrams;
	if (fullduplexstats->IPGsum < stats->IPGsum)
	    fullduplexstats->IPGsum = stats->IPGsum;
    }
    if (final) {
	reporter_set_timestamps_time(&stats->ts, TOTAL);
	stats->cntBytes = stats->total.Bytes.current;
	stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr;
	stats->sock_callstats.write.WriteCnt = stats->sock_callstats.write.totWriteCnt;
	stats->cntIPG = stats->total.IPG.current;
	stats->cntDatagrams = stats->PacketID;
	stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
	if (isIsochronous(stats->common)) {
	    stats->isochstats.cntFrames = stats->isochstats.framecnt.current;
	    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current;
	    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current;
	}
    } else {
	if (stats->ts.iEnd > 0) {
	    stats->cntIPG = (stats->total.IPG.current - stats->total.IPG.prev);
	} else {
	    stats->cntIPG = 0;
	}
    }
    if ((stats->output_handler) && !(stats->isMaskOutput)) {
	(*stats->output_handler)(stats);
	if (final && (stats->common->ReportMode != kReport_CSV)) {
	    printf(report_datagrams, stats->common->transferID, stats->total.Datagrams.current);
	    fflush(stdout);
	}
    }
    reporter_reset_transfer_stats_client_udp(stats);
}

void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final) {
    struct TransferInfo *stats = &data->info;
    struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
    struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
    stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
    int ix;
    if (stats->framelatency_histogram) {
        stats->framelatency_histogram->final = 0;
    }
    if (sumstats) {
	sumstats->threadcnt++;
	sumstats->total.Bytes.current += stats->cntBytes;
        sumstats->sock_callstats.read.cntRead += stats->sock_callstats.read.cntRead;
        sumstats->sock_callstats.read.totcntRead += stats->sock_callstats.read.cntRead;
        for (ix = 0; ix < TCPREADBINCOUNT; ix++) {
	    sumstats->sock_callstats.read.bins[ix] += stats->sock_callstats.read.bins[ix];
	    sumstats->sock_callstats.read.totbins[ix] += stats->sock_callstats.read.bins[ix];
        }
    }
    if (fullduplexstats) {
	fullduplexstats->total.Bytes.current += stats->cntBytes;
    }
    if (final) {
	if ((stats->cntBytes > 0) && stats->output_handler && !TimeZero(stats->ts.intervalTime)) {
	    // print a partial interval report if enable and this a final
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		if (isIsochronous(stats->common)) {
		    stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
		    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
		    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
		}
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
		reporter_reset_transfer_stats_server_tcp(stats);
	    }
        }
        if (stats->framelatency_histogram) {
	    stats->framelatency_histogram->final = 1;
	}
	stats->final = true;
	reporter_set_timestamps_time(&stats->ts, TOTAL);
        stats->cntBytes = stats->total.Bytes.current;
	stats->IPGsum = stats->ts.iEnd;
        stats->sock_callstats.read.cntRead = stats->sock_callstats.read.totcntRead;
        for (ix = 0; ix < TCPREADBINCOUNT; ix++) {
	    stats->sock_callstats.read.bins[ix] = stats->sock_callstats.read.totbins[ix];
        }
	if (isIsochronous(stats->common)) {
	    stats->isochstats.cntFrames = stats->isochstats.framecnt.current;
	    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current;
	    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current;
	}
	stats->transit.sumTransit = stats->transit.totsumTransit;
	stats->transit.cntTransit = stats->transit.totcntTransit;
	stats->transit.minTransit = stats->transit.totminTransit;
	stats->transit.maxTransit = stats->transit.totmaxTransit;
	stats->transit.m2Transit = stats->transit.totm2Transit;
	if (stats->framelatency_histogram) {
	    stats->framelatency_histogram->final = 1;
	}
    } else if (isIsochronous(stats->common)) {
	stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
	stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
	stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
    }
    if ((stats->output_handler) && !stats->isMaskOutput) {
	(*stats->output_handler)(stats);
	if (isFrameInterval(stats->common) && stats->framelatency_histogram) {
	    histogram_print(stats->framelatency_histogram, stats->ts.iStart, stats->ts.iEnd);
	}
    }
    if (!final)
	reporter_reset_transfer_stats_server_tcp(stats);
}

void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final) {
    struct TransferInfo *stats = &data->info;
    struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
    struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
    stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
#if HAVE_DECL_TCP_NOTSENT_LOWAT
    if (stats->latency_histogram) {
        stats->latency_histogram->final = final;
    }
    if (stats->drain_histogram) {
        stats->drain_histogram->final = final;
    }
#endif
    if (isIsochronous(stats->common)) {
	if (final) {
	    stats->isochstats.cntFrames = stats->isochstats.framecnt.current;
	    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current;
	    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current;
	} else {
	    stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
	    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
	    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
	}
    }
    if (sumstats) {
	sumstats->total.Bytes.current += stats->cntBytes;
	sumstats->sock_callstats.write.WriteErr += stats->sock_callstats.write.WriteErr;
	sumstats->sock_callstats.write.WriteCnt += stats->sock_callstats.write.WriteCnt;
	sumstats->sock_callstats.write.totWriteErr += stats->sock_callstats.write.WriteErr;
	sumstats->sock_callstats.write.totWriteCnt += stats->sock_callstats.write.WriteCnt;
	sumstats->threadcnt++;
#if HAVE_TCP_STATS
	sumstats->sock_callstats.write.TCPretry += stats->sock_callstats.write.TCPretry;
	sumstats->sock_callstats.write.totTCPretry += stats->sock_callstats.write.TCPretry;
#endif
    }
    if (fullduplexstats) {
	fullduplexstats->total.Bytes.current += stats->cntBytes;
    }
    if (final) {
#if HAVE_DECL_TCP_NOTSENT_LOWAT
	if (stats->latency_histogram) {
	    stats->latency_histogram->final = 1;
	}
	if (stats->drain_histogram) {
	    stats->drain_histogram->final = 1;
	}
#endif
	if ((stats->cntBytes > 0) && stats->output_handler && !TimeZero(stats->ts.intervalTime)) {
	    // print a partial interval report if enable and this a final
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		if (isIsochronous(stats->common)) {
		    stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
		    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
		    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
		}
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
		reporter_reset_transfer_stats_client_tcp(stats);
	    }
        }
	if (isIsochronous(stats->common)) {
	    stats->isochstats.cntFrames = stats->isochstats.framecnt.current;
	    stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current;
	    stats->isochstats.cntSlips = stats->isochstats.slipcnt.current;
	}
	stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr;
	stats->sock_callstats.write.WriteCnt = stats->sock_callstats.write.totWriteCnt;
#if HAVE_TCP_STATS
	stats->sock_callstats.write.TCPretry = stats->sock_callstats.write.totTCPretry;
#endif
	if (stats->framelatency_histogram) {
	    stats->framelatency_histogram->final = 1;
	}
	stats->cntBytes = stats->total.Bytes.current;
#if HAVE_DECL_TCP_NOTSENT_LOWAT
	stats->drain_mmm.current = stats->drain_mmm.total;
#endif
	reporter_set_timestamps_time(&stats->ts, TOTAL);
    } else if (isIsochronous(stats->common)) {
	stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
	stats->isochstats.cntFramesMissed = stats->isochstats.framelostcnt.current - stats->isochstats.framelostcnt.prev;
	stats->isochstats.cntSlips = stats->isochstats.slipcnt.current - stats->isochstats.slipcnt.prev;
    }
    if ((stats->output_handler) && !(stats->isMaskOutput)) {
	(*stats->output_handler)(stats);
    }
    if (!final)
	reporter_reset_transfer_stats_client_tcp(stats);
}

/*
 * Handles summing of threads
 */
void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, int final) {
    if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	if (final) {
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
		reporter_reset_transfer_stats_client_tcp(stats);
	    }
	} else if ((stats->output_handler) && !(stats->isMaskOutput)) {
	    (*stats->output_handler)(stats);
	    stats->threadcnt = 0;
	}
	reporter_reset_transfer_stats_client_tcp(stats);
    }
    if (final) {
	stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr;
	stats->sock_callstats.write.WriteCnt = stats->sock_callstats.write.totWriteCnt;
#if HAVE_TCP_STATS
	stats->sock_callstats.write.TCPretry = stats->sock_callstats.write.totTCPretry;
#endif
	stats->cntBytes = stats->total.Bytes.current;
	reporter_set_timestamps_time(&stats->ts, TOTAL);
	if ((stats->output_handler) && !(stats->isMaskOutput))
	    (*stats->output_handler)(stats);
    }
}

void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, int final) {
    if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	if (final) {
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
	    }
	} else if ((stats->output_handler) && !(stats->isMaskOutput)) {
	    (*stats->output_handler)(stats);
	    stats->threadcnt = 0;
	}
	reporter_reset_transfer_stats_server_tcp(stats);
    }
    if (final) {
	int ix;
	stats->cntBytes = stats->total.Bytes.current;
	stats->sock_callstats.read.cntRead = stats->sock_callstats.read.totcntRead;
	for (ix = 0; ix < TCPREADBINCOUNT; ix++) {
	    stats->sock_callstats.read.bins[ix] = stats->sock_callstats.read.totbins[ix];
	}
	stats->cntBytes = stats->total.Bytes.current;
	reporter_set_timestamps_time(&stats->ts, TOTAL);
	if ((stats->output_handler) && !(stats->isMaskOutput))
	    (*stats->output_handler)(stats);
    }
}
void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, int final) {
    if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	if (final) {
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
	    }
	}
	stats->total.Bytes.prev = stats->total.Bytes.current;
    }
    if (final) {
	stats->cntBytes = stats->total.Bytes.current;
	reporter_set_timestamps_time(&stats->ts, TOTAL);
    } else {
	reporter_set_timestamps_time(&stats->ts, INTERVAL);
    }
    if ((stats->output_handler) && !(stats->isMaskOutput))
	(*stats->output_handler)(stats);
}

void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, int final) {
    if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev;
	stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
	if (final) {
	    if ((stats->output_handler) && !(stats->isMaskOutput)) {
		reporter_set_timestamps_time(&stats->ts, FINALPARTIAL);
		if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
		    (*stats->output_handler)(stats);
	    }
	}
	stats->total.Bytes.prev = stats->total.Bytes.current;
	stats->total.IPG.prev = stats->total.IPG.current;
	stats->total.Datagrams.prev = stats->total.Datagrams.current;
    }
    if (final) {
	stats->cntBytes = stats->total.Bytes.current;
	stats->cntBytes = stats->total.Bytes.current;
	stats->cntDatagrams = stats->total.Datagrams.current ;
	stats->cntIPG = stats->total.IPG.current;
	stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
	reporter_set_timestamps_time(&stats->ts, TOTAL);
    } else {
	reporter_set_timestamps_time(&stats->ts, INTERVAL);
    }
    if ((stats->output_handler) && !(stats->isMaskOutput))
	(*stats->output_handler)(stats);
    if (stats->cntDatagrams)
        stats->IPGsum = 0.0;
}

// Conditional print based on time
int reporter_condprint_time_interval_report (struct ReporterData *data, struct ReportStruct *packet) {
    struct TransferInfo *stats = &data->info;
    assert(stats!=NULL);
    //   printf("***sum handler = %p\n", (void *) data->GroupSumReport->transfer_protocol_sum_handler);
    int advance_jobq = 0;
    // Print a report if packet time exceeds the next report interval time,
    // Also signal to the caller to move to the next report (or packet ring)
    // if there was output. This will allow for more precise interval sum accounting.
    if (TimeDifference(stats->ts.nextTime, packet->packetTime) < 0) {
	assert(data->transfer_protocol_handler!=NULL);
	advance_jobq = 1;
	struct TransferInfo *sumstats = (data->GroupSumReport ? &data->GroupSumReport->info : NULL);
	struct TransferInfo *fullduplexstats = (data->FullDuplexReport ? &data->FullDuplexReport->info : NULL);
	stats->ts.packetTime = packet->packetTime;
#ifdef DEBUG_PPS
	printf("*** packetID TRIGGER = %ld pt=%ld.%ld empty=%d nt=%ld.%ld\n",packet->packetID, packet->packetTime.tv_sec, packet->packetTime.tv_usec, packet->emptyreport, stats->ts.nextTime.tv_sec, stats->ts.nextTime.tv_usec);
#endif
	reporter_set_timestamps_time(&stats->ts, INTERVAL);
	(*data->transfer_protocol_handler)(data, 0);
	if (fullduplexstats && ((++data->FullDuplexReport->threads) == 2) && isEnhanced(stats->common)) {
	    data->FullDuplexReport->threads = 0;
	    assert(data->FullDuplexReport->transfer_protocol_sum_handler != NULL);
	    (*data->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, 0);
	}
	if (sumstats) {
	    if ((++data->GroupSumReport->threads) == data->GroupSumReport->reference.count)   {
		data->GroupSumReport->threads = 0;
		if ((data->GroupSumReport->reference.count > (fullduplexstats ? 2 : 1)) || \
		    isSumOnly(data->info.common)) {
		    sumstats->isMaskOutput = false;
		} else {
		    sumstats->isMaskOutput = true;
		}
		reporter_set_timestamps_time(&sumstats->ts, INTERVAL);
		assert(data->GroupSumReport->transfer_protocol_sum_handler != NULL);
		(*data->GroupSumReport->transfer_protocol_sum_handler)(sumstats, 0);
	    }
	}
        // In the (hopefully unlikely event) the reporter fell behind
        // output the missed reports to catch up
	if ((stats->output_handler) && !(stats->isMaskOutput))
	    reporter_transfer_protocol_missed_reports(stats, packet);
    }
    return advance_jobq;
}

// Conditional print based on bursts or frames
int reporter_condprint_frame_interval_report_server_udp (struct ReporterData *data, struct ReportStruct *packet) {
    int advance_jobq = 0;
    struct TransferInfo *stats = &data->info;
    // first packet of a burst and not a duplicate
    assert(packet->burstsize != 0);
    if ((packet->burstsize == (packet->remaining + packet->packetLen)) && (stats->matchframeID != packet->frameID)) {
	stats->matchframeID=packet->frameID;
    }
    if ((packet->packetLen == packet->remaining) && (packet->frameID == stats->matchframeID)) {
	if ((stats->ts.iStart = TimeDifference(stats->ts.nextTime, stats->ts.startTime)) < 0)
	    stats->ts.iStart = 0.0;
	stats->frameID = packet->frameID;
	stats->ts.iEnd = TimeDifference(packet->packetTime, stats->ts.startTime);
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	stats->cntOutofOrder = stats->total.OutofOrder.current - stats->total.OutofOrder.prev;
	// assume most of the  time out-of-order packets are not
	// duplicate packets, so conditionally subtract them from the lost packets.
	stats->cntError = stats->total.Lost.current - stats->total.Lost.prev;
	stats->cntError -= stats->cntOutofOrder;
	if (stats->cntError < 0)
	    stats->cntError = 0;
	stats->cntDatagrams = stats->PacketID - stats->total.Datagrams.prev;
	if ((stats->output_handler) && !(stats->isMaskOutput))
	    (*stats->output_handler)(stats);
	reporter_reset_transfer_stats_server_udp(stats);
	advance_jobq = 1;
    }
    return advance_jobq;
}

int reporter_condprint_frame_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
    fprintf(stderr, "FIX ME\n");
    return 1;
}

int reporter_condprint_burst_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
    assert(packet->burstsize != 0);
    struct TransferInfo *stats = &data->info;

    int advance_jobq = 0;
    // first packet of a burst and not a duplicate
    if (packet->transit_ready) {
        stats->tripTime = reporter_handle_packet_oneway_transit(data, packet);
	if (stats->framelatency_histogram) {
	    histogram_insert(stats->framelatency_histogram, stats->tripTime, &packet->sentTime);
	}
	stats->tripTime *= 1e3; // convert from secs millisecs
//	printf("****sndpkt=%ld.%ld rxpkt=%ld.%ld\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->packetTime.tv_sec,packet->packetTime.tv_usec);
	stats->ts.prevpacketTime = packet->prevSentTime;
	stats->ts.packetTime = packet->packetTime;
	reporter_set_timestamps_time(&stats->ts, FRAME);
	stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
	if ((stats->output_handler) && !(stats->isMaskOutput))
	    (*stats->output_handler)(stats);
	reporter_reset_transfer_stats_server_tcp(stats);
	advance_jobq = 1;
    }
    return advance_jobq;
}

#ifdef __cplusplus
} /* end extern "C" */
#endif