1
0
mirror of https://github.com/fumiama/simple-dict.git synced 2026-06-05 02:00:25 +08:00
This commit is contained in:
源文雨
2022-10-27 13:47:15 +08:00
parent deecbda91c
commit b7184d4dbf
2 changed files with 127 additions and 57 deletions

179
server.c
View File

@@ -3,6 +3,7 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <pthread.h>
#include <setjmp.h>
#include <signal.h>
#include <simple_protobuf.h>
#include <simplecrypto.h>
@@ -20,6 +21,47 @@
#include "crypto.h"
#include "config.h"
struct thread_timer_t {
uint32_t index;
int accept_fd;
time_t touch; // lock by mt
ssize_t numbytes;
char *dat;
pthread_rwlock_t mt; // lock thread&timerthread
pthread_t thread; // lock by mt
pthread_t timerthread; // lock by mt
pthread_cond_t c; // lock by mc
pthread_mutex_t mc; // lock c
pthread_cond_t tc; // lock by tmc
pthread_mutex_t tmc; // lock tc&hastimerslept
pthread_rwlock_t mb; // lock isbusy
uint8_t isbusy; // lock by mb
uint8_t hastimerslept; // lock by tmc
uint8_t buf[
THREAD_TIMER_T_SZ
-sizeof(uint32_t)
-sizeof(int)
-sizeof(time_t)
-sizeof(ssize_t)
-sizeof(char*)
-sizeof(pthread_rwlock_t)
-2*sizeof(pthread_t)
-2*sizeof(pthread_cond_t)
-2*sizeof(pthread_mutex_t)
-sizeof(pthread_rwlock_t)
-2*sizeof(uint8_t)
];
};
typedef struct thread_timer_t thread_timer_t;
static thread_timer_t timers[THREADCNT];
#define timer_pointer_of(x) ((thread_timer_t*)(x))
#define touch_timer(x) { \
pthread_rwlock_wrlock(&timer_pointer_of(x)->mt); \
timer_pointer_of(x)->touch = time(NULL); \
printf("Touch timer@%d\n", timer_pointer_of(x)->index);\
pthread_rwlock_unlock(&timer_pointer_of(x)->mt); \
}
#ifdef LISTEN_ON_IPV6
static socklen_t struct_len = sizeof(struct sockaddr_in6);
static struct sockaddr_in6 server_addr;
@@ -28,28 +70,6 @@
static struct sockaddr_in server_addr;
#endif
struct thread_timer_t {
uint32_t index;
int accept_fd;
time_t touch;
ssize_t numbytes;
char *dat;
pthread_t thread;
pthread_t timerthread;
pthread_cond_t c;
pthread_mutex_t mc;
pthread_cond_t tc;
pthread_mutex_t tmc;
pthread_rwlock_t mb;
uint8_t isbusy; // lock by mb
uint8_t hastimerslept; // lock by tmc
uint8_t buf[BUFSIZ-sizeof(uint32_t)-sizeof(int)-sizeof(time_t)-sizeof(ssize_t)-sizeof(char*)-2*sizeof(pthread_t)-2*sizeof(pthread_cond_t)-2*sizeof(pthread_mutex_t)-sizeof(pthread_rwlock_t)-2*sizeof(uint8_t)];
};
typedef struct thread_timer_t thread_timer_t;
static thread_timer_t timers[THREADCNT];
#define timer_pointer_of(x) ((thread_timer_t*)(x))
#define touch_timer(x) (timer_pointer_of(x)->touch = time(NULL))
static dict_t setdicts[THREADCNT];
static uint32_t* items_len;
static config_t cfg;
@@ -58,6 +78,9 @@ static pthread_attr_t attr;
#define DICTPOOLSZ (((uint32_t)-1)>>((sizeof(uint32_t)*8-DICTPOOLBIT)))
static dict_t* dict_pool[DICTPOOLSZ+1];
static pthread_key_t pthread_key_index;
static jmp_buf jmp2convend[THREADCNT];
static void accept_client(int fd);
static void accept_timer(void *p);
static int bind_server(uint16_t* port);
@@ -559,15 +582,17 @@ static int s5_md5(thread_timer_t *timer) {
}
static void handle_quit(int signo) {
puts("Handle sigquit");
fflush(stdout);
pthread_exit(NULL);
uint32_t index = (uint32_t)(pthread_getspecific(pthread_key_index));
printf("Handle sigquit@%u\n", index);
signal(SIGQUIT, handle_quit);
longjmp(jmp2convend[index], signo);
}
static void handle_segv(int signo) {
puts("Handle sigsegv");
fflush(stdout);
pthread_exit(NULL);
uint32_t index = (uint32_t)(pthread_getspecific(pthread_key_index));
printf("Handle sigsegv@%u\n", index);
signal(SIGSEGV, handle_segv);
longjmp(jmp2convend[index], signo);
}
static void handle_kill(int signo) {
@@ -575,17 +600,33 @@ static void handle_kill(int signo) {
process_defer();
}
static void handle_int(int signo) {
puts("Keyboard interrupted");
process_defer();
}
static void handle_pipe(int signo) {
uint32_t index = (uint32_t)(pthread_getspecific(pthread_key_index));
printf("Pipe error@%u, break loop...\n", index);
signal(SIGPIPE, handle_pipe);
longjmp(jmp2convend[index], signo);
}
static void accept_timer(void *p) {
thread_timer_t *timer = timer_pointer_of(p);
uint32_t index = timer->index;
sigset_t mask;
pthread_t thread;
sigemptyset(&mask);
sigaddset(&mask, SIGPIPE); // 防止处理嵌套
pthread_sigmask(SIG_BLOCK, &mask, NULL);
sleep(MAXWAITSEC / 4);
while(timer->thread && !pthread_kill(timer->thread, 0)) {
pthread_rwlock_rdlock(&timer->mt);
thread = timer->thread;
pthread_rwlock_unlock(&timer->mt);
while(thread && !pthread_kill(thread, 0)) {
pthread_rwlock_rdlock(&timer->mb);
uint8_t isbusy = timer->isbusy;
pthread_rwlock_unlock(&timer->mb);
@@ -593,25 +634,32 @@ static void accept_timer(void *p) {
TIMER_SLEEP:
pthread_mutex_lock(&timer->tmc);
timer->hastimerslept = 1;
puts("Timer sleep");
printf("Timer@%d sleep\n", timer->index);
pthread_cond_wait(&timer->tc, &timer->tmc);
timer->hastimerslept = 0;
pthread_mutex_unlock(&timer->tmc);
puts("Timer wake up");
printf("Timer@%d wake up\n", timer->index);
sleep(MAXWAITSEC / 4);
pthread_rwlock_rdlock(&timer->mt);
thread = timer->thread;
pthread_rwlock_unlock(&timer->mt);
}
if(is_dict_opening) touch_timer(p);
pthread_rwlock_rdlock(&timer->mt);
time_t waitsec = time(NULL) - timer->touch;
printf("Wait sec: %u, max: %u\n", (unsigned int)waitsec, MAXWAITSEC);
pthread_rwlock_unlock(&timer->mt);
printf("Wait@%d sec: %u, max: %u\n", timer->index, (unsigned int)waitsec, MAXWAITSEC);
if(waitsec > MAXWAITSEC) {
pthread_t thread = timer->thread;
if(thread) {
pthread_kill(thread, SIGQUIT);
puts("Kill thread");
printf("Kill thread@%d\n", timer->index);
}
break;
}
sleep(MAXWAITSEC / 4);
pthread_rwlock_rdlock(&timer->mt);
thread = timer->thread;
pthread_rwlock_unlock(&timer->mt);
}
goto TIMER_SLEEP;
}
@@ -621,52 +669,55 @@ static void cleanup_thread(thread_timer_t* timer) {
sigemptyset(&mask);
sigaddset(&mask, SIGPIPE); // 防止处理嵌套
pthread_sigmask(SIG_BLOCK, &mask, NULL);
printf("Start cleaning, ");
printf("Start cleaning@%d, ", timer->index);
if(timer->accept_fd) {
close(timer->accept_fd);
timer->accept_fd = 0;
printf("Close accept, ");
}
close_dict(timer->index);
pthread_rwlock_wrlock(&timer->mt);
timer->thread = 0;
printf("Clear thread, ");
pthread_rwlock_unlock(&timer->mt);
pthread_cond_destroy(&timer->c);
printf("Destroy accept cond, ");
pthread_mutex_destroy(&timer->mc);
printf("Destroy accept mutex, ");
setdicts[timer->index].data[0] = 0;
pthread_rwlock_wrlock(&timer->mb);
timer->isbusy = 0;
printf("Clear busy, ");
pthread_rwlock_unlock(&timer->mb);
puts("Finish cleaning");
}
static void process_defer() {
for(int i = 0; i < THREADCNT; i++) {
if(timers[i].thread) pthread_kill(timers[i].thread, SIGQUIT);
if(timers[i].timerthread) pthread_kill(timers[i].timerthread, SIGQUIT);
if(timers[i].thread) pthread_kill(timers[i].thread, SIGUSR1); // pthread_exit
if(timers[i].timerthread) pthread_kill(timers[i].timerthread, SIGUSR1); // pthread_exit
}
fflush(stdout);
pthread_exit(NULL);
}
static void handle_int(int signo) {
puts("Keyboard interrupted");
process_defer();
}
static void handle_pipe(int signo) {
puts("Pipe error, exit thread...");
fflush(stdout);
pthread_exit(NULL);
}
static void handle_accept(void *p) {
#ifdef DEBUG
printf("accept ptr: %p\n", p);
#endif
pthread_cleanup_push((void*)&cleanup_thread, p);
puts("Handling accept...");
pthread_setspecific(pthread_key_index, (void*)((uintptr_t)timer_pointer_of(p)->index));
if(setjmp(jmp2convend[timer_pointer_of(p)->index])) goto CONV_END;
while(1) {
int accept_fd = timer_pointer_of(p)->accept_fd;
uint32_t index = timer_pointer_of(p)->index;
@@ -795,12 +846,17 @@ static void accept_client(int fd) {
signal(SIGSEGV, handle_segv);
signal(SIGPIPE, handle_pipe);
signal(SIGTERM, handle_kill);
signal(SIGUSR1, (void (*)(int))pthread_exit);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
init_crypto();
init_dict_pool(open_dict(0, 1));
close_dict(0);
for(int i = 0; i < THREADCNT; i++) pthread_rwlock_init(&timers[i].mb, NULL);
for(int i = 0; i < THREADCNT; i++) {
pthread_rwlock_init(&timers[i].mt, NULL);
pthread_rwlock_init(&timers[i].mb, NULL);
}
pthread_key_create(&pthread_key_index, NULL);
while(1) {
int p = 0;
while(p < THREADCNT) {
@@ -849,10 +905,15 @@ static void accept_client(int fd) {
printf("\n> %sAccept client %s:%u at slot No.%d, ", ctime(&t), str, port, p);
timer->accept_fd = accept_fd;
timer->index = p;
pthread_rwlock_wrlock(&timers[p].mt);
timer->touch = time(NULL);
pthread_rwlock_unlock(&timers[p].mt);
reset_seq(p);
// start or wakeup accept thread
if(timer->thread) {
pthread_rwlock_rdlock(&timers[p].mt);
pthread_t thread = timer->thread;
pthread_rwlock_unlock(&timers[p].mt);
if(thread) {
pthread_mutex_lock(&timer->mc);
pthread_cond_signal(&timer->c); // wakeup thread
pthread_mutex_unlock(&timer->mc);
@@ -869,8 +930,9 @@ static void accept_client(int fd) {
puts("Thread created");
}
// start or wakeup timer thread
pthread_t thread = timer->timerthread;
uint8_t hastimerslept = timer->hastimerslept;
pthread_rwlock_rdlock(&timers[p].mt);
thread = timer->timerthread;
pthread_rwlock_unlock(&timers[p].mt);
if(!thread || pthread_kill(thread, 0)) {
printf("Creating timer thread...");
pthread_cond_init(&timer->tc, NULL);
@@ -883,12 +945,17 @@ static void accept_client(int fd) {
continue;
}
puts("succeeded");
} else if(hastimerslept) {
printf("Waking up timer thread...");
} else {
pthread_mutex_lock(&timer->tmc);
pthread_cond_signal(&timer->tc); // wakeup thread
uint8_t hastimerslept = timer->hastimerslept;
pthread_mutex_unlock(&timer->tmc);
puts("succeeded");
if(hastimerslept) {
printf("Waking up timer thread...");
pthread_mutex_lock(&timer->tmc);
pthread_cond_signal(&timer->tc); // wakeup thread
pthread_mutex_unlock(&timer->tmc);
puts("succeeded");
} else puts("Timer already running");
}
}
}

View File

@@ -10,9 +10,12 @@
#define MAXWAITSEC 8
#endif
#ifndef DICTPOOLBIT
// DICTPOOLBIT must be lower than 4*8 = 32
// DICTPOOLBIT must be lower than 4*8 = 32
#define DICTPOOLBIT 16
#endif
#ifndef THREAD_TIMER_T_SZ
#define THREAD_TIMER_T_SZ 1024
#endif
enum server_cmd_t {CMDGET, CMDCAT, CMDMD5, CMDACK, CMDEND, CMDSET, CMDDEL, CMDDAT};
enum server_ack_t {ACKNONE=0b0000011, ACKSUCC=0b0010011, ACKDATA=0b0100011, ACKNULL=0b0110011, ACKNEQU=0b1000011, ACKERRO=0b1010011};