Re: Linux Kernel Splice Race Condition with page invalidation

!MAILaRCHIVE_VOTE_RePLACE
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]
To: Eugene Teo <eugeneteo@...>
Cc: <netdev@...>, linux-kernel <linux-kernel@...>, Alexandre LISSY <alexandre.lissy@...>
Date: Friday, August 29, 2008 - 5:58 am

I forgot the example programs from the forward, thanks Eugene for the
reminder.

So here they are:

epoll+splice.c
============================================================
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <signal.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <arpa/inet.h>
#include <netinet/in.h>

#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>

#define MAX_EVENTS 	32
/* #define BUF_SIZE	1400	// ==> ~ 25-30% de CPU à 4096 clients */
/* #define BUF_SIZE	32768	// ==> ~ 50% de CPU à 4096 clients */
/* #define BUF_SIZE	8192	// ==> ~ 35% de CPU à 4096 clients */
#define BUF_SIZE	131072
#define MAX_CONNEXIONS	16384
#define SERVER_IP	"127.0.0.1"
#define SERVER_PORT	8003

typedef enum {
	INITIAL			= 1,
	RECU_REQUETE_CLIENT	= 2,
	ATT_REPONSE_SERVEUR	= 3
} proxy_status ;

struct proxy
{
	unsigned char type;
	int client_fd;	/* fd connected to client */
	int server_fd;  /* fd connected to server */
	/* 
	 * 0 : client
	 * 1 : server
	 */
	ssize_t datalen;
	int curpos;
	proxy_status Statut;
	char * buf;
	struct epoll_event * ev;
	struct proxy * peer;
	int * tube;
};

struct poll {
	void * socks_lock;
/*	void * socks; */
	int socket_fd;
	int epoll_fd;
	struct proxy * pr;
};

/* typedef struct proxy epoll_data_t; */

struct poll gpoll;

/* struct proxy Connexions[MAX_CONNEXIONS];
unsigned int curConnexionsPos = 0; */

void setnonblocking(int fd)
{
	fcntl(fd, F_SETFL, ( fcntl(fd, F_GETFL) | O_NONBLOCK ));
}

/*
 * Init control.
 * Init epoll.
 * Bind and listen on control port.
 */
void 	poll_init_tcp()
{
    struct sockaddr_in  	saddr;
    struct epoll_event		*event;
    struct proxy		*Proxy;
    int 			i = 1;

    /* pthread_mutex_init(&gpoll.socks_lock, NULL);

    gpoll.socks		= NULL; */
    /* Init epoll */
    gpoll.epoll_fd 		= epoll_create(32);
    gpoll.socket_fd		= socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);

    event = (struct epoll_event *)malloc(sizeof(struct epoll_event));
    Proxy = (struct proxy *)malloc(sizeof(struct proxy));
    if(event == NULL || Proxy == NULL)
    {
    	perror("malloc()");
	return;
    }
    memset(event, 0, sizeof(struct epoll_event));
    memset(Proxy, 0, sizeof(struct proxy));
    event->events 		= EPOLLIN | EPOLLOUT;
    Proxy->client_fd		= gpoll.socket_fd;
    Proxy->server_fd		= gpoll.socket_fd;
    Proxy->curpos		= 0;
    Proxy->ev			= event;
    event->data.ptr		= Proxy;
    gpoll.pr			= Proxy;

    fprintf(stderr, "Stored fd : %d, %d in %p\n", Proxy->client_fd, Proxy->server_fd, Proxy);

    saddr.sin_family		= AF_INET;
    saddr.sin_addr.s_addr 	= INADDR_ANY;
    saddr.sin_port 		= htons(8080);

    if (gpoll.socket_fd == -1)
        fprintf(stderr, "back-ch: socket SOCK_STREAM: %s\n", strerror(errno));

    if (-1 == setsockopt(gpoll.socket_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (i)))
        fprintf(stderr, "back-ch: setsockopt SO_REUSEADDR: %s\n", strerror(errno));

    if (-1 == bind(gpoll.socket_fd, (struct sockaddr *)&saddr, sizeof (saddr)))
        fprintf(stderr, "back-ch: bind: %s\n", strerror(errno));

    if (-1 == listen(gpoll.socket_fd, 10))
        fprintf(stderr, "ctlchannel: listen: %s\n", strerror(errno));

    if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_ADD, gpoll.socket_fd, event) < 0)
        fprintf(stderr, "cannot control epoll");

    setnonblocking(gpoll.epoll_fd);
}


/*
 * This function accept an incoming connection, add it to epoll, set it to non-blocking mode, 
 * create a new sock struct, fill it and add it to the internal chained list.
 */
void	accept_sock(void)
{
    int sd, dest;
    int * tube;
    struct sockaddr saddr;
    struct sockaddr_in dest_addr;
    struct proxy * Client, * Serveur;
    char * buffer;
    struct epoll_event * evClient, * evServeur;
    socklen_t saddrlen;
    socklen_t dest_addrlen;

    /* Accept connection */
    saddrlen = sizeof(saddr);
    sd = accept(gpoll.socket_fd, &saddr, &saddrlen);

    dest_addrlen = sizeof(dest_addr);
    dest_addr.sin_family = AF_INET;
    dest_addr.sin_port   = htons(SERVER_PORT);
    inet_aton(SERVER_IP, &dest_addr.sin_addr);

    dest = socket(PF_INET, SOCK_STREAM, 0); 
    if(dest == -1)
    {
	perror("socket()");
	return;
    }

    if( connect(dest, (struct sockaddr *) &dest_addr, dest_addrlen) == -1 )
    {
	perror("connect()");
	if(shutdown(sd, SHUT_RDWR) == -1)
	{
		perror("shutdown()");
	}
	return;
    }

    tube = (int *)malloc(sizeof(int)*2);
    Client = (struct proxy *)malloc(sizeof(struct proxy));
    Serveur = (struct proxy *)malloc(sizeof(struct proxy));
    evClient = (struct epoll_event *)malloc(sizeof(struct epoll_event));
    evServeur = (struct epoll_event *)malloc(sizeof(struct epoll_event));
    buffer = (char *)malloc(sizeof(char)*BUF_SIZE);
    if(buffer == NULL || tube == NULL || Client == NULL || Serveur == NULL || evClient == NULL || evServeur == NULL)
    {
	perror("malloc()");
	exit(EXIT_FAILURE);
    }

    if(pipe(tube) < 0)
    {
    	perror("pipe()");
	exit(EXIT_FAILURE);
    }

    Client->client_fd	= sd;
    Client->server_fd	= dest;
    Client->curpos	= 0;
    Client->datalen	= 16;
    Client->type	= 0;
    Client->buf		= buffer;
    Client->Statut	= 0;
    Client->ev		= evClient;
    Client->peer	= Serveur;
    Client->tube	= tube;
    Serveur->client_fd	= sd;
    Serveur->server_fd	= dest;
    Serveur->curpos	= 0;
    Serveur->datalen	= 16;
    Serveur->type	= 1;
    Serveur->buf	= buffer;
    Serveur->Statut	= 0;
    Serveur->ev		= evServeur;
    Serveur->peer	= Client;
    Serveur->tube	= tube;

    memset(evClient, 0, sizeof(struct epoll_event));
    memset(evServeur, 0, sizeof(struct epoll_event));
    evClient->events	= EPOLLIN | EPOLLOUT | EPOLLET;
    evClient->data.ptr	= Client;
    evServeur->events	= EPOLLIN | EPOLLOUT | EPOLLET;
    evServeur->data.ptr	= Serveur;


    if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_ADD, sd, evClient))
        fprintf(stderr, "problem with client socket");

    if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_ADD, dest, evServeur))
        fprintf(stderr, "problem with server socket");

    setnonblocking(dest);
    setnonblocking(sd);

#ifdef VERBOSE
    fprintf(stderr, "accept() on fd %d\n", sd);
    fprintf(stderr, "connect() on fd %d\n", dest);
#endif
}

void close_socket(struct proxy * p, unsigned char peer)
{
	int cfd, sfd, result;

	cfd = p->client_fd;
	sfd = p->server_fd;

	if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_DEL, cfd, NULL))
		perror("epoll_ctl()");
	if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_DEL, sfd, NULL))
		perror("epoll_ctl()");

	if(p->type == 0)
	{
#ifdef VERBOSE
		fprintf(stderr, "Freeing buffer @ %p\n", p->buf);
#endif
		free(p->buf);

#ifdef VERBOSE
		fprintf(stderr, "Freeing pipe @ %p\n", p->tube);
#endif
		close(p->tube[0]);
		close(p->tube[1]);
		free(p->tube);
	}
#ifdef VERBOSE
	fprintf(stderr, "Freeing struct epoll_event @ %p\n", p->ev);
#endif
	free(p->ev);
	if(peer == 1)
	{
#ifdef VERBOSE
		fprintf(stderr, "Freeing peer's struct proxy @ %p\n", p->peer);
#endif
		close_socket(p->peer, 0);
	}
#ifdef VERBOSE
	fprintf(stderr, "Freeing struct proxy @ %p\n", p);
#endif
	free(p);

#ifdef VERBOSE
	fprintf(stderr, "Shutting down fds (%d, %d)\n", sfd, cfd);
#endif
	result = shutdown(sfd, SHUT_RDWR);
	if(result == -1)
		perror("shutdown()");

	result = shutdown(cfd, SHUT_RDWR);
	if(result == -1)
		perror("shutdown()");
}

void	poll_loop()
{
    struct epoll_event 	events[MAX_EVENTS];
    int n = 0, repfd = 0, fd = 0;
    long read_incoming, write_incoming, write_outcoming;
    struct proxy * p;
    unsigned char type;

    memset(events, 0, sizeof(struct epoll_event)*MAX_EVENTS);

    for(;;) 
    {
        int nfds = epoll_wait(gpoll.epoll_fd, events, MAX_EVENTS, -1);
        for (n = 0; n < nfds; ++n) 
        {
#ifdef DEBUG
	    fprintf(stderr, "(EPOLLIN=%d, EPOLLOUT=%d, EPOLLRDHUP=%d, EPOLLPRI=%d, EPOLLERR=%d, EPOLLHUP=%d)\n",
	    	events[n].events & EPOLLIN,
	    	events[n].events & EPOLLOUT,
	    	events[n].events & EPOLLRDHUP,
	    	events[n].events & EPOLLPRI,
	    	events[n].events & EPOLLERR,
	    	events[n].events & EPOLLHUP
		);

		fprintf(stderr, "Retrieving user data from %p\n", events[n].data.ptr);
#endif

		p = events[n].data.ptr;
		if (events[n].events & EPOLLIN)
		{
			if (p->server_fd == gpoll.socket_fd && (int)p->client_fd == gpoll.socket_fd)
				accept_sock();
		}

		type	= p->type;
		/**
		 * Type :
		 * 	0 => Client
		 * 	1 => Serveur
		 **/
		switch(type)
		{
			case 0:	fd	= p->client_fd;
				repfd	= p->server_fd;
				break;
			case 1:	fd	= p->server_fd;
				repfd	= p->client_fd;
				break;
		}

		if (events[n].events & EPOLLHUP || events[n].events & EPOLLRDHUP)
		{
			/* Suppression des FDs concernant les sockets morts pour epoll. */
			close_socket(p, 1);
			continue;
		}

            if (events[n].events & EPOLLIN)
            {
#ifdef DEBUG
		fprintf(stderr, "fd %d is ready for reading !\n", fd);
#endif
		if(p->buf != NULL)
		{
			read_incoming = splice(fd, NULL, p->tube[1], NULL, 1400, SPLICE_F_NONBLOCK | SPLICE_F_MORE | SPLICE_F_MOVE);
			if(read_incoming < 0)
			{
				if(errno == EAGAIN)
				{
					fprintf(stderr, "EAGAIN: IN=%d, OUT=%d\n", fd, p->tube[1]);
					continue;
				}
				perror("splice()");
#ifdef DEBUG
				fprintf(stderr, "Was: %ld = splice(%d, %p, %d, %p, %d, %d);\n",
					read_incoming,
					fd,
					NULL,
					p->tube[1],
					NULL,
					12*1024,
					SPLICE_F_NONBLOCK
				);
#endif
				break;
			} else {
				if(read_incoming == 0)
				{
					fprintf(stderr, "Something's wrong. Closing this proxy.\n");
					close_socket(p, 1);
					continue;
				}

#ifdef DEBUG
				fprintf(stderr, "Splice()'d %lu bytes from %d to %d\n", read_incoming, fd, p->tube[1]);
#endif
				write_outcoming = read_incoming;
				while(write_outcoming > 0)
				{
				
					write_incoming = splice(p->tube[0], NULL, repfd, NULL, write_outcoming, SPLICE_F_NONBLOCK | SPLICE_F_MORE | SPLICE_F_MOVE);
					if(write_incoming < 0)
					{
						if(write_incoming == -EAGAIN)
						{
							fprintf(stderr, "EAGAIN: IN=%d, OUT=%d\n", p->tube[0], repfd);
							continue;
						}

						perror("splice()");
						break;
					}
				
					write_outcoming -= write_incoming;
#ifdef DEBUG
					fprintf(stderr, "Splice()'d %lu bytes from %d to %d\n", write_incoming, p->tube[0], repfd);
#endif
#ifdef DEBUG
					fprintf(stderr, "Splice()'d %lu bytes from %d to %d (via %d, %d). Still %lu bytes to send.\n", write_incoming, fd, repfd, p->tube[0], p->tube[1], write_outcoming);
#endif
				}

				switch(type)
				{
					case 0: /* Socket client prêt en lecture */
						break;

					case 1: /* Socket serveur prêt en lecture */
						break;
				}
			}
		}
            }
	}
    }
}

void handler(int signo)
{
	if(signo == SIGTERM || signo == SIGINT)
	{
		fprintf(stderr, "Got SIGTERM or SIGINT, cleaning up things ...\n");
		epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_DEL, gpoll.socket_fd, NULL);
		shutdown(gpoll.socket_fd, SHUT_RDWR);
		free(gpoll.pr->buf);
		free(gpoll.pr->ev);
		free(gpoll.pr);
		exit(EXIT_SUCCESS);
	} else {
		fprintf(stderr, "UNKNOWN SIGNAL !!! : %d\n", signo);
	}
}

int main(int argc, char ** argv)
{
	signal(SIGTERM, handler);
	signal(SIGINT, handler);
	poll_init_tcp();
	poll_loop();

	return EXIT_SUCCESS;
}
============================================================


epoll.c
============================================================
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <arpa/inet.h>
#include <netinet/in.h>

#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>

#define MAX_EVENTS 	2
/* #define BUF_SIZE	1400	// ==> ~ 25-30% de CPU à 4096 clients */
/* #define BUF_SIZE	32768	// ==> ~ 50% de CPU à 4096 clients */
#define BUF_SIZE	1400	// ==> ~ 35% de CPU à 4096 clients
#define MAX_CONNEXIONS	16384
#define SERVER_IP	"127.0.0.1"
#define SERVER_PORT	8003

typedef enum {
	INITIAL			= 1,
	RECU_REQUETE_CLIENT	= 2,
	ATT_REPONSE_SERVEUR	= 3
} proxy_status ;

struct proxy
{
	unsigned char type;
	int client_fd;	/* fd connected to client */
	int server_fd;  /* fd connected to server */
	/* 
	 * 0 : client
	 * 1 : server
	 */
	ssize_t datalen;
	int curpos;
	proxy_status Statut;
	char * buf;
	struct epoll_event * ev;
	struct proxy * peer;
};

struct poll {
	void * socks_lock;
/*	void * socks; */
	int socket_fd;
	int epoll_fd;
	struct proxy * pr;
};

/* typedef struct proxy epoll_data_t; */

struct poll gpoll;

/* struct proxy Connexions[MAX_CONNEXIONS];
unsigned int curConnexionsPos = 0; */

void setnonblocking(int fd)
{
	fcntl(fd, F_SETFL, ( fcntl(fd, F_GETFL) | O_NONBLOCK ));
}

/**
 * Recherche du FD de l'autre entité.
 *
 * type :
 * 	0 => client
 * 	1 => serveur
int find_peer(int fd, struct proxy ** target, unsigned char * type)
{
	int i, found_fd;
	struct proxy *p;

	fprintf(stderr, "Looking for fd %d\n", fd);
	for(i = 0; i < curConnexionsPos; i++)
	{
		p = &Connexions[i];
		fprintf(stderr, "p->client_fd=%d\np->server_fd=%d\n\n", p->client_fd, p->server_fd);

		if(p->client_fd == fd)
		{
			found_fd = p->server_fd;
			*type	 = 0;
		}
		else if(p->server_fd == fd)
		{
			found_fd = p->client_fd;
			*type    = 1;
		}

		*target = p;
		fprintf(stderr, "Found at p=%p\n", p);

		return found_fd;
	}

	errno = EBADF;
	return -1;
}
*/

/*
 * Init control.
 * Init epoll.
 * Bind and listen on control port.
 */
void 	poll_init_tcp()
{
    struct sockaddr_in  	saddr;
    struct epoll_event		*event;
    struct proxy		*Proxy;
    int 			i = 1;

    /* pthread_mutex_init(&gpoll.socks_lock, NULL);

    gpoll.socks		= NULL; */
    /* Init epoll */
    gpoll.epoll_fd 		= epoll_create(2);
    gpoll.socket_fd		= socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);

    event = (struct epoll_event *)malloc(sizeof(struct epoll_event));
    Proxy = (struct proxy *)malloc(sizeof(struct proxy));
    if(event == NULL || Proxy == NULL)
    {
    	perror("malloc()");
	return;
    }
    memset(event, 0, sizeof(struct epoll_event));
    memset(Proxy, 0, sizeof(struct proxy));
    event->events 		= EPOLLIN | EPOLLOUT;
    Proxy->client_fd		= gpoll.socket_fd;
    Proxy->server_fd		= gpoll.socket_fd;
    Proxy->curpos		= 0;
    Proxy->ev			= event;
    event->data.ptr		= Proxy;
    gpoll.pr			= Proxy;

    fprintf(stderr, "Stored fd : %d, %d in %p\n", Proxy->client_fd, Proxy->server_fd, Proxy);

    saddr.sin_family		= AF_INET;
    saddr.sin_addr.s_addr 	= INADDR_ANY;
    saddr.sin_port 		= htons(8080);

    if (gpoll.socket_fd == -1)
        fprintf(stderr, "back-ch: socket SOCK_STREAM: %s\n", strerror(errno));

    if (-1 == setsockopt(gpoll.socket_fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof (i)))
        fprintf(stderr, "back-ch: setsockopt SO_REUSEADDR: %s\n", strerror(errno));

    if (-1 == bind(gpoll.socket_fd, (struct sockaddr *)&saddr, sizeof (saddr)))
        fprintf(stderr, "back-ch: bind: %s\n", strerror(errno));

    if (-1 == listen(gpoll.socket_fd, 10))
        fprintf(stderr, "ctlchannel: listen: %s\n", strerror(errno));

    if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_ADD, gpoll.socket_fd, event) < 0)
        fprintf(stderr, "cannot control epoll");

    setnonblocking(gpoll.epoll_fd);
}


/*
 * This function accept an incoming connection, add it to epoll, set it to non-blocking mode, 
 * create a new sock struct, fill it and add it to the internal chained list.
 */
void	accept_sock(void)
{
    int sd, dest;
    struct sockaddr saddr;
    struct sockaddr_in dest_addr;
    struct proxy * Client, * Serveur;
    char * buffer;
    struct epoll_event * evClient, * evServeur;
    socklen_t saddrlen;
    socklen_t dest_addrlen;

    /* Accept connection */
    saddrlen = sizeof(saddr);
    sd = accept(gpoll.socket_fd, &saddr, &saddrlen);

    dest_addrlen = sizeof(dest_addr);
    dest_addr.sin_family = AF_INET;
    dest_addr.sin_port   = htons(SERVER_PORT);
    inet_aton(SERVER_IP, &dest_addr.sin_addr);

    dest = socket(PF_INET, SOCK_STREAM, 0); 
    if(dest == -1)
    {
	perror("socket()");
	return;
    }

    if( connect(dest, (struct sockaddr *) &dest_addr, dest_addrlen) == -1 )
    {
	perror("connect()");
	if(shutdown(sd, SHUT_RDWR) == -1)
	{
		perror("shutdown()");
	}
	return;
    }

    Client = (struct proxy *)malloc(sizeof(struct proxy));
    Serveur = (struct proxy *)malloc(sizeof(struct proxy));
    evClient = (struct epoll_event *)malloc(sizeof(struct epoll_event));
    evServeur = (struct epoll_event *)malloc(sizeof(struct epoll_event));
    buffer = (char *)malloc(sizeof(char)*BUF_SIZE);
    if(buffer == NULL || Client == NULL || Serveur == NULL || evClient == NULL || evServeur == NULL)
    {
	perror("malloc()");
	exit(EXIT_FAILURE);
    }

    Client->client_fd	= sd;
    Client->server_fd	= dest;
    Client->curpos	= 0;
    Client->datalen	= 16;
    Client->type	= 0;
    Client->buf		= buffer;
    Client->Statut	= 0;
    Client->ev		= evClient;
    Client->peer	= Serveur;
    Serveur->client_fd	= sd;
    Serveur->server_fd	= dest;
    Serveur->curpos	= 0;
    Serveur->datalen	= 16;
    Serveur->type	= 1;
    Serveur->buf	= buffer;
    Serveur->Statut	= 0;
    Serveur->ev		= evServeur;
    Serveur->peer	= Client;

    memset(evClient, 0, sizeof(struct epoll_event));
    memset(evServeur, 0, sizeof(struct epoll_event));
    evClient->events	= EPOLLIN | EPOLLOUT | EPOLLET;
    evClient->data.ptr	= Client;
    evServeur->events	= EPOLLIN | EPOLLOUT | EPOLLET;
    evServeur->data.ptr	= Serveur;


    if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_ADD, sd, evClient))
        fprintf(stderr, "problem with client socket");

    if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_ADD, dest, evServeur))
        fprintf(stderr, "problem with server socket");

    setnonblocking(dest);
    setnonblocking(sd);

#ifdef VERBOSE
    fprintf(stderr, "accept() on fd %d\n", sd);
    fprintf(stderr, "connect() on fd %d\n", dest);
#endif
}

void close_socket(struct proxy * p, unsigned char peer)
{
	int cfd, sfd, result;

	cfd = p->client_fd;
	sfd = p->server_fd;

	if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_DEL, cfd, NULL))
		perror("epoll_ctl()");
	if (epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_DEL, sfd, NULL))
		perror("epoll_ctl()");

	if(p->type == 0)
	{
#ifdef VERBOSE
		fprintf(stderr, "Freeing buffer @ %p\n", p->buf);
#endif
		free(p->buf);
	}
#ifdef VERBOSE
	fprintf(stderr, "Freeing struct epoll_event @ %p\n", p->ev);
#endif
	free(p->ev);
	if(peer == 1)
	{
#ifdef VERBOSE
		fprintf(stderr, "Freeing peer's struct proxy @ %p\n", p->peer);
#endif
		close_socket(p->peer, 0);
	}
#ifdef VERBOSE
	fprintf(stderr, "Freeing struct proxy @ %p\n", p);
#endif
	free(p);

#ifdef VERBOSE
	fprintf(stderr, "Shutting down fds (%d, %d)\n", sfd, cfd);
#endif
	result = shutdown(sfd, SHUT_RDWR);
	if(result == -1)
		perror("shutdown()");

	result = shutdown(cfd, SHUT_RDWR);
	if(result == -1)
		perror("shutdown()");
}

void	poll_loop()
{
    struct epoll_event 	events[MAX_EVENTS];
    int n = 0, repfd = 0, fd = 0;
    ssize_t read_incoming, write_outcoming, copied;
    struct proxy * p;
    unsigned char type;

    memset(events, 0, sizeof(struct epoll_event)*MAX_EVENTS);

    for(;;) 
    {
        int nfds = epoll_wait(gpoll.epoll_fd, events, MAX_EVENTS, -1);
        for (n = 0; n < nfds; ++n) 
        {
#ifdef DEBUG
	    fprintf(stderr, "(EPOLLIN=%d, EPOLLOUT=%d, EPOLLRDHUP=%d, EPOLLPRI=%d, EPOLLERR=%d, EPOLLHUP=%d)\n",
	    	events[n].events & EPOLLIN,
	    	events[n].events & EPOLLOUT,
	    	events[n].events & EPOLLRDHUP,
	    	events[n].events & EPOLLPRI,
	    	events[n].events & EPOLLERR,
	    	events[n].events & EPOLLHUP
		);

		fprintf(stderr, "Retrieving user data from %p\n", events[n].data.ptr);
#endif

		p = events[n].data.ptr;
		if (events[n].events & EPOLLIN)
		{
			if (p->server_fd == gpoll.socket_fd && (int)p->client_fd == gpoll.socket_fd)
				accept_sock();
		}

		type	= p->type;
		/**
		 * Type :
		 * 	0 => Client
		 * 	1 => Serveur
		 **/
		switch(type)
		{
			case 0:	fd	= p->client_fd;
				repfd	= p->server_fd;
				break;
			case 1:	fd	= p->server_fd;
				repfd	= p->client_fd;
				break;
		}

		if (events[n].events & EPOLLHUP)
		{
			/* Suppression des FDs concernant les sockets morts pour epoll. */
			close_socket(p, 1);
			continue;
		}

            if (events[n].events & EPOLLIN)
            {
#ifdef DEBUG
		fprintf(stderr, "fd %d is ready for reading into %p.\n", fd, p->buf);
#endif
		if(p->buf != NULL)
		{
			read_incoming = read(fd, p->buf, BUF_SIZE);
			p->datalen = read_incoming;

#ifdef DEBUG
			fprintf(stderr, "Read %d bytes from %d.\n", read_incoming, fd);
#endif

			if(read_incoming == 0)
			{
				fprintf(stderr, "Something's wrong on fd %d : I got no data.\n", fd);
				/* close_socket(p, 1); */
				continue;
			}

			copied = write(repfd, p->buf, p->datalen);
			p->datalen -= copied;

			switch(type)
			{
				case 0: /* Socket client prêt en lecture */
					break;

				case 1: /* Socket serveur prêt en lecture */
					break;
			}
		}
            }
/*
            else if (events[n].events & EPOLLOUT)
            {
#ifdef DEBUG
		fprintf(stderr, "fd %d is ready for writing. ", fd);
#endif
		if(p != NULL && p->buf != NULL && p->datalen > 0)
		{
			write_outcoming = write(fd, p->buf, p->datalen);
			p->datalen -= write_outcoming;
#ifdef DEBUG
			fprintf(stderr, "Write %d bytes to %d.\n", write_outcoming, fd);
#endif
			switch(type)
			{
				case 0: // Socket client prêt en écriture
					break;

				case 1: // Socket serveur prêt en écriture
					break;
			}
		}
            }
*/
	}
    }
}

void handler(int signo)
{
	if(signo == SIGTERM || signo == SIGINT)
	{
		fprintf(stderr, "Got SIGTERM or SIGINT, cleaning up things ...\n");
		epoll_ctl(gpoll.epoll_fd, EPOLL_CTL_DEL, gpoll.socket_fd, NULL);
		shutdown(gpoll.socket_fd, SHUT_RDWR);
		free(gpoll.pr->buf);
		free(gpoll.pr->ev);
		free(gpoll.pr);
		exit(EXIT_SUCCESS);
	} else {
		fprintf(stderr, "UNKNOWN SIGNAL !!! : %d\n", signo);
	}
}

int main(int argc, char ** argv)
{
	signal(SIGTERM, handler);
	signal(SIGINT, handler);
	poll_init_tcp();
	poll_loop();

	return EXIT_SUCCESS;
}
============================================================



On Thu, 2008-08-28 at 18:15 +0200, Miklos Szeredi wrote:

--
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]

Messages in current thread:
Re: Linux Kernel Splice Race Condition with page invalidation, Miklos Szeredi, (Fri Aug 29, 5:58 am)