1. fork进程
  2. IO复用
  3. 线程

多进程

主进程监听,在循环中接受连接请求,当连接建立后,fork一个子进程,在子进程中进行处理。

  • 主进程:listen -> while(1) -> accept -> fork -> close connfd

  • 子进程:close listenfd -> handle -> close connfd

server:

[root@linuxkit-00155ddc0103 inet]# ./echoserverfork 6667
Parent: waitting…
name[localhost] port[38276]connected! Child No.1 will handle it!
Parent: waitting…
server recived 6 bytes
name[localhost] port[38280]connected! Child No.2 will handle it!
Parent: waitting…
server recived 7 bytes

client1:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
first
first

client2:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
second
second

code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "mynet.h"
int open_listenfd(char *port);
void echo(int connfd);
void sigchld_handler(int sig)
{
	while(waitpid(-1,0,WNOHANG) >0)
	  ;
	return;
}

int main(int argc, char **argv)
{
	int listenfd, connfd, cno=1;
	socklen_t clientlen;
	struct sockaddr_storage clientaddr;
	char client_hostname[MAXLINE], client_port[MAXLINE];

	if(argc!=2){
	  fprintf(stderr,"usage: %s <port>\n",argv[0]);
	  exit(0);
	}
	signal(SIGCHLD, sigchld_handler);
	listenfd = open_listenfd(argv[1]);
	while(1){
		printf("Parent: waitting...\n");
		clientlen = sizeof(struct sockaddr_storage);
		connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
        getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
		printf("name[%s] port[%s]connected! Child No.%d will handle it!\n",
					client_hostname,client_port,cno++);
		if(fork()==0) //child
		{
			close(listenfd);
			echo(connfd);
			close(connfd);
			exit(0);
		}
		close(connfd);
	}
	exit(0);
}
void echo(int connfd)
{
	size_t n;
	char buf[MAXLINE];

	while((n=read(connfd,buf,MAXLINE))!=0){
		printf("server recived %d bytes\n",(int)n);
		write(connfd,buf,n);
	}
}
© 2019 GitHub, Inc.

IO复用

IO复用即使用select函数

#include <sys/select.h>
int select(int n, fd_set *fdset, NULL, NULL, NULL) //等待一组描述符准备好读
FD_ZERO(fd_set *fdset);
FD_CLR(int fd, fd_set *fdset);
FD_SET(int fd, fd_set *fdset);
FD_ISSET(int fd, fd_set *fdset);

stdio与connfd的复用

如果使用echoserver迭代版本的同时,要求能够响应服务器本身的stdin输入,则可以使用select。一个标准输入的fd一般是0,监听fd则是系统分配

  • 将stdin和llistenfd加入fdset
  • 使用select等待是否有fd准备好
  • 通过FD_ISSET查看哪个准备好并进行相应的处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

#include "mynet.h"
int open_listenfd(char *port);
void echo(int connfd);
void command(void);

int main(int argc, char **argv)
{
	int listenfd, connfd;
	socklen_t clientlen;
	struct sockaddr_storage clientaddr;
	char client_hostname[MAXLINE], client_port[MAXLINE];
	fd_set read_set, ready_set;

	if(argc!=2){
	  fprintf(stderr,"usage: %s <port>\n",argv[0]);
	  exit(0);
	}
	listenfd = open_listenfd(argv[1]);
	FD_ZERO(&read_set);	//clear read_set
	FD_SET(STDIN_FILENO, &read_set);	//add stdin to read_set
	FD_SET(listenfd,&read_set);	//add listenfd to read_set

	while(1){
		ready_set = read_set;
		select(listenfd+1,&ready_set,NULL,NULL,NULL);
		if(FD_ISSET(STDIN_FILENO,&ready_set))
		{
			printf("stdin is ready.\n");
			command();
		}
		if(FD_ISSET(listenfd,&ready_set)){
			printf("client connetted.\n");
			clientlen = sizeof(struct sockaddr_storage);
			connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
			getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
			echo(connfd);
			close(connfd);
		}
	}
	exit(0);
}
void command(void){
	char buf[MAXLINE];
	if(!fgets(buf,MAXLINE,stdin)){
		exit(0);	//EOF
	}
	printf("%s",buf);
}
void echo(int connfd)
{
	size_t n;
	char buf[MAXLINE];

	while((n=read(connfd,buf,MAXLINE))!=0){
		printf("server recived %d bytes\n",(int)n);
		write(connfd,buf,n);
	}
}

server behavior:

[root@linuxkit-00155ddc0103 inet]# ./select 6667
do something
stdin is ready.
do something
hello
stdin is ready.
hello
client connetted.
server recived 12 bytes
ccd //no response here becuse client is connecting
stdin is ready. //client close now
ccd
f
stdin is ready.
f
stdin is ready. //ctrl+D here

client behavior:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
i am client
i am client

基于I/O多路复用的并发事件驱动器

与之前类似,维护一个结构体pool,存储readset,readyset和clientfd

  • server监听,初始化pool
  • 进入循环,初始化readyset,select阻塞,等待fd准备好
  • 若listenfd,添加client
  • 对所有准备好的clientfd回射服务

server:

[root@linuxkit-00155ddc0103 inet]# ./echoserverselect 6667
localhost:38334 connected!
Server recived 5 (5 total) bytes on fd 4
Server recived 4 (9 total) bytes on fd 4
Server recived 7 (16 total) bytes on fd 4
localhost:38338 connected!
Server recived 4 (20 total) bytes on fd 5
Server recived 6 (26 total) bytes on fd 5
Server recived 2 (28 total) bytes on fd 5
Close connfd 4
Server recived 7 (35 total) bytes on fd 5
Close connfd 5

client1:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
1234
1234
123
123
123456
123456
[root@linuxkit-00155ddc0103 inet]#

client2:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
123
123
12345
12345
1
1
123456
123456
[root@linuxkit-00155ddc0103 inet]#

code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
#include "mynet.h"
typedef struct{
	int maxfd;
	fd_set read_set;
	fd_set ready_set;
	int nready;
	int maxi;
	int clientfd[FD_SETSIZE];
}pool;
void add_client(int connfd, pool *p);
void check_clients(pool *p);
void init_pool(int listenfd, pool *p);
int open_listenfd(char *port);

int byte_cnt = 0;

int main(int argc, char **argv)
{
	int listenfd, connfd;
	socklen_t clientlen;
	struct sockaddr_storage clientaddr;
	char client_hostname[MAXLINE], client_port[MAXLINE];
	static pool pool;

	if(argc!=2){
	  fprintf(stderr,"usage: %s <port>\n",argv[0]);
	  exit(0);
	}
	listenfd = open_listenfd(argv[1]);
	init_pool(listenfd,&pool);

	while(1){
		pool.ready_set = pool.read_set;
		pool.nready = select(pool.maxfd+1,&pool.ready_set,NULL,NULL,NULL);
		if(FD_ISSET(listenfd,&pool.ready_set))
		{
			clientlen = sizeof(struct sockaddr_storage);
			connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
			getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
			printf("%s:%s connected!\n",client_hostname,client_port);
			add_client(connfd, &pool);
		}
		check_clients(&pool);
	}
	exit(0);
}

void init_pool(int listenfd, pool *p){
	int i;
	p->maxi = -1;
	for(i=0;i<FD_SETSIZE;i++)
	  p->clientfd[i] = -1;

	p->maxfd = listenfd;
	FD_ZERO(&p -> read_set);
	FD_SET(listenfd, &p->read_set);
}

void add_client(int connfd, pool *p){
	int i;
	p->nready--;
	for(i=0;i<FD_SETSIZE;i++){
		if(p->clientfd[i]<0){
			p->clientfd[i] = connfd;
			FD_SET(connfd,&p->read_set);
			if(connfd,&p->maxfd)
			  p->maxfd = connfd;
			if(i>p->maxi)
			  p->maxi = i;
			break;
		}
		if(i == FD_SETSIZE)
		  printf("Add client error! Beyond max client numbers.\n");
	}
}

void check_clients(pool *p)
{
	int i,connfd,n;
	char buf[MAXLINE];
	char echostr[MAXLINE+10];

	for(i=0;(i<=p->maxi)&&(p->nready>0);i++){
		connfd = p->clientfd[i];
		sprintf(buf,"\0");

		if((connfd>0)&&(FD_ISSET(connfd,&p->ready_set))){
			p->nready--;
			if((n=read(connfd,buf,MAXLINE))!=0){
				byte_cnt+=n;
				printf("Server recived %d (%d total) bytes on fd %d\n",n,byte_cnt,connfd);
				write(connfd,buf,n);
			}
			else{
				printf("Close connfd %d\n",connfd);
				close(connfd);
				FD_CLR(connfd,&p->read_set);
				p->clientfd[i] = -1;
			}
		}
	}
}


多线程

server:

[root@linuxkit-00155ddc0103 inet]# ./echoserverthread 6667
name[localhost] port[38362]connected! Thread id:140562122241792 will serve it!
name[localhost] port[38366]connected! Thread id:140562113849088 will serve it!
server recived 6 bytes from fd 4
server recived 4 bytes from fd 4
server recived 7 bytes from fd 5
server recived 4 bytes from fd 5

client1:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
12345
12345
123
123

client2:

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
654321
654321
321
321

注意传递connfd时是新申请空间,并在thread里释放
code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "mynet.h"
int open_listenfd(char *port);
void *thread(void *vargp);
void echo(int connfd);

int main(int argc, char **argv)
{
	int listenfd, *connfdp, cno=1;
	socklen_t clientlen;
	pthread_t tid;
	struct sockaddr_storage clientaddr;
	char client_hostname[MAXLINE], client_port[MAXLINE];

	if(argc!=2){
	  fprintf(stderr,"usage: %s <port>\n",argv[0]);
	  exit(0);
	}
	listenfd = open_listenfd(argv[1]);
	while(1){
		clientlen = sizeof(struct sockaddr_storage);
		connfdp = malloc(sizeof(int));
		*connfdp = accept(listenfd, (SA*)&clientaddr,&clientlen);
        getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
		pthread_create(&tid,NULL,thread,connfdp);
		printf("name[%s] port[%s]connected! Thread id:%ld will serve it!\n",
					client_hostname,client_port,(unsigned long long)tid);
	}
	exit(0);
}

void *thread(void* vargp){
	int connfd = *((int *)vargp);
	pthread_detach(pthread_self());
	free(vargp);
	echo(connfd);
	close(connfd);
	return NULL;
}

void echo(int connfd)
{
	size_t n;
	char buf[MAXLINE];

	while((n=read(connfd,buf,MAXLINE))!=0){
		printf("server recived %d bytes from fd %d\n",(int)n,connfd);
		write(connfd,buf,n);
	}
}

基于线程的事件驱动程序

也称作预线程化的方法

  • 主线程将连接符connfd放入池中
  • 工作线程将connfd取出并处理
  • 加入线程和取出线程都使用互斥量保护
  • echo程序中的cnt也使用互斥量保护

server

[root@linuxkit-00155ddc0103 inet]# ./echoserver_pre_thread 6667
client[localhost:38376] connected! connfd 4!
server recived 7 (7 total) bytes on fd 4
client[localhost:38380] connected! connfd 5!
server recived 9 (16 total) bytes on fd 5
server recived 4 (20 total) bytes on fd 4

client1

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
123456
123456
123
123

client2

[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
87654321
87654321

sbuf.c fd池,插入,取出操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include "sbuf.h"

void sbuf_init(sbuf_t *sp, int n){
	sp->buf = calloc(n, sizeof(int));
	sp->n = n;
	sp->front = sp->rear = 0;
	sem_init(&sp->mutex,0,1);
	sem_init(&sp->slots,0,n);
	sem_init(&sp->items,0,0);
}

void snuf_deinit(sbuf_t *sp){
	free(sp->buf);
}

void sbuf_insert(sbuf_t *sp, int item){
	sem_wait(&sp->slots);
	sem_wait(&sp->mutex);
	sp->buf[(++sp->rear)%(sp->n)] = item;
	sem_post(&sp->mutex);
	sem_post(&sp->items);
}

int sbuf_remove(sbuf_t *sp){
	int item;
	sem_wait(&sp->items);
	sem_wait(&sp->mutex);
	item = sp->buf[(++sp->front)%(sp->n)];
	sem_post(&sp->mutex);
	sem_post(&sp->slots);
	return item;
}

sbuf结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#ifndef SBUF_H_
#define SBUF_H_
#include "mynet.h"
#include <semaphore.h>

typedef struct {
	int *buf;
	int n;
	int front;
	int rear;
	sem_t mutex;
	sem_t slots;
	sem_t items;
}sbuf_t;

void dbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *p);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);
#endif

echo_cnt.c

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include "echo_cnt.h"

static int byte_cnt;
static sem_t mutex;

static void init_echo_cnt(void){
	sem_init(&mutex,0,1);
	byte_cnt = 0;
}

void echo_cnt(int connfd){
	int n;
	char buf[MAXLINE];
	static pthread_once_t once = PTHREAD_ONCE_INIT;

	pthread_once(&once, init_echo_cnt);
	while((n=read(connfd,buf,MAXLINE))!=0){
		sem_wait(&mutex);
		byte_cnt += n;
		printf("server recived %d (%d total) bytes on fd %d\n",
					n,byte_cnt,connfd);
		sem_post(&mutex);
		write(connfd,buf,n);
	}
}

主程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include "mynet.h"
#include "sbuf.h"
#include "echo_cnt.h"
#define NTHREADS 4
#define SBUFSIZE 16

int open_listenfd(char *port);
void *thread(void *vargp);

sbuf_t sbuf;

int main(int argc, char **argv)
{
	int listenfd, connfd, i;
	socklen_t clientlen;
	pthread_t tid;
	struct sockaddr_storage clientaddr;
	char client_hostname[MAXLINE], client_port[MAXLINE];

	if(argc!=2){
	  fprintf(stderr,"usage: %s <port>\n",argv[0]);
	  exit(0);
	}
	listenfd = open_listenfd(argv[1]);
	sbuf_init(&sbuf,SBUFSIZE);
	for(i = 0;i<NTHREADS;i++){
		pthread_create(&tid,NULL,thread,NULL);
	}
	while(1){
		clientlen = sizeof(struct sockaddr_storage);
		connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
        getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
		sbuf_insert(&sbuf,connfd);
		printf("client[%s:%s] connected! connfd %d!\n",
					client_hostname,client_port,(int)connfd);
	}
	exit(0);
}

void *thread(void* vargp){
	pthread_detach(pthread_self());
	while(1){
		int connfd = sbuf_remove(&sbuf);
		echo_cnt(connfd);
	    close(connfd);
	}
	return NULL;
}