1
0
mirror of https://github.com/fumiama/simple-dict.git synced 2026-06-30 00:40:25 +08:00

drop accept_threads

This commit is contained in:
源文雨
2022-05-04 20:09:57 +08:00
parent 83abd73899
commit 8389832951
2 changed files with 49 additions and 51 deletions

6
dict.h
View File

@@ -13,12 +13,12 @@
#define DICTKEYSZ 127 #define DICTKEYSZ 127
#define DICTDATSZ 127 #define DICTDATSZ 127
struct DICT { struct dict_t {
char key[DICTKEYSZ]; char key[DICTKEYSZ];
char data[DICTDATSZ]; char data[DICTDATSZ];
}; };
typedef struct DICT DICT; typedef struct dict_t dict_t;
#define DICTSZ sizeof(DICT) #define DICTSZ sizeof(dict_t)
#define DICT_LOCK_UN 0x00 #define DICT_LOCK_UN 0x00
#define DICT_LOCK_SH 0x01 #define DICT_LOCK_SH 0x01

View File

@@ -30,36 +30,36 @@
static struct sockaddr_in server_addr; static struct sockaddr_in server_addr;
#endif #endif
struct THREADTIMER { struct thread_timer_t {
uint32_t index; uint32_t index;
uint32_t lock_type; uint32_t lock_type;
int accept_fd;
char *dat, *ptr; char *dat, *ptr;
time_t touch; time_t touch;
ssize_t numbytes; ssize_t numbytes;
pthread_t thread;
int accept_fd;
}; };
typedef struct THREADTIMER THREADTIMER; typedef struct thread_timer_t thread_timer_t;
static THREADTIMER timers[THREADCNT]; static thread_timer_t timers[THREADCNT];
#define timer_pointer_of(x) ((THREADTIMER*)(x)) #define timer_pointer_of(x) ((thread_timer_t*)(x))
#define touch_timer(x) (timer_pointer_of(x)->touch = time(NULL)) #define touch_timer(x) (timer_pointer_of(x)->touch = time(NULL))
static int fd; // server fd static int fd; // server fd
static pthread_t accept_threads[THREADCNT]; static dict_t d;
static DICT d; static dict_t* setdict;
static DICT* setdict;
static uint32_t* items_len; static uint32_t* items_len;
static CONFIG cfg; static CONFIG cfg;
static pthread_attr_t attr; static pthread_attr_t attr;
static pthread_rwlock_t mu; static pthread_rwlock_t mu;
#define DICTPOOLSZ (((uint32_t)-1)>>((sizeof(uint32_t)*8-DICTPOOLBIT))) #define DICTPOOLSZ (((uint32_t)-1)>>((sizeof(uint32_t)*8-DICTPOOLBIT)))
static DICT* dict_pool[DICTPOOLSZ+1]; static dict_t* dict_pool[DICTPOOLSZ+1];
static void accept_client(); static void accept_client();
static void accept_timer(void *p); static void accept_timer(void *p);
static uint16_t bind_server(uint16_t port); static uint16_t bind_server(uint16_t port);
static void cleanup_thread(THREADTIMER* timer); static void cleanup_thread(thread_timer_t* timer);
static int close_and_send(THREADTIMER* timer, enum SERVERACK cmd, char *data, size_t numbytes); static int close_and_send(thread_timer_t* timer, enum SERVERACK cmd, char *data, size_t numbytes);
static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]); static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]);
static void handle_accept(void *accept_fd_p); static void handle_accept(void *accept_fd_p);
static void handle_int(int signo); static void handle_int(int signo);
@@ -69,13 +69,13 @@ static void init_dict_pool(FILE *fp);
static void kill_timer(pthread_t thread); static void kill_timer(pthread_t thread);
static uint32_t last_nonnull(char* p, uint32_t max_size); static uint32_t last_nonnull(char* p, uint32_t max_size);
static int listen_socket(); static int listen_socket();
static int send_all(THREADTIMER *timer); static int send_all(thread_timer_t *timer);
static int send_data(int accept_fd, int index, enum SERVERACK cmd, char *data, size_t length); static int send_data(int accept_fd, int index, enum SERVERACK cmd, char *data, size_t length);
static int s1_get(THREADTIMER *timer); static int s1_get(thread_timer_t *timer);
static int s2_set(THREADTIMER *timer); static int s2_set(thread_timer_t *timer);
static int s3_set_data(THREADTIMER *timer); static int s3_set_data(thread_timer_t *timer);
static int s4_del(THREADTIMER *timer); static int s4_del(thread_timer_t *timer);
static int s5_md5(THREADTIMER *timer); static int s5_md5(thread_timer_t *timer);
static uint16_t bind_server(uint16_t port) { static uint16_t bind_server(uint16_t port) {
#ifdef LISTEN_ON_IPV6 #ifdef LISTEN_ON_IPV6
@@ -142,7 +142,7 @@ static int send_data(int accept_fd, int index, enum SERVERACK cmd, char *data, s
} }
} }
static int send_all(THREADTIMER *timer) { static int send_all(thread_timer_t *timer) {
int re = 1; int re = 1;
FILE *fp = open_dict(DICT_LOCK_SH, timer->index, &mu); FILE *fp = open_dict(DICT_LOCK_SH, timer->index, &mu);
if(!fp) return 1; if(!fp) return 1;
@@ -186,12 +186,12 @@ static void init_dict_pool(FILE *fp) {
if(!ch) continue; // skip null bytes if(!ch) continue; // skip null bytes
SIMPLE_PB* spb = get_pb(fp); SIMPLE_PB* spb = get_pb(fp);
if(!spb) continue; // skip error bytes if(!spb) continue; // skip error bytes
DICT* d = (DICT*)spb->target; dict_t* d = (dict_t*)spb->target;
md5((uint8_t *)d->key, strlen(d->key)+1, digest); md5((uint8_t *)d->key, strlen(d->key)+1, digest);
uint8_t* dp = digest; uint8_t* dp = digest;
int p = ((*((uint32_t*)digest))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; int p = ((*((uint32_t*)digest))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ;
int c = 16-4; int c = 16-4;
DICT* slot; dict_t* slot;
while((slot=dict_pool[p]) && c-->0) { while((slot=dict_pool[p]) && c-->0) {
#ifdef DEBUG #ifdef DEBUG
@@ -207,8 +207,8 @@ static void init_dict_pool(FILE *fp) {
#endif #endif
if(!slot) { if(!slot) {
DICT* dnew = (DICT*)malloc(sizeof(DICT)); dict_t* dnew = (dict_t*)malloc(sizeof(dict_t));
memcpy(dnew, d, sizeof(DICT)); memcpy(dnew, d, sizeof(dict_t));
dict_pool[p] = dnew; // 解决哈希冲突 dict_pool[p] = dnew; // 解决哈希冲突
} }
@@ -216,7 +216,7 @@ static void init_dict_pool(FILE *fp) {
} }
} }
static int s1_get(THREADTIMER *timer) { static int s1_get(thread_timer_t *timer) {
uint8_t digest[16]; uint8_t digest[16];
FILE *fp = open_dict(DICT_LOCK_SH, timer->index, &mu); FILE *fp = open_dict(DICT_LOCK_SH, timer->index, &mu);
//timer->status = 0; //timer->status = 0;
@@ -238,7 +238,7 @@ static int s1_get(THREADTIMER *timer) {
while(has_next(fp, ch)) { while(has_next(fp, ch)) {
if(!ch) continue; // skip null bytes if(!ch) continue; // skip null bytes
SIMPLE_PB* spb = get_pb(fp); SIMPLE_PB* spb = get_pb(fp);
DICT* d = (DICT*)spb->target; dict_t* d = (dict_t*)spb->target;
if(!strcmp(timer->dat, d->key)) { if(!strcmp(timer->dat, d->key)) {
int r; int r;
pthread_cleanup_push((void*)free, (void*)spb); pthread_cleanup_push((void*)free, (void*)spb);
@@ -253,7 +253,7 @@ static int s1_get(THREADTIMER *timer) {
return close_and_send(timer, ACKNULL, "null", 4); return close_and_send(timer, ACKNULL, "null", 4);
} }
static int s2_set(THREADTIMER *timer) { static int s2_set(thread_timer_t *timer) {
uint8_t digest[16]; uint8_t digest[16];
timer->lock_type = DICT_LOCKING_EX; timer->lock_type = DICT_LOCKING_EX;
FILE *fp = open_dict(DICT_LOCK_EX, timer->index, &mu); FILE *fp = open_dict(DICT_LOCK_EX, timer->index, &mu);
@@ -264,12 +264,12 @@ static int s2_set(THREADTIMER *timer) {
uint8_t* dp = digest; uint8_t* dp = digest;
int p = ((*((uint32_t*)digest))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; int p = ((*((uint32_t*)digest))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ;
if(!dict_pool[p]) setdict = dict_pool[p] = (DICT*)malloc(sizeof(DICT)); if(!dict_pool[p]) setdict = dict_pool[p] = (dict_t*)malloc(sizeof(dict_t));
else { else {
int c = 16-4; int c = 16-4;
int notok; int notok;
while(dict_pool[p] && (notok=strcmp(timer->dat, dict_pool[p]->key)) && c-->0) p = ((*((uint32_t*)(++dp)))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; // 哈希碰撞 while(dict_pool[p] && (notok=strcmp(timer->dat, dict_pool[p]->key)) && c-->0) p = ((*((uint32_t*)(++dp)))>>(8*sizeof(uint32_t)-DICTPOOLBIT))&DICTPOOLSZ; // 哈希碰撞
if(!dict_pool[p]) setdict = dict_pool[p] = (DICT*)malloc(sizeof(DICT)); // 无值 if(!dict_pool[p]) setdict = dict_pool[p] = (dict_t*)malloc(sizeof(dict_t)); // 无值
else if(notok) setdict = &d; // 全部冲突 else if(notok) setdict = &d; // 全部冲突
else { // 已有值 else { // 已有值
char ret[4]; char ret[4];
@@ -283,7 +283,7 @@ static int s2_set(THREADTIMER *timer) {
#endif #endif
//timer->status = 3; //timer->status = 3;
memset(setdict, 0, sizeof(DICT)); memset(setdict, 0, sizeof(dict_t));
strncpy(setdict->key, timer->dat, DICTKEYSZ-1); strncpy(setdict->key, timer->dat, DICTKEYSZ-1);
fseek(fp, 0, SEEK_END); fseek(fp, 0, SEEK_END);
return send_data(timer->accept_fd, timer->index, ACKDATA, "data", 4); return send_data(timer->accept_fd, timer->index, ACKDATA, "data", 4);
@@ -294,7 +294,7 @@ static int s2_set(THREADTIMER *timer) {
} }
} }
static int s3_set_data(THREADTIMER *timer) { static int s3_set_data(thread_timer_t *timer) {
//timer->status = 0; //timer->status = 0;
uint32_t datasize = (timer->numbytes > (DICTDATSZ-1))?(DICTDATSZ-1):timer->numbytes; uint32_t datasize = (timer->numbytes > (DICTDATSZ-1))?(DICTDATSZ-1):timer->numbytes;
#ifdef DEBUG #ifdef DEBUG
@@ -302,7 +302,7 @@ static int s3_set_data(THREADTIMER *timer) {
#endif #endif
memcpy(setdict->data, timer->dat, datasize); memcpy(setdict->data, timer->dat, datasize);
if(!set_pb(get_dict_fp_wr(), items_len, sizeof(DICT), setdict)) { if(!set_pb(get_dict_fp_wr(), items_len, sizeof(dict_t), setdict)) {
fprintf(stderr, "Error set data: dict[%s]=%s\n", setdict->key, timer->dat); fprintf(stderr, "Error set data: dict[%s]=%s\n", setdict->key, timer->dat);
return close_and_send(timer, ACKERRO, "erro", 4); return close_and_send(timer, ACKERRO, "erro", 4);
} }
@@ -315,7 +315,7 @@ static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]) {
while(has_next(fp, ch)) { while(has_next(fp, ch)) {
if(!ch) continue; // skip null bytes if(!ch) continue; // skip null bytes
SIMPLE_PB* spb = get_pb(fp); SIMPLE_PB* spb = get_pb(fp);
DICT* d = (DICT*)spb->target; dict_t* d = (dict_t*)spb->target;
if(memcmp(key, d->key, len)) { if(memcmp(key, d->key, len)) {
free(spb); free(spb);
continue; continue;
@@ -362,7 +362,7 @@ static enum SERVERACK del(FILE *fp, char* key, int len, char ret[4]) {
return ACKNULL; return ACKNULL;
} }
static int s4_del(THREADTIMER *timer) { static int s4_del(thread_timer_t *timer) {
uint8_t digest[16]; uint8_t digest[16];
char ret[4]; char ret[4];
timer->lock_type = DICT_LOCK_EX; timer->lock_type = DICT_LOCK_EX;
@@ -383,7 +383,7 @@ static int s4_del(THREADTIMER *timer) {
return close_and_send(timer, ACKNULL, "null", 4); return close_and_send(timer, ACKNULL, "null", 4);
} }
static int s5_md5(THREADTIMER *timer) { static int s5_md5(thread_timer_t *timer) {
//timer->status = 0; //timer->status = 0;
fill_md5(&mu); fill_md5(&mu);
if(is_dict_md5_equal((uint8_t*)timer->dat)) return send_data(timer->accept_fd, timer->index, ACKNULL, "null", 4); if(is_dict_md5_equal((uint8_t*)timer->dat)) return send_data(timer->accept_fd, timer->index, ACKNULL, "null", 4);
@@ -396,9 +396,10 @@ static void handle_quit(int signo) {
} }
static void accept_timer(void *p) { static void accept_timer(void *p) {
THREADTIMER *timer = timer_pointer_of(p); thread_timer_t *timer = timer_pointer_of(p);
uint32_t index = timer->index; uint32_t index = timer->index;
while(accept_threads[index] && !pthread_kill(accept_threads[index], 0)) { pthread_t thread = timer->thread;
while(!pthread_kill(thread, 0)) {
sleep(MAXWAITSEC / 4); sleep(MAXWAITSEC / 4);
time_t waitsec = time(NULL) - timer->touch; time_t waitsec = time(NULL) - timer->touch;
printf("Wait sec: %u, max: %u\n", (unsigned int)waitsec, MAXWAITSEC); printf("Wait sec: %u, max: %u\n", (unsigned int)waitsec, MAXWAITSEC);
@@ -406,7 +407,7 @@ static void accept_timer(void *p) {
if(waitsec > MAXWAITSEC*THREADCNT) break; if(waitsec > MAXWAITSEC*THREADCNT) break;
} else if(waitsec > MAXWAITSEC) break; } else if(waitsec > MAXWAITSEC) break;
} }
pthread_t thread = accept_threads[index];
if(thread) { if(thread) {
pthread_kill(thread, SIGQUIT); pthread_kill(thread, SIGQUIT);
puts("Kill thread"); puts("Kill thread");
@@ -418,9 +419,9 @@ static void kill_timer(pthread_t thread) {
puts("Kill timer"); puts("Kill timer");
} }
static void cleanup_thread(THREADTIMER* timer) { static void cleanup_thread(thread_timer_t* timer) {
puts("Start cleaning"); puts("Start cleaning");
accept_threads[timer->index] = 0; timer->thread = 0;
if(timer->accept_fd) { if(timer->accept_fd) {
close(timer->accept_fd); close(timer->accept_fd);
timer->accept_fd = 0; timer->accept_fd = 0;
@@ -464,11 +465,8 @@ static void handle_accept(void *p) {
CMDPACKET* cp = (CMDPACKET*)buff; CMDPACKET* cp = (CMDPACKET*)buff;
ssize_t numbytes = 0, offset = 0; ssize_t numbytes = 0, offset = 0;
while( while(
accept_threads[index]
&& (
offset >= CMDPACKET_HEAD_LEN offset >= CMDPACKET_HEAD_LEN
|| (numbytes = recv(accept_fd, buff+offset, CMDPACKET_HEAD_LEN-offset, MSG_WAITALL)) > 0 || (numbytes = recv(accept_fd, buff+offset, CMDPACKET_HEAD_LEN-offset, MSG_WAITALL)) > 0
)
) { ) {
touch_timer(p); touch_timer(p);
offset += numbytes; offset += numbytes;
@@ -585,7 +583,7 @@ static void accept_client() {
while(1) { while(1) {
puts("Ready for accept, waitting..."); puts("Ready for accept, waitting...");
int p = 0; int p = 0;
while(p < THREADCNT && accept_threads[p] && !pthread_kill(accept_threads[p], 0)) p++; while(p < THREADCNT && timers[p].thread) p++;
if(p >= THREADCNT) { if(p >= THREADCNT) {
puts("Max thread cnt exceeded"); puts("Max thread cnt exceeded");
sleep(1); sleep(1);
@@ -612,13 +610,13 @@ static void accept_client() {
inet_ntop(AF_INET, &in, str, sizeof(str)); inet_ntop(AF_INET, &in, str, sizeof(str));
#endif #endif
printf("Accept client %s:%u\n", str, port); printf("Accept client %s:%u\n", str, port);
THREADTIMER* timer = &timers[p]; thread_timer_t* timer = &timers[p];
timer->accept_fd = accept_fd; timer->accept_fd = accept_fd;
timer->index = p; timer->index = p;
timer->touch = time(NULL); timer->touch = time(NULL);
timer->ptr = NULL; timer->ptr = NULL;
reset_seq(p); reset_seq(p);
if (pthread_create(accept_threads + p, &attr, (void *)&handle_accept, timer)) { if (pthread_create(&timer->thread, &attr, (void *)&handle_accept, timer)) {
perror("Error creating thread: "); perror("Error creating thread: ");
cleanup_thread(timer); cleanup_thread(timer);
continue; continue;
@@ -627,7 +625,7 @@ static void accept_client() {
} }
} }
static int close_and_send(THREADTIMER* timer, enum SERVERACK cmd, char *data, size_t numbytes) { static int close_and_send(thread_timer_t* timer, enum SERVERACK cmd, char *data, size_t numbytes) {
close_dict(timer->lock_type, timer->index, &mu); close_dict(timer->lock_type, timer->index, &mu);
timer->lock_type = DICT_LOCK_UN; timer->lock_type = DICT_LOCK_UN;
return send_data(timer->accept_fd, timer->index, cmd, data, numbytes); return send_data(timer->accept_fd, timer->index, cmd, data, numbytes);
@@ -697,7 +695,7 @@ int main(int argc, char *argv[]) {
fclose(fp); fclose(fp);
free(spb); free(spb);
} }
items_len = align_struct(sizeof(DICT), 2, d.key, d.data); items_len = align_struct(sizeof(dict_t), 2, d.key, d.data);
if(!items_len) { if(!items_len) {
fputs("Align struct error", stderr); fputs("Align struct error", stderr);
return 8; return 8;