From 8389832951c66d6635635ba8abace3cc0af52d2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Wed, 4 May 2022 20:09:57 +0800 Subject: [PATCH] drop accept_threads --- dict.h | 6 ++-- server.c | 94 +++++++++++++++++++++++++++----------------------------- 2 files changed, 49 insertions(+), 51 deletions(-) diff --git a/dict.h b/dict.h index 0422dc6..b0f5dc2 100644 --- a/dict.h +++ b/dict.h @@ -13,12 +13,12 @@ #define DICTKEYSZ 127 #define DICTDATSZ 127 -struct DICT { +struct dict_t { char key[DICTKEYSZ]; char data[DICTDATSZ]; }; -typedef struct DICT DICT; -#define DICTSZ sizeof(DICT) +typedef struct dict_t dict_t; +#define DICTSZ sizeof(dict_t) #define DICT_LOCK_UN 0x00 #define DICT_LOCK_SH 0x01 diff --git a/server.c b/server.c index 8696f7d..5aefce6 100644 --- a/server.c +++ b/server.c @@ -30,36 +30,36 @@ static struct sockaddr_in server_addr; #endif -struct THREADTIMER { +struct thread_timer_t { uint32_t index; uint32_t lock_type; - int accept_fd; char *dat, *ptr; time_t touch; ssize_t numbytes; + pthread_t thread; + int accept_fd; }; -typedef struct THREADTIMER THREADTIMER; -static THREADTIMER timers[THREADCNT]; -#define timer_pointer_of(x) ((THREADTIMER*)(x)) +typedef struct thread_timer_t thread_timer_t; +static thread_timer_t timers[THREADCNT]; +#define timer_pointer_of(x) ((thread_timer_t*)(x)) #define touch_timer(x) (timer_pointer_of(x)->touch = time(NULL)) static int fd; // server fd -static pthread_t accept_threads[THREADCNT]; -static DICT d; -static DICT* setdict; +static dict_t d; +static dict_t* setdict; static uint32_t* items_len; static CONFIG cfg; static pthread_attr_t attr; static pthread_rwlock_t mu; #define DICTPOOLSZ (((uint32_t)-1)>>((sizeof(uint32_t)*8-DICTPOOLBIT))) -static DICT* dict_pool[DICTPOOLSZ+1]; +static dict_t* dict_pool[DICTPOOLSZ+1]; static void accept_client(); static void accept_timer(void *p); static uint16_t bind_server(uint16_t port); -static void cleanup_thread(THREADTIMER* timer); -static int close_and_send(THREADTIMER* timer, enum SERVERACK cmd, char *data, size_t numbytes); +static void cleanup_thread(thread_timer_t* timer); +static int close_and_send(thread_timer_t* timer, enum SERVERACK cmd, char *data, size_t numbytes); static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]); static void handle_accept(void *accept_fd_p); static void handle_int(int signo); @@ -69,13 +69,13 @@ static void init_dict_pool(FILE *fp); static void kill_timer(pthread_t thread); static uint32_t last_nonnull(char* p, uint32_t max_size); static int listen_socket(); -static int send_all(THREADTIMER *timer); +static int send_all(thread_timer_t *timer); static int send_data(int accept_fd, int index, enum SERVERACK cmd, char *data, size_t length); -static int s1_get(THREADTIMER *timer); -static int s2_set(THREADTIMER *timer); -static int s3_set_data(THREADTIMER *timer); -static int s4_del(THREADTIMER *timer); -static int s5_md5(THREADTIMER *timer); +static int s1_get(thread_timer_t *timer); +static int s2_set(thread_timer_t *timer); +static int s3_set_data(thread_timer_t *timer); +static int s4_del(thread_timer_t *timer); +static int s5_md5(thread_timer_t *timer); static uint16_t bind_server(uint16_t port) { #ifdef LISTEN_ON_IPV6 @@ -142,7 +142,7 @@ static int send_data(int accept_fd, int index, enum SERVERACK cmd, char *data, s } } -static int send_all(THREADTIMER *timer) { +static int send_all(thread_timer_t *timer) { int re = 1; FILE *fp = open_dict(DICT_LOCK_SH, timer->index, &mu); if(!fp) return 1; @@ -186,12 +186,12 @@ static void init_dict_pool(FILE *fp) { if(!ch) continue; // skip null bytes SIMPLE_PB* spb = get_pb(fp); if(!spb) continue; // skip error bytes - DICT* d = (DICT*)spb->target; + dict_t* d = (dict_t*)spb->target; md5((uint8_t *)d->key, strlen(d->key)+1, digest); uint8_t* dp = digest; int p = ((*((uint32_t*)digest))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; int c = 16-4; - DICT* slot; + dict_t* slot; while((slot=dict_pool[p]) && c-->0) { #ifdef DEBUG @@ -207,8 +207,8 @@ static void init_dict_pool(FILE *fp) { #endif if(!slot) { - DICT* dnew = (DICT*)malloc(sizeof(DICT)); - memcpy(dnew, d, sizeof(DICT)); + dict_t* dnew = (dict_t*)malloc(sizeof(dict_t)); + memcpy(dnew, d, sizeof(dict_t)); dict_pool[p] = dnew; // 解决哈希冲突 } @@ -216,7 +216,7 @@ static void init_dict_pool(FILE *fp) { } } -static int s1_get(THREADTIMER *timer) { +static int s1_get(thread_timer_t *timer) { uint8_t digest[16]; FILE *fp = open_dict(DICT_LOCK_SH, timer->index, &mu); //timer->status = 0; @@ -238,7 +238,7 @@ static int s1_get(THREADTIMER *timer) { while(has_next(fp, ch)) { if(!ch) continue; // skip null bytes SIMPLE_PB* spb = get_pb(fp); - DICT* d = (DICT*)spb->target; + dict_t* d = (dict_t*)spb->target; if(!strcmp(timer->dat, d->key)) { int r; pthread_cleanup_push((void*)free, (void*)spb); @@ -253,7 +253,7 @@ static int s1_get(THREADTIMER *timer) { return close_and_send(timer, ACKNULL, "null", 4); } -static int s2_set(THREADTIMER *timer) { +static int s2_set(thread_timer_t *timer) { uint8_t digest[16]; timer->lock_type = DICT_LOCKING_EX; FILE *fp = open_dict(DICT_LOCK_EX, timer->index, &mu); @@ -264,12 +264,12 @@ static int s2_set(THREADTIMER *timer) { uint8_t* dp = digest; int p = ((*((uint32_t*)digest))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; - if(!dict_pool[p]) setdict = dict_pool[p] = (DICT*)malloc(sizeof(DICT)); + if(!dict_pool[p]) setdict = dict_pool[p] = (dict_t*)malloc(sizeof(dict_t)); else { int c = 16-4; int notok; while(dict_pool[p] && (notok=strcmp(timer->dat, dict_pool[p]->key)) && c-->0) p = ((*((uint32_t*)(++dp)))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; // 哈希碰撞 - if(!dict_pool[p]) setdict = dict_pool[p] = (DICT*)malloc(sizeof(DICT)); // 无值 + if(!dict_pool[p]) setdict = dict_pool[p] = (dict_t*)malloc(sizeof(dict_t)); // 无值 else if(notok) setdict = &d; // 全部冲突 else { // 已有值 char ret[4]; @@ -283,7 +283,7 @@ static int s2_set(THREADTIMER *timer) { #endif //timer->status = 3; - memset(setdict, 0, sizeof(DICT)); + memset(setdict, 0, sizeof(dict_t)); strncpy(setdict->key, timer->dat, DICTKEYSZ-1); fseek(fp, 0, SEEK_END); return send_data(timer->accept_fd, timer->index, ACKDATA, "data", 4); @@ -294,7 +294,7 @@ static int s2_set(THREADTIMER *timer) { } } -static int s3_set_data(THREADTIMER *timer) { +static int s3_set_data(thread_timer_t *timer) { //timer->status = 0; uint32_t datasize = (timer->numbytes > (DICTDATSZ-1))?(DICTDATSZ-1):timer->numbytes; #ifdef DEBUG @@ -302,7 +302,7 @@ static int s3_set_data(THREADTIMER *timer) { #endif memcpy(setdict->data, timer->dat, datasize); - if(!set_pb(get_dict_fp_wr(), items_len, sizeof(DICT), setdict)) { + if(!set_pb(get_dict_fp_wr(), items_len, sizeof(dict_t), setdict)) { fprintf(stderr, "Error set data: dict[%s]=%s\n", setdict->key, timer->dat); return close_and_send(timer, ACKERRO, "erro", 4); } @@ -315,7 +315,7 @@ static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]) { while(has_next(fp, ch)) { if(!ch) continue; // skip null bytes SIMPLE_PB* spb = get_pb(fp); - DICT* d = (DICT*)spb->target; + dict_t* d = (dict_t*)spb->target; if(memcmp(key, d->key, len)) { free(spb); continue; @@ -362,7 +362,7 @@ static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]) { return ACKNULL; } -static int s4_del(THREADTIMER *timer) { +static int s4_del(thread_timer_t *timer) { uint8_t digest[16]; char ret[4]; timer->lock_type = DICT_LOCK_EX; @@ -383,7 +383,7 @@ static int s4_del(THREADTIMER *timer) { return close_and_send(timer, ACKNULL, "null", 4); } -static int s5_md5(THREADTIMER *timer) { +static int s5_md5(thread_timer_t *timer) { //timer->status = 0; fill_md5(&mu); if(is_dict_md5_equal((uint8_t*)timer->dat)) return send_data(timer->accept_fd, timer->index, ACKNULL, "null", 4); @@ -396,9 +396,10 @@ static void handle_quit(int signo) { } static void accept_timer(void *p) { - THREADTIMER *timer = timer_pointer_of(p); + thread_timer_t *timer = timer_pointer_of(p); uint32_t index = timer->index; - while(accept_threads[index] && !pthread_kill(accept_threads[index], 0)) { + pthread_t thread = timer->thread; + while(!pthread_kill(thread, 0)) { sleep(MAXWAITSEC / 4); time_t waitsec = time(NULL) - timer->touch; printf("Wait sec: %u, max: %u\n", (unsigned int)waitsec, MAXWAITSEC); @@ -406,7 +407,7 @@ static void accept_timer(void *p) { if(waitsec > MAXWAITSEC*THREADCNT) break; } else if(waitsec > MAXWAITSEC) break; } - pthread_t thread = accept_threads[index]; + if(thread) { pthread_kill(thread, SIGQUIT); puts("Kill thread"); @@ -418,9 +419,9 @@ static void kill_timer(pthread_t thread) { puts("Kill timer"); } -static void cleanup_thread(THREADTIMER* timer) { +static void cleanup_thread(thread_timer_t* timer) { puts("Start cleaning"); - accept_threads[timer->index] = 0; + timer->thread = 0; if(timer->accept_fd) { close(timer->accept_fd); timer->accept_fd = 0; @@ -464,11 +465,8 @@ static void handle_accept(void *p) { CMDPACKET* cp = (CMDPACKET*)buff; ssize_t numbytes = 0, offset = 0; while( - accept_threads[index] - && ( - offset >= CMDPACKET_HEAD_LEN - || (numbytes = recv(accept_fd, buff+offset, CMDPACKET_HEAD_LEN-offset, MSG_WAITALL)) > 0 - ) + offset >= CMDPACKET_HEAD_LEN + || (numbytes = recv(accept_fd, buff+offset, CMDPACKET_HEAD_LEN-offset, MSG_WAITALL)) > 0 ) { touch_timer(p); offset += numbytes; @@ -585,7 +583,7 @@ static void accept_client() { while(1) { puts("Ready for accept, waitting..."); int p = 0; - while(p < THREADCNT && accept_threads[p] && !pthread_kill(accept_threads[p], 0)) p++; + while(p < THREADCNT && timers[p].thread) p++; if(p >= THREADCNT) { puts("Max thread cnt exceeded"); sleep(1); @@ -612,13 +610,13 @@ static void accept_client() { inet_ntop(AF_INET, &in, str, sizeof(str)); #endif printf("Accept client %s:%u\n", str, port); - THREADTIMER* timer = &timers[p]; + thread_timer_t* timer = &timers[p]; timer->accept_fd = accept_fd; timer->index = p; timer->touch = time(NULL); timer->ptr = NULL; reset_seq(p); - if (pthread_create(accept_threads + p, &attr, (void *)&handle_accept, timer)) { + if (pthread_create(&timer->thread, &attr, (void *)&handle_accept, timer)) { perror("Error creating thread: "); cleanup_thread(timer); continue; @@ -627,7 +625,7 @@ static void accept_client() { } } -static int close_and_send(THREADTIMER* timer, enum SERVERACK cmd, char *data, size_t numbytes) { +static int close_and_send(thread_timer_t* timer, enum SERVERACK cmd, char *data, size_t numbytes) { close_dict(timer->lock_type, timer->index, &mu); timer->lock_type = DICT_LOCK_UN; return send_data(timer->accept_fd, timer->index, cmd, data, numbytes); @@ -697,7 +695,7 @@ int main(int argc, char *argv[]) { fclose(fp); free(spb); } - items_len = align_struct(sizeof(DICT), 2, d.key, d.data); + items_len = align_struct(sizeof(dict_t), 2, d.key, d.data); if(!items_len) { fputs("Align struct error", stderr); return 8;