From b7184d4dbf20cbd8b639e98c14ebcedcefb1c453 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Thu, 27 Oct 2022 13:47:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server.c | 179 ++++++++++++++++++++++++++++++++++++++----------------- server.h | 5 +- 2 files changed, 127 insertions(+), 57 deletions(-) diff --git a/server.c b/server.c index 78cbcfb..3ac9b0c 100644 --- a/server.c +++ b/server.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,47 @@ #include "crypto.h" #include "config.h" +struct thread_timer_t { + uint32_t index; + int accept_fd; + time_t touch; // lock by mt + ssize_t numbytes; + char *dat; + pthread_rwlock_t mt; // lock thread&timerthread + pthread_t thread; // lock by mt + pthread_t timerthread; // lock by mt + pthread_cond_t c; // lock by mc + pthread_mutex_t mc; // lock c + pthread_cond_t tc; // lock by tmc + pthread_mutex_t tmc; // lock tc&hastimerslept + pthread_rwlock_t mb; // lock isbusy + uint8_t isbusy; // lock by mb + uint8_t hastimerslept; // lock by tmc + uint8_t buf[ + THREAD_TIMER_T_SZ + -sizeof(uint32_t) + -sizeof(int) + -sizeof(time_t) + -sizeof(ssize_t) + -sizeof(char*) + -sizeof(pthread_rwlock_t) + -2*sizeof(pthread_t) + -2*sizeof(pthread_cond_t) + -2*sizeof(pthread_mutex_t) + -sizeof(pthread_rwlock_t) + -2*sizeof(uint8_t) + ]; +}; +typedef struct thread_timer_t thread_timer_t; +static thread_timer_t timers[THREADCNT]; +#define timer_pointer_of(x) ((thread_timer_t*)(x)) +#define touch_timer(x) { \ + pthread_rwlock_wrlock(&timer_pointer_of(x)->mt); \ + timer_pointer_of(x)->touch = time(NULL); \ + printf("Touch timer@%d\n", timer_pointer_of(x)->index);\ + pthread_rwlock_unlock(&timer_pointer_of(x)->mt); \ +} + #ifdef LISTEN_ON_IPV6 static socklen_t struct_len = sizeof(struct sockaddr_in6); static struct sockaddr_in6 server_addr; @@ -28,28 +70,6 @@ static struct sockaddr_in server_addr; #endif -struct thread_timer_t { - uint32_t index; - int accept_fd; - time_t touch; - ssize_t numbytes; - char *dat; - pthread_t thread; - pthread_t timerthread; - pthread_cond_t c; - pthread_mutex_t mc; - pthread_cond_t tc; - pthread_mutex_t tmc; - pthread_rwlock_t mb; - uint8_t isbusy; // lock by mb - uint8_t hastimerslept; // lock by tmc - uint8_t buf[BUFSIZ-sizeof(uint32_t)-sizeof(int)-sizeof(time_t)-sizeof(ssize_t)-sizeof(char*)-2*sizeof(pthread_t)-2*sizeof(pthread_cond_t)-2*sizeof(pthread_mutex_t)-sizeof(pthread_rwlock_t)-2*sizeof(uint8_t)]; -}; -typedef struct thread_timer_t thread_timer_t; -static thread_timer_t timers[THREADCNT]; -#define timer_pointer_of(x) ((thread_timer_t*)(x)) -#define touch_timer(x) (timer_pointer_of(x)->touch = time(NULL)) - static dict_t setdicts[THREADCNT]; static uint32_t* items_len; static config_t cfg; @@ -58,6 +78,9 @@ static pthread_attr_t attr; #define DICTPOOLSZ (((uint32_t)-1)>>((sizeof(uint32_t)*8-DICTPOOLBIT))) static dict_t* dict_pool[DICTPOOLSZ+1]; +static pthread_key_t pthread_key_index; +static jmp_buf jmp2convend[THREADCNT]; + static void accept_client(int fd); static void accept_timer(void *p); static int bind_server(uint16_t* port); @@ -559,15 +582,17 @@ static int s5_md5(thread_timer_t *timer) { } static void handle_quit(int signo) { - puts("Handle sigquit"); - fflush(stdout); - pthread_exit(NULL); + uint32_t index = (uint32_t)(pthread_getspecific(pthread_key_index)); + printf("Handle sigquit@%u\n", index); + signal(SIGQUIT, handle_quit); + longjmp(jmp2convend[index], signo); } static void handle_segv(int signo) { - puts("Handle sigsegv"); - fflush(stdout); - pthread_exit(NULL); + uint32_t index = (uint32_t)(pthread_getspecific(pthread_key_index)); + printf("Handle sigsegv@%u\n", index); + signal(SIGSEGV, handle_segv); + longjmp(jmp2convend[index], signo); } static void handle_kill(int signo) { @@ -575,17 +600,33 @@ static void handle_kill(int signo) { process_defer(); } +static void handle_int(int signo) { + puts("Keyboard interrupted"); + process_defer(); +} + +static void handle_pipe(int signo) { + uint32_t index = (uint32_t)(pthread_getspecific(pthread_key_index)); + printf("Pipe error@%u, break loop...\n", index); + signal(SIGPIPE, handle_pipe); + longjmp(jmp2convend[index], signo); +} + static void accept_timer(void *p) { thread_timer_t *timer = timer_pointer_of(p); uint32_t index = timer->index; sigset_t mask; + pthread_t thread; sigemptyset(&mask); sigaddset(&mask, SIGPIPE); // 防止处理嵌套 pthread_sigmask(SIG_BLOCK, &mask, NULL); sleep(MAXWAITSEC / 4); - while(timer->thread && !pthread_kill(timer->thread, 0)) { + pthread_rwlock_rdlock(&timer->mt); + thread = timer->thread; + pthread_rwlock_unlock(&timer->mt); + while(thread && !pthread_kill(thread, 0)) { pthread_rwlock_rdlock(&timer->mb); uint8_t isbusy = timer->isbusy; pthread_rwlock_unlock(&timer->mb); @@ -593,25 +634,32 @@ static void accept_timer(void *p) { TIMER_SLEEP: pthread_mutex_lock(&timer->tmc); timer->hastimerslept = 1; - puts("Timer sleep"); + printf("Timer@%d sleep\n", timer->index); pthread_cond_wait(&timer->tc, &timer->tmc); timer->hastimerslept = 0; pthread_mutex_unlock(&timer->tmc); - puts("Timer wake up"); + printf("Timer@%d wake up\n", timer->index); sleep(MAXWAITSEC / 4); + pthread_rwlock_rdlock(&timer->mt); + thread = timer->thread; + pthread_rwlock_unlock(&timer->mt); } if(is_dict_opening) touch_timer(p); + pthread_rwlock_rdlock(&timer->mt); time_t waitsec = time(NULL) - timer->touch; - printf("Wait sec: %u, max: %u\n", (unsigned int)waitsec, MAXWAITSEC); + pthread_rwlock_unlock(&timer->mt); + printf("Wait@%d sec: %u, max: %u\n", timer->index, (unsigned int)waitsec, MAXWAITSEC); if(waitsec > MAXWAITSEC) { - pthread_t thread = timer->thread; if(thread) { pthread_kill(thread, SIGQUIT); - puts("Kill thread"); + printf("Kill thread@%d\n", timer->index); } break; } sleep(MAXWAITSEC / 4); + pthread_rwlock_rdlock(&timer->mt); + thread = timer->thread; + pthread_rwlock_unlock(&timer->mt); } goto TIMER_SLEEP; } @@ -621,52 +669,55 @@ static void cleanup_thread(thread_timer_t* timer) { sigemptyset(&mask); sigaddset(&mask, SIGPIPE); // 防止处理嵌套 pthread_sigmask(SIG_BLOCK, &mask, NULL); - printf("Start cleaning, "); + + printf("Start cleaning@%d, ", timer->index); + if(timer->accept_fd) { close(timer->accept_fd); timer->accept_fd = 0; printf("Close accept, "); } + close_dict(timer->index); + + pthread_rwlock_wrlock(&timer->mt); timer->thread = 0; + printf("Clear thread, "); + pthread_rwlock_unlock(&timer->mt); + pthread_cond_destroy(&timer->c); printf("Destroy accept cond, "); + pthread_mutex_destroy(&timer->mc); printf("Destroy accept mutex, "); + setdicts[timer->index].data[0] = 0; + pthread_rwlock_wrlock(&timer->mb); timer->isbusy = 0; printf("Clear busy, "); pthread_rwlock_unlock(&timer->mb); + puts("Finish cleaning"); } static void process_defer() { for(int i = 0; i < THREADCNT; i++) { - if(timers[i].thread) pthread_kill(timers[i].thread, SIGQUIT); - if(timers[i].timerthread) pthread_kill(timers[i].timerthread, SIGQUIT); + if(timers[i].thread) pthread_kill(timers[i].thread, SIGUSR1); // pthread_exit + if(timers[i].timerthread) pthread_kill(timers[i].timerthread, SIGUSR1); // pthread_exit } fflush(stdout); pthread_exit(NULL); } -static void handle_int(int signo) { - puts("Keyboard interrupted"); - process_defer(); -} - -static void handle_pipe(int signo) { - puts("Pipe error, exit thread..."); - fflush(stdout); - pthread_exit(NULL); -} - static void handle_accept(void *p) { #ifdef DEBUG printf("accept ptr: %p\n", p); #endif pthread_cleanup_push((void*)&cleanup_thread, p); puts("Handling accept..."); + pthread_setspecific(pthread_key_index, (void*)((uintptr_t)timer_pointer_of(p)->index)); + if(setjmp(jmp2convend[timer_pointer_of(p)->index])) goto CONV_END; while(1) { int accept_fd = timer_pointer_of(p)->accept_fd; uint32_t index = timer_pointer_of(p)->index; @@ -795,12 +846,17 @@ static void accept_client(int fd) { signal(SIGSEGV, handle_segv); signal(SIGPIPE, handle_pipe); signal(SIGTERM, handle_kill); + signal(SIGUSR1, (void (*)(int))pthread_exit); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); init_crypto(); init_dict_pool(open_dict(0, 1)); close_dict(0); - for(int i = 0; i < THREADCNT; i++) pthread_rwlock_init(&timers[i].mb, NULL); + for(int i = 0; i < THREADCNT; i++) { + pthread_rwlock_init(&timers[i].mt, NULL); + pthread_rwlock_init(&timers[i].mb, NULL); + } + pthread_key_create(&pthread_key_index, NULL); while(1) { int p = 0; while(p < THREADCNT) { @@ -849,10 +905,15 @@ static void accept_client(int fd) { printf("\n> %sAccept client %s:%u at slot No.%d, ", ctime(&t), str, port, p); timer->accept_fd = accept_fd; timer->index = p; + pthread_rwlock_wrlock(&timers[p].mt); timer->touch = time(NULL); + pthread_rwlock_unlock(&timers[p].mt); reset_seq(p); // start or wakeup accept thread - if(timer->thread) { + pthread_rwlock_rdlock(&timers[p].mt); + pthread_t thread = timer->thread; + pthread_rwlock_unlock(&timers[p].mt); + if(thread) { pthread_mutex_lock(&timer->mc); pthread_cond_signal(&timer->c); // wakeup thread pthread_mutex_unlock(&timer->mc); @@ -869,8 +930,9 @@ static void accept_client(int fd) { puts("Thread created"); } // start or wakeup timer thread - pthread_t thread = timer->timerthread; - uint8_t hastimerslept = timer->hastimerslept; + pthread_rwlock_rdlock(&timers[p].mt); + thread = timer->timerthread; + pthread_rwlock_unlock(&timers[p].mt); if(!thread || pthread_kill(thread, 0)) { printf("Creating timer thread..."); pthread_cond_init(&timer->tc, NULL); @@ -883,12 +945,17 @@ static void accept_client(int fd) { continue; } puts("succeeded"); - } else if(hastimerslept) { - printf("Waking up timer thread..."); + } else { pthread_mutex_lock(&timer->tmc); - pthread_cond_signal(&timer->tc); // wakeup thread + uint8_t hastimerslept = timer->hastimerslept; pthread_mutex_unlock(&timer->tmc); - puts("succeeded"); + if(hastimerslept) { + printf("Waking up timer thread..."); + pthread_mutex_lock(&timer->tmc); + pthread_cond_signal(&timer->tc); // wakeup thread + pthread_mutex_unlock(&timer->tmc); + puts("succeeded"); + } else puts("Timer already running"); } } } diff --git a/server.h b/server.h index 7217c83..6769f64 100644 --- a/server.h +++ b/server.h @@ -10,9 +10,12 @@ #define MAXWAITSEC 8 #endif #ifndef DICTPOOLBIT -// DICTPOOLBIT must be lower than 4*8 = 32 + // DICTPOOLBIT must be lower than 4*8 = 32 #define DICTPOOLBIT 16 #endif +#ifndef THREAD_TIMER_T_SZ + #define THREAD_TIMER_T_SZ 1024 +#endif enum server_cmd_t {CMDGET, CMDCAT, CMDMD5, CMDACK, CMDEND, CMDSET, CMDDEL, CMDDAT}; enum server_ack_t {ACKNONE=0b0000011, ACKSUCC=0b0010011, ACKDATA=0b0100011, ACKNULL=0b0110011, ACKNEQU=0b1000011, ACKERRO=0b1010011};