Re: tcpbench@libevent

Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]
From: Christiano F. Haesbaert
Date: Thursday, October 21, 2010 - 4:51 pm

Weird, I've just cvs up -A -C and the patch applies cleanly, maybe
gmail is fucking up my paste ?

Well, here it is again, hope it helps.

? tcpbench.cat1
Index: Makefile
===================================================================
RCS file: /cvs/src/usr.bin/tcpbench/Makefile,v
retrieving revision 1.3
diff -d -u -p -w -u -r1.3 Makefile
--- Makefile	26 Jun 2008 07:05:56 -0000	1.3
+++ Makefile	21 Oct 2010 23:49:21 -0000
@@ -15,7 +15,7 @@ CDIAGFLAGS+=    -Wshadow

 PROG=tcpbench

-LDADD=-lkvm
+LDADD=-lkvm -levent

 #BINGRP= kmem
 #BINMODE=2555
Index: tcpbench.c
===================================================================
RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v
retrieving revision 1.19
diff -d -u -p -w -u -r1.19 tcpbench.c
--- tcpbench.c	19 Oct 2010 10:03:23 -0000	1.19
+++ tcpbench.c	21 Oct 2010 23:49:21 -0000
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2008 Damien Miller <djm@mindrot.org>
+ * Copyright (c) 2010 Christiano F. Haesbaert <haesbaert@haesbaert.org>
  *
  * Permission to use, copy, modify, and distribute this software for any
  * purpose with or without fee is hereby granted, provided that the above
@@ -19,6 +20,7 @@
 #include <sys/socket.h>
 #include <sys/socketvar.h>
 #include <sys/resource.h>
+#include <sys/queue.h>

 #include <net/route.h>

@@ -39,6 +41,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#include <event.h>
 #include <netdb.h>
 #include <signal.h>
 #include <err.h>
@@ -50,29 +53,58 @@

 #define DEFAULT_PORT "12345"
 #define DEFAULT_STATS_INTERVAL 1000 /* ms */
-#define DEFAULT_BUF 256 * 1024
+#define DEFAULT_BUF (256 * 1024)
 #define MAX_FD 1024

-sig_atomic_t done = 0;
-sig_atomic_t proc_slice = 0;
-
 static u_int  rtableid;
-static char **kflag;
-static size_t Bflag;
 static int    Sflag;
 static int    rflag;
 static int    sflag;
 static int    vflag;
+static int	  qflag;
+static kvm_t	 *kvmh;
+static char	**kvars;
+static u_long	  ktcbtab;
+static char	 *dummybuf;
+static size_t	  dummybuf_len;
+

 /* stats for a single connection */
 struct statctx {
+	TAILQ_ENTRY(statctx) entry;
 	struct timeval t_start, t_last;
 	unsigned long long bytes;
 	u_long tcbaddr;
-	char **kvars;
-	kvm_t *kh;
+	int fd;
+	char *buf;
+	size_t buflen;
+	struct event ev;
 };

+static void	signal_handler(int, short, void *);
+static void	saddr_ntop(const struct sockaddr *, socklen_t, char *, size_t);
+static void	drop_gid(void);
+static void	set_slice_timer(int);
+static void 	print_header(void);
+static void	kget(u_long, void *, size_t);
+static u_long	kfind_tcb(int);
+static void	kupdate_stats(u_long, struct inpcb *, struct tcpcb *,
+    struct socket *);
+static void	list_kvars(void);
+static void	check_kvar(const char *);
+static char **	check_prepare_kvars(char *);
+static void	stats_prepare(struct statctx *);
+static void	stats_update(struct statctx *, ssize_t);
+static void	stats_cleanslice(void);
+static void	stats_display(unsigned long long, long double, float,
+    struct statctx *, struct inpcb *, struct tcpcb *, struct socket *);
+static void	process_slice(int, short, void *);
+static void	server_handle_sc(int, short, void *);
+static void	server_accept(int, short, void *);
+static nfds_t	server_init(struct addrinfo *);
+static void	client_handle_sc(int, short, void *);
+static void	client_init(struct addrinfo *, int);
+
 /*
  * We account the mainstats here, that is the stats
  * for all connections, all variables starting with slice
@@ -85,6 +117,7 @@ static struct {
 	struct timeval t_start;	        /* when we started counting */
 	long double peak_mbps;		/* peak mbps so far */
 	int nconns; 		        /* connected clients */
+	struct event timer;		/* process timer */
 } mainstats;

 /* When adding variables, also add to stats_display() */
@@ -124,18 +157,7 @@ static const char *allowed_kvars[] = {
 	NULL
 };

-static void
-exitsighand(int signo)
-{
-	done = signo;
-}
-
-static void
-alarmhandler(int signo)
-{
-	proc_slice = 1;
-	signal(signo, alarmhandler);
-}
+TAILQ_HEAD(, statctx) sc_queue;

 static void __dead
 usage(void)
@@ -143,13 +165,32 @@ usage(void)
 	fprintf(stderr,
 	    "usage: tcpbench -l\n"
 	    "       tcpbench [-v] [-B buf] [-k kvars] [-n connections] [-p port]\n"
-	    "                [-r rate] [-S space] [-V rtable] hostname\n"
+	    "                [-q] [-r rate] [-S space] [-V rtable] hostname\n"
 	    "       tcpbench -s [-v] [-B buf] [-k kvars] [-p port]\n"
-	    "                [-r rate] [-S space] [-V rtable]\n");
+	    "                [-q] [-r rate] [-S space] [-V rtable]\n");
 	exit(1);
 }

 static void
+signal_handler(int sig, short event, void *bula)
+{
+	/*
+	 * signal handler rules don't apply, libevent decouples for us
+	 */
+	switch (sig) {
+	case SIGINT:
+	case SIGTERM:
+	case SIGHUP:
+		warnx("Terminated by signal %d", sig);
+		exit(0);
+		break;		/* NOTREACHED */
+	default:
+		errx(1, "unexpected signal %d", sig);
+		break;		/* NOTREACHED */
+	}
+}
+
+static void
 saddr_ntop(const struct sockaddr *addr, socklen_t alen, char *buf, size_t len)
 {
 	char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV];
@@ -166,22 +207,34 @@ saddr_ntop(const struct sockaddr *addr,
 }

 static void
-set_timer(int toggle)
+drop_gid(void)
 {
-	struct itimerval itv;
+	gid_t gid;
+
+	gid = getgid();
+	if (setresgid(gid, gid, gid) == -1)
+		err(1, "setresgid");
+}
+
+static void
+set_slice_timer(int on)
+{
+	struct timeval tv;

 	if (rflag <= 0)
 		return;

-	if (toggle) {
-		itv.it_interval.tv_sec = rflag / 1000;
-		itv.it_interval.tv_usec = (rflag % 1000) * 1000;
-		itv.it_value = itv.it_interval;
-	}
-	else
-		bzero(&itv, sizeof(itv));
+	if (on) {
+		if (evtimer_pending(&mainstats.timer, NULL))
+			return;
+		timerclear(&tv);
+		tv.tv_usec = rflag * 1000;
 		
-	setitimer(ITIMER_REAL, &itv, NULL);
+		evtimer_add(&mainstats.timer, &tv);
+	} else {
+		if (evtimer_pending(&mainstats.timer, NULL))
+			evtimer_del(&mainstats.timer);
+	}
 }

 static void
@@ -192,21 +245,21 @@ print_header(void)
 	printf("%12s %14s %12s %8s ", "elapsed_ms", "bytes", "mbps",
 	    "bwidth");
 	
-	for (kv = kflag;  kflag != NULL && *kv != NULL; kv++)
-		printf("%s%s", kv != kflag ? "," : "", *kv);
+	for (kv = kvars;  kvars != NULL && *kv != NULL; kv++)
+		printf("%s%s", kv != kvars ? "," : "", *kv);
 	
 	printf("\n");
 }

 static void
-kget(kvm_t *kh, u_long addr, void *buf, size_t size)
+kget(u_long addr, void *buf, size_t size)
 {
-	if (kvm_read(kh, addr, buf, size) != (ssize_t)size)
-		errx(1, "kvm_read: %s", kvm_geterr(kh));
+	if (kvm_read(kvmh, addr, buf, size) != (ssize_t)size)
+		errx(1, "kvm_read: %s", kvm_geterr(kvmh));
 }

 static u_long
-kfind_tcb(kvm_t *kh, u_long ktcbtab, int sock)
+kfind_tcb(int sock)
 {
 	struct inpcbtable tcbtab;
 	struct inpcb *head, *next, *prev;
@@ -240,7 +293,7 @@ kfind_tcb(kvm_t *kh, u_long ktcbtab, int
 	if (vflag >= 2)
 		fprintf(stderr, "Using PCB table at %lu\n", ktcbtab);
 retry:
-	kget(kh, ktcbtab, &tcbtab, sizeof(tcbtab));
+	kget(ktcbtab, &tcbtab, sizeof(tcbtab));
 	prev = head = (struct inpcb *)&CIRCLEQ_FIRST(
 	    &((struct inpcbtable *)ktcbtab)->inpt_queue);
 	next = CIRCLEQ_FIRST(&tcbtab.inpt_queue);
@@ -250,7 +303,7 @@ retry:
 	while (next != head) {
 		if (vflag >= 2)
 			fprintf(stderr, "Checking PCB %p\n", next);
-		kget(kh, (u_long)next, &inpcb, sizeof(inpcb));
+		kget((u_long)next, &inpcb, sizeof(inpcb));
 		if (CIRCLEQ_PREV(&inpcb, inp_queue) != prev) {
 			if (nretry--) {
 				warnx("pcb prev pointer insane");
@@ -313,7 +366,7 @@ retry:
 			    in6->sin6_port != inpcb.inp_fport)
 				continue;
 		}
-		kget(kh, (u_long)inpcb.inp_ppcb, &tcpcb, sizeof(tcpcb));
+		kget((u_long)inpcb.inp_ppcb, &tcpcb, sizeof(tcpcb));
 		if (tcpcb.t_state != TCPS_ESTABLISHED) {
 			if (vflag >= 2)
 				fprintf(stderr, "Not established\n");
@@ -321,19 +374,19 @@ retry:
 		}
 		if (vflag >= 2)
 			fprintf(stderr, "Found PCB at %p\n", prev);
-		return (u_long)prev;
+		return ((u_long)prev);
 	}

 	errx(1, "No matching PCB found");
 }

 static void
-kupdate_stats(kvm_t *kh, u_long tcbaddr,
-    struct inpcb *inpcb, struct tcpcb *tcpcb, struct socket *sockb)
+kupdate_stats(u_long tcbaddr, struct inpcb *inpcb,
+    struct tcpcb *tcpcb, struct socket *sockb)
 {
-	kget(kh, tcbaddr, inpcb, sizeof(*inpcb));
-	kget(kh, (u_long)inpcb->inp_ppcb, tcpcb, sizeof(*tcpcb));
-	kget(kh, (u_long)inpcb->inp_socket, sockb, sizeof(*sockb));
+	kget(tcbaddr, inpcb, sizeof(*inpcb));
+	kget((u_long)inpcb->inp_ppcb, tcpcb, sizeof(*tcpcb));
+	kget((u_long)inpcb->inp_socket, sockb, sizeof(*sockb));
 }

 static void
@@ -371,22 +424,26 @@ check_prepare_kvars(char *list)
 			errx(1, "strdup");
 		ret[n] = NULL;
 	}
-	return ret;
+	return (ret);
 }

 static void
-stats_prepare(struct statctx *sc, int fd, kvm_t *kh, u_long ktcbtab)
+stats_prepare(struct statctx *sc)
 {
 	if (rflag <= 0)
 		return;
-	sc->kh = kh;
-	sc->kvars = kflag;
-	if (kflag)
-		sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd);
-	if (gettimeofday(&sc->t_start, NULL) == -1)
-		err(1, "gettimeofday");
+	sc->buf = dummybuf;
+	sc->buflen = dummybuf_len;
+	if (kvars)
+		sc->tcbaddr = kfind_tcb(sc->fd);
 	sc->t_last = sc->t_start;
 	sc->bytes = 0;
+	event_set(&sc->ev, sc->fd, EV_READ | EV_PERSIST,
+	    server_handle_sc, sc);
+	event_add(&sc->ev, NULL);
+	/* TODO: use clock_gettime() */
+	if (gettimeofday(&sc->t_start, NULL) == -1)
+		err(1, "gettimeofday");
 }

 static void
@@ -412,14 +469,14 @@ stats_display(unsigned long long total_e
 	printf("%12llu %14llu %12.3Lf %7.2f%% ", total_elapsed, sc->bytes,
 	    mbps, bwperc);
 	
-	if (sc->kvars != NULL) {
-		kupdate_stats(sc->kh, sc->tcbaddr, inpcb, tcpcb,
+	if (kvars != NULL) {
+		kupdate_stats(sc->tcbaddr, inpcb, tcpcb,
 		    sockb);

-		for (j = 0; sc->kvars[j] != NULL; j++) {
+		for (j = 0; kvars[j] != NULL; j++) {
 #define S(a) #a
 #define P(b, v, f)							\
-			if (strcmp(sc->kvars[j], S(b.v)) == 0) {	\
+			if (strcmp(kvars[j], S(b.v)) == 0) {	\
 				printf("%s"f, j > 0 ? "," : "", b->v);	\
 				continue;				\
 			}
@@ -463,30 +520,24 @@ stats_display(unsigned long long total_e
 }

 static void
-mainstats_display(long double slice_mbps, long double avg_mbps)
-{
-	printf("Conn: %3d Mbps: %12.3Lf Peak Mbps: %12.3Lf Avg Mbps: %12.3Lf\n",
-	    mainstats.nconns, slice_mbps, mainstats.peak_mbps, avg_mbps);
-}
-
-static void
-process_slice(struct statctx *sc, size_t nsc)
+process_slice(int fd, short event, void *bula)
 {
 	unsigned long long total_elapsed, since_last;
 	long double mbps, slice_mbps = 0;
 	float bwperc;
-	nfds_t i;
+	struct statctx *sc;
 	struct timeval t_cur, t_diff;
 	struct inpcb inpcb;
 	struct tcpcb tcpcb;
 	struct socket sockb;
 	
-	for (i = 0; i < nsc; i++, sc++) {
+	TAILQ_FOREACH(sc, &sc_queue, entry) {
 		if (gettimeofday(&t_cur, NULL) == -1)
 			err(1, "gettimeofday");
-		if (sc->kvars != NULL) /* process kernel stats */
-			kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb,
+		if (kvars != NULL) /* process kernel stats */
+			kupdate_stats(sc->tcbaddr, &inpcb, &tcpcb,
 			    &sockb);
+		
 		timersub(&t_cur, &sc->t_start, &t_diff);
 		total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
 		timersub(&t_cur, &sc->t_last, &t_diff);
@@ -495,64 +546,105 @@ process_slice(struct statctx *sc, size_t
 		mbps = (sc->bytes * 8) / (since_last * 1000.0);
 		slice_mbps += mbps;
 		
+		if (!qflag)
 		stats_display(total_elapsed, mbps, bwperc, sc,
 		    &inpcb, &tcpcb, &sockb);
 		
 		sc->t_last = t_cur;
 		sc->bytes = 0;
-
 	}

 	/* process stats for this slice */
 	if (slice_mbps > mainstats.peak_mbps)
 		mainstats.peak_mbps = slice_mbps;
-	mainstats_display(slice_mbps, slice_mbps / mainstats.nconns);
+	printf("Conn: %3d Mbps: %12.3Lf Peak Mbps: %12.3Lf Avg Mbps: %12.3Lf\n",
+	    mainstats.nconns, slice_mbps, mainstats.peak_mbps,
+	    slice_mbps / mainstats.nconns);
+	stats_cleanslice();
+	set_slice_timer(mainstats.nconns > 0);
 }

-static int
-handle_connection(struct statctx *sc, int fd, char *buf, size_t buflen)
+static void
+server_handle_sc(int fd, short event, void *v_sc)
 {
+	struct statctx *sc = v_sc;
 	ssize_t n;

 again:
-	n = read(fd, buf, buflen);
+	n = read(sc->fd, sc->buf, sc->buflen);
 	if (n == -1) {
 		if (errno == EINTR)
 			goto again;
 		else if (errno == EWOULDBLOCK)
-			return 0;
-		warn("fd %d read error", fd);
-		
-		return -1;
-	}
-	else if (n == 0) {
+			return;
+		warn("fd %d read error", sc->fd);
+		return;
+	} else if (n == 0) {
 		if (vflag)
-			fprintf(stderr, "%8d closed by remote end\n", fd);
-		close(fd);
-		return -1;
+			fprintf(stderr, "%8d closed by remote end\n", sc->fd);
+		close(sc->fd);
+		TAILQ_REMOVE(&sc_queue, sc, entry);
+		free(sc);
+		mainstats.nconns--;
+		set_slice_timer(mainstats.nconns > 0);
+		return;
 	}
 	if (vflag >= 3)
 		fprintf(stderr, "read: %zd bytes\n", n);
-	
 	stats_update(sc, n);
-	return 0;
+}
+
+static void
+server_accept(int fd, short event, void *bula)
+{
+	int sock, r;
+	struct statctx *sc;
+	struct sockaddr_storage ss;
+	socklen_t sslen;
+	char tmp[128];
+	
+	sslen = sizeof(ss);
+again:	
+	if ((sock = accept(fd, (struct sockaddr *)&ss,
+	    &sslen)) == -1) {
+		if (errno == EINTR)
+			goto again;
+		warn("accept");
+		return;
+	}
+	saddr_ntop((struct sockaddr *)&ss, sslen,
+	    tmp, sizeof(tmp));
+	if ((r = fcntl(sock, F_GETFL, 0)) == -1)
+		err(1, "fcntl(F_GETFL)");
+	r |= O_NONBLOCK;
+	if (fcntl(sock, F_SETFL, r) == -1)
+		err(1, "fcntl(F_SETFL, O_NONBLOCK)");
+	/* Alloc client structure and register reading callback */
+	if ((sc = calloc(1, sizeof(*sc))) == NULL)
+		err(1, "calloc");
+	sc->fd = sock;
+	stats_prepare(sc);
+	event_set(&sc->ev, sc->fd, EV_READ | EV_PERSIST,
+	    server_handle_sc, sc);
+	event_add(&sc->ev, NULL);
+	TAILQ_INSERT_TAIL(&sc_queue, sc, entry);
+	mainstats.nconns++;
+	set_slice_timer(mainstats.nconns > 0);
+	if (vflag)
+		warnx("Accepted connection from %s, fd = %d\n", tmp, sc->fd);
 }

 static nfds_t
-serverbind(struct pollfd *pfd, nfds_t max_nfds, struct addrinfo *aitop)
+server_init(struct addrinfo *aitop)
 {
 	char tmp[128];
 	int sock, on = 1;
 	struct addrinfo *ai;
+	struct event *ev;
 	nfds_t lnfds;

 	lnfds = 0;
 	for (ai = aitop; ai != NULL; ai = ai->ai_next) {
-		if (lnfds == max_nfds) {
-			fprintf(stderr,
-			    "maximum number of listening fds reached\n");
-			break;
-		}
 		saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp));
 		if (vflag)
 			fprintf(stderr, "Try to listen on %s\n", tmp);
@@ -594,165 +686,47 @@ serverbind(struct pollfd *pfd, nfds_t ma
 			close(sock);
 			continue;
 		}
+		if ((ev = calloc(1, sizeof(*ev))) == NULL)
+			err(1, "calloc");
+		event_set(ev, sock, EV_READ | EV_PERSIST, server_accept, NULL);
+		event_add(ev, NULL);
 		if (vflag >= 3)
 			fprintf(stderr, "listening on fd %d\n", sock);
 		lnfds++;
-		pfd[lnfds - 1].fd = sock;
-		pfd[lnfds - 1].events = POLLIN;
-
 	}
 	freeaddrinfo(aitop);
 	if (lnfds == 0)
 		errx(1, "No working listen addresses found");

-	return lnfds;
+	return (lnfds);
 }	

 static void
-set_listening(struct pollfd *pfd, nfds_t lfds, int toggle) {
-	int i;
-
-	for (i = 0; i < (int)lfds; i++) {
-		if (toggle)
-			pfd[i].events = POLLIN;
-		else
-			pfd[i].events = 0;
-	}
-			
-}
-static void __dead
-serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop)
+client_handle_sc(int fd, short event, void *v_sc)
 {
-	socklen_t sslen;
-	struct pollfd *pfd;
-	char tmp[128], *buf;
-	struct statctx *psc;
-	struct sockaddr_storage ss;
-	nfds_t i, nfds, lfds;
-	size_t nalloc;
-	int r, sock, client_id;
-
-	sslen = sizeof(ss);
-	nalloc = 128;
-	if ((pfd = calloc(sizeof(*pfd), nalloc)) == NULL)
-		err(1, "calloc");
-	if ((psc = calloc(sizeof(*psc), nalloc)) == NULL)
-		err(1, "calloc");
-	if ((buf = malloc(Bflag)) == NULL)
-		err(1, "malloc");
-	lfds = nfds = serverbind(pfd, nalloc - 1, aitop);
-	if (vflag >= 3)
-		fprintf(stderr, "listening on %d fds\n", lfds);
-	if (setpgid(0, 0) == -1)
-		err(1, "setpgid");
-	
-	print_header();
-	
-	client_id = 0;
-	while (!done) {
-		if (proc_slice) {
-			process_slice(psc + lfds, nfds - lfds);
-			stats_cleanslice();
-			proc_slice = 0;
-		}
-		if (vflag >= 3)
-			fprintf(stderr, "mainstats.nconns = %u\n",
-			    mainstats.nconns);
-		if ((r = poll(pfd, nfds, INFTIM)) == -1) {
-			if (errno == EINTR)
-				continue;
-			warn("poll");
-			break;
-		}
-
-		if (vflag >= 3)
-			fprintf(stderr, "poll: %d\n", r);
-		for (i = 0 ; r > 0 && i < nfds; i++) {
-			if ((pfd[i].revents & POLLIN) == 0)
-				continue;
-			if (pfd[i].fd == -1)
-				errx(1, "pfd insane");
-			r--;
-			if (vflag >= 3)
-				fprintf(stderr, "fd %d active i = %d\n",
-				    pfd[i].fd, i);
-			/* new connection */
-			if (i < lfds) {
-				if ((sock = accept(pfd[i].fd,
-				    (struct sockaddr *)&ss,
-				    &sslen)) == -1) {
-					if (errno == EINTR)
-						continue;
-					else if (errno == EMFILE ||
-					    errno == ENFILE)
-						set_listening(pfd, lfds, 0);
-					warn("accept");
-					continue;
-				}
-				if ((r = fcntl(sock, F_GETFL, 0)) == -1)
-					err(1, "fcntl(F_GETFL)");
-				r |= O_NONBLOCK;
-				if (fcntl(sock, F_SETFL, r) == -1)
-					err(1, "fcntl(F_SETFL, O_NONBLOCK)");
-				saddr_ntop((struct sockaddr *)&ss, sslen,
-				    tmp, sizeof(tmp));
-				if (vflag)
-					fprintf(stderr,
-					    "Accepted connection %d from "
-					    "%s, fd = %d\n", client_id++, tmp,
-					     sock);
-				/* alloc more space if we're full */
-				if (nfds == nalloc) {
-					nalloc *= 2;
-					if ((pfd = realloc(pfd,
-					    sizeof(*pfd) * nalloc)) == NULL)
-						err(1, "realloc");
-					if ((psc = realloc(psc,
-					    sizeof(*psc) * nalloc)) == NULL)
-						err(1, "realloc");
+	struct statctx *sc = v_sc;
+	ssize_t n;
+again:
+	if ((n = write(sc->fd, sc->buf, sc->buflen)) == -1) {
+		if (errno == EINTR || errno == EAGAIN)
+			goto again;
+		err(1, "write");
 				}
-				pfd[nfds].fd = sock;
-				pfd[nfds].events = POLLIN;
-				stats_prepare(&psc[nfds], sock, kvmh, ktcbtab);
-				nfds++;
-				if (!mainstats.nconns++)
-					set_timer(1);
-				continue;
+	if (n == 0) {
+		warnx("Remote end closed connection");
+		exit(1);
 			}
-			/* event in fd */
 			if (vflag >= 3)
-				fprintf(stderr,
-				    "fd %d active", pfd[i].fd);
-			while (handle_connection(&psc[i], pfd[i].fd,
-			    buf, Bflag) == -1) {
-				pfd[i] = pfd[nfds - 1];
-				pfd[nfds - 1].fd = -1;
-				psc[i] = psc[nfds - 1];
-				mainstats.nconns--;
-				nfds--;
-				/* stop display if no clients */
-				if (!mainstats.nconns) {
-					proc_slice = 1;
-					set_timer(0);
-				}
-				/* if we were full */
-				set_listening(pfd, lfds, 1);
-
-				/* is there an event pending on the last fd? */
-				if (pfd[i].fd == -1 ||
-				    (pfd[i].revents & POLLIN) == 0)
-					break;
-			}
-		}
-	}
-	exit(1);
+		warnx("write: %zd bytes\n", n);
+	stats_update(sc, n);
 }

-void
-clientconnect(struct addrinfo *aitop, struct pollfd *pfd, int nconn)
+static void
+client_init(struct addrinfo *aitop, int nconn)
 {
-	char tmp[128];
+	struct statctx *sc;
 	struct addrinfo *ai;
+	char tmp[128];
 	int i, r, sock;

 	for (i = 0; i < nconn; i++) {
@@ -793,15 +767,22 @@ clientconnect(struct addrinfo *aitop, st
 		}
 		if (sock == -1)
 			errx(1, "No host found");
-
 		if ((r = fcntl(sock, F_GETFL, 0)) == -1)
 			err(1, "fcntl(F_GETFL)");
 		r |= O_NONBLOCK;
 		if (fcntl(sock, F_SETFL, r) == -1)
 			err(1, "fcntl(F_SETFL, O_NONBLOCK)");
-
-		pfd[i].fd = sock;
-		pfd[i].events = POLLOUT;
+		/* Alloc and prepare stats */
+		if ((sc = calloc(1, sizeof(*sc))) == NULL)
+			err(1, "calloc");
+		sc->fd = sock;
+		stats_prepare(sc);
+		event_set(&sc->ev, sc->fd, EV_WRITE | EV_PERSIST,
+		    client_handle_sc, sc);
+		event_add(&sc->ev, NULL);
+		TAILQ_INSERT_TAIL(&sc_queue, sc, entry);
+		mainstats.nconns++;
+		set_slice_timer(mainstats.nconns > 0);
 	}
 	freeaddrinfo(aitop);

@@ -809,104 +790,28 @@ clientconnect(struct addrinfo *aitop, st
 		fprintf(stderr, "%u connections established\n", nconn);
 }

-static void __dead
-clientloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop, int nconn)
-{
-	struct statctx *psc;
-	struct pollfd *pfd;
-	char *buf;
-	int i;
-	ssize_t n;
-
-	if ((pfd = calloc(nconn, sizeof(*pfd))) == NULL)
-		err(1, "clientloop pfd calloc");
-	if ((psc = calloc(nconn, sizeof(*psc))) == NULL)
-		err(1, "clientloop psc calloc");
-	
-	clientconnect(aitop, pfd, nconn);
-
-	for (i = 0; i < nconn; i++) {
-		stats_prepare(psc + i, pfd[i].fd, kvmh, ktcbtab);
-		mainstats.nconns++;
-	}
-
-	if ((buf = malloc(Bflag)) == NULL)
-		err(1, "malloc");
-	arc4random_buf(buf, Bflag);
-
-	print_header();
-	set_timer(1);
-
-	while (!done) {
-		if (proc_slice) {
-			process_slice(psc, nconn);
-			stats_cleanslice();
-			proc_slice = 0;
-		}
-		if (poll(pfd, nconn, INFTIM) == -1) {
-			if (errno == EINTR)
-				continue;
-			err(1, "poll");
-		}
-		for (i = 0; i < nconn; i++) {
-			if (pfd[i].revents & POLLOUT) {
-				if ((n = write(pfd[i].fd, buf, Bflag)) == -1) {
-					if (errno == EINTR || errno == EAGAIN)
-						continue;
-					err(1, "write");
-				}
-				if (n == 0) {
-					warnx("Remote end closed connection");
-					done = -1;
-					break;
-				}
-				if (vflag >= 3)
-					fprintf(stderr, "write: %zd bytes\n",
-					    n);
-				stats_update(psc + i, n);
-			}
-		}
-	}
-	
-	if (done > 0)
-		warnx("Terminated by signal %d", done);
-
-	free(buf);
-	exit(0);
-}
-
-static void
-drop_gid(void)
-{
-	gid_t gid;
-
-	gid = getgid();
-	if (setresgid(gid, gid, gid) == -1)
-		err(1, "setresgid");
-}
-
 int
 main(int argc, char **argv)
 {
 	extern int optind;
 	extern char *optarg;
-
 	char kerr[_POSIX2_LINE_MAX], *tmp;
 	struct addrinfo *aitop, hints;
 	const char *errstr;
-	kvm_t *kvmh = NULL;
 	struct rlimit rl;
-	int ch, herr;
+	int ch, herr, nconn;
 	struct nlist nl[] = { { "_tcbtable" }, { "" } };
 	const char *host = NULL, *port = DEFAULT_PORT;
-	int nconn = 1;
+	struct event ev_sigint, ev_sigterm, ev_sighup;

-	Bflag = DEFAULT_BUF;
+	dummybuf_len = DEFAULT_BUF;
 	Sflag = sflag = vflag = rtableid = 0;
-	kflag = NULL;
+	kvmh  = NULL;
+	kvars = NULL;
 	rflag = DEFAULT_STATS_INTERVAL;
+	nconn = 1;

-	while ((ch = getopt(argc, argv, "B:hlk:n:p:r:sS:vV:")) != -1) {
+	while ((ch = getopt(argc, argv, "B:hlk:n:p:qr:sS:vV:")) != -1) {
 		switch (ch) {
 		case 'l':
 			list_kvars();
@@ -914,7 +819,7 @@ main(int argc, char **argv)
 		case 'k':
 			if ((tmp = strdup(optarg)) == NULL)
 				errx(1, "strdup");
-			kflag = check_prepare_kvars(tmp);
+			kvars = check_prepare_kvars(tmp);
 			free(tmp);
 			break;
 		case 'r':
@@ -938,7 +843,7 @@ main(int argc, char **argv)
 				    errstr, optarg);
 			break;
 		case 'B':
-			Bflag = strtonum(optarg, 0, 1024*1024*1024,
+			dummybuf_len = strtonum(optarg, 0, 1024*1024*1024,
 			    &errstr);
 			if (errstr != NULL)
 				errx(1, "read/write buffer size is %s: %s",
@@ -960,6 +865,9 @@ main(int argc, char **argv)
 				errx(1, "number of connections is %s: %s",
 				    errstr, optarg);
 			break;
+		case 'q':
+			qflag = 1;
+			break;
 		case 'h':
 		default:
 			usage();
@@ -984,23 +892,17 @@ main(int argc, char **argv)
 		else
 			errx(1, "getaddrinfo: %s", gai_strerror(herr));
 	}
-
-	if (kflag) {
+	if (kvars) {
 		if ((kvmh = kvm_openfiles(NULL, NULL, NULL,
 		    O_RDONLY, kerr)) == NULL)
 			errx(1, "kvm_open: %s", kerr);
 		drop_gid();
 		if (kvm_nlist(kvmh, nl) < 0 || nl[0].n_type == 0)
 			errx(1, "kvm: no namelist");
+		ktcbtab = nl[0].n_value;
 	} else
 		drop_gid();

-	signal(SIGINT, exitsighand);
-	signal(SIGTERM, exitsighand);
-	signal(SIGHUP, exitsighand);
-	signal(SIGPIPE, SIG_IGN);
-	signal(SIGALRM, alarmhandler);
-
 	if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
 		err(1, "getrlimit");
 	if (rl.rlim_cur < MAX_FD)
@@ -1010,10 +912,36 @@ main(int argc, char **argv)
 	if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
 		err(1, "getrlimit");
 	
-	if (sflag)
-		serverloop(kvmh, nl[0].n_value, aitop);
-	else
-		clientloop(kvmh, nl[0].n_value, aitop, nconn);
+	/* Init world */
+	TAILQ_INIT(&sc_queue);
+	if ((dummybuf = malloc(dummybuf_len)) == NULL)
+		err(1, "malloc");
+	arc4random_buf(dummybuf, dummybuf_len);

-	return 0;
+	/* Setup libevent and signals */
+	event_init();
+	signal_set(&ev_sigterm, SIGTERM, signal_handler, NULL);
+	signal_set(&ev_sighup, SIGHUP, signal_handler, NULL);
+	signal_set(&ev_sigint, SIGINT, signal_handler, NULL);
+	signal_add(&ev_sigint, NULL);
+	signal_add(&ev_sigterm, NULL);
+	signal_add(&ev_sighup, NULL);
+	signal(SIGPIPE, SIG_IGN);
+	
+	print_header();
+	
+	/* Slice stats timer */
+	evtimer_set(&mainstats.timer, process_slice, NULL);
+	
+	if (sflag) {
+		(void)server_init(aitop);
+		if (setpgid(0, 0) == -1)
+			err(1, "setpgid");
+	} else
+		client_init(aitop, nconn);
+	
+	/* libevent main loop*/
+	event_dispatch();
+
+	return (0);
 }
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]

Messages in current thread:
Re: tcpbench@libevent, Christiano F. Haesbaert, (Thu Oct 21, 7:30 am)
Re: tcpbench@libevent, Christiano F. Haesbaert, (Thu Oct 21, 4:51 pm)