#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #if !__APPLE__ #include #include #else #include #endif static CONFIG* cfg; // 存储 pwd 和 sps static int fd; // server socket fd #ifdef LISTEN_ON_IPV6 static socklen_t struct_len = sizeof(struct sockaddr_in6); static struct sockaddr_in6 server_addr; static struct sockaddr_in6 client_addr; #else static socklen_t struct_len = sizeof(struct sockaddr_in); static struct sockaddr_in server_addr; static struct sockaddr_in client_addr; #endif static char *data_path; // cat 命令读取的文件位置 static char *kanban_path; // get 命令读取的文件位置 #define THREADCNT 16 static pthread_t accept_threads[THREADCNT]; static pthread_attr_t attr; #define MAXWAITSEC 10 #define TIMERDATSZ BUFSIZ // accept_timer 使用的结构体 // 包含了本次 accept 的全部信息 // 以方便退出后清理空间 struct THREADTIMER { int index; // 指向 accept_threads 某个槽位的下标 time_t touch; // 最后访问时间,与当前时间差超过 MAXWAITSEC 将由 timer 强行回收线程 int accept_fd; // 本次 accept 的 fd,会自行关闭或出错时由 timer 负责回收 ssize_t numbytes; // 本次接收的数据长度 char status; // 本会话所处的状态 char is_open; // 标识 fp 是否正在使用 FILE *fp; // 本会话打开的文件,会自行关闭或出错时由 timer 负责回收 char data[TIMERDATSZ]; }; typedef struct THREADTIMER THREADTIMER; #define showUsage(program) printf("Usage: %s [-d] listen_port try_times kanban_file data_file config_file\n\t-d: As daemon\n", program) static void accept_client(); static void accept_timer(void *p); static int bind_server(uint16_t port, int try_times); static int check_buffer(THREADTIMER *timer); static void clean_timer(THREADTIMER* timer); static void close_file(FILE *fp); static int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes); static void handle_accept(void *accept_fd_p); static void handle_pipe(int signo); static void handle_quit(int signo); static int listen_socket(int try_times); static FILE *open_file(char* file_path, int lock_type, char* mode); static int send_all(char* file_path, THREADTIMER *timer); static int send_data(int accept_fd, char *data, size_t length); static off_t size_of_file(const char* fname); static int sm1_pwd(THREADTIMER *timer); static int s0_init(THREADTIMER *timer); static int s1_get(THREADTIMER *timer); static int s2_set(THREADTIMER *timer); static int s3_set_data(THREADTIMER *timer); static pid_t pid; /*************************************** * accept_client 接受新连接,创建线程处理 * 创建的线程入口点为 handle_accept * 与其伴生的结构体为 timer,负责管理 * 该线程使用的资源,当线程(正常/异常)退出 * 或 client 超过 MAXWAITSEC 未响应时 * 将由与其伴生的 accept_timer 线程读取 * timer 的信息,调用 clean_timer 回收 * 未被释放的资源以防止内存泄漏 ***************************************/ static void accept_client() { pid = fork(); while (pid > 0) { // 主进程监控子进程状态,如果子进程异常终止则重启之 wait(NULL); puts("Server subprocess exited. Restart..."); pid = fork(); } signal(SIGQUIT, handle_quit); signal(SIGPIPE, handle_pipe); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if(pid < 0) puts("Error when forking a subprocess."); else while(1) { puts("Ready for accept, waitting..."); int p = 0; while(p < THREADCNT && accept_threads[p] && !pthread_kill(accept_threads[p], 0)) p++; if(p >= THREADCNT) { puts("Max thread cnt exceeded"); sleep(1); continue; } printf("Next thread is No.%d\n", p); THREADTIMER *timer = malloc(sizeof(THREADTIMER)); if(!timer) { puts("Allocate timer error"); continue; } timer->accept_fd = accept(fd, (struct sockaddr *)&client_addr, &struct_len); if(timer->accept_fd <= 0) { free(timer); puts("Accept client error."); continue; } #ifdef LISTEN_ON_IPV6 uint16_t port = ntohs(client_addr.sin6_port); struct in6_addr in = client_addr.sin6_addr; char str[INET6_ADDRSTRLEN]; // 46 inet_ntop(AF_INET6, &in, str, sizeof(str)); #else uint16_t port = ntohs(client_addr.sin_port); struct in_addr in = client_addr.sin_addr; char str[INET_ADDRSTRLEN]; // 16 inet_ntop(AF_INET, &in, str, sizeof(str)); #endif printf("Accept client %s:%u\n", str, port); timer->index = p; timer->touch = time(NULL); timer->is_open = 0; timer->fp = NULL; if (pthread_create(&accept_threads[p], &attr, (void *)&handle_accept, timer)) perror("pthread_create"); else puts("Creating thread succeeded"); } } #define timer_ptr(x) ((THREADTIMER*)(x)) #define my_thread(timer) accept_threads[timer->index] /*************************************** * accept_timer 是与 handle_accept 伴生的 * 线程,负责监控其会话状态,并在超时时杀死它 ***************************************/ static void accept_timer(void *p) { while(my_thread(timer_ptr(p)) && !pthread_kill(my_thread(timer_ptr(p)), 0)) { sleep(MAXWAITSEC / 4); time_t waitsec = time(NULL) - timer_ptr(p)->touch; printf("Wait sec: %d, max: %d\n", (int)waitsec, MAXWAITSEC); if(waitsec > MAXWAITSEC) break; } clean_timer(timer_ptr(p)); free(p); // 唯一 free 点 puts("Timer has been freed"); } static int bind_server(uint16_t port, int try_times) { int fail_count = 0; int result = -1; #ifdef LISTEN_ON_IPV6 server_addr.sin6_family = AF_INET6; server_addr.sin6_port = htons(port); bzero(&(server_addr.sin6_addr), sizeof(server_addr.sin6_addr)); fd = socket(PF_INET6, SOCK_STREAM, 0); #else server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); server_addr.sin_addr.s_addr = INADDR_ANY; bzero(&(server_addr.sin_zero), 8); fd = socket(AF_INET, SOCK_STREAM, 0); #endif while(!~(result = bind(fd, (struct sockaddr *)&server_addr, struct_len)) && fail_count++ < try_times) sleep(1); if(!~result && fail_count >= try_times) { puts("Bind server failure!"); return 0; } else{ puts("Bind server success!"); return 1; } } /*************************************** * check_buffer 检查接收到的数据,结合 * 当前会话所处状态决定接下来的处理流程 ***************************************/ static int check_buffer(THREADTIMER *timer) { printf("Status: %d\n", (int)timer->status); switch(timer->status) { case -1: return sm1_pwd(timer); break; case 0: return s0_init(timer); break; case 1: return s1_get(timer); break; case 2: return s2_set(timer); break; case 3: return s3_set_data(timer); break; default: return -1; break; } } // clean_timer 清理 timer static void clean_timer(THREADTIMER* timer) { puts("Start cleaning."); if(my_thread(timer)) { pthread_kill(my_thread(timer), SIGQUIT); my_thread(timer) = 0; puts("Kill thread."); } if(timer->is_open) { close_file(timer->fp); timer->is_open = 0; puts("Close file."); } if(timer->accept_fd) { close(timer->accept_fd); timer->accept_fd = 0; puts("Close accept."); } puts("Finish cleaning."); } static void close_file(FILE *fp) { puts("Close file"); if(fp) { flock(fileno(fp), LOCK_UN); fclose(fp); } } static int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes) { close_file(timer->fp); timer->is_open = 0; return send_data(timer->accept_fd, data, numbytes); } #define chkbuf(p) if(!check_buffer(timer_ptr(p))) break #define take_word(p, w, buff) if(timer_ptr(p)->numbytes > strlen(w) && strstr(buff, w) == buff) {\ int l = strlen(w);\ char store = buff[l];\ buff[l] = 0;\ ssize_t n = timer_ptr(p)->numbytes - l;\ timer_ptr(p)->numbytes = l;\ chkbuf(p);\ buff[0] = store;\ memmove(buff + 1, buff + l + 1, n - 1);\ buff[n] = 0;\ timer_ptr(p)->numbytes = n;\ printf("Split cmd: %s\n", w);\ } #define touch_timer(x) (timer_ptr(x)->touch = time(NULL)) #define my_fd(x) (timer_ptr(x)->accept_fd) #define my_dat(x) (timer_ptr(x)->data) // handle_accept 初步解析指令,处理部分粘连 static void handle_accept(void *p) { if(my_fd(p) > 0) { puts("Connected to the client."); signal(SIGQUIT, handle_quit); signal(SIGPIPE, handle_pipe); pthread_t thread; if (pthread_create(&thread, &attr, (void *)&accept_timer, p)) puts("Error creating timer thread"); else puts("Creating timer thread succeeded"); send_data(my_fd(p), "Welcome to simple kanban server.", 33); timer_ptr(p)->status = -1; while(my_thread(timer_ptr(p)) && (timer_ptr(p)->numbytes = recv(my_fd(p), my_dat(p), TIMERDATSZ, 0)) > 0) { touch_timer(p); my_dat(p)[timer_ptr(p)->numbytes] = 0; printf("Get %d bytes: %s\n", (int)timer_ptr(p)->numbytes, my_dat(p)); puts("Check buffer"); //处理部分粘连 take_word(p, cfg->pwd, my_dat(p)); take_word(p, "get", my_dat(p)); take_word(p, "set", my_dat(p)); take_word(p, "cat", my_dat(p)); take_word(p, "quit", my_dat(p)); take_word(p, cfg->sps, my_dat(p)); take_word(p, "ver", my_dat(p)); take_word(p, "dat", my_dat(p)); if(timer_ptr(p)->numbytes > 0) chkbuf(p); } printf("Break: recv %d bytes\n", (int)timer_ptr(p)->numbytes); my_thread(timer_ptr(p)) = 0; clean_timer(timer_ptr(p)); } else puts("Error accepting client"); } static void handle_quit(int signo) { perror("signal quit"); pthread_exit(NULL); } static void handle_pipe(int signo) { perror("signal pipe"); pthread_exit(NULL); } static int listen_socket(int try_times) { int fail_count = 0; int result = -1; while(!~(result = listen(fd, 10)) && fail_count++ < try_times) sleep(1); if(!~result && fail_count >= try_times) { puts("Listen failed!"); return 0; } else{ puts("Listening...."); return 1; } } static FILE *open_file(char* file_path, int lock_type, char* mode) { FILE *fp = NULL; fp = fopen(file_path, mode); if(fp) { if(!~flock(fileno(fp), lock_type | LOCK_NB)) { perror("flock"); fp = NULL; } printf("Open file in mode %d\n", lock_type); } else perror("fopen"); return fp; } static int send_all(char* file_path, THREADTIMER *timer) { int re = 1; FILE *fp = open_file(file_path, LOCK_SH, "rb"); if(fp) { timer->fp = fp; timer->is_open = 1; uint32_t file_size = (uint32_t)size_of_file(file_path); printf("Get file size: %d bytes.\n", (int)file_size); off_t len = 0; #if __APPLE__ #ifdef WORDS_BIGENDIAN file_size = __DARWIN_OSSwapInt32(file_size); #endif struct sf_hdtr hdtr; struct iovec headers; headers.iov_base = &file_size; headers.iov_len = sizeof(uint32_t); hdtr.headers = &headers; hdtr.hdr_cnt = 1; hdtr.trailers = NULL; hdtr.trl_cnt = 0; re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0); if(!re) perror("sendfile"); #else #ifdef WORDS_BIGENDIAN uint32_t little_fs = __builtin_bswap32(file_size); send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0); #else send(timer->accept_fd, &file_size, sizeof(uint32_t), 0); #endif re = !sendfile(timer->accept_fd, fileno(fp), &len, file_size) >= 0; if(!re) perror("sendfile"); #endif printf("Send %d bytes.\n", (int)len); close_file(fp); timer->is_open = 0; } return re; } static int send_data(int accept_fd, char *data, size_t length) { if(!~send(accept_fd, data, length, 0)) { puts("Send data error"); return 0; } printf("Send data: "); if(length > 128) { data[124] = '.'; data[125] = '.'; data[126] = '.'; data[127] = 0; } puts(data); return 1; } static off_t size_of_file(const char* fname) { struct stat statbuf; if(stat(fname, &statbuf)==0) return statbuf.st_size; else return -1; } static int sm1_pwd(THREADTIMER *timer) { if(!strcmp(cfg->pwd, timer->data)) timer->status = 0; return !timer->status; } static int s0_init(THREADTIMER *timer) { if(!strcmp("get", timer->data)) timer->status = 1; else if(!strcmp(cfg->sps, timer->data)) timer->status = 2; else if(!strcmp("cat", timer->data)) return send_all(data_path, timer); else if(!strcmp("quit", timer->data)) return 0; return send_data(timer->accept_fd, timer->data, timer->numbytes); } static int s1_get(THREADTIMER *timer) { //get kanban FILE *fp = open_file(kanban_path, LOCK_SH, "rb"); timer->status = 0; if(fp) { timer->fp = fp; timer->is_open = 1; uint32_t ver, cli_ver; if(fscanf(fp, "%u", &ver) > 0) { if(sscanf(timer->data, "%u", &cli_ver) > 0) { if(cli_ver < ver) { //need to send a new kanban close_file(fp); timer->is_open = 0; int r = send_all(kanban_path, timer); if(strstr(timer->data, "quit") == timer->data + timer->numbytes - 4) { puts("Found last cmd is quit."); return 0; } else return r; } } } } return close_file_and_send(timer, "null", 4); } static int s2_set(THREADTIMER *timer) { FILE *fp = NULL; if(!strcmp(timer->data, "ver")) { fp = open_file(kanban_path, LOCK_EX, "wb"); } else if(!strcmp(timer->data, "dat")) { fp = open_file(data_path, LOCK_EX, "wb"); } if(fp) { timer->status = 3; timer->fp = fp; timer->is_open = 1; return send_data(timer->accept_fd, "data", 4); } else { timer->status = 0; return send_data(timer->accept_fd, "erro", 4); } } static int s3_set_data(THREADTIMER *timer) { timer->status = 0; #ifdef WORDS_BIGENDIAN uint32_t file_size = __builtin_bswap32(*(uint32_t*)(timer->data)); #else uint32_t file_size = *(uint32_t*)(timer->data); #endif printf("Set data size: %u\n", file_size); int is_first_data = 0; if(timer->numbytes == sizeof(uint32_t)) { if((timer->numbytes = recv(timer->accept_fd, timer->data, TIMERDATSZ, 0)) <= 0) return close_file_and_send(timer, "erro", 4); is_first_data = 1; printf("Get data size: %d\n", (int)timer->numbytes); } size_t offset = (is_first_data?0:sizeof(uint32_t)); if(file_size <= TIMERDATSZ - offset) { while(timer->numbytes != file_size - offset) { ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes + offset, TIMERDATSZ - timer->numbytes - offset, MSG_WAITALL); if(n <= 0) return close_file_and_send(timer, "erro", 4); timer->numbytes += n; } if(fwrite(timer->data + offset, file_size, 1, timer->fp) != 1) { perror("fwrite"); return close_file_and_send(timer, "erro", 4); } return close_file_and_send(timer, "succ", 4); } if(fwrite(timer->data + offset, timer->numbytes - offset, 1, timer->fp) != 1) { perror("fwrite"); return close_file_and_send(timer, "erro", 4); } int32_t remain = file_size - timer->numbytes; while(remain > 0) { // printf("remain:%d\n", (int)remain); ssize_t n = recv(timer->accept_fd, timer->data, (remain>TIMERDATSZ)?TIMERDATSZ:remain, MSG_WAITALL); if(n <= 0) return close_file_and_send(timer, "erro", 4); if(fwrite(timer->data, n, 1, timer->fp) != 1) { perror("fwrite"); return close_file_and_send(timer, "erro", 4); } remain -= n; } return close_file_and_send(timer, "succ", 4); } int main(int argc, char *argv[]) { if(argc != 6 && argc != 7) showUsage(argv[0]); else { int port = 0; int as_daemon = !strcmp("-d", argv[1]); sscanf(argv[as_daemon?2:1], "%d", &port); if(port > 0 && port < 65536) { int times = 0; sscanf(argv[as_daemon?3:2], "%d", ×); if(times > 0) { if(!as_daemon || (as_daemon && (daemon(1, 1) >= 0))) { FILE *fp = NULL; fp = fopen(argv[as_daemon?4:3], "rb+"); if(!fp) fp = fopen(argv[as_daemon?4:3], "wb+"); if(fp) { kanban_path = argv[as_daemon?4:3]; fclose(fp); fp = NULL; fp = fopen(argv[as_daemon?5:4], "rb+"); if(!fp) fp = fopen(argv[as_daemon?5:4], "wb+"); if(fp) { data_path = argv[as_daemon?5:4]; fclose(fp); fp = NULL; fp = fopen(argv[as_daemon?6:5], "rb"); if(fp) { SIMPLE_PB* spb = get_pb(fp); cfg = (CONFIG*)spb->target; fclose(fp); if(bind_server(port, times)) if(listen_socket(times)) accept_client(); } else printf("Error opening config file: %s\n", argv[as_daemon?6:5]); } else printf("Error opening data file: %s\n", argv[as_daemon?5:4]); } else printf("Error opening kanban file: %s\n", argv[as_daemon?4:3]); } else puts("Start daemon error"); } else printf("Error times: %d\n", times); } else printf("Error port: %d\n", port); } close(fd); exit(EXIT_FAILURE); }