diff --git a/client.c b/client.c index 7538d6c..c19b232 100644 --- a/client.c +++ b/client.c @@ -9,6 +9,7 @@ #include #include #include +#include #if !__APPLE__ #include @@ -90,7 +91,7 @@ int main(int argc,char *argv[]) { //usage: ./client host port else puts("Send file error."); #endif fclose(fp); - printf("Send count:%u\n", len); + printf("Send count:%d\n", (int)len); } else puts("Open file error!"); } else { send(sockfd, buf, strlen(buf), 0); diff --git a/server.c b/server.c index 45bf060..4353556 100644 --- a/server.c +++ b/server.c @@ -23,9 +23,9 @@ #include #endif -CONFIG* cfg; +static CONFIG* cfg; // 存储 pwd 和 sps -int fd; +static int fd; // server socket fd #ifdef LISTEN_ON_IPV6 static socklen_t struct_len = sizeof(struct sockaddr_in6); @@ -37,51 +37,137 @@ int fd; static struct sockaddr_in client_addr; #endif -char *data_path; -char *kanban_path; +static char *data_path; // cat 命令读取的文件位置 +static char *kanban_path; // get 命令读取的文件位置 #define THREADCNT 16 -pthread_t accept_threads[THREADCNT]; - +static pthread_t accept_threads[THREADCNT]; static pthread_attr_t attr; #define MAXWAITSEC 10 +#define TIMERDATSZ BUFSIZ +// accept_timer 使用的结构体 +// 包含了本次 accept 的全部信息 +// 以方便退出后清理空间 struct THREADTIMER { - pthread_t *thread; - time_t touch; - int accept_fd; - ssize_t numbytes; - char *data; - char status; - char is_open; - FILE *fp; + int index; // 指向 accept_threads 某个槽位的下标 + time_t touch; // 最后访问时间,与当前时间差超过 MAXWAITSEC 将由 timer 强行回收线程 + int accept_fd; // 本次 accept 的 fd,会自行关闭或出错时由 timer 负责回收 + ssize_t numbytes; // 本次接收的数据长度 + char status; // 本会话所处的状态 + char is_open; // 标识 fp 是否正在使用 + FILE *fp; // 本会话打开的文件,会自行关闭或出错时由 timer 负责回收 + char data[TIMERDATSZ]; }; typedef struct THREADTIMER THREADTIMER; #define showUsage(program) printf("Usage: %s [-d] listen_port try_times kanban_file data_file config_file\n\t-d: As daemon\n", program) -void accept_client(); -void accept_timer(void *p); -int bind_server(uint16_t port, uint32_t try_times); -int check_buffer(THREADTIMER *timer); -void close_file(FILE *fp); -int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes); -void handle_accept(void *accept_fd_p); -void handle_pipe(int signo); -void handle_quit(int signo); -void kill_thread(THREADTIMER* timer); -int listen_socket(uint32_t try_times); -FILE *open_file(char* file_path, int lock_type, char* mode); -int send_all(char* file_path, THREADTIMER *timer); -int send_data(int accept_fd, char *data, size_t length); -int sm1_pwd(THREADTIMER *timer); -off_t size_of_file(const char* fname); -int s0_init(THREADTIMER *timer); -int s1_get(THREADTIMER *timer); -int s2_set(THREADTIMER *timer); -int s3_set_data(THREADTIMER *timer); +static void accept_client(); +static void accept_timer(void *p); +static int bind_server(uint16_t port, int try_times); +static int check_buffer(THREADTIMER *timer); +static void clean_timer(THREADTIMER* timer); +static void close_file(FILE *fp); +static int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes); +static void handle_accept(void *accept_fd_p); +static void handle_pipe(int signo); +static void handle_quit(int signo); +static int listen_socket(int try_times); +static FILE *open_file(char* file_path, int lock_type, char* mode); +static int send_all(char* file_path, THREADTIMER *timer); +static int send_data(int accept_fd, char *data, size_t length); +static off_t size_of_file(const char* fname); +static int sm1_pwd(THREADTIMER *timer); +static int s0_init(THREADTIMER *timer); +static int s1_get(THREADTIMER *timer); +static int s2_set(THREADTIMER *timer); +static int s3_set_data(THREADTIMER *timer); -int bind_server(uint16_t port, uint32_t try_times) { +static pid_t pid; +/*************************************** + * accept_client 接受新连接,创建线程处理 + * 创建的线程入口点为 handle_accept + * 与其伴生的结构体为 timer,负责管理 + * 该线程使用的资源,当线程(正常/异常)退出 + * 或 client 超过 MAXWAITSEC 未响应时 + * 将由与其伴生的 accept_timer 线程读取 + * timer 的信息,调用 clean_timer 回收 + * 未被释放的资源以防止内存泄漏 +***************************************/ +static void accept_client() { + pid = fork(); + while (pid > 0) { // 主进程监控子进程状态,如果子进程异常终止则重启之 + wait(NULL); + puts("Server subprocess exited. Restart..."); + pid = fork(); + } + signal(SIGQUIT, handle_quit); + signal(SIGPIPE, handle_pipe); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if(pid < 0) puts("Error when forking a subprocess."); + else while(1) { + puts("Ready for accept, waitting..."); + int p = 0; + while(p < THREADCNT && accept_threads[p] && !pthread_kill(accept_threads[p], 0)) p++; + if(p >= THREADCNT) { + puts("Max thread cnt exceeded"); + sleep(1); + continue; + } + printf("Next thread is No.%d\n", p); + THREADTIMER *timer = malloc(sizeof(THREADTIMER)); + if(!timer) { + puts("Allocate timer error"); + continue; + } + timer->accept_fd = accept(fd, (struct sockaddr *)&client_addr, &struct_len); + if(timer->accept_fd <= 0) { + free(timer); + puts("Accept client error."); + continue; + } + #ifdef LISTEN_ON_IPV6 + uint16_t port = ntohs(client_addr.sin6_port); + struct in6_addr in = client_addr.sin6_addr; + char str[INET6_ADDRSTRLEN]; // 46 + inet_ntop(AF_INET6, &in, str, sizeof(str)); + #else + uint16_t port = ntohs(client_addr.sin_port); + struct in_addr in = client_addr.sin_addr; + char str[INET_ADDRSTRLEN]; // 16 + inet_ntop(AF_INET, &in, str, sizeof(str)); + #endif + printf("Accept client %s:%u\n", str, port); + timer->index = p; + timer->touch = time(NULL); + timer->is_open = 0; + timer->fp = NULL; + if (pthread_create(&accept_threads[p], &attr, (void *)&handle_accept, timer)) perror("pthread_create"); + else puts("Creating thread succeeded"); + } +} + +#define timer_ptr(x) ((THREADTIMER*)(x)) +#define my_thread(timer) accept_threads[timer->index] +/*************************************** + * accept_timer 是与 handle_accept 伴生的 + * 线程,负责监控其会话状态,并在超时时杀死它 +***************************************/ +static void accept_timer(void *p) { + while(my_thread(timer_ptr(p)) && !pthread_kill(my_thread(timer_ptr(p)), 0)) { + sleep(MAXWAITSEC / 4); + time_t waitsec = time(NULL) - timer_ptr(p)->touch; + printf("Wait sec: %d, max: %d\n", (int)waitsec, MAXWAITSEC); + if(waitsec > MAXWAITSEC) break; + } + clean_timer(timer_ptr(p)); + free(p); // 唯一 free 点 + puts("Timer has been freed"); +} + +static int bind_server(uint16_t port, int try_times) { int fail_count = 0; int result = -1; #ifdef LISTEN_ON_IPV6 @@ -106,7 +192,118 @@ int bind_server(uint16_t port, uint32_t try_times) { } } -int listen_socket(uint32_t try_times) { +/*************************************** + * check_buffer 检查接收到的数据,结合 + * 当前会话所处状态决定接下来的处理流程 +***************************************/ +static int check_buffer(THREADTIMER *timer) { + printf("Status: %d\n", (int)timer->status); + switch(timer->status) { + case -1: return sm1_pwd(timer); break; + case 0: return s0_init(timer); break; + case 1: return s1_get(timer); break; + case 2: return s2_set(timer); break; + case 3: return s3_set_data(timer); break; + default: return -1; break; + } +} + +// clean_timer 清理 timer +static void clean_timer(THREADTIMER* timer) { + puts("Start cleaning."); + if(my_thread(timer)) { + pthread_kill(my_thread(timer), SIGQUIT); + my_thread(timer) = 0; + puts("Kill thread."); + } + if(timer->is_open) { + close_file(timer->fp); + timer->is_open = 0; + puts("Close file."); + } + if(timer->accept_fd) { + close(timer->accept_fd); + timer->accept_fd = 0; + puts("Close accept."); + } + puts("Finish cleaning."); +} + +static void close_file(FILE *fp) { + puts("Close file"); + if(fp) { + flock(fileno(fp), LOCK_UN); + fclose(fp); + } +} + +static int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes) { + close_file(timer->fp); + timer->is_open = 0; + return send_data(timer->accept_fd, data, numbytes); +} + +#define chkbuf(p) if(!check_buffer(timer_ptr(p))) break +#define take_word(p, w, buff) if(timer_ptr(p)->numbytes > strlen(w) && strstr(buff, w) == buff) {\ + int l = strlen(w);\ + char store = buff[l];\ + buff[l] = 0;\ + ssize_t n = timer_ptr(p)->numbytes - l;\ + timer_ptr(p)->numbytes = l;\ + chkbuf(p);\ + buff[0] = store;\ + memmove(buff + 1, buff + l + 1, n - 1);\ + buff[n] = 0;\ + timer_ptr(p)->numbytes = n;\ + printf("Split cmd: %s\n", w);\ + } +#define touch_timer(x) (timer_ptr(x)->touch = time(NULL)) +#define my_fd(x) (timer_ptr(x)->accept_fd) +#define my_dat(x) (timer_ptr(x)->data) +// handle_accept 初步解析指令,处理部分粘连 +static void handle_accept(void *p) { + if(my_fd(p) > 0) { + puts("Connected to the client."); + signal(SIGQUIT, handle_quit); + signal(SIGPIPE, handle_pipe); + pthread_t thread; + if (pthread_create(&thread, &attr, (void *)&accept_timer, p)) puts("Error creating timer thread"); + else puts("Creating timer thread succeeded"); + send_data(my_fd(p), "Welcome to simple kanban server.", 33); + timer_ptr(p)->status = -1; + while(my_thread(timer_ptr(p)) && (timer_ptr(p)->numbytes = recv(my_fd(p), my_dat(p), TIMERDATSZ, 0)) > 0) { + touch_timer(p); + my_dat(p)[timer_ptr(p)->numbytes] = 0; + printf("Get %d bytes: %s\n", (int)timer_ptr(p)->numbytes, my_dat(p)); + puts("Check buffer"); + //处理部分粘连 + take_word(p, cfg->pwd, my_dat(p)); + take_word(p, "get", my_dat(p)); + take_word(p, "set", my_dat(p)); + take_word(p, "cat", my_dat(p)); + take_word(p, "quit", my_dat(p)); + take_word(p, cfg->sps, my_dat(p)); + take_word(p, "ver", my_dat(p)); + take_word(p, "dat", my_dat(p)); + if(timer_ptr(p)->numbytes > 0) chkbuf(p); + } + printf("Break: recv %d bytes\n", (int)timer_ptr(p)->numbytes); + my_thread(timer_ptr(p)) = 0; + clean_timer(timer_ptr(p)); + } else puts("Error accepting client"); +} + +static void handle_quit(int signo) { + perror("signal quit"); + pthread_exit(NULL); +} + +static void handle_pipe(int signo) { + perror("signal pipe"); + pthread_exit(NULL); +} + +static int listen_socket(int try_times) { int fail_count = 0; int result = -1; while(!~(result = listen(fd, 10)) && fail_count++ < try_times) sleep(1); @@ -119,25 +316,27 @@ int listen_socket(uint32_t try_times) { } } -int send_data(int accept_fd, char *data, size_t length) { - if(!~send(accept_fd, data, length, 0)) { - puts("Send data error"); - return 0; - } else { - printf("Send data: "); - puts(data); - return 1; - } +static FILE *open_file(char* file_path, int lock_type, char* mode) { + FILE *fp = NULL; + fp = fopen(file_path, mode); + if(fp) { + if(!~flock(fileno(fp), lock_type | LOCK_NB)) { + perror("flock"); + fp = NULL; + } + printf("Open file in mode %d\n", lock_type); + } else perror("fopen"); + return fp; } -int send_all(char* file_path, THREADTIMER *timer) { +static int send_all(char* file_path, THREADTIMER *timer) { int re = 1; FILE *fp = open_file(file_path, LOCK_SH, "rb"); if(fp) { timer->fp = fp; timer->is_open = 1; uint32_t file_size = (uint32_t)size_of_file(file_path); - printf("Get file size: %u bytes.\n", file_size); + printf("Get file size: %d bytes.\n", (int)file_size); off_t len = 0; #if __APPLE__ #ifdef WORDS_BIGENDIAN @@ -152,7 +351,7 @@ int send_all(char* file_path, THREADTIMER *timer) { hdtr.trailers = NULL; hdtr.trl_cnt = 0; re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0); - if(!re) perror("Sendfile"); + if(!re) perror("sendfile"); #else #ifdef WORDS_BIGENDIAN uint32_t little_fs = __builtin_bswap32(file_size); @@ -161,21 +360,43 @@ int send_all(char* file_path, THREADTIMER *timer) { send(timer->accept_fd, &file_size, sizeof(uint32_t), 0); #endif re = !sendfile(timer->accept_fd, fileno(fp), &len, file_size) >= 0; - if(!re) perror("Sendfile"); + if(!re) perror("sendfile"); #endif - printf("Send %u bytes.\n", len); + printf("Send %d bytes.\n", (int)len); close_file(fp); timer->is_open = 0; } return re; } -int sm1_pwd(THREADTIMER *timer) { +static int send_data(int accept_fd, char *data, size_t length) { + if(!~send(accept_fd, data, length, 0)) { + puts("Send data error"); + return 0; + } + printf("Send data: "); + if(length > 128) { + data[124] = '.'; + data[125] = '.'; + data[126] = '.'; + data[127] = 0; + } + puts(data); + return 1; +} + +static off_t size_of_file(const char* fname) { + struct stat statbuf; + if(stat(fname, &statbuf)==0) return statbuf.st_size; + else return -1; +} + +static int sm1_pwd(THREADTIMER *timer) { if(!strcmp(cfg->pwd, timer->data)) timer->status = 0; return !timer->status; } -int s0_init(THREADTIMER *timer) { +static int s0_init(THREADTIMER *timer) { if(!strcmp("get", timer->data)) timer->status = 1; else if(!strcmp(cfg->sps, timer->data)) timer->status = 2; else if(!strcmp("cat", timer->data)) return send_all(data_path, timer); @@ -183,7 +404,7 @@ int s0_init(THREADTIMER *timer) { return send_data(timer->accept_fd, timer->data, timer->numbytes); } -int s1_get(THREADTIMER *timer) { //get kanban +static int s1_get(THREADTIMER *timer) { //get kanban FILE *fp = open_file(kanban_path, LOCK_SH, "rb"); timer->status = 0; if(fp) { @@ -208,7 +429,7 @@ int s1_get(THREADTIMER *timer) { //get kanban return close_file_and_send(timer, "null", 4); } -int s2_set(THREADTIMER *timer) { +static int s2_set(THREADTIMER *timer) { FILE *fp = NULL; if(!strcmp(timer->data, "ver")) { fp = open_file(kanban_path, LOCK_EX, "wb"); @@ -226,7 +447,7 @@ int s2_set(THREADTIMER *timer) { } } -int s3_set_data(THREADTIMER *timer) { +static int s3_set_data(THREADTIMER *timer) { timer->status = 0; #ifdef WORDS_BIGENDIAN uint32_t file_size = __builtin_bswap32(*(uint32_t*)(timer->data)); @@ -236,244 +457,40 @@ int s3_set_data(THREADTIMER *timer) { printf("Set data size: %u\n", file_size); int is_first_data = 0; if(timer->numbytes == sizeof(uint32_t)) { - if((timer->numbytes = recv(timer->accept_fd, timer->data, BUFSIZ, 0)) <= 0) + if((timer->numbytes = recv(timer->accept_fd, timer->data, TIMERDATSZ, 0)) <= 0) return close_file_and_send(timer, "erro", 4); - else { - is_first_data = 1; - printf("Get data size: %tu\n", timer->numbytes); - } + is_first_data = 1; + printf("Get data size: %d\n", (int)timer->numbytes); } size_t offset = (is_first_data?0:sizeof(uint32_t)); - if(file_size <= BUFSIZ - offset) { + if(file_size <= TIMERDATSZ - offset) { while(timer->numbytes != file_size - offset) { - timer->numbytes += recv(timer->accept_fd, timer->data + timer->numbytes + offset, BUFSIZ - timer->numbytes - offset, 0); + ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes + offset, TIMERDATSZ - timer->numbytes - offset, MSG_WAITALL); + if(n <= 0) return close_file_and_send(timer, "erro", 4); + timer->numbytes += n; } if(fwrite(timer->data + offset, file_size, 1, timer->fp) != 1) { - puts("Set data error."); + perror("fwrite"); return close_file_and_send(timer, "erro", 4); - } else return close_file_and_send(timer, "succ", 4); - } else { - if(fwrite(timer->data + offset, timer->numbytes - offset, 1, timer->fp) != 1) { - puts("Set data error."); - return close_file_and_send(timer, "erro", 4); - } - int32_t remain = file_size - timer->numbytes; - while(remain > 0) { - printf("remain:%d\n", remain); - timer->numbytes = recv(timer->accept_fd, timer->data, BUFSIZ, 0); - if(fwrite(timer->data, timer->numbytes, 1, timer->fp) != 1) { - puts("Set data error."); - return close_file_and_send(timer, "erro", 4); - } - remain -= timer->numbytes; } return close_file_and_send(timer, "succ", 4); } - return close_file_and_send(timer, "erro", 4); -} - -off_t size_of_file(const char* fname) { - struct stat statbuf; - if(stat(fname, &statbuf)==0) return statbuf.st_size; - else return -1; -} - -int check_buffer(THREADTIMER *timer) { - printf("Status: %d\n", timer->status); - switch(timer->status) { - case -1: return sm1_pwd(timer); break; - case 0: return s0_init(timer); break; - case 1: return s1_get(timer); break; - case 2: return s2_set(timer); break; - case 3: return s3_set_data(timer); break; - default: return -1; break; + if(fwrite(timer->data + offset, timer->numbytes - offset, 1, timer->fp) != 1) { + perror("fwrite"); + return close_file_and_send(timer, "erro", 4); } -} - -void handle_quit(int signo) { - printf("Handle quit with sig %d\n", signo); - pthread_exit(NULL); -} - -#define timer_pointer_of(x) ((THREADTIMER*)(x)) -#define touch_timer(x) timer_pointer_of(x)->touch = time(NULL) - -void accept_timer(void *p) { - THREADTIMER *timer = timer_pointer_of(p); - while(*(timer->thread) && !pthread_kill(*(timer->thread), 0)) { - sleep(MAXWAITSEC / 4); - puts("Check accept status"); - if(time(NULL) - timer->touch > MAXWAITSEC) break; - } - puts("Call kill thread"); - kill_thread(timer); - puts("Free timer"); - free(timer); - puts("Finish calling kill thread"); -} - -void kill_thread(THREADTIMER* timer) { - puts("Start killing."); - if(*(timer->thread)) { - pthread_kill(*(timer->thread), SIGQUIT); - *(timer->thread) = 0; - puts("Kill thread."); - } - if(timer->accept_fd) { - close(timer->accept_fd); - timer->accept_fd = 0; - puts("Close accept."); - } - if(timer->data) { - free(timer->data); - timer->data = NULL; - puts("Free data."); - } - if(timer->is_open) { - close_file(timer->fp); - timer->is_open = 0; - puts("Close file."); - } - puts("Finish killing."); -} - -void handle_pipe(int signo) { - printf("Pipe error: %d\n", signo); - pthread_exit(NULL); -} - -#define chkbuf(p) if(!check_buffer(timer_pointer_of(p))) break - -#define take_word(p, w) if(timer_pointer_of(p)->numbytes > strlen(w) && strstr(buff, w) == buff) {\ - int l = strlen(w);\ - char store = buff[l];\ - buff[l] = 0;\ - ssize_t n = timer_pointer_of(p)->numbytes - l;\ - timer_pointer_of(p)->numbytes = l;\ - chkbuf(p);\ - buff[0] = store;\ - memmove(buff + 1, buff + l + 1, n - 1);\ - buff[n] = 0;\ - timer_pointer_of(p)->numbytes = n;\ - printf("Split cmd: %s\n", w);\ - } - -void handle_accept(void *p) { - int accept_fd = timer_pointer_of(p)->accept_fd; - if(accept_fd > 0) { - puts("Connected to the client."); - signal(SIGQUIT, handle_quit); - signal(SIGPIPE, handle_pipe); - pthread_t thread; - if (pthread_create(&thread, &attr, (void *)&accept_timer, p)) puts("Error creating timer thread"); - else puts("Creating timer thread succeeded"); - send_data(accept_fd, "Welcome to simple kanban server.", 33); - timer_pointer_of(p)->status = -1; - char *buff = calloc(BUFSIZ, sizeof(char)); - if(buff) { - timer_pointer_of(p)->data = buff; - while(*(timer_pointer_of(p)->thread) && (timer_pointer_of(p)->numbytes = recv(accept_fd, buff, BUFSIZ, 0)) > 0) { - touch_timer(p); - buff[timer_pointer_of(p)->numbytes] = 0; - printf("Get %u bytes: %s\n", timer_pointer_of(p)->numbytes, buff); - puts("Check buffer"); - //处理部分粘连 - take_word(p, cfg->pwd); - take_word(p, "get"); - take_word(p, "set"); - take_word(p, "cat"); - take_word(p, "quit"); - take_word(p, cfg->sps); - take_word(p, "ver"); - take_word(p, "dat"); - if(timer_pointer_of(p)->numbytes > 0) chkbuf(p); - } - printf("Break: recv %u bytes\n", timer_pointer_of(p)->numbytes); - } else puts("Error allocating buffer"); - *(timer_pointer_of(p)->thread) = 0; - kill_thread(timer_pointer_of(p)); - } else puts("Error accepting client"); -} - -static pid_t pid; -void accept_client() { - pid = fork(); - while (pid > 0) { //主进程监控子进程状态,如果子进程异常终止则重启之 - wait(NULL); - puts("Server subprocess exited. Restart..."); - pid = fork(); - } - signal(SIGQUIT, handle_quit); - signal(SIGPIPE, handle_pipe); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, 1); - if(pid < 0) puts("Error when forking a subprocess."); - else while(1) { - puts("Ready for accept, waitting..."); - int p = 0; - while(p < THREADCNT && accept_threads[p] && !pthread_kill(accept_threads[p], 0)) p++; - if(p < THREADCNT) { - printf("Run on thread No.%d\n", p); - THREADTIMER *timer = malloc(sizeof(THREADTIMER)); - if(timer) { - timer->accept_fd = accept(fd, (struct sockaddr *)&client_addr, &struct_len); - if(timer->accept_fd <= 0) { - free(timer); - puts("Accept client error."); - } else { - #ifdef LISTEN_ON_IPV6 - uint16_t port = ntohs(client_addr.sin6_port); - struct in6_addr in = client_addr.sin6_addr; - char str[INET6_ADDRSTRLEN]; // 46 - inet_ntop(AF_INET6, &in, str, sizeof(str)); - #else - uint16_t port = ntohs(client_addr.sin_port); - struct in_addr in = client_addr.sin_addr; - char str[INET_ADDRSTRLEN]; // 16 - inet_ntop(AF_INET, &in, str, sizeof(str)); - #endif - printf("Accept client %s:%u\n", str, port); - timer->thread = &accept_threads[p]; - timer->touch = time(NULL); - timer->data = NULL; - timer->is_open = 0; - timer->fp = NULL; - if (pthread_create(timer->thread, &attr, (void *)&handle_accept, timer)) puts("Error creating thread"); - else puts("Creating thread succeeded"); - } - } else puts("Allocate timer error"); - } else { - puts("Max thread cnt exceeded"); - sleep(1); + int32_t remain = file_size - timer->numbytes; + while(remain > 0) { + // printf("remain:%d\n", (int)remain); + ssize_t n = recv(timer->accept_fd, timer->data, (remain>TIMERDATSZ)?TIMERDATSZ:remain, MSG_WAITALL); + if(n <= 0) return close_file_and_send(timer, "erro", 4); + if(fwrite(timer->data, n, 1, timer->fp) != 1) { + perror("fwrite"); + return close_file_and_send(timer, "erro", 4); } + remain -= n; } -} - -FILE *open_file(char* file_path, int lock_type, char* mode) { - FILE *fp = NULL; - fp = fopen(file_path, mode); - if(fp) { - if(!~flock(fileno(fp), lock_type | LOCK_NB)) { - printf("Error: "); - fp = NULL; - } - printf("Open file in mode %d\n", lock_type); - } else puts("Open file error"); - return fp; -} - -int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes) { - close_file(timer->fp); - timer->is_open = 0; - return send_data(timer->accept_fd, data, numbytes); -} - -void close_file(FILE *fp) { - puts("Close file"); - if(fp) { - flock(fileno(fp), LOCK_UN); - fclose(fp); - } + return close_file_and_send(timer, "succ", 4); } int main(int argc, char *argv[]) {