diff --git a/server.c b/server.c index a7bcd66..c196dda 100644 --- a/server.c +++ b/server.c @@ -16,139 +16,65 @@ #include #include #include + #include "server.h" #include "dict.h" #include "crypto.h" #include "config.h" -struct thread_timer_t { - uint32_t index; - int accept_fd; - time_t touch; // lock by mt - ssize_t numbytes; - char *dat; - pthread_rwlock_t mt; // lock touch - pthread_t thread; - pthread_t timerthread; - pthread_cond_t c; // lock by mc - pthread_mutex_t mc; // lock c - pthread_cond_t tc; // lock by tmc - pthread_mutex_t tmc; // lock tc&hastimerslept - pthread_rwlock_t mb; // lock isbusy - uint8_t isbusy; // lock by mb - uint8_t hastimerslept; // lock by tmc - uint8_t buf[ - THREAD_TIMER_T_SZ - -sizeof(uint32_t) - -sizeof(int) - -sizeof(time_t) - -sizeof(ssize_t) - -sizeof(char*) - -sizeof(pthread_rwlock_t) - -2*sizeof(pthread_t) - -2*sizeof(pthread_cond_t) - -2*sizeof(pthread_mutex_t) - -sizeof(pthread_rwlock_t) - -2*sizeof(uint8_t) - ]; -}; -typedef struct thread_timer_t thread_timer_t; -static thread_timer_t timers[THREADCNT]; -#define timer_pointer_of(x) ((thread_timer_t*)(x)) -#define touch_timer(x) { \ - pthread_rwlock_wrlock(&timer_pointer_of(x)->mt); \ - timer_pointer_of(x)->touch = time(NULL); \ - printf("Touch timer@%d\n", timer_pointer_of(x)->index);\ - pthread_rwlock_unlock(&timer_pointer_of(x)->mt); \ -} +static server_ack_t del(FILE *fp, const char* key, int len, char ret[4]); +static void init_dict_pool(FILE *fp); +static int insert_item(FILE *fp, const dict_t* dict, int keysize, int datasize); +static inline uint32_t last_nonnull(const char* p, uint32_t max_size); -#ifdef LISTEN_ON_IPV6 - static socklen_t struct_len = sizeof(struct sockaddr_in6); - static struct sockaddr_in6 server_addr; -#else - static socklen_t struct_len = sizeof(struct sockaddr_in); - static struct sockaddr_in server_addr; -#endif +#define TCPOOL_THREAD_TIMER_T_SZ THREAD_TIMER_T_SZ +#define TCPOOL_THREADCNT THREADCNT +#define TCPOOL_MAXWAITSEC MAXWAITSEC +#define TCPOOL_THREAD_CONTEXT \ + ssize_t numbytes; \ + char *dat; \ + uint8_t buf[ \ + TCPOOL_THREAD_TIMER_T_SZ \ + -sizeof(uint32_t) \ + -sizeof(int) \ + -sizeof(time_t) \ + -sizeof(ssize_t) \ + -sizeof(char*) \ + -sizeof(pthread_rwlock_t) \ + -2*sizeof(pthread_t) \ + -2*sizeof(pthread_cond_t) \ + -2*sizeof(pthread_mutex_t) \ + -sizeof(pthread_rwlock_t) \ + -2*sizeof(uint8_t) \ + ] static dict_t setdicts[THREADCNT]; static uint32_t* items_len; static config_t cfg; -static pthread_attr_t attr; #define DICTPOOLSZ (((uint32_t)-1)>>((sizeof(uint32_t)*8-DICTPOOLBIT))) static dict_t* dict_pool[DICTPOOLSZ+1]; -static pthread_key_t pthread_key_index; -static sigjmp_buf jmp2convend[THREADCNT]; +#define TCPOOL_TOUCH_TIMER_CONDITION (is_dict_opening) +#define TCPOOL_INIT_ACTION \ + init_crypto(); \ + init_dict_pool(open_dict(0, 1)); \ + close_dict(0); +#define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) reset_seq(timer->index) +#define TCPOOL_CLEANUP_THREAD_ACTION(timer) \ + close_dict(timer->index); \ + setdicts[timer->index].data[0] = 0; -static void accept_client(int fd); -static void accept_timer(void *p); -static int bind_server(uint16_t* port); -static void cleanup_thread(thread_timer_t* timer); -static server_ack_t del(FILE *fp, const char* key, int len, char ret[4]); -static void handle_accept(void *accept_fd_p); -static void handle_int(int signo); -static void handle_kill(int signo); -static void handle_pipe(int signo); -static void handle_quit(int signo); -static void handle_segv(int signo); -static void init_dict_pool(FILE *fp); -static int insert_item(FILE *fp, const dict_t* dict, int keysize, int datasize); -static inline uint32_t last_nonnull(const char* p, uint32_t max_size); -static int listen_socket(int fd); -static int send_all(thread_timer_t *timer); +#include "tcpool.h" + +static int send_all(tcpool_thread_timer_t *timer); static int send_data(int accept_fd, int index, server_ack_t cmd, const char *data, size_t length); -static int s1_get(thread_timer_t *timer); -static int s2_set(thread_timer_t *timer); -static int s3_set_data(thread_timer_t *timer); -static int s4_del(thread_timer_t *timer); -static int s5_md5(thread_timer_t *timer); +static int s1_get(tcpool_thread_timer_t *timer); +static int s2_set(tcpool_thread_timer_t *timer); +static int s3_set_data(tcpool_thread_timer_t *timer); +static int s4_del(tcpool_thread_timer_t *timer); +static int s5_md5(tcpool_thread_timer_t *timer); -static int bind_server(uint16_t* port) { - #ifdef LISTEN_ON_IPV6 - server_addr.sin6_family = AF_INET6; - server_addr.sin6_port = htons(*port); - bzero(&(server_addr.sin6_addr), sizeof(server_addr.sin6_addr)); - int fd = socket(PF_INET6, SOCK_STREAM, 0); - #else - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(*port); - server_addr.sin_addr.s_addr = INADDR_ANY; - bzero(&(server_addr.sin_zero), 8); - int fd = socket(AF_INET, SOCK_STREAM, 0); - #endif - int on = 1; - if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { - perror("Set socket option failure"); - return 0; - } - if(!~bind(fd, (struct sockaddr *)&server_addr, struct_len)) { - perror("Bind server failure"); - return 0; - } - #ifdef LISTEN_ON_IPV6 - *port = ntohs(server_addr.sin6_port); - struct in6_addr in = server_addr.sin6_addr; - char str[INET6_ADDRSTRLEN]; // 46 - inet_ntop(AF_INET6, &in, str, sizeof(str)); - #else - *port = ntohs(server_addr.sin_port); - struct in_addr in = server_addr.sin_addr; - char str[INET_ADDRSTRLEN]; // 16 - inet_ntop(AF_INET, &in, str, sizeof(str)); - #endif - printf("Bind server successfully on %s:%u\n", str, *port); - return fd; -} - -static int listen_socket(int fd) { - if(!~listen(fd, THREADCNT)) { - perror("Listen failed"); - return 0; - } - puts("Listening..."); - return fd; -} static inline uint32_t last_nonnull(const char* p, uint32_t max_size) { if(max_size > 1) while(!p[max_size - 1]) max_size--; @@ -176,11 +102,11 @@ static int send_data(int accept_fd, int index, server_ack_t cmd, const char *dat return 0; } -static int send_all(thread_timer_t *timer) { +static int send_all(tcpool_thread_timer_t *timer) { int re = 1; FILE *fp = open_dict(timer->index, 1); if(fp == NULL) return 1; - pthread_cleanup_push((void*)&close_dict, (void*)(uintptr_t)timer->index); + pthread_cleanup_push((void (*)(void*))&close_dict, (void*)(uintptr_t)timer->index); off_t len = 0, file_size = get_dict_size(); while(1) { @@ -194,14 +120,14 @@ static int send_all(thread_timer_t *timer) { perror("malloc"); break; } - pthread_cleanup_push((void*)&free, (void*)buf); + pthread_cleanup_push((void (*)(void*))&free, (void*)buf); if(fread(buf, file_size, 1, fp) == 1) { #ifdef DEBUG printf("Get dict file size: %u\n", (unsigned int)file_size); #endif char* encbuf = raw_encrypt(buf, &file_size, timer->index, cfg.pwd); sprintf(timer->dat, "%u$", (unsigned int)file_size); - pthread_cleanup_push((void*)&free, (void*)encbuf); + pthread_cleanup_push((void (*)(void*))&free, (void*)encbuf); struct iovec iov[2] = {{timer->dat, strlen(timer->dat)}, {encbuf, file_size}}; re = writev(timer->accept_fd, (const struct iovec *)&iov, 2); printf("Send %d bytes.\n", re); @@ -256,14 +182,14 @@ static void init_dict_pool(FILE *fp) { } } -static int s1_get(thread_timer_t *timer) { +static int s1_get(tcpool_thread_timer_t *timer) { uint8_t digest[16]; uint8_t buf[8+DICTSZ]; if(require_shared_lock()) // busy return send_data(timer->accept_fd, timer->index, ACKERRO, "erro", 4); int ret = -1; - pthread_cleanup_push((void*)&release_shared_lock, NULL); + pthread_cleanup_push((void (*)(void*))&release_shared_lock, NULL); while(1) { md5((uint8_t*)timer->dat, strlen(timer->dat)+1, digest); uint8_t* dp = digest; @@ -305,7 +231,7 @@ static int s1_get(thread_timer_t *timer) { if(fp == NULL) return send_data(timer->accept_fd, timer->index, ACKERRO, "erro", 4); while(1) { int ch; - pthread_cleanup_push((void*)&close_dict, (void*)(uintptr_t)timer->index); + pthread_cleanup_push((void (*)(void*))&close_dict, (void*)(uintptr_t)timer->index); while(has_next(fp, ch)) { if(!ch) continue; // skip null bytes simple_pb_t* spb = read_pb_into(fp, (simple_pb_t*)buf); @@ -324,7 +250,7 @@ static int s1_get(thread_timer_t *timer) { return ret; } -static int s2_set(thread_timer_t *timer) { +static int s2_set(tcpool_thread_timer_t *timer) { memset(&setdicts[timer->index], 0, sizeof(dict_t)); strncpy(setdicts[timer->index].key, timer->dat, DICTKEYSZ-1); md5((uint8_t*)timer->dat, strlen(timer->dat)+1, (uint8_t*)setdicts[timer->index].data); @@ -368,7 +294,7 @@ static int insert_item(FILE *fp, const dict_t* dict, int keysize, int datasize) char* data = malloc(cap); int iserr = 1; if(data) { - pthread_cleanup_push((void*)&free, data); + pthread_cleanup_push((void (*)(void*))&free, data); while(1) { if(fseek(fp, next, SEEK_SET)) { iserr = 1; @@ -401,7 +327,7 @@ ERR_INSERT_ITEM: return 1; } -static int s3_set_data(thread_timer_t *timer) { +static int s3_set_data(tcpool_thread_timer_t *timer) { if(is_empty_md5((uint64_t*)setdicts[timer->index].data)) { puts("Set data error: key md5 is empty"); return send_data(timer->accept_fd, timer->index, ACKERRO, "erro", 4); @@ -419,10 +345,10 @@ static int s3_set_data(thread_timer_t *timer) { } int r; - pthread_cleanup_push((void*)&close_dict, (void*)(uintptr_t)timer->index); + pthread_cleanup_push((void (*)(void*))&close_dict, (void*)(uintptr_t)timer->index); uint8_t* dp = (uint8_t*)setdicts[timer->index].data; - touch_timer(timer); + tcpool_touch_timer(timer); int p = ((*((uint32_t*)dp))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; dict_t* setdict; @@ -504,7 +430,7 @@ static server_ack_t del(FILE *fp, const char* key, int len, char ret[4]) { char* data = malloc(cap); server_ack_t ack = 0; if(data) { - pthread_cleanup_push((void*)&free, data); + pthread_cleanup_push((void (*)(void*))&free, data); while(1) { if(fseek(fp, next, SEEK_SET)) { *(uint32_t*)ret = *(uint32_t*)"erro"; @@ -537,14 +463,14 @@ static server_ack_t del(FILE *fp, const char* key, int len, char ret[4]) { return ACKNULL; } -static int s4_del(thread_timer_t *timer) { +static int s4_del(tcpool_thread_timer_t *timer) { uint8_t digest[16]; char ret[4]; int r; FILE *fp = open_dict(timer->index, 0); if(fp == NULL) return send_data(timer->accept_fd, timer->index, ACKERRO, "erro", 4); - pthread_cleanup_push((void*)&close_dict, (void*)(uintptr_t)timer->index); + pthread_cleanup_push((void (*)(void*))&close_dict, (void*)(uintptr_t)timer->index); while(1) { md5((uint8_t*)timer->dat, strlen(timer->dat)+1, digest); uint8_t* dp = digest; @@ -573,11 +499,11 @@ static int s4_del(thread_timer_t *timer) { return r; } -static int s5_md5(thread_timer_t *timer) { +static int s5_md5(tcpool_thread_timer_t *timer) { FILE* fp = open_dict(timer->index, 1); if(fp == NULL) return send_data(timer->accept_fd, timer->index, ACKERRO, "erro", 4); int r; - pthread_cleanup_push((void*)&close_dict, (void*)(uintptr_t)timer->index); + pthread_cleanup_push((void (*)(void*))&close_dict, (void*)(uintptr_t)timer->index); fill_md5(fp); r = is_dict_md5_equal((uint8_t*)timer->dat); pthread_cleanup_pop(1); @@ -585,369 +511,100 @@ static int s5_md5(thread_timer_t *timer) { else return send_data(timer->accept_fd, timer->index, ACKNEQU, "nequ", 4); } -static void handle_quit(int signo) { - uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(pthread_key_index)); - printf("Handle sigquit@%d\n", index-1); - fflush(stdout); - if(index) { - sigaction(SIGQUIT, &(const struct sigaction){handle_quit}, NULL); - siglongjmp(jmp2convend[index-1], signo); - } - else pthread_exit(NULL); -} - -static void handle_segv(int signo) { - uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(pthread_key_index)); - printf("Handle sigsegv@%d\n", index-1); - fflush(stdout); - if(index) { - sigaction(SIGSEGV, &(const struct sigaction){handle_segv}, NULL); - siglongjmp(jmp2convend[index-1], signo); - } - else pthread_exit(NULL); -} - -static void handle_kill(int signo) { - puts("Handle sigkill/sigterm"); - fflush(stdout); - exit(signo); -} - -static void handle_int(int signo) { - puts("Keyboard interrupted"); - fflush(stdout); - exit(signo); -} - -static void handle_pipe(int signo) { - uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(pthread_key_index)); - printf("Pipe error@%d, break loop...\n", index-1); - fflush(stdout); - if(index) { - sigaction(SIGPIPE, &(const struct sigaction){handle_pipe}, NULL); - siglongjmp(jmp2convend[index-1], signo); - } - else pthread_exit(NULL); -} - -static void accept_timer(void *p) { - thread_timer_t *timer = timer_pointer_of(p); +static void accept_action(tcpool_thread_timer_t *timer) { + int accept_fd = timer->accept_fd; uint32_t index = timer->index; - pthread_t thread = timer->thread; - - sleep(MAXWAITSEC / 4); - while(thread && !pthread_kill(thread, 0)) { - pthread_rwlock_rdlock(&timer->mb); - uint8_t isbusy = timer->isbusy; - pthread_rwlock_unlock(&timer->mb); - if(!isbusy) { - TIMER_SLEEP: - pthread_mutex_lock(&timer->tmc); - timer->hastimerslept = 1; - printf("Timer@%d sleep\n", timer->index); - pthread_cond_wait(&timer->tc, &timer->tmc); - timer->hastimerslept = 0; - pthread_mutex_unlock(&timer->tmc); - printf("Timer@%d wake up\n", timer->index); - sleep(MAXWAITSEC / 4); - thread = timer->thread; - } - if(is_dict_opening) touch_timer(p); - pthread_rwlock_rdlock(&timer->mt); - time_t waitsec = time(NULL) - timer->touch; - pthread_rwlock_unlock(&timer->mt); - printf("Wait@%d sec: %u, max: %u\n", timer->index, (unsigned int)waitsec, MAXWAITSEC); - if(waitsec > MAXWAITSEC) { - if(thread) { - pthread_kill(thread, SIGQUIT); - printf("Kill thread@%d\n", timer->index); + uint8_t *buff = timer->buf; + cmdpacket_t cp = (cmdpacket_t)buff; + ssize_t numbytes = 0, offset = 0; + while( + offset >= CMDPACKET_HEAD_LEN + || (numbytes = recv(accept_fd, buff+offset, CMDPACKET_HEAD_LEN-offset, MSG_WAITALL)) > 0 + ) { + tcpool_touch_timer(timer); + offset += numbytes; + #ifdef DEBUG + printf("[handle] Get %zd bytes, total: %zd.\n", numbytes, offset); + #endif + if(offset < CMDPACKET_HEAD_LEN) break; + if(offset < CMDPACKET_HEAD_LEN+(ssize_t)(cp->datalen)) { + ssize_t toread = CMDPACKET_HEAD_LEN+(ssize_t)(cp->datalen)-offset; + numbytes = recv(accept_fd, buff+offset, toread, MSG_WAITALL); + if(numbytes != toread) break; + else { + offset += numbytes; + #ifdef DEBUG + printf("[handle] Get %zd bytes, total: %zd.\n", numbytes, offset); + #endif } - break; } - sleep(MAXWAITSEC / 4); - thread = timer->thread; - } - goto TIMER_SLEEP; -} - -static void cleanup_thread(thread_timer_t* timer) { - printf("Start cleaning@%d, ", timer->index); - - if(timer->accept_fd) { - close(timer->accept_fd); - timer->accept_fd = 0; - printf("Close accept, "); - } - - close_dict(timer->index); - setdicts[timer->index].data[0] = 0; - - timer->thread = 0; - printf("Clear thread, "); - - pthread_cond_destroy(&timer->c); - printf("Destroy accept cond, "); - - pthread_mutex_destroy(&timer->mc); - printf("Destroy accept mutex, "); - - pthread_rwlock_wrlock(&timer->mb); - timer->isbusy = 0; - printf("Clear busy, "); - pthread_rwlock_unlock(&timer->mb); - - puts("Finish cleaning"); -} - -static void handle_accept(void *p) { - #ifdef DEBUG - printf("accept ptr: %p\n", p); - #endif - pthread_cleanup_push((void*)&cleanup_thread, p); - puts("Handling accept..."); - pthread_setspecific(pthread_key_index, (void*)((uintptr_t)timer_pointer_of(p)->index+1)); - if(sigsetjmp(jmp2convend[timer_pointer_of(p)->index], 1)) { - printf("Long Jump@%d\n", timer_pointer_of(p)->index); - goto CONV_END; - } - while(1) { - int accept_fd = timer_pointer_of(p)->accept_fd; - uint32_t index = timer_pointer_of(p)->index; - uint8_t *buff = timer_pointer_of(p)->buf; - cmdpacket_t cp = (cmdpacket_t)buff; - ssize_t numbytes = 0, offset = 0; - while( - offset >= CMDPACKET_HEAD_LEN - || (numbytes = recv(accept_fd, buff+offset, CMDPACKET_HEAD_LEN-offset, MSG_WAITALL)) > 0 - ) { - touch_timer(p); - offset += numbytes; - #ifdef DEBUG - printf("[handle] Get %zd bytes, total: %zd.\n", numbytes, offset); - #endif - if(offset < CMDPACKET_HEAD_LEN) break; - if(offset < CMDPACKET_HEAD_LEN+(ssize_t)(cp->datalen)) { - ssize_t toread = CMDPACKET_HEAD_LEN+(ssize_t)(cp->datalen)-offset; - numbytes = recv(accept_fd, buff+offset, toread, MSG_WAITALL); - if(numbytes != toread) break; - else { - offset += numbytes; - #ifdef DEBUG - printf("[handle] Get %zd bytes, total: %zd.\n", numbytes, offset); - #endif - } - } - numbytes = CMDPACKET_HEAD_LEN+(ssize_t)(cp->datalen); // 暂存 packet len - if(offset < numbytes) break; - #ifdef DEBUG - printf("[handle] Decrypt %d bytes data...\n", (int)cp->datalen); - #endif - if(cp->cmd <= CMDEND) { - if(!cmdpacket_decrypt(cp, index, cfg.pwd)) { - cp->data[cp->datalen] = 0; - timer_pointer_of(p)->dat = (char*)cp->data; - timer_pointer_of(p)->numbytes = (ssize_t)(cp->datalen); - printf("[normal] Get %zd bytes packet with cmd: %d, data: %s\n", offset, cp->cmd, cp->data); - switch(cp->cmd) { - case CMDGET: - if(!has_dict_opened && !s1_get(timer_pointer_of(p))) goto CONV_END; - break; - case CMDCAT: - if(!has_dict_opened && !send_all(timer_pointer_of(p))) goto CONV_END; - break; - case CMDMD5: - if(!has_dict_opened && !s5_md5(timer_pointer_of(p))) goto CONV_END; - break; - case CMDACK: - case CMDEND: - default: goto CONV_END; break; - } - } else { - puts("Decrypt normal data failed"); + numbytes = CMDPACKET_HEAD_LEN+(ssize_t)(cp->datalen); // 暂存 packet len + if(offset < numbytes) break; + #ifdef DEBUG + printf("[handle] Decrypt %d bytes data...\n", (int)cp->datalen); + #endif + if(cp->cmd <= CMDEND) { + if(!cmdpacket_decrypt(cp, index, cfg.pwd)) { + cp->data[cp->datalen] = 0; + timer->dat = (char*)cp->data; + timer->numbytes = (ssize_t)(cp->datalen); + printf("[normal] Get %zd bytes packet with cmd: %d, data: %s\n", offset, cp->cmd, cp->data); + switch(cp->cmd) { + case CMDGET: + if(!has_dict_opened && !s1_get(timer)) return; break; - } - } else if(cp->cmd <= CMDDAT) { - if(!cmdpacket_decrypt(cp, index, cfg.sps)) { - cp->data[cp->datalen] = 0; - timer_pointer_of(p)->dat = (char*)cp->data; - timer_pointer_of(p)->numbytes = (ssize_t)(cp->datalen); - printf("[super] Get %zd bytes packet with data: %s\n", offset, cp->data); - switch(cp->cmd) { - case CMDSET: - if(!has_dict_opened && !s2_set(timer_pointer_of(p))) goto CONV_END; - break; - case CMDDEL: - if(!has_dict_opened && !s4_del(timer_pointer_of(p))) goto CONV_END; - break; - case CMDDAT: - if(!has_dict_opened && !s3_set_data(timer_pointer_of(p))) goto CONV_END; - break; - default: goto CONV_END; break; - } - } else { - puts("Decrypt super data failed"); + case CMDCAT: + if(!has_dict_opened && !send_all(timer)) return; break; + case CMDMD5: + if(!has_dict_opened && !s5_md5(timer)) return; + break; + case CMDACK: + case CMDEND: + default: return; break; } } else { - puts("Invalid command"); + puts("Decrypt normal data failed"); break; } - if(offset > numbytes) { - offset -= numbytes; - memmove(buff, buff+numbytes, offset); - numbytes = 0; - } else offset = 0; - #ifdef DEBUG - printf("Offset after analyzing packet: %zd\n", offset); - #endif + } else if(cp->cmd <= CMDDAT) { + if(!cmdpacket_decrypt(cp, index, cfg.sps)) { + cp->data[cp->datalen] = 0; + timer->dat = (char*)cp->data; + timer->numbytes = (ssize_t)(cp->datalen); + printf("[super] Get %zd bytes packet with data: %s\n", offset, cp->data); + switch(cp->cmd) { + case CMDSET: + if(!has_dict_opened && !s2_set(timer)) return; + break; + case CMDDEL: + if(!has_dict_opened && !s4_del(timer)) return; + break; + case CMDDAT: + if(!has_dict_opened && !s3_set_data(timer)) return; + break; + default: return; break; + } + } else { + puts("Decrypt super data failed"); + break; + } + } else { + puts("Invalid command"); + break; } - CONV_END: puts("Conversation end"); - - if(timer_pointer_of(p)->accept_fd) { - close(timer_pointer_of(p)->accept_fd); - timer_pointer_of(p)->accept_fd = 0; - puts("Close accept"); - } - - close_dict(timer_pointer_of(p)->index); - setdicts[timer_pointer_of(p)->index].data[0] = 0; - - pthread_mutex_lock(&timer_pointer_of(p)->mc); - - pthread_rwlock_wrlock(&timer_pointer_of(p)->mb); - timer_pointer_of(p)->isbusy = 0; - pthread_rwlock_unlock(&timer_pointer_of(p)->mb); - - puts("Set thread status to idle"); - pthread_cond_wait(&timer_pointer_of(p)->c, &timer_pointer_of(p)->mc); - - pthread_mutex_unlock(&timer_pointer_of(p)->mc); - puts("Thread wakeup"); + if(offset > numbytes) { + offset -= numbytes; + memmove(buff, buff+numbytes, offset); + numbytes = 0; + } else offset = 0; + #ifdef DEBUG + printf("Offset after analyzing packet: %zd\n", offset); + #endif } - pthread_cleanup_pop(1); + return; } -static void accept_client(int fd) { - /*pid_t pid = fork(); - while (pid > 0) { // 主进程监控子进程状态,如果子进程异常终止则重启之 - wait(NULL); - puts("Server subprocess exited. Restart..."); - pid = fork(); - } - while(pid < 0) { - perror("Error when forking a subprocess"); - sleep(1); - }*/ - sigaction(SIGINT , &(const struct sigaction){handle_int}, NULL); - sigaction(SIGQUIT, &(const struct sigaction){handle_quit}, NULL); - sigaction(SIGKILL, &(const struct sigaction){handle_kill}, NULL); - sigaction(SIGSEGV, &(const struct sigaction){handle_segv}, NULL); - sigaction(SIGPIPE, &(const struct sigaction){handle_pipe}, NULL); - sigaction(SIGTERM, &(const struct sigaction){handle_kill}, NULL); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - init_crypto(); - init_dict_pool(open_dict(0, 1)); - close_dict(0); - for(int i = 0; i < THREADCNT; i++) { - pthread_rwlock_init(&timers[i].mt, NULL); - pthread_rwlock_init(&timers[i].mb, NULL); - } - pthread_key_create(&pthread_key_index, NULL); - while(1) { - int p = 0; - while(p < THREADCNT) { - pthread_rwlock_rdlock(&timers[p].mb); - if(!timers[p].isbusy) break; - pthread_rwlock_unlock(&timers[p].mb); - p++; - } - if(p >= THREADCNT) { - puts("Max thread cnt exceeded"); - sleep(1); - continue; - } - printf("Ready for accept on slot No.%d\n", p); - thread_timer_t* timer = &timers[p]; - pthread_rwlock_unlock(&timer->mb); - #ifdef LISTEN_ON_IPV6 - struct sockaddr_in6 client_addr; - #else - struct sockaddr_in client_addr; - #endif - int accept_fd; - if((accept_fd=accept(fd, (struct sockaddr *)&client_addr, &struct_len))<=0) { - perror("Accept client error"); - continue; - } - pthread_rwlock_wrlock(&timer->mb); - timer->isbusy = 1; - pthread_rwlock_unlock(&timer->mb); - #ifdef LISTEN_ON_IPV6 - uint16_t port = ntohs(client_addr.sin6_port); - struct in6_addr in = client_addr.sin6_addr; - char str[INET6_ADDRSTRLEN]; // 46 - inet_ntop(AF_INET6, &in, str, sizeof(str)); - #else - uint16_t port = ntohs(client_addr.sin_port); - struct in_addr in = client_addr.sin_addr; - char str[INET_ADDRSTRLEN]; // 16 - inet_ntop(AF_INET, &in, str, sizeof(str)); - #endif - time_t t = time(NULL); - printf("\n> %sAccept client %s:%u at slot No.%d, ", ctime(&t), str, port, p); - timer->accept_fd = accept_fd; - timer->index = p; - pthread_rwlock_wrlock(&timers[p].mt); - timer->touch = time(NULL); - pthread_rwlock_unlock(&timers[p].mt); - reset_seq(p); - // start or wakeup accept thread - pthread_t thread = timer->thread; - if(thread && !pthread_kill(thread, 0)) { - pthread_mutex_lock(&timer->mc); - pthread_cond_signal(&timer->c); // wakeup thread - pthread_mutex_unlock(&timer->mc); - puts("Pick thread from pool"); - } else { - pthread_cond_init(&timer->c, NULL); - pthread_mutex_init(&timer->mc, NULL); - if (pthread_create(&timer->thread, &attr, (void *)&handle_accept, timer)) { - perror("Error creating thread"); - cleanup_thread(timer); - putchar('\n'); - continue; - } - puts("Thread created"); - } - // start or wakeup timer thread - thread = timer->timerthread; - if(!thread || pthread_kill(thread, 0)) { - printf("Creating timer thread..."); - pthread_cond_init(&timer->tc, NULL); - pthread_mutex_init(&timer->tmc, NULL); - timer->hastimerslept = 0; - if (pthread_create(&timer->timerthread, &attr, (void *)&accept_timer, timer)) { - perror("Error creating timer thread"); - cleanup_thread(timer); - putchar('\n'); - continue; - } - puts("succeeded"); - } else { - pthread_mutex_lock(&timer->tmc); - uint8_t hastimerslept = timer->hastimerslept; - pthread_mutex_unlock(&timer->tmc); - if(hastimerslept) { - printf("Waking up timer thread..."); - pthread_mutex_lock(&timer->tmc); - pthread_cond_signal(&timer->tc); // wakeup thread - pthread_mutex_unlock(&timer->tmc); - puts("succeeded"); - } else puts("Timer already running"); - } - } -} #define argequ(i, arg) (*(uint16_t*)argv[i] == *(uint16_t*)(arg)) #define showUsage(program) \ @@ -1025,7 +682,7 @@ int main(int argc, char *argv[]) { int fd; if(!(fd=bind_server((uint16_t*)&port))) return 10; if(!listen_socket(fd)) return 11; - pthread_cleanup_push((void*)&close, (void*)((long long)fd)); + pthread_cleanup_push((void (*)(void*))&close, (void*)((long long)fd)); accept_client(fd); pthread_cleanup_pop(1); return 0; diff --git a/tcpool.h b/tcpool.h new file mode 100644 index 0000000..35bff61 --- /dev/null +++ b/tcpool.h @@ -0,0 +1,428 @@ +#ifndef _TCPOOL_H_ +#define _TCPOOL_H_ + +/* See feature_test_macros(7) */ +#define _GNU_SOURCE 1 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef TCPOOL_THREAD_TIMER_T_SZ + #define TCPOOL_THREAD_TIMER_T_SZ 1024 +#endif + +#ifndef TCPOOL_THREADCNT + #define TCPOOL_THREADCNT 32 +#endif + +#ifndef TCPOOL_MAXWAITSEC + #define TCPOOL_MAXWAITSEC 8 +#endif + +#ifndef TCPOOL_THREAD_CONTEXT + #define TCPOOL_THREAD_CONTEXT uint8_t __padding[ \ + TCPOOL_THREAD_TIMER_T_SZ \ + -sizeof(uint32_t) \ + -sizeof(int) \ + -sizeof(time_t) \ + -sizeof(pthread_rwlock_t) \ + -2*sizeof(pthread_t) \ + -2*sizeof(pthread_cond_t) \ + -2*sizeof(pthread_mutex_t) \ + -sizeof(pthread_rwlock_t) \ + -2*sizeof(uint8_t) \ + ] +#endif + +#ifndef TCPOOL_TOUCH_TIMER_CONDITION + #define TCPOOL_TOUCH_TIMER_CONDITION (0) +#endif + +#ifndef TCPOOL_INIT_ACTION + #define TCPOOL_INIT_ACTION ; +#endif + +#ifndef TCPOOL_PREHANDLE_ACCEPT_ACTION + #define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) ; +#endif + +#ifndef TCPOOL_CLEANUP_THREAD_ACTION + #define TCPOOL_CLEANUP_THREAD_ACTION(timer) ; +#endif + +struct tcpool_thread_timer_t { + uint32_t index; + int accept_fd; + time_t touch; // lock by mt + pthread_rwlock_t mt; // lock touch + pthread_t thread; + pthread_t timerthread; + pthread_cond_t c; // lock by mc + pthread_mutex_t mc; // lock c + pthread_cond_t tc; // lock by tmc + pthread_mutex_t tmc; // lock tc&hastimerslept + pthread_rwlock_t mb; // lock isbusy + uint8_t isbusy; // lock by mb + uint8_t hastimerslept; // lock by tmc + TCPOOL_THREAD_CONTEXT; +}; +typedef struct tcpool_thread_timer_t tcpool_thread_timer_t; + +static tcpool_thread_timer_t tcpool_timers[TCPOOL_THREADCNT]; + +#define tcpool_timer_pointer_of(x) ((tcpool_thread_timer_t*)(x)) + +#define tcpool_touch_timer(x) { \ + pthread_rwlock_wrlock(&tcpool_timer_pointer_of(x)->mt); \ + tcpool_timer_pointer_of(x)->touch = time(NULL); \ + printf("Touch timer@%d\n", tcpool_timer_pointer_of(x)->index);\ + pthread_rwlock_unlock(&tcpool_timer_pointer_of(x)->mt); \ +} + +#ifdef LISTEN_ON_IPV6 + static socklen_t tcpool_struct_len = sizeof(struct sockaddr_in6); + static struct sockaddr_in6 tcpool_server_addr; +#else + static socklen_t tcpool_struct_len = sizeof(struct sockaddr_in); + static struct sockaddr_in tcpool_server_addr; +#endif + +static pthread_attr_t __tcpool_thread_attr; +static pthread_key_t __tcpool_pthread_key_index; +static sigjmp_buf __tcpool_jmp2convend[TCPOOL_THREADCNT]; + +static void accept_action(tcpool_thread_timer_t *timer); +static void accept_client(int fd); +static void accept_timer(void *p); +static int bind_server(uint16_t* port); +static void cleanup_thread(tcpool_thread_timer_t* timer); +static void handle_accept(void *accept_fd_p); +static void handle_int(int signo); +static void handle_kill(int signo); +static void handle_pipe(int signo); +static void handle_quit(int signo); +static void handle_segv(int signo); +static int listen_socket(int fd); + +static int bind_server(uint16_t* port) { + #ifdef LISTEN_ON_IPV6 + tcpool_server_addr.sin6_family = AF_INET6; + tcpool_server_addr.sin6_port = htons(*port); + bzero(&(tcpool_server_addr.sin6_addr), sizeof(tcpool_server_addr.sin6_addr)); + int fd = socket(PF_INET6, SOCK_STREAM, 0); + #else + tcpool_server_addr.sin_family = AF_INET; + tcpool_server_addr.sin_port = htons(*port); + tcpool_server_addr.sin_addr.s_addr = INADDR_ANY; + bzero(&(tcpool_server_addr.sin_zero), 8); + int fd = socket(AF_INET, SOCK_STREAM, 0); + #endif + int on = 1; + if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { + perror("Set socket option failure"); + return 0; + } + if(!~bind(fd, (struct sockaddr *)&tcpool_server_addr, tcpool_struct_len)) { + perror("Bind server failure"); + return 0; + } + #ifdef LISTEN_ON_IPV6 + *port = ntohs(tcpool_server_addr.sin6_port); + struct in6_addr in = tcpool_server_addr.sin6_addr; + char str[INET6_ADDRSTRLEN]; // 46 + inet_ntop(AF_INET6, &in, str, sizeof(str)); + #else + *port = ntohs(tcpool_server_addr.sin_port); + struct in_addr in = tcpool_server_addr.sin_addr; + char str[INET_ADDRSTRLEN]; // 16 + inet_ntop(AF_INET, &in, str, sizeof(str)); + #endif + printf("Bind server successfully on %s:%u\n", str, *port); + return fd; +} + +static int listen_socket(int fd) { + if(!~listen(fd, TCPOOL_THREADCNT)) { + perror("Listen failed"); + return 0; + } + puts("Listening..."); + return fd; +} + +static void handle_quit(int signo) { + uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(__tcpool_pthread_key_index)); + printf("Handle sigquit@%d\n", index-1); + fflush(stdout); + if(index) { + sigaction(SIGQUIT, &(const struct sigaction){handle_quit}, NULL); + siglongjmp(__tcpool_jmp2convend[index-1], signo); + } + else pthread_exit(NULL); +} + +static void handle_segv(int signo) { + uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(__tcpool_pthread_key_index)); + printf("Handle sigsegv@%d\n", index-1); + fflush(stdout); + if(index) { + sigaction(SIGSEGV, &(const struct sigaction){handle_segv}, NULL); + siglongjmp(__tcpool_jmp2convend[index-1], signo); + } + else pthread_exit(NULL); +} + +static void handle_kill(int signo) { + puts("Handle sigkill/sigterm"); + fflush(stdout); + exit(signo); +} + +static void handle_int(int signo) { + puts("Keyboard interrupted"); + fflush(stdout); + exit(signo); +} + +static void handle_pipe(int signo) { + uint32_t index = (uint32_t)((uintptr_t)pthread_getspecific(__tcpool_pthread_key_index)); + printf("Pipe error@%d, break loop...\n", index-1); + fflush(stdout); + if(index) { + sigaction(SIGPIPE, &(const struct sigaction){handle_pipe}, NULL); + siglongjmp(__tcpool_jmp2convend[index-1], signo); + } + else pthread_exit(NULL); +} + +static void accept_timer(void *p) { + tcpool_thread_timer_t *timer = tcpool_timer_pointer_of(p); + uint32_t index = timer->index; + pthread_t thread = timer->thread; + uint8_t isbusy; + + sleep(TCPOOL_MAXWAITSEC / 4); + while(thread && !pthread_kill(thread, 0)) { + pthread_rwlock_rdlock(&timer->mb); + isbusy = timer->isbusy; + pthread_rwlock_unlock(&timer->mb); + if(!isbusy) { + TIMER_SLEEP: + pthread_mutex_lock(&timer->tmc); + timer->hastimerslept = 1; + printf("Timer@%d sleep\n", timer->index); + pthread_cond_wait(&timer->tc, &timer->tmc); + timer->hastimerslept = 0; + pthread_mutex_unlock(&timer->tmc); + printf("Timer@%d wake up\n", timer->index); + sleep(TCPOOL_MAXWAITSEC / 4); + thread = timer->thread; + } + if(TCPOOL_TOUCH_TIMER_CONDITION) tcpool_touch_timer(p); + pthread_rwlock_rdlock(&timer->mt); + time_t waitsec = time(NULL) - timer->touch; + pthread_rwlock_unlock(&timer->mt); + printf("Wait@%d sec: %u, max: %u\n", timer->index, (unsigned int)waitsec, TCPOOL_MAXWAITSEC); + if(waitsec > TCPOOL_MAXWAITSEC) { + if(thread) { + pthread_kill(thread, SIGQUIT); + printf("Kill thread@%d\n", timer->index); + } + break; + } + sleep(TCPOOL_MAXWAITSEC / 4); + thread = timer->thread; + } + goto TIMER_SLEEP; +} + +static void cleanup_thread(tcpool_thread_timer_t* timer) { + printf("Start cleaning@%d, ", timer->index); + + if(timer->accept_fd) { + close(timer->accept_fd); + timer->accept_fd = 0; + printf("Close accept, "); + } + + TCPOOL_CLEANUP_THREAD_ACTION(timer); + + timer->thread = 0; + printf("Clear thread, "); + + pthread_cond_destroy(&timer->c); + printf("Destroy accept cond, "); + + pthread_mutex_destroy(&timer->mc); + printf("Destroy accept mutex, "); + + pthread_rwlock_wrlock(&timer->mb); + timer->isbusy = 0; + printf("Clear busy, "); + pthread_rwlock_unlock(&timer->mb); + + puts("Finish cleaning"); +} + +static void handle_accept(void *p) { + #ifdef DEBUG + printf("accept ptr: %p\n", p); + #endif + pthread_cleanup_push((void (*)(void*))&cleanup_thread, p); + puts("Handling accept..."); + pthread_setspecific(__tcpool_pthread_key_index, (void*)((uintptr_t)tcpool_timer_pointer_of(p)->index+1)); + if(sigsetjmp(__tcpool_jmp2convend[tcpool_timer_pointer_of(p)->index], 1)) { + printf("Long Jump@%d\n", tcpool_timer_pointer_of(p)->index); + goto CONV_END; + } + while(1) { + accept_action(tcpool_timer_pointer_of(p)); + CONV_END: puts("Conversation end"); + + if(tcpool_timer_pointer_of(p)->accept_fd) { + close(tcpool_timer_pointer_of(p)->accept_fd); + tcpool_timer_pointer_of(p)->accept_fd = 0; + puts("Close accept"); + } + + TCPOOL_CLEANUP_THREAD_ACTION(tcpool_timer_pointer_of(p)); + + pthread_mutex_lock(&tcpool_timer_pointer_of(p)->mc); + + pthread_rwlock_wrlock(&tcpool_timer_pointer_of(p)->mb); + tcpool_timer_pointer_of(p)->isbusy = 0; + pthread_rwlock_unlock(&tcpool_timer_pointer_of(p)->mb); + + puts("Set thread status to idle"); + pthread_cond_wait(&tcpool_timer_pointer_of(p)->c, &tcpool_timer_pointer_of(p)->mc); + + pthread_mutex_unlock(&tcpool_timer_pointer_of(p)->mc); + puts("Thread wakeup"); + } + pthread_cleanup_pop(1); +} + +static void accept_client(int fd) { + sigaction(SIGINT , &(const struct sigaction){handle_int}, NULL); + sigaction(SIGQUIT, &(const struct sigaction){handle_quit}, NULL); + sigaction(SIGKILL, &(const struct sigaction){handle_kill}, NULL); + sigaction(SIGSEGV, &(const struct sigaction){handle_segv}, NULL); + sigaction(SIGPIPE, &(const struct sigaction){handle_pipe}, NULL); + sigaction(SIGTERM, &(const struct sigaction){handle_kill}, NULL); + pthread_attr_init(&__tcpool_thread_attr); + pthread_attr_setdetachstate(&__tcpool_thread_attr, PTHREAD_CREATE_DETACHED); + TCPOOL_INIT_ACTION; + for(int i = 0; i < TCPOOL_THREADCNT; i++) { + pthread_rwlock_init(&tcpool_timers[i].mt, NULL); + pthread_rwlock_init(&tcpool_timers[i].mb, NULL); + } + pthread_key_create(&__tcpool_pthread_key_index, NULL); + while(1) { + int p = 0; + while(p < TCPOOL_THREADCNT) { + pthread_rwlock_rdlock(&tcpool_timers[p].mb); + if(!tcpool_timers[p].isbusy) break; + pthread_rwlock_unlock(&tcpool_timers[p].mb); + p++; + } + if(p >= TCPOOL_THREADCNT) { + puts("Max thread cnt exceeded"); + sleep(1); + continue; + } + printf("Ready for accept on slot No.%d\n", p); + tcpool_thread_timer_t* timer = &tcpool_timers[p]; + pthread_rwlock_unlock(&timer->mb); + #ifdef LISTEN_ON_IPV6 + struct sockaddr_in6 client_addr; + #else + struct sockaddr_in client_addr; + #endif + int accept_fd; + if((accept_fd=accept(fd, (struct sockaddr *)&client_addr, &tcpool_struct_len))<=0) { + perror("Accept client error"); + continue; + } + pthread_rwlock_wrlock(&timer->mb); + timer->isbusy = 1; + pthread_rwlock_unlock(&timer->mb); + #ifdef LISTEN_ON_IPV6 + uint16_t port = ntohs(client_addr.sin6_port); + struct in6_addr in = client_addr.sin6_addr; + char str[INET6_ADDRSTRLEN]; // 46 + inet_ntop(AF_INET6, &in, str, sizeof(str)); + #else + uint16_t port = ntohs(client_addr.sin_port); + struct in_addr in = client_addr.sin_addr; + char str[INET_ADDRSTRLEN]; // 16 + inet_ntop(AF_INET, &in, str, sizeof(str)); + #endif + time_t t = time(NULL); + printf("\n> %sAccept client %s:%u at slot No.%d, ", ctime(&t), str, port, p); + timer->accept_fd = accept_fd; + timer->index = p; + pthread_rwlock_wrlock(&timer->mt); + timer->touch = time(NULL); + pthread_rwlock_unlock(&timer->mt); + TCPOOL_PREHANDLE_ACCEPT_ACTION(timer); + // start or wakeup accept thread + pthread_t thread = timer->thread; + if(thread && !pthread_kill(thread, 0)) { + pthread_mutex_lock(&timer->mc); + pthread_cond_signal(&timer->c); // wakeup thread + pthread_mutex_unlock(&timer->mc); + puts("Pick thread from pool"); + } else { + pthread_cond_init(&timer->c, NULL); + pthread_mutex_init(&timer->mc, NULL); + if (pthread_create(&timer->thread, &__tcpool_thread_attr, (void (*)(void*))&handle_accept, timer)) { + perror("Error creating thread"); + cleanup_thread(timer); + putchar('\n'); + continue; + } + puts("Thread created"); + } + // start or wakeup timer thread + thread = timer->timerthread; + if(!thread || pthread_kill(thread, 0)) { + printf("Creating timer thread..."); + pthread_cond_init(&timer->tc, NULL); + pthread_mutex_init(&timer->tmc, NULL); + timer->hastimerslept = 0; + if (pthread_create(&timer->timerthread, &__tcpool_thread_attr, (void (*)(void*))&accept_timer, timer)) { + perror("Error creating timer thread"); + cleanup_thread(timer); + putchar('\n'); + continue; + } + puts("succeeded"); + } else { + pthread_mutex_lock(&timer->tmc); + uint8_t hastimerslept = timer->hastimerslept; + pthread_mutex_unlock(&timer->tmc); + if(hastimerslept) { + printf("Waking up timer thread..."); + pthread_mutex_lock(&timer->tmc); + pthread_cond_signal(&timer->tc); // wakeup thread + pthread_mutex_unlock(&timer->tmc); + puts("succeeded"); + } else puts("Timer already running"); + } + } +} + +#endif /* _TCPOOL_H_ */