博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
应用层协议实现系列(一)——HTTPserver之仿nginx多进程和多路IO的实现
阅读量:6091 次
发布时间:2019-06-20

本文共 8180 字,大约阅读时间需要 27 分钟。

近期在尝试自己写一个Httpserver,在粗略研究了nginx的代码之后,决定仿照nginx中的部分设计自己实现一个高并发的HTTPserver,在这里分享给大家。

眼下使用的较多的Httpserver就是apache和nginx,apache的主要特点就是稳定,而nginx的主要特点是承载的并发量高。在这里从实现原理上做一个分析:

apache採用的是多进程server模型,即server每监听到一个连接时,会创建一个新的进程去处理连接,进程与进程之间是独立的,因此就算进程在处理连接的过程中崩溃了,也不会影响其它进程的执行。但因为server能创建的进程数目与内存有关,因此server的最大并发数会受到机器内存的影响,同一时候假设有人发起大量恶意长链接攻击,就会导致server超载。

nginx採用的是多路IO复用server模型,即server每监听到一个连接时,会将连接增加到连接数组中,使用epoll多路IO复用函数对每一个连接进行监听,当连接中有数据到来时,epoll会返回对应的连接,依此对各个连接进行处理就可以。epoll的最大连接数量尽管也会受到内存的影响,但因为每一个未激活的连接占用的内存非常小,所以相比于apache能够承载更大的并发。

但因为多路IO复用是在一个进程中进行的,所以假设在处理连接的过程中崩溃了,其它连接也会受到影响。为了解决问题,nginx中也引入了多进程,即nginxserver由一个master进程和多个worker进程组成。master进程作为父进程在启动的时候会创建socket套接字以及若干个worker进程,每一个worker进程会使用epoll对master创建的套接字进行监听。当有新连接到来时,若干个worker进程的epoll函数都会返回,但仅仅有一个worker进程能够accept成功。该进程accept成功之后将该连接增加到epoll的监听数组中,该连接之后的数据都将由该worker进程处理。假设当中一个worker进程在处理连接的过程中崩溃了,父进程会收到信号并重新启动该进程以保证server的稳定性。

另外,每次新连接到来都会唤醒若干个worker进程同一时候进行accept,但仅仅有一个worker能accept成功,为了避免这个问题,nginx引入了相互排斥信号量,即每一个worker进程在accept之前都须要先获取锁,假设获取不到则放弃accept。

在明白了上述原理之后,我们就能够仿照nginx实现一个httpserver了。首先是创建套接字的函数:

//创建socketint startup(int port) {    struct sockaddr_in servAddr;    memset(&servAddr, 0, sizeof(servAddr));    //协议域(ip地址和端口)    servAddr.sin_family = AF_INET;    //绑定默认网卡    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);    //端口    servAddr.sin_port = htons(port);    int listenFd;    //创建套接字    if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {        printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);        return 0;    }    unsigned value = 1;    setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));    //绑定套接字    if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) {        printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);        return 0;    }    //開始监听,设置最大连接请求    if (listen(listenFd, 10) == -1) {        printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);        return 0;    }    return listenFd;}
该函数创建了一个套接字并将其绑定到了一个port上開始监听。因为我们接下来要创建若干个worker进程,能够通过fork函数实现:

//管理子进程的数组,数组多大就有几个子进程static int processArr[PROCESS_NUM];//创建若干个子进程,返回当前进程是否父进程bool createSubProcess() {    for (int i=0; i
0){ processArr[i] = pid; continue; } //假设出错 else { fprintf(stderr,"can't fork ,error %d\n",errno); return true; } } return true;}
在以上代码中,创建的进程数目由数组大小决定,建议将该进程数目设置为CPU的核数,以充分利用多核CPU。为了避免在父进程退出后,子进程仍然存在产生僵尸进程,我们还须要实现一个信号处理函数:

//信号处理void handleTerminate(int signal) {    for (int i=0; i
该函数实现了当父进程收到退出信号时,向每一个子进程也发送退出信号。以下来看看main函数的实现,因为本人是在mac os下进行开发,mac下不支持epoll函数,于是改为类似的select函数:

int main(int argc, const char * argv[]){    int listenFd;        initMutex();    //设置port号    listenFd = startup(8080);        //创建若干个子进程    bool isParent = createSubProcess();    //假设是父进程    if (isParent) {        while (1) {            //注冊信号处理            signal(SIGTERM, handleTerminate);            //挂起等待信号            pause();        }    }    //假设是子进程    else {        //套接字集合        fd_set rset;        //最大套接字        int maxFd = listenFd;        std::set
fdArray; //循环处理事件 while (1) { FD_ZERO(&rset); FD_SET(listenFd, &rset); //又一次设置每一个须要监听的套接字 for (std::set
::iterator iterator=fdArray.begin();iterator!=fdArray.end();iterator++) { FD_SET(*iterator, &rset); } //開始监听 if (select(maxFd+1, &rset, NULL, NULL, NULL)<0) { fprintf(stderr, "select error: %s(errno: %d)\n",strerror(errno),errno); continue; } //遍历每一个连接套接字 for (std::set
::iterator iterator=fdArray.begin();iterator!=fdArray.end();) { int currentFd = *iterator; if (FD_ISSET(currentFd, &rset)) { if (!handleRequest(currentFd)) { close(currentFd); fdArray.erase(iterator++); continue; } } ++iterator; } //检查连接监听套接字 if (FD_ISSET(listenFd, &rset)) { if (pthread_mutex_trylock(mutex)==0) { int newFd = accept(listenFd, (struct sockaddr *)NULL, NULL); if (newFd<=0) { fprintf(stderr, "accept socket error: %s(errno: %d)\n",strerror(errno),errno); continue; } //更新最大的套接字 if (newFd>maxFd) { maxFd = newFd; } fdArray.insert(newFd); pthread_mutex_unlock(mutex); } } } } close(listenFd); return 0;}
在以上代码中,还涉及了进程间相互排斥信号量的定义,代码例如以下:

//相互排斥量pthread_mutex_t *mutex;//创建共享的mutexvoid initMutex(){    //设置相互排斥量为进程间共享    mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0);    if( MAP_FAILED==mutex) {        perror("mutex mmap failed");        exit(1);    }    //设置attr的属性    pthread_mutexattr_t attr;    pthread_mutexattr_init(&attr);    int ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);    if(ret != 0) {        fprintf(stderr, "mutex set shared failed");        exit(1);    }    pthread_mutex_init(mutex, &attr);}
对每一个连接的处理例如以下:

//处理http请求bool handleRequest(int connFd) {    if (connFd<=0) return false;    //读取缓存    char buff[4096];    //读取http header    int len = (int)recv(connFd, buff, sizeof(buff), 0);    if (len<=0) {        return false;    }    buff[len] = '\0';    std::cout<
<
这样就实现了一个仿nginx的高并发server。完整的代码例如以下:

#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define GET_ARRAY_LEN(array) (sizeof(array) / sizeof(array[0]))#define PROCESS_NUM 4//创建socketint startup(int port) { struct sockaddr_in servAddr; memset(&servAddr, 0, sizeof(servAddr)); //协议域(ip地址和端口) servAddr.sin_family = AF_INET; //绑定默认网卡 servAddr.sin_addr.s_addr = htonl(INADDR_ANY); //端口 servAddr.sin_port = htons(port); int listenFd; //创建套接字 if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } unsigned value = 1; setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)); //绑定套接字 if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) { printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } //開始监听,设置最大连接请求 if (listen(listenFd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } return listenFd;}//管理子进程的数组,数组多大就有几个子进程static int processArr[PROCESS_NUM];//创建若干个子进程,返回当前进程是否父进程bool createSubProcess() { for (int i=0; i
0){ processArr[i] = pid; continue; } //假设出错 else { fprintf(stderr,"can't fork ,error %d\n",errno); return true; } } return true;}//信号处理void handleTerminate(int signal) { for (int i=0; i
fdArray; //循环处理事件 while (1) { FD_ZERO(&rset); FD_SET(listenFd, &rset); //又一次设置每一个须要监听的套接字 for (std::set
::iterator iterator=fdArray.begin();iterator!=fdArray.end();iterator++) { FD_SET(*iterator, &rset); } //開始监听 if (select(maxFd+1, &rset, NULL, NULL, NULL)<0) { fprintf(stderr, "select error: %s(errno: %d)\n",strerror(errno),errno); continue; } //遍历每一个连接套接字 for (std::set
::iterator iterator=fdArray.begin();iterator!=fdArray.end();) { int currentFd = *iterator; if (FD_ISSET(currentFd, &rset)) { if (!handleRequest(currentFd)) { close(currentFd); fdArray.erase(iterator++); continue; } } ++iterator; } //检查连接监听套接字 if (FD_ISSET(listenFd, &rset)) { if (pthread_mutex_trylock(mutex)==0) { int newFd = accept(listenFd, (struct sockaddr *)NULL, NULL); if (newFd<=0) { fprintf(stderr, "accept socket error: %s(errno: %d)\n",strerror(errno),errno); continue; } //更新最大的套接字 if (newFd>maxFd) { maxFd = newFd; } fdArray.insert(newFd); pthread_mutex_unlock(mutex); } } } } close(listenFd); return 0;}

下一篇文章中,将向大家说明怎样对http协议进行解析。

假设大家认为对自己有帮助的话,还希望能帮顶一下,谢谢:)
个人博客:
本文地址:
转载请注明出处,谢谢!
你可能感兴趣的文章
ESXi磁盘块大小设立
查看>>
我的友情链接
查看>>
elasticsearch 使用事项
查看>>
sqlserver中text与Varchar(max)的区别
查看>>
Docker私有仓库Registry的搭建验证
查看>>
企业信息化-之成立信息化专项小组
查看>>
jQuery 知识点总结
查看>>
php 正则对于中文汉字字符的提取
查看>>
微服务架构—优雅停机方案
查看>>
推荐决策 对比user-based 和item-based推荐算法
查看>>
必不可少需要掌握的嵌入式知识(1) -- 网络编程
查看>>
android蓝牙打印
查看>>
ViewPager+Fragment取消预加载(延迟加载)
查看>>
Confluence 6 缓存性能优化
查看>>
八评腾讯:解密腾讯的中年危机
查看>>
我的友情链接
查看>>
iOS NSFileManeger 计算文件是否超时,和计算文件夹下文件的总大小
查看>>
Oracle DG 之--DG Broker 配置
查看>>
AIX删除多余的默认网关
查看>>
爬山为什么要穿登山鞋
查看>>