From 5bc6279ee4d2aade86e30dfb30731fe7ef95817d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Wed, 8 Nov 2023 18:00:51 +0900 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8tcpool=E5=85=A8=E6=96=B0?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- file.h | 98 ++++++++ server.c | 692 +++++++++++++++---------------------------------------- tcpool.h | 430 ++++++++++++++++++++++++++++++++++ 3 files changed, 708 insertions(+), 512 deletions(-) create mode 100644 file.h create mode 100644 tcpool.h diff --git a/file.h b/file.h new file mode 100644 index 0000000..232e14e --- /dev/null +++ b/file.h @@ -0,0 +1,98 @@ +#ifndef _FILE_H_ +#define _FILE_H_ + +#include +#include +#include +#include +#include +#include + +static char* file_filepath[2]; + +static volatile uint16_t has_file_opened[2]; +static volatile uint16_t is_file_opening[2]; +static volatile uint32_t file_owner_index[2] = {(uint32_t)-1, (uint32_t)-1}; + +static FILE* file_fp[2]; +static pthread_rwlock_t mu[2]; + +static inline off_t get_file_size(int isdata) { + struct stat statbuf; + if(stat(file_filepath[!!isdata], &statbuf)==0) { + return statbuf.st_size; + } + return -1; +} + +static int init_file(char* file_path[2]) { + for (int i = 0; i < 2; i++) { + FILE* fp = fopen(file_path[i], "rb+"); + if(!fp) { + perror("Open file error"); + return 2; + } + int err = pthread_rwlock_init(&mu[i], NULL); + if(err) { + perror("Init lock error"); + return 1; + } + file_filepath[i] = file_path[i]; + fclose(fp); + } + return 0; +} + +static inline FILE* open_file(uint32_t index, int isdata, int isro) { + isdata = !!isdata; + is_file_opening[isdata] = 1; + if(pthread_rwlock_wrlock(&mu[isdata])) { + perror("Open file: Writelock busy"); + is_file_opening[isdata] = 0; + return NULL; + } + is_file_opening[isdata] = 0; + file_fp[isdata] = fopen(file_filepath[isdata], isro?"rb":"rb+"); + if(!file_fp[isdata]) { + perror("Open file: fopen"); + pthread_rwlock_unlock(&mu[isdata]); + return NULL; + } + has_file_opened[isdata] = 1; + file_owner_index[isdata] = index; + puts("Open file"); + return file_fp[isdata]; +} + +static inline int require_shared_lock(int isdata) { + if(pthread_rwlock_rdlock(&mu[!!isdata])) { + perror("Open file: Readlock busy"); + return 1; + } + puts("Shared lock required"); + return 0; +} + +static inline void release_shared_lock(int isdata) { + pthread_rwlock_unlock(&mu[!!isdata]); + puts("Release shared lock"); +} + +static inline void close_file(uint32_t index, int isdata) { + isdata = !!isdata; + if(index != file_owner_index[isdata]) return; + if(has_file_opened[isdata]) { + fclose(file_fp[isdata]); + file_fp[isdata] = NULL; + has_file_opened[isdata] = 0; + file_owner_index[isdata] = (uint32_t)-1; + pthread_rwlock_unlock(&mu[isdata]); + puts("Close file"); + } else puts("file already closed"); +} + +static void close_file_wrap(uint32_t index_isdata[2]) { + close_file(index_isdata[0], (int)index_isdata[1]); +} + +#endif diff --git a/server.c b/server.c index b7de14c..3abe0cf 100644 --- a/server.c +++ b/server.c @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -18,298 +17,59 @@ #include #include #include -#include "config.h" #if !__APPLE__ #include #include - #ifndef FD_COPY - #define FD_COPY(f, t) bcopy(f, t, sizeof(*(f))) - #endif #else #include #endif - -static uint8_t _cfg[sizeof(simple_pb_t)+sizeof(config_t)]; -#define cfg ((const const_config_t*)(_cfg+sizeof(simple_pb_t))) // 存储 pwd 和 sps - -static int fd; // server socket fd - -#ifdef LISTEN_ON_IPV6 - static socklen_t struct_len = sizeof(struct sockaddr_in6); - static struct sockaddr_in6 server_addr; - static struct sockaddr_in6 client_addr; -#else - static socklen_t struct_len = sizeof(struct sockaddr_in); - static struct sockaddr_in server_addr; - static struct sockaddr_in client_addr; -#endif +#include "config.h" +#include "file.h" static char *data_path; // cat 命令读取的文件位置 static char *kanban_path; // get 命令读取的文件位置 -/* lock operations for open_file */ -#define LOCK_ALLF_UN 0x00 /* unlock all file */ -#define LOCK_DATA_SH 0x01 /* shared data file lock */ -#define LOCK_DATA_EX 0x02 /* exclusive data file lock */ -#define LOCK_KANB_SH 0x04 /* shared kanban file lock */ -#define LOCK_KANB_EX 0x08 /* exclusive kanban file lock */ +static uint8_t _cfg[sizeof(simple_pb_t)+sizeof(config_t)]; +#define cfg ((const const_config_t*)(_cfg+sizeof(simple_pb_t))) // 存储 pwd 和 sps -static int file_mode = LOCK_ALLF_UN; -static int kanb_file_ro_cnt, data_file_ro_cnt; +#define TCPOOL_THREAD_TIMER_T_SZ 65536 +#define TCPOOL_MAXWAITSEC 16 +#define SERVER_THREAD_BUFSZ ( \ + TCPOOL_THREAD_TIMER_T_SZ \ + -TCPOOL_THREAD_TIMER_T_HEAD_SZ \ + -sizeof(ssize_t)-2*sizeof(uint8_t) \ +) +#define TCPOOL_THREAD_CONTEXT \ + ssize_t numbytes; /* 本次接收的数据长度 */ \ + int8_t status; /* 本会话所处的状态 */ \ + uint8_t isdata; /* 是否为 datfile */ \ + char data[SERVER_THREAD_BUFSZ] +#define TCPOOL_TOUCH_TIMER_CONDITION (*(volatile uint32_t*)(is_file_opening)) +#define TCPOOL_INIT_ACTION init_file((char*[]){kanban_path, data_path}) +#define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) \ + timer->status = -1; \ + timer->isdata = 0; +#define TCPOOL_CLEANUP_THREAD_ACTION(timer) \ + close_file(timer->index, timer->isdata); \ + timer->status = -1; -// THREADCNT 并行的监听队列/select队列长度 -#define THREADCNT 16 -// MAXWAITSEC 最大等待时间 -#define MAXWAITSEC 16 -#define TIMERDATSZ (BUFSIZ-sizeof(int)-sizeof(time_t)-sizeof(int)-sizeof(ssize_t)-sizeof(int8_t)-sizeof(uint8_t)*3-sizeof(FILE*)) - -static struct timeval timeout; - -// 会话的上下文 -// 包含了本次 accept 的全部信息 -struct threadtimer_t { - int index; // 自身位置 - int accept_fd; // 本次 accept 的 fd - time_t touch; // 最后访问时间,与当前时间差超过 MAXWAITSEC 将强行中断连接 - ssize_t numbytes; // 本次接收的数据长度 - FILE *fp; // 本会话打开的文件 - int8_t status; // 本会话所处的状态 - uint8_t is_open; // 标识 fp 是否正在使用 - uint8_t lock_type; // 打开文件类型 - uint8_t again_cnt; // EAGAIN 次数 - char data[TIMERDATSZ]; -}; -typedef struct threadtimer_t threadtimer_t; - -static threadtimer_t timers[THREADCNT]; - -static fd_set rdfds, wrfds, erfds, tmpfds; - -#define show_usage(program) printf("Usage: %s (-d) port kanban.txt data.bin config.sp\n\t-d: As daemon\n", program) +#include "tcpool.h" -/* - * accept_client 接受新连接, - * 调用 select 处理 - * 处理入口点为 handle_accept - * 当 client 超过 MAXWAITSEC 未响应时 - * 调用 clean_timer 回收 - * 未被释放的资源以防止内存泄漏等 -*/ -static void accept_client(); -static int bind_server(int* port); /* * check_buffer 检查接收到的数据,结合 * 当前会话所处状态决定接下来的处理流程 */ -static int check_buffer(threadtimer_t *timer); -static void clean_timer(threadtimer_t* timer); -static void close_file(threadtimer_t *timer); -static int close_file_and_send(threadtimer_t *timer, char *data, size_t numbytes); -// handle_accept 初步解析指令,处理部分粘连 -static int handle_accept(threadtimer_t* p); -static void handle_end(int signo); -static void handle_int(int signo); -static void handle_quit(int signo); -static void handle_segv(int signo); -static int listen_socket(); -static FILE *open_file(char* file_path, int lock_type, char* mode); -static int send_all(char* file_path, threadtimer_t *timer); +static int check_buffer(tcpool_thread_timer_t *timer); +static int send_all(tcpool_thread_timer_t *timer); static int send_data(int accept_fd, char *data, size_t length); -static off_t size_of_file(const char* fname); -static int sm1_pwd(threadtimer_t *timer); -static int s0_init(threadtimer_t *timer); -static int s1_get(threadtimer_t *timer); -static int s2_set(threadtimer_t *timer); -static int s3_set_data(threadtimer_t *timer); - - -static void accept_client() { - int i; char c; - signal(SIGINT, handle_int); - signal(SIGQUIT, handle_quit); - signal(SIGKILL, handle_end); - signal(SIGSEGV, handle_segv); - signal(SIGPIPE, SIG_IGN); - signal(SIGTERM, handle_end); - FD_SET(fd, &tmpfds); - while(1) { - FD_COPY(&tmpfds, &rdfds); - FD_COPY(&tmpfds, &erfds); - puts("Selecting..."); - timeout.tv_sec = MAXWAITSEC/4; - int r = select(THREADCNT+8, &rdfds, &wrfds, &erfds, &timeout); - if(r < 0) { - perror("select"); - return; - } - if(r == 0) { // 超时 - for(i = 0; i < THREADCNT; i++) { - if(timers[i].touch && timers[i].accept_fd) { - time_t waitsec = time(NULL) - timers[i].touch; - if(waitsec > MAXWAITSEC) { - printf("Close@%d, wait sec: %u, max: %u\n", i, (unsigned int)waitsec, MAXWAITSEC); - clean_timer(&timers[i]); - } - } - } - continue; - } - puts("\nSelected"); - // 正常 - if(FD_ISSET(fd, &rdfds)) { // 有新连接 - int p = 0; - while(p < THREADCNT && timers[p].touch) p++; - if(p >= THREADCNT) { - puts("Max thread cnt exceeded"); - int nfd = accept(fd, (struct sockaddr *)&client_addr, &struct_len); - if(nfd > 0) close(nfd); - goto HANDLE_CLIENTS; - } - threadtimer_t* timer = &timers[p]; - timer->accept_fd = accept(fd, (struct sockaddr *)&client_addr, &struct_len); - if(timer->accept_fd <= 0) { - perror("accept"); - goto HANDLE_CLIENTS; - } - #ifdef LISTEN_ON_IPV6 - uint16_t port = ntohs(client_addr.sin6_port); - struct in6_addr in = client_addr.sin6_addr; - char str[INET6_ADDRSTRLEN]; // 46 - inet_ntop(AF_INET6, &in, str, sizeof(str)); - #else - uint16_t port = ntohs(client_addr.sin_port); - struct in_addr in = client_addr.sin_addr; - char str[INET_ADDRSTRLEN]; // 16 - inet_ntop(AF_INET, &in, str, sizeof(str)); - #endif - time_t t = time(NULL); - printf("> %sAccept client(%d) %s:%u at slot No.%d, ", ctime(&t), timer->accept_fd, str, port, p); - timer->index = p; - timer->touch = time(NULL); - timer->is_open = 0; - timer->fp = NULL; - timer->status = -1; - timer->again_cnt = 0; - if(send_data(timer->accept_fd, "Welcome to simple kanban server.", 33) <= 0) { - puts("Send banner to new client failed"); - clean_timer(timer); - goto HANDLE_CLIENTS; - } - FD_SET(timer->accept_fd, &tmpfds); - puts("Add new client into select list"); - } else if(FD_ISSET(fd, &erfds)) { // 主套接字错误 - int nfd = accept(fd, (struct sockaddr *)&client_addr, &struct_len); - perror("main fd in erfds"); - if(nfd > 0) close(nfd); - return; - } - HANDLE_CLIENTS: - for(i = 0; i < THREADCNT; i++) { - if(timers[i].touch && timers[i].accept_fd) { - if(FD_ISSET(timers[i].accept_fd, &rdfds)) { - if(recv(timers[i].accept_fd, &c, 1, MSG_PEEK) <= 0 || !handle_accept(&timers[i])) clean_timer(&timers[i]); - else FD_SET(timers[i].accept_fd, &tmpfds); - } else if(FD_ISSET(timers[i].accept_fd, &erfds)) { - printf("Close@%d due to error\n", i); - clean_timer(&timers[i]); - } - } - } - } -} - -static int bind_server(int* port) { - #ifdef LISTEN_ON_IPV6 - server_addr.sin6_family = AF_INET6; - server_addr.sin6_port = htons((uint16_t)(*port)); - bzero(&(server_addr.sin6_addr), sizeof(server_addr.sin6_addr)); - int fd = socket(PF_INET6, SOCK_STREAM, 0); - #else - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons((uint16_t)(*port)); - server_addr.sin_addr.s_addr = INADDR_ANY; - bzero(&(server_addr.sin_zero), 8); - int fd = socket(AF_INET, SOCK_STREAM, 0); - #endif - int on = 1; - if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { - perror("Set socket option failure"); - return 0; - } - if(!~bind(fd, (struct sockaddr *)&server_addr, struct_len)) { - perror("Bind server failure"); - return 0; - } - #ifdef LISTEN_ON_IPV6 - *port = ntohs(server_addr.sin6_port); - struct in6_addr in = server_addr.sin6_addr; - char str[INET6_ADDRSTRLEN]; // 46 - inet_ntop(AF_INET6, &in, str, sizeof(str)); - #else - *port = ntohs(server_addr.sin_port); - struct in_addr in = server_addr.sin_addr; - char str[INET_ADDRSTRLEN]; // 16 - inet_ntop(AF_INET, &in, str, sizeof(str)); - #endif - printf("Bind server successfully on %s:%u\n", str, *port); - return fd; -} - -static int check_buffer(threadtimer_t *timer) { - printf("Status: %d\n", (int)timer->status); - switch(timer->status) { - case -1: return sm1_pwd(timer); break; - case 0: return s0_init(timer); break; - case 1: return s1_get(timer); break; - case 2: return s2_set(timer); break; - case 3: return s3_set_data(timer); break; - default: return -1; break; - } -} - -static void clean_timer(threadtimer_t* timer) { - printf("Start cleaning: "); - if(timer->is_open) { - close_file(timer); - printf("Close file, "); - } - FD_CLR(timer->accept_fd, &tmpfds); - if(timer->accept_fd) { - close(timer->accept_fd); - timer->accept_fd = 0; - printf("Close accept, "); - } - timer->touch = 0; - timer->status = -1; - timer->lock_type = 0; - puts("Finish cleaning"); -} - -static void close_file(threadtimer_t *timer) { - if(timer->is_open && timer->fp != NULL) { - int lock_type = timer->lock_type; - puts("Close file"); - fclose(timer->fp); - timer->is_open = 0; - timer->fp = NULL; - file_mode &= ~lock_type; - if((lock_type&LOCK_KANB_SH) > 0 && --kanb_file_ro_cnt > 0) { - file_mode |= LOCK_KANB_SH; - } - if((lock_type&LOCK_DATA_SH) > 0 && --data_file_ro_cnt > 0) { - file_mode |= LOCK_DATA_SH; - } - timer->lock_type = 0; - } -} - -static int close_file_and_send(threadtimer_t *timer, char *data, size_t numbytes) { - close_file(timer); - return send_data(timer->accept_fd, data, numbytes); -} +static int sm1_pwd(tcpool_thread_timer_t *timer); +static int s0_init(tcpool_thread_timer_t *timer); +static int s1_get(tcpool_thread_timer_t *timer); +static int s2_set(tcpool_thread_timer_t *timer); +static int s3_set_data(tcpool_thread_timer_t *timer); #define take_word(p, w, buff) if((p)->numbytes >= strlen(w) && !strncmp(buff, w, strlen(w))) {\ printf("<--- Taking: %s in %zd --->\n", w, (p)->numbytes);\ @@ -318,7 +78,7 @@ static int close_file_and_send(threadtimer_t *timer, char *data, size_t numbytes buff[l] = 0;\ ssize_t n = (p)->numbytes - l;\ (p)->numbytes = l;\ - if(!(r = check_buffer((p)))) {\ + if(!check_buffer((p))) {\ printf("<--- break in %zd --->\n", (p)->numbytes); \ break; \ } \ @@ -330,160 +90,86 @@ static int close_file_and_send(threadtimer_t *timer, char *data, size_t numbytes (p)->numbytes = n;\ printf("<--- pass in %zd --->\n", (p)->numbytes); \ } -#define touch_timer(x) ((x)->touch = time(NULL)) -#define my_fd(x) ((x)->accept_fd) -#define my_dat(x) ((x)->data) -static int handle_accept(threadtimer_t* p) { - int r = 1; +static void accept_action(tcpool_thread_timer_t *p) { + if(send_data(p->accept_fd, "Welcome to simple kanban server.", 33) <= 0) { + puts("Send banner to new client failed"); + return; + } printf("Recv data from client@%d, ", p->index); - if((p)->status == 3) return s3_set_data(p); - while(((p)->numbytes = recv(my_fd(p), my_dat(p), TIMERDATSZ, MSG_DONTWAIT)) > 0) { - touch_timer(p); - my_dat(p)[(p)->numbytes] = 0; - printf("Get %d bytes: %s, Check buffer...\n", (int)(p)->numbytes, my_dat(p)); - //处理允许的粘连 - take_word(p, cfg->pwd, my_dat(p)); - take_word(p, "get", my_dat(p)); - take_word(p, "cat", my_dat(p)); - take_word(p, "quit", my_dat(p)); - take_word(p, cfg->sps, my_dat(p)); - take_word(p, "ver", my_dat(p)); - take_word(p, "dat", my_dat(p)); + int is_first = 1; + while(((p)->numbytes = recv(p->accept_fd, p->data, SERVER_THREAD_BUFSZ, 0)) > 0) { + tcpool_touch_timer(p); + p->data[p->numbytes] = 0; + printf("Get %d bytes: %s, Check buffer...\n", (int)(p)->numbytes, p->data); + if(is_first) { + is_first = 0; + //处理允许的粘连 + take_word(p, cfg->pwd, p->data); + take_word(p, "get", p->data); + take_word(p, "cat", p->data); + take_word(p, "quit", p->data); + take_word(p, cfg->sps, p->data); + take_word(p, "ver", p->data); + take_word(p, "dat", p->data); + } if((p)->numbytes <= 0) { puts("Taking words finished"); + continue; + } + if(!check_buffer((p))) { + printf("<--- break normal in %zd --->\n", p->numbytes); break; } - puts("Last check_buffer"); - r = check_buffer((p)); - break; } - if((p)->numbytes <= 0) { - if(errno == EAGAIN || errno == EINVAL) { - if(!++(p)->again_cnt) { - r = 0; - puts("Max EAGAIN/EINVAL cnt exceeded"); - } - } else if(errno) { - perror("recv"); - r = 0; - } + printf("Recv finished, remain: %zd\n", (p)->numbytes); +} + +static int check_buffer(tcpool_thread_timer_t *timer) { + printf("Status: %d\n", (int)timer->status); + switch(timer->status) { + case -1: return sm1_pwd(timer); break; + case 0: return s0_init(timer); break; + case 1: return s1_get(timer); break; + case 2: return s2_set(timer); break; + case 3: return s3_set_data(timer); break; + default: return -1; break; } - printf("Recv finished, remain: %zd, continue: %s\n", (p)->numbytes, r?"true":"false"); - return r; } -static void handle_end(int signo) { - puts("Handle kill/term"); - close(fd); - fflush(stdout); - exit(0); -} - -static void handle_int(int signo) { - puts("Keyboard interrupted"); - close(fd); - fflush(stdout); - exit(0); -} - -static void handle_quit(int signo) { - puts("Handle sigquit"); - close(fd); - fflush(stdout); - exit(0); -} - -static void handle_segv(int signo) { - puts("Handle sigsegv"); - close(fd); - fflush(stdout); - exit(0); -} - -static int listen_socket() { - int flags = fcntl(fd, F_GETFL, 0); - if(!~listen(fd, THREADCNT)) return 1; - return fcntl(fd, F_SETFL, flags | O_NONBLOCK); -} - -static FILE *open_file(char* file_path, int lock_type, char* mode) { - FILE *fp = NULL; - if((lock_type&LOCK_KANB_SH)) { - if((file_mode&LOCK_KANB_EX) > 0) { - puts("open_file(KANB_SH): file is busy"); - return NULL; - } - file_mode |= LOCK_KANB_SH; - kanb_file_ro_cnt++; - } else if((lock_type&LOCK_DATA_SH)) { - if((file_mode&LOCK_DATA_EX) > 0) { - puts("open_file(DATA_SH): file is busy"); - return NULL; - } - file_mode |= LOCK_DATA_SH; - data_file_ro_cnt++; - } else if(lock_type&LOCK_KANB_EX) { - if((file_mode&(LOCK_KANB_EX|LOCK_KANB_SH)) > 0) { - puts("open_file(KANB_EX): file is busy"); - return NULL; - } - file_mode |= LOCK_KANB_EX; - } else if(lock_type&LOCK_DATA_EX) { - if((file_mode&(LOCK_DATA_EX|LOCK_DATA_SH)) > 0) { - puts("open_file(DATA_EX): file is busy"); - return NULL; - } - file_mode |= LOCK_DATA_EX; - } - fp = fopen(file_path, mode); - if(!fp) { - perror("fopen"); - file_mode &= ~lock_type; - return NULL; - } - printf("Open file %s in mode %s\n", file_path, mode); - return fp; -} - -static int send_all(char* file_path, threadtimer_t *timer) { +static int send_all(tcpool_thread_timer_t *timer) { int re = 1; - FILE *fp = open_file(file_path, timer->lock_type, "rb"); - if(fp) { - timer->fp = fp; - timer->is_open = 1; - uint32_t file_size = (uint32_t)size_of_file(file_path); - printf("Get file size: %d bytes, ", (int)file_size); - off_t len = 0; - int flags = fcntl(timer->accept_fd, F_GETFL, 0); - fcntl(timer->accept_fd, F_SETFL, flags & ~O_NONBLOCK); - #if __APPLE__ - #ifdef WORDS_BIGENDIAN - file_size = __DARWIN_OSSwapInt32(file_size); - #endif - struct sf_hdtr hdtr; - struct iovec headers; - headers.iov_base = &file_size; - headers.iov_len = sizeof(uint32_t); - hdtr.headers = &headers; - hdtr.hdr_cnt = 1; - hdtr.trailers = NULL; - hdtr.trl_cnt = 0; - re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0); - if(!re) perror("sendfile"); - #else - #ifdef WORDS_BIGENDIAN - uint32_t little_fs = __builtin_bswap32(file_size); - send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0); - #else - send(timer->accept_fd, &file_size, sizeof(uint32_t), 0); - #endif - re = sendfile(timer->accept_fd, fileno(fp), &len, file_size) > 0; - if(!re) perror("sendfile"); + FILE *fp = open_file(timer->index, timer->isdata, 1); + if(fp == NULL) return 1; + uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata}; + pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data); + off_t len = 0, file_size = get_file_size(timer->isdata); + printf("Get file size: %d bytes, ", (int)file_size); + #if __APPLE__ + #ifdef WORDS_BIGENDIAN + file_size = __DARWIN_OSSwapInt32(file_size); #endif - printf("Send %d bytes\n", (int)len); - close_file(timer); - fcntl(timer->accept_fd, F_SETFL, flags); - } + struct sf_hdtr hdtr; + struct iovec headers; + headers.iov_base = &file_size; + headers.iov_len = sizeof(uint32_t); + hdtr.headers = &headers; + hdtr.hdr_cnt = 1; + hdtr.trailers = NULL; + hdtr.trl_cnt = 0; + re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0); + if(!re) perror("sendfile"); + #else + #ifdef WORDS_BIGENDIAN + uint32_t little_fs = __builtin_bswap32(file_size); + send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0); + #else + send(timer->accept_fd, &file_size, sizeof(uint32_t), 0); + #endif + re = sendfile(timer->accept_fd, fileno(fp), &len, file_size) > 0; + if(!re) perror("sendfile"); + #endif + printf("Send %d bytes\n", (int)len); + pthread_cleanup_pop(1); return re; } @@ -493,7 +179,7 @@ static int send_data(int accept_fd, char *data, size_t length) { puts("Send data error: zero length"); return 0; } - if(!~send(accept_fd, data, length, MSG_DONTWAIT)) { + if(!~send(accept_fd, data, length, 0)) { puts("Send data error"); return 0; } @@ -510,13 +196,7 @@ static int send_data(int accept_fd, char *data, size_t length) { return 1; } -static off_t size_of_file(const char* fname) { - struct stat statbuf; - if(stat(fname, &statbuf)==0) return statbuf.st_size; - else return -1; -} - -static int sm1_pwd(threadtimer_t *timer) { +static int sm1_pwd(tcpool_thread_timer_t *timer) { if(!strncmp(timer->data, cfg->pwd, strlen(cfg->pwd))) { timer->status = 0; puts("Password check passed"); @@ -524,49 +204,48 @@ static int sm1_pwd(threadtimer_t *timer) { return !timer->status; } -static int s0_init(threadtimer_t *timer) { - if(!strncmp(timer->data, "get", 3)) timer->status = 1; - else if(!strncmp(timer->data, cfg->sps, strlen(cfg->sps))) timer->status = 2; - else if(!strncmp(timer->data, "cat", 3)) { - timer->lock_type = LOCK_DATA_SH; - return send_all(data_path, timer); +static int s0_init(tcpool_thread_timer_t *timer) { + if(!strncmp(timer->data, "get", 3)) { + timer->status = 1; + return 1; } - else if(!strncmp(timer->data, "quit", 4)) return 0; - return send_data(timer->accept_fd, timer->data, timer->numbytes); + if(!strncmp(timer->data, cfg->sps, strlen(cfg->sps))) { + timer->status = 2; + return 2; + } + if(!strncmp(timer->data, "cat", 3)) { + timer->isdata = 1; + return send_all(timer); + } + //if(!strncmp(timer->data, "quit", 4)) return 0; + return 0; } -static int s1_get(threadtimer_t *timer) { //get kanban - FILE *fp = open_file(kanban_path, LOCK_KANB_SH, "rb"); +// s1_get scan getxxx +static int s1_get(tcpool_thread_timer_t *timer) { + FILE *fp = open_file(timer->index, 0, 1); timer->status = 0; - if(fp) { - timer->fp = fp; - timer->is_open = 1; - timer->lock_type = LOCK_KANB_SH; - uint32_t ver, cli_ver; - if(fscanf(fp, "%u", &ver) > 0) { - if(sscanf(timer->data, "%u", &cli_ver) > 0) { - if(cli_ver < ver) { //need to send a new kanban - timer->is_open = 0; - close_file(timer); - int r = send_all(kanban_path, timer); - if(strstr(timer->data, "quit")) { - puts("Found last cmd is quit"); - timer->numbytes = 0; - return 0; - } - int i = 0; - for(; i < timer->numbytes; i++) { - if(!isdigit(timer->data[i])) { - timer->numbytes -= i; - break; - } - } - return r; - } - } - } - } - int r = close_file_and_send(timer, "null", 4); + if(!fp) goto GET_END; + + uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata}; + int r; uint32_t ver, cli_ver; + + pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data); + timer->isdata = 0; + r = fscanf(fp, "%u", &ver); + pthread_cleanup_pop(1); + + if(r <= 0) goto GET_END; + if(sscanf(timer->data, "%u", &cli_ver) <= 0) goto GET_END; + if(cli_ver >= ver) goto GET_END; + + //need to send a new kanban + r = send_all(timer); + goto GET_SKIP; + + GET_END: + r = send_data(timer->accept_fd, "null", 4); + GET_SKIP: if(strstr(timer->data, "quit")) { puts("Found last cmd is quit"); timer->numbytes = 0; @@ -582,37 +261,33 @@ static int s1_get(threadtimer_t *timer) { //get kanban return r; } -static int s2_set(threadtimer_t *timer) { +static int s2_set(tcpool_thread_timer_t *timer) { FILE *fp = NULL; - int lktp; if(!strncmp(timer->data, "ver", 3)) { - fp = open_file(kanban_path, LOCK_KANB_EX, "wb"); - lktp = LOCK_KANB_EX; + timer->isdata = 0; } else if(!strncmp(timer->data, "dat", 3)) { - fp = open_file(data_path, LOCK_DATA_EX, "wb"); - lktp = LOCK_DATA_EX; - } - if(fp) { - timer->status = 3; - timer->fp = fp; - timer->is_open = 1; - timer->lock_type = lktp; - return send_data(timer->accept_fd, "data", 4); + timer->isdata = 1; } else { timer->status = 0; return send_data(timer->accept_fd, "erro", 4); } + timer->status = 3; + return send_data(timer->accept_fd, "data", 4); } -static int s3_set_data(threadtimer_t *timer) { - char ret[4] = "succ"; - int flags = fcntl(timer->accept_fd, F_GETFL, 0); - fcntl(timer->accept_fd, F_SETFL, flags & ~O_NONBLOCK); +static int s3_set_data(tcpool_thread_timer_t *timer) { + char ret[4]; + *(uint32_t*)ret = *(uint32_t*)"succ"; timer->status = 0; - ssize_t n = recv(timer->accept_fd, timer->data, 4, MSG_WAITALL); - if(n < 4) { - *(uint32_t*)ret = *(uint32_t*)"erro"; - goto S3_RETURN; + FILE* fp = open_file(timer->index, timer->isdata, 0); + uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata}; + pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data); + if(timer->numbytes < 4) { + ssize_t n = recv(timer->accept_fd, timer->data+timer->numbytes, 4-timer->numbytes, MSG_WAITALL); + if(n < 4-timer->numbytes) { + *(uint32_t*)ret = *(uint32_t*)"erro"; + goto S3_RETURN; + } } #ifdef WORDS_BIGENDIAN uint32_t file_size = __builtin_bswap32(*(uint32_t*)(timer->data)); @@ -620,27 +295,27 @@ static int s3_set_data(threadtimer_t *timer) { uint32_t file_size = *(uint32_t*)(timer->data); #endif printf("Set data size: %u\n", file_size); - if((timer->numbytes = recv(timer->accept_fd, timer->data, TIMERDATSZ, 0)) < 0 && errno != EAGAIN) { + if((timer->numbytes = recv(timer->accept_fd, timer->data, SERVER_THREAD_BUFSZ, 0)) < 0 && errno != EAGAIN) { *(uint32_t*)ret = *(uint32_t*)"erro"; goto S3_RETURN; } printf("Get data size: %d\n", (int)timer->numbytes); - if(file_size <= TIMERDATSZ) { + if(file_size <= SERVER_THREAD_BUFSZ) { while(timer->numbytes != file_size) { - ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes, TIMERDATSZ - timer->numbytes, MSG_WAITALL); + ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes, SERVER_THREAD_BUFSZ - timer->numbytes, MSG_WAITALL); if(n <= 0) { *(uint32_t*)ret = *(uint32_t*)"erro"; goto S3_RETURN; } timer->numbytes += n; } - if(fwrite(timer->data, file_size, 1, timer->fp) != 1) { + if(fwrite(timer->data, file_size, 1, fp) != 1) { perror("fwrite"); *(uint32_t*)ret = *(uint32_t*)"erro"; } goto S3_RETURN; } - if(timer->numbytes > 0 && fwrite(timer->data, timer->numbytes, 1, timer->fp) != 1) { + if(timer->numbytes > 0 && fwrite(timer->data, timer->numbytes, 1, fp) != 1) { perror("fwrite"); *(uint32_t*)ret = *(uint32_t*)"erro"; goto S3_RETURN; @@ -648,7 +323,7 @@ static int s3_set_data(threadtimer_t *timer) { int32_t remain = file_size - timer->numbytes; while(remain > 0) { // printf("remain:%d\n", (int)remain); - ssize_t n = recv(timer->accept_fd, timer->data, (remain>TIMERDATSZ)?TIMERDATSZ:remain, MSG_WAITALL); + ssize_t n = recv(timer->accept_fd, timer->data, (remain>SERVER_THREAD_BUFSZ)?SERVER_THREAD_BUFSZ:remain, MSG_WAITALL); if(n < 0) { *(uint32_t*)ret = *(uint32_t*)"erro"; goto S3_RETURN; @@ -657,7 +332,7 @@ static int s3_set_data(threadtimer_t *timer) { usleep(10000); // 10 ms continue; } - if(fwrite(timer->data, n, 1, timer->fp) != 1) { + if(fwrite(timer->data, n, 1, fp) != 1) { perror("fwrite"); *(uint32_t*)ret = *(uint32_t*)"erro"; goto S3_RETURN; @@ -665,22 +340,25 @@ static int s3_set_data(threadtimer_t *timer) { remain -= n; } S3_RETURN: - fcntl(timer->accept_fd, F_SETFL, flags); - return close_file_and_send(timer, ret, 4); + pthread_cleanup_pop(1); + return send_data(timer->accept_fd, ret, 4); } +#define show_usage(program) printf("Usage: %s (-d) port kanban.txt data.bin config.sp\n\t-d: As daemon\n", program) + int main(int argc, char *argv[]) { if(argc != 5 && argc != 6) { show_usage(argv[0]); return 0; } - int port = 0; + uint16_t port = 0; int tmp; int as_daemon = !strncmp(argv[1], "-d", 3); - sscanf(argv[as_daemon?2:1], "%d", &port); - if(port < 0 || port >= 65536) { - printf("Error port: %d\n", port); + sscanf(argv[as_daemon?2:1], "%d", &tmp); + if(tmp < 0 || tmp >= 65536) { + printf("Error port: %d\n", tmp); return 1; } + port = (uint16_t)tmp; if(as_daemon && daemon(1, 1) < 0) { perror("Start daemon error"); return 2; @@ -714,20 +392,10 @@ int main(int argc, char *argv[]) { } read_pb_into(fp, (simple_pb_t*)(&_cfg)); fclose(fp); - if(!(fd = bind_server(&port))) { - return 6; - } - if(listen_socket()) { - perror("Listen failed"); - return 7; - } - /* - printf("password: "); - puts(cfg->pwd); - printf("set password: "); - puts(cfg->sps); - */ - accept_client(); - close(fd); + if(!(tmp = bind_server(&port))) return 6; + if(!listen_socket(tmp)) return 7; + pthread_cleanup_push((void (*)(void*))&close, (void*)((long long)tmp)); + accept_client(tmp); + pthread_cleanup_pop(1); return 99; } diff --git a/tcpool.h b/tcpool.h new file mode 100644 index 0000000..72b12ae --- /dev/null +++ b/tcpool.h @@ -0,0 +1,430 @@ +#ifndef _TCPOOL_H_ +#define _TCPOOL_H_ + +/* See feature_test_macros(7) */ +#define _GNU_SOURCE 1 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef TCPOOL_THREAD_TIMER_T_SZ + #define TCPOOL_THREAD_TIMER_T_SZ 1024 +#endif + +#define TCPOOL_THREAD_TIMER_T_HEAD_SZ ( \ + sizeof(uint32_t) \ + +sizeof(int) \ + +sizeof(time_t) \ + +sizeof(pthread_rwlock_t) \ + +2*sizeof(pthread_t) \ + +2*sizeof(pthread_cond_t) \ + +2*sizeof(pthread_mutex_t) \ + +sizeof(pthread_rwlock_t) \ + +2*sizeof(uint8_t) \ + ) + +#ifndef TCPOOL_THREADCNT + #define TCPOOL_THREADCNT 32 +#endif + +#ifndef TCPOOL_MAXWAITSEC + #define TCPOOL_MAXWAITSEC 8 +#endif + +#ifndef TCPOOL_THREAD_CONTEXT + #define TCPOOL_THREAD_CONTEXT uint8_t __padding[ \ + TCPOOL_THREAD_TIMER_T_SZ \ + -TCPOOL_THREAD_TIMER_T_HEAD_SZ \ + ] +#endif + +#ifndef TCPOOL_TOUCH_TIMER_CONDITION + #define TCPOOL_TOUCH_TIMER_CONDITION (0) +#endif + +#ifndef TCPOOL_INIT_ACTION + #define TCPOOL_INIT_ACTION ; +#endif + +#ifndef TCPOOL_PREHANDLE_ACCEPT_ACTION + #define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) ; +#endif + +#ifndef TCPOOL_CLEANUP_THREAD_ACTION + #define TCPOOL_CLEANUP_THREAD_ACTION(timer) ; +#endif + +struct tcpool_thread_timer_t { + uint32_t index; + int accept_fd; + time_t touch; // lock by mt + pthread_rwlock_t mt; // lock touch + pthread_t thread; + pthread_t timerthread; + pthread_cond_t c; // lock by mc + pthread_mutex_t mc; // lock c + pthread_cond_t tc; // lock by tmc + pthread_mutex_t tmc; // lock tc&hastimerslept + pthread_rwlock_t mb; // lock isbusy + TCPOOL_THREAD_CONTEXT; + uint8_t isbusy; // lock by mb + uint8_t hastimerslept; // lock by tmc +}; +typedef struct tcpool_thread_timer_t tcpool_thread_timer_t; + +static tcpool_thread_timer_t tcpool_timers[TCPOOL_THREADCNT]; + +#define tcpool_timer_pointer_of(x) ((tcpool_thread_timer_t*)(x)) + +#define tcpool_touch_timer(x) { \ + pthread_rwlock_wrlock(&tcpool_timer_pointer_of(x)->mt); \ + tcpool_timer_pointer_of(x)->touch = time(NULL); \ + printf("Touch timer@%d\n", tcpool_timer_pointer_of(x)->index);\ + pthread_rwlock_unlock(&tcpool_timer_pointer_of(x)->mt); \ +} + +#ifdef LISTEN_ON_IPV6 + static socklen_t tcpool_struct_len = sizeof(struct sockaddr_in6); + static struct sockaddr_in6 tcpool_server_addr; +#else + static socklen_t tcpool_struct_len = sizeof(struct sockaddr_in); + static struct sockaddr_in tcpool_server_addr; +#endif + +static pthread_attr_t __tcpool_thread_attr; +static pthread_key_t __tcpool_pthread_key_index; +static sigjmp_buf __tcpool_jmp2convend[TCPOOL_THREADCNT]; + +static void accept_action(tcpool_thread_timer_t *timer); +static void accept_client(int fd); +static void accept_timer(void *p); +static int bind_server(uint16_t* port); +static void cleanup_thread(tcpool_thread_timer_t* timer); +static void handle_accept(void *accept_fd_p); +static void handle_int(int signo); +static void handle_kill(int signo); +static void handle_pipe(int signo); +static void handle_quit(int signo); +static void handle_segv(int signo); +static int listen_socket(int fd); + +static int bind_server(uint16_t* port) { + #ifdef LISTEN_ON_IPV6 + tcpool_server_addr.sin6_family = AF_INET6; + tcpool_server_addr.sin6_port = htons(*port); + bzero(&(tcpool_server_addr.sin6_addr), sizeof(tcpool_server_addr.sin6_addr)); + int fd = socket(PF_INET6, SOCK_STREAM, 0); + #else + tcpool_server_addr.sin_family = AF_INET; + tcpool_server_addr.sin_port = htons(*port); + tcpool_server_addr.sin_addr.s_addr = INADDR_ANY; + bzero(&(tcpool_server_addr.sin_zero), 8); + int fd = socket(AF_INET, SOCK_STREAM, 0); + #endif + int on = 1; + if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { + perror("Set socket option failure"); + return 0; + } + if(!~bind(fd, (struct sockaddr *)&tcpool_server_addr, tcpool_struct_len)) { + perror("Bind server failure"); + return 0; + } + #ifdef LISTEN_ON_IPV6 + *port = ntohs(tcpool_server_addr.sin6_port); + struct in6_addr in = tcpool_server_addr.sin6_addr; + char str[INET6_ADDRSTRLEN]; // 46 + inet_ntop(AF_INET6, &in, str, sizeof(str)); + #else + *port = ntohs(tcpool_server_addr.sin_port); + struct in_addr in = tcpool_server_addr.sin_addr; + char str[INET_ADDRSTRLEN]; // 16 + inet_ntop(AF_INET, &in, str, sizeof(str)); + #endif + printf("Bind server successfully on %s:%u\n", str, *port); + return fd; +} + +static int listen_socket(int fd) { + if(!~listen(fd, TCPOOL_THREADCNT)) { + perror("Listen failed"); + return 0; + } + puts("Listening..."); + return fd; +} + +static void handle_quit(int signo) { + uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(__tcpool_pthread_key_index)); + printf("Handle sigquit@%d\n", index-1); + fflush(stdout); + if(index) { + sigaction(SIGQUIT, &(const struct sigaction){handle_quit}, NULL); + siglongjmp(__tcpool_jmp2convend[index-1], signo); + } + else pthread_exit(NULL); +} + +static void handle_segv(int signo) { + uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(__tcpool_pthread_key_index)); + printf("Handle sigsegv@%d\n", index-1); + fflush(stdout); + if(index) { + sigaction(SIGSEGV, &(const struct sigaction){handle_segv}, NULL); + siglongjmp(__tcpool_jmp2convend[index-1], signo); + } + else pthread_exit(NULL); +} + +static void handle_kill(int signo) { + puts("Handle sigkill/sigterm"); + fflush(stdout); + exit(signo); +} + +static void handle_int(int signo) { + puts("Keyboard interrupted"); + fflush(stdout); + exit(signo); +} + +static void handle_pipe(int signo) { + uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(__tcpool_pthread_key_index)); + printf("Pipe error@%d, break loop...\n", index-1); + fflush(stdout); + if(index) { + sigaction(SIGPIPE, &(const struct sigaction){handle_pipe}, NULL); + siglongjmp(__tcpool_jmp2convend[index-1], signo); + } + else pthread_exit(NULL); +} + +static void accept_timer(void *p) { + tcpool_thread_timer_t *timer = tcpool_timer_pointer_of(p); + uint32_t index = timer->index; + pthread_t thread = timer->thread; + uint8_t isbusy; + + sleep(TCPOOL_MAXWAITSEC / 4); + while(thread && !pthread_kill(thread, 0)) { + pthread_rwlock_rdlock(&timer->mb); + isbusy = timer->isbusy; + pthread_rwlock_unlock(&timer->mb); + if(!isbusy) { + TIMER_SLEEP: + pthread_mutex_lock(&timer->tmc); + timer->hastimerslept = 1; + printf("Timer@%d sleep\n", timer->index); + pthread_cond_wait(&timer->tc, &timer->tmc); + timer->hastimerslept = 0; + pthread_mutex_unlock(&timer->tmc); + printf("Timer@%d wake up\n", timer->index); + sleep(TCPOOL_MAXWAITSEC / 4); + thread = timer->thread; + } + if(TCPOOL_TOUCH_TIMER_CONDITION) tcpool_touch_timer(p); + pthread_rwlock_rdlock(&timer->mt); + time_t waitsec = time(NULL) - timer->touch; + pthread_rwlock_unlock(&timer->mt); + printf("Wait@%d sec: %u, max: %u\n", timer->index, (unsigned int)waitsec, TCPOOL_MAXWAITSEC); + if(waitsec > TCPOOL_MAXWAITSEC) { + if(thread) { + pthread_kill(thread, SIGQUIT); + printf("Kill thread@%d\n", timer->index); + } + break; + } + sleep(TCPOOL_MAXWAITSEC / 4); + thread = timer->thread; + } + goto TIMER_SLEEP; +} + +static void cleanup_thread(tcpool_thread_timer_t* timer) { + printf("Start cleaning@%d, ", timer->index); + + if(timer->accept_fd) { + close(timer->accept_fd); + timer->accept_fd = 0; + printf("Close accept, "); + } + + TCPOOL_CLEANUP_THREAD_ACTION(timer); + + timer->thread = 0; + printf("Clear thread, "); + + pthread_cond_destroy(&timer->c); + printf("Destroy accept cond, "); + + pthread_mutex_destroy(&timer->mc); + printf("Destroy accept mutex, "); + + pthread_rwlock_wrlock(&timer->mb); + timer->isbusy = 0; + printf("Clear busy, "); + pthread_rwlock_unlock(&timer->mb); + + puts("Finish cleaning"); +} + +static void handle_accept(void *p) { + #ifdef DEBUG + printf("accept ptr: %p\n", p); + #endif + pthread_cleanup_push((void (*)(void*))&cleanup_thread, p); + puts("Handling accept..."); + pthread_setspecific(__tcpool_pthread_key_index, (void*)((uintptr_t)tcpool_timer_pointer_of(p)->index+1)); + if(sigsetjmp(__tcpool_jmp2convend[tcpool_timer_pointer_of(p)->index], 1)) { + printf("Long Jump@%d\n", tcpool_timer_pointer_of(p)->index); + goto CONV_END; + } + while(1) { + accept_action(tcpool_timer_pointer_of(p)); + CONV_END: puts("Conversation end"); + + if(tcpool_timer_pointer_of(p)->accept_fd) { + close(tcpool_timer_pointer_of(p)->accept_fd); + tcpool_timer_pointer_of(p)->accept_fd = 0; + puts("Close accept"); + } + + TCPOOL_CLEANUP_THREAD_ACTION(tcpool_timer_pointer_of(p)); + + pthread_mutex_lock(&tcpool_timer_pointer_of(p)->mc); + + pthread_rwlock_wrlock(&tcpool_timer_pointer_of(p)->mb); + tcpool_timer_pointer_of(p)->isbusy = 0; + pthread_rwlock_unlock(&tcpool_timer_pointer_of(p)->mb); + + puts("Set thread status to idle"); + pthread_cond_wait(&tcpool_timer_pointer_of(p)->c, &tcpool_timer_pointer_of(p)->mc); + + pthread_mutex_unlock(&tcpool_timer_pointer_of(p)->mc); + puts("Thread wakeup"); + } + pthread_cleanup_pop(1); +} + +static void accept_client(int fd) { + sigaction(SIGINT , &(const struct sigaction){handle_int}, NULL); + sigaction(SIGQUIT, &(const struct sigaction){handle_quit}, NULL); + sigaction(SIGKILL, &(const struct sigaction){handle_kill}, NULL); + sigaction(SIGSEGV, &(const struct sigaction){handle_segv}, NULL); + sigaction(SIGPIPE, &(const struct sigaction){handle_pipe}, NULL); + sigaction(SIGTERM, &(const struct sigaction){handle_kill}, NULL); + pthread_attr_init(&__tcpool_thread_attr); + pthread_attr_setdetachstate(&__tcpool_thread_attr, PTHREAD_CREATE_DETACHED); + TCPOOL_INIT_ACTION; + for(int i = 0; i < TCPOOL_THREADCNT; i++) { + pthread_rwlock_init(&tcpool_timers[i].mt, NULL); + pthread_rwlock_init(&tcpool_timers[i].mb, NULL); + } + pthread_key_create(&__tcpool_pthread_key_index, NULL); + while(1) { + int p = 0; + while(p < TCPOOL_THREADCNT) { + pthread_rwlock_rdlock(&tcpool_timers[p].mb); + if(!tcpool_timers[p].isbusy) break; + pthread_rwlock_unlock(&tcpool_timers[p].mb); + p++; + } + if(p >= TCPOOL_THREADCNT) { + puts("Max thread cnt exceeded"); + sleep(1); + continue; + } + printf("Ready for accept on slot No.%d\n", p); + tcpool_thread_timer_t* timer = &tcpool_timers[p]; + pthread_rwlock_unlock(&timer->mb); + #ifdef LISTEN_ON_IPV6 + struct sockaddr_in6 client_addr; + #else + struct sockaddr_in client_addr; + #endif + int accept_fd; + if((accept_fd=accept(fd, (struct sockaddr *)&client_addr, &tcpool_struct_len))<=0) { + perror("Accept client error"); + continue; + } + pthread_rwlock_wrlock(&timer->mb); + timer->isbusy = 1; + pthread_rwlock_unlock(&timer->mb); + #ifdef LISTEN_ON_IPV6 + uint16_t port = ntohs(client_addr.sin6_port); + struct in6_addr in = client_addr.sin6_addr; + char str[INET6_ADDRSTRLEN]; // 46 + inet_ntop(AF_INET6, &in, str, sizeof(str)); + #else + uint16_t port = ntohs(client_addr.sin_port); + struct in_addr in = client_addr.sin_addr; + char str[INET_ADDRSTRLEN]; // 16 + inet_ntop(AF_INET, &in, str, sizeof(str)); + #endif + time_t t = time(NULL); + printf("\n> %sAccept client %s:%u at slot No.%d, ", ctime(&t), str, port, p); + timer->accept_fd = accept_fd; + timer->index = p; + pthread_rwlock_wrlock(&timer->mt); + timer->touch = time(NULL); + pthread_rwlock_unlock(&timer->mt); + TCPOOL_PREHANDLE_ACCEPT_ACTION(timer); + // start or wakeup accept thread + pthread_t thread = timer->thread; + if(thread && !pthread_kill(thread, 0)) { + pthread_mutex_lock(&timer->mc); + pthread_cond_signal(&timer->c); // wakeup thread + pthread_mutex_unlock(&timer->mc); + puts("Pick thread from pool"); + } else { + pthread_cond_init(&timer->c, NULL); + pthread_mutex_init(&timer->mc, NULL); + if (pthread_create(&timer->thread, &__tcpool_thread_attr, (void (*)(void*))&handle_accept, timer)) { + perror("Error creating thread"); + cleanup_thread(timer); + putchar('\n'); + continue; + } + puts("Thread created"); + } + // start or wakeup timer thread + thread = timer->timerthread; + if(!thread || pthread_kill(thread, 0)) { + printf("Creating timer thread..."); + pthread_cond_init(&timer->tc, NULL); + pthread_mutex_init(&timer->tmc, NULL); + timer->hastimerslept = 0; + if (pthread_create(&timer->timerthread, &__tcpool_thread_attr, (void (*)(void*))&accept_timer, timer)) { + perror("Error creating timer thread"); + cleanup_thread(timer); + putchar('\n'); + continue; + } + puts("succeeded"); + } else { + pthread_mutex_lock(&timer->tmc); + uint8_t hastimerslept = timer->hastimerslept; + pthread_mutex_unlock(&timer->tmc); + if(hastimerslept) { + printf("Waking up timer thread..."); + pthread_mutex_lock(&timer->tmc); + pthread_cond_signal(&timer->tc); // wakeup thread + pthread_mutex_unlock(&timer->tmc); + puts("succeeded"); + } else puts("Timer already running"); + } + } +} + +#endif /* _TCPOOL_H_ */