diff --git a/client.c b/client.c index f1ef84c..cdba98d 100644 --- a/client.c +++ b/client.c @@ -88,22 +88,22 @@ int main(int argc,char *argv[]) { //usage: ./client host port hdtr.trailers = NULL; hdtr.trl_cnt = 0; if(!sendfile(fileno(fp), sockfd, 0, &len, &hdtr, 0)) puts("Send file success."); - else puts("Send file error."); + else perror("sendfile"); #else send(sockfd, &file_size, sizeof(uint32_t), 0); if(!sendfile(sockfd, fileno(fp), &len, file_size)) puts("Send file success."); - else puts("Send file error."); + else perror("sendfile"); #endif fclose(fp); - printf("Send count:%d\n", (int)len); - } else puts("Open file error!"); + printf("Send count: %d\n", (int)len); + } else perror("fopen"); } else { send(sockfd, buf, strlen(buf), 0); if(!strcmp(buf, "quit")) exit(EXIT_SUCCESS); } sleep(1); } - } else perror("Create msg thread failed"); + } else perror("pthread_create"); close(sockfd); return 0; } diff --git a/file.h b/file.h index ad4cec7..b9cf138 100644 --- a/file.h +++ b/file.h @@ -3,97 +3,151 @@ #include #include +#include #include #include #include #include +#include -static char* file_filepath[2]; +// FILE_CACHE_MAX_SIZE 1G +#define FILE_CACHE_MAX_SIZE (1024*1024*1024) -static volatile uint16_t has_file_opened[2]; -static volatile uint16_t is_file_opening[2]; -static volatile uint32_t file_owner_index[2] = {(uint32_t)-1, (uint32_t)-1}; +struct file_cache_t { + pthread_rwlock_t mu; + char const *path; + char *data; + size_t size; +}; +typedef struct file_cache_t file_cache_t; -static FILE* file_fp[2]; -static pthread_rwlock_t mu[2]; - -static inline off_t get_file_size(int isdata) { - struct stat statbuf; - if(stat(file_filepath[!!isdata], &statbuf)==0) { - return statbuf.st_size; +int file_cache_init(file_cache_t* fc, char* path) { + static int page_size; + int fd; + struct stat sb; + char* mapped; + if(page_size <= 0) page_size = (int)sysconf(_SC_PAGE_SIZE); + if(pthread_rwlock_init(&fc->mu, NULL)) { + perror("pthread_rwlock_init"); + return -1; } - return -1; -} - -static int init_file(char* file_path[2]) { - int i = 0; - for(; i < 2; i++) { - FILE* fp = fopen(file_path[i], "rb+"); - if(!fp) { - perror("Open file error"); - return 2; - } - int err = pthread_rwlock_init(&mu[i], NULL); - if(err) { - perror("Init lock error"); - return 1; - } - file_filepath[i] = file_path[i]; - fclose(fp); + fd = open(path, O_RDWR|O_CREAT); + if(fd < 0) { + perror("open"); + return -2; } + if(fstat(fd, &sb) < 0) { + perror("fstat"); + return -3; + } + if(sb.st_size < page_size) { + if(ftruncate(fd, page_size) < 0) { + perror("ftruncate"); + return -4; + } + } + mapped = mmap(NULL, (size_t)sb.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if(mapped == MAP_FAILED) { + perror("mmap"); + return -5; + } + fc->path = path; + fc->data = mapped+sizeof(uint64_t); + fc->size = (size_t)sb.st_size; return 0; } -static inline FILE* open_file(uint32_t index, int isdata, int isro) { - isdata = !!isdata; - is_file_opening[isdata] = 1; - if(pthread_rwlock_wrlock(&mu[isdata])) { - perror("Open file: Writelock busy"); - is_file_opening[isdata] = 0; - return NULL; - } - is_file_opening[isdata] = 0; - file_fp[isdata] = fopen(file_filepath[isdata], isro?"rb":"rb+"); - if(!file_fp[isdata]) { - perror("Open file: fopen"); - pthread_rwlock_unlock(&mu[isdata]); - return NULL; - } - has_file_opened[isdata] = 1; - file_owner_index[isdata] = index; - puts("Open file"); - return file_fp[isdata]; +uint64_t file_cache_get_data_size(file_cache_t* fc) { + #ifdef WORDS_BIGENDIAN + return __builtin_bswap64(*(uint64_t*)(fc->data - sizeof(uint64_t))); + #else + return *(uint64_t*)(fc->data - sizeof(uint64_t)); + #endif } -static inline int require_shared_lock(int isdata) { - if(pthread_rwlock_rdlock(&mu[!!isdata])) { - perror("Open file: Readlock busy"); +void file_cache_set_data_size(file_cache_t* fc, uint64_t size) { + #ifdef WORDS_BIGENDIAN + *(uint64_t*)(fc->data - sizeof(uint64_t)) = __builtin_bswap64(size); + #else + *(uint64_t*)(fc->data - sizeof(uint64_t)) = size; + #endif +} + +int file_cache_read_lock(file_cache_t* fc) { + if(pthread_rwlock_rdlock(&fc->mu)) { + perror("pthread_rwlock_rdlock"); return 1; } - puts("Shared lock required"); + puts("file_cache_read_lock: obtained"); return 0; } -static inline void release_shared_lock(int isdata) { - pthread_rwlock_unlock(&mu[!!isdata]); - puts("Release shared lock"); +int file_cache_write_lock(file_cache_t* fc) { + if(pthread_rwlock_wrlock(&fc->mu)) { + perror("pthread_rwlock_wrlock"); + return 1; + } + puts("file_cache_write_lock: obtained"); + return 0; } -static inline void close_file(uint32_t index, int isdata) { - isdata = !!isdata; - if(index != file_owner_index[isdata]) return; - if(has_file_opened[isdata]) { - fclose(file_fp[isdata]); - file_fp[isdata] = NULL; - has_file_opened[isdata] = 0; - file_owner_index[isdata] = (uint32_t)-1; - pthread_rwlock_unlock(&mu[isdata]); - puts("Close file"); - } else puts("file already closed"); +int file_cache_unlock(file_cache_t* fc) { + if(pthread_rwlock_unlock(&fc->mu)) { + perror("file_cache_unlock"); + return 1; + } + puts("file_cache_unlock: success"); + return 0; } -static void close_file_wrap(uint32_t index_isdata[2]) { - close_file(index_isdata[0], (int)index_isdata[1]); +// file_cache_realloc must be used after obtaining write lock +int file_cache_realloc(file_cache_t* fc, uint64_t newsize) { + if(newsize > FILE_CACHE_MAX_SIZE) { + printf("file_cache_realloc: too big size %llu\n", newsize); + return -1; + } + if(newsize <= fc->size - sizeof(uint64_t)) { + file_cache_set_data_size(fc, newsize); + printf("file_cache_realloc: new data size %llu bytes (fast)\n", newsize); + return 0; + } + if(munmap(fc->data - sizeof(uint64_t), fc->size) < 0) { + perror("munmap"); + return -2; + } + int fd = open(fc->path, O_RDWR|O_CREAT); + if(fd < 0) { + perror("open"); + return -3; + } + fc->size = (size_t)newsize + sizeof(uint64_t); + if(ftruncate(fd, fc->size) < 0) { + perror("ftruncate"); + return -4; + } + fc->data = mmap(NULL, fc->size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if(fc->data == MAP_FAILED) { + perror("mmap"); + return -5; + } + fc->data += sizeof(uint64_t); + file_cache_set_data_size(fc, newsize); + printf("file_cache_realloc: new data size %llu bytes\n", newsize); + return 0; +} + +int file_cache_close(file_cache_t* fc) { + if(munmap(fc->data - sizeof(uint64_t), fc->size) < 0) { + perror("munmap"); + return -1; + } + if(pthread_rwlock_destroy(&fc->mu)) { + perror("pthread_rwlock_destroy"); + return -2; + } + return 0; } #endif diff --git a/server.c b/server.c index 62a1b75..a972e32 100644 --- a/server.c +++ b/server.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -31,28 +32,43 @@ static char *data_path; // cat 命令读取的文件位置 static char *kanban_path; // get 命令读取的文件位置 +static file_cache_t data_file_cache; +static file_cache_t kanban_file_cache; + static uint8_t _cfg[sizeof(simple_pb_t)+sizeof(config_t)]; #define cfg ((const const_config_t*)(_cfg+sizeof(simple_pb_t))) // 存储 pwd 和 sps #define TCPOOL_THREAD_TIMER_T_SZ 65536 + #define TCPOOL_MAXWAITSEC 16 + #define SERVER_THREAD_BUFSZ ( \ TCPOOL_THREAD_TIMER_T_SZ \ -TCPOOL_THREAD_TIMER_T_HEAD_SZ \ -sizeof(ssize_t)-2*sizeof(uint8_t) \ ) + #define TCPOOL_THREAD_CONTEXT \ ssize_t numbytes; /* 本次接收的数据长度 */ \ int8_t status; /* 本会话所处的状态 */ \ uint8_t isdata; /* 是否为 datfile */ \ + uint8_t isopen; /* 是否获得了文件锁 */ \ char data[SERVER_THREAD_BUFSZ] -#define TCPOOL_TOUCH_TIMER_CONDITION (*(volatile uint32_t*)(is_file_opening)) -#define TCPOOL_INIT_ACTION init_file((char*[]){kanban_path, data_path}) + +#define TCPOOL_TOUCH_TIMER_CONDITION 0 + +#define TCPOOL_INIT_ACTION \ + file_cache_init(&data_file_cache, data_path); \ + file_cache_init(&kanban_file_cache, kanban_path); + #define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) \ timer->status = -1; \ timer->isdata = 0; + #define TCPOOL_CLEANUP_THREAD_ACTION(timer) \ - close_file(timer->index, timer->isdata); \ + if(timer->isopen) file_cache_unlock(timer->isdata?&data_file_cache:&kanban_file_cache); \ + timer->isopen = 0; \ + timer->isdata = 0; \ timer->status = -1; #include "tcpool.h" @@ -137,39 +153,32 @@ static int check_buffer(tcpool_thread_timer_t *timer) { } static int send_all(tcpool_thread_timer_t *timer) { - int re = 1; - FILE *fp = open_file(timer->index, timer->isdata, 1); - if(fp == NULL) return 1; - uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata}; - pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data); - off_t len = 0, file_size = get_file_size(timer->isdata); - printf("Get file size: %d bytes, ", (int)file_size); - #if __APPLE__ - #ifdef WORDS_BIGENDIAN - file_size = __DARWIN_OSSwapInt32(file_size); - #endif - struct sf_hdtr hdtr; - struct iovec headers; - headers.iov_base = &file_size; - headers.iov_len = sizeof(uint32_t); - hdtr.headers = &headers; - hdtr.hdr_cnt = 1; - hdtr.trailers = NULL; - hdtr.trl_cnt = 0; - re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0); - if(!re) perror("sendfile"); - #else - #ifdef WORDS_BIGENDIAN - uint32_t little_fs = __builtin_bswap32(file_size); - send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0); - #else - send(timer->accept_fd, &file_size, sizeof(uint32_t), 0); - #endif - re = sendfile(timer->accept_fd, fileno(fp), &len, file_size) > 0; - if(!re) perror("sendfile"); + int re; + file_cache_t* fc = timer->isdata?&data_file_cache:&kanban_file_cache; + if(file_cache_read_lock(fc)) { + return 0; + } + pthread_cleanup_push((void (*)(void*))&file_cache_unlock, (void*)fc); + uint64_t file_size = file_cache_get_data_size(fc); + printf("Get file size: %llu bytes, ", file_size); + #ifdef WORDS_BIGENDIAN + uint32_t little_fs = __builtin_bswap32(file_size); #endif - printf("Send %d bytes\n", (int)len); + struct iovec iov[2] = { + #ifdef WORDS_BIGENDIAN + {&little_fs, sizeof(uint32_t)}, + #else + {&file_size, sizeof(uint32_t)}, + #endif + {(void*)fc->data, file_size} + }; + re = writev(timer->accept_fd, (const struct iovec *)&iov, 2); pthread_cleanup_pop(1); + if(re <= 0) { + perror("writev"); + return 0; + } + printf("Send %d bytes\n", re); return re; } @@ -223,26 +232,25 @@ static int s0_init(tcpool_thread_timer_t *timer) { // s1_get scan getxxx static int s1_get(tcpool_thread_timer_t *timer) { - FILE *fp = open_file(timer->index, 0, 1); - timer->status = 0; - if(!fp) goto GET_END; - + file_cache_t* fc = timer->isdata?&data_file_cache:&kanban_file_cache; uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata}; int r; uint32_t ver, cli_ver; r = send_data(timer->accept_fd, "get", 3); if (!r) goto GET_END; - - pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data); - timer->isdata = 0; - r = fscanf(fp, "%u", &ver); + if(file_cache_read_lock(fc)) { + goto GET_END; + } + timer->status = 0; + pthread_cleanup_push((void (*)(void*))&file_cache_unlock, (void*)fc); + r = sscanf(fc->data, "%u", &ver); pthread_cleanup_pop(1); if(r <= 0) goto GET_END; if(sscanf(timer->data, "%u", &cli_ver) <= 0) goto GET_END; if(cli_ver >= ver) goto GET_END; - //need to send a new kanban + // need to send a new kanban r = send_all(timer); goto GET_SKIP; @@ -282,13 +290,25 @@ static int s3_set_data(tcpool_thread_timer_t *timer) { char ret[4]; *(uint32_t*)ret = *(uint32_t*)"succ"; timer->status = 0; - FILE* fp = open_file(timer->index, timer->isdata, 0); - uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata}; - pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data); + int recv_bufsz; + socklen_t optlen = sizeof(recv_bufsz); + if(getsockopt(timer->accept_fd, SOL_SOCKET, SO_RCVBUF, &recv_bufsz, &optlen)) { + perror("getsockopt"); + *(uint32_t*)ret = *(uint32_t*)"erop"; + goto S3_RETURN; + } + printf("Set recv buffer size: %d\n", recv_bufsz); + file_cache_t* fc = timer->isdata?&data_file_cache:&kanban_file_cache; + if(file_cache_write_lock(fc)) { + *(uint32_t*)ret = *(uint32_t*)"erwl"; + goto S3_RETURN; + } + pthread_cleanup_push((void (*)(void*))&file_cache_unlock, (void*)fc); if(timer->numbytes < 4) { ssize_t n = recv(timer->accept_fd, timer->data+timer->numbytes, 4-timer->numbytes, MSG_WAITALL); if(n < 4-timer->numbytes) { - *(uint32_t*)ret = *(uint32_t*)"erro"; + *(uint32_t*)ret = *(uint32_t*)"ercN"; + perror("recv"); goto S3_RETURN; } } @@ -297,50 +317,53 @@ static int s3_set_data(tcpool_thread_timer_t *timer) { #else uint32_t file_size = *(uint32_t*)(timer->data); #endif - printf("Set data size: %u\n", file_size); - if((timer->numbytes = recv(timer->accept_fd, timer->data, SERVER_THREAD_BUFSZ, 0)) < 0 && errno != EAGAIN) { - *(uint32_t*)ret = *(uint32_t*)"erro"; + printf("Client set data size: %u\n", file_size); + timer->numbytes -= 4; + if(file_cache_realloc(fc, (uint64_t)file_size)) { + *(uint32_t*)ret = *(uint32_t*)"eral"; goto S3_RETURN; } - printf("Get data size: %d\n", (int)timer->numbytes); - if(file_size <= SERVER_THREAD_BUFSZ) { - while(timer->numbytes != file_size) { - ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes, SERVER_THREAD_BUFSZ - timer->numbytes, MSG_WAITALL); - if(n <= 0) { - *(uint32_t*)ret = *(uint32_t*)"erro"; + if(timer->numbytes >= file_size) { + memcpy(fc->data, timer->data+4, file_size); + puts("All data received and copied"); + goto S3_RETURN; + } + ssize_t recvlen = 0, p = 0; + if(timer->numbytes > 0) { + p = timer->numbytes; + memcpy(fc->data, timer->data+4, p); + file_size -= p; + printf("Copy received data: %zd bytes, remain: %u bytes\n", p, file_size); + } + if((uint64_t)file_size <= (uint64_t)recv_bufsz) { + if((recvlen = recv(timer->accept_fd, fc->data+p, (size_t)file_size, MSG_WAITALL)) != (ssize_t)file_size) { + *(uint32_t*)ret = *(uint32_t*)"ercA"; + perror("recv"); + goto S3_RETURN; + } + printf("Recv from client: %zd bytes\n", recvlen); + } else { + puts("Start loop recv"); + while((recvlen = recv( + timer->accept_fd, fc->data+p, + (size_t)(((uint64_t)file_size>(uint64_t)recv_bufsz)?recv_bufsz:file_size), MSG_WAITALL) + ) > 0) { + if(recvlen <= 0 || (uint32_t)recvlen > file_size) { + *(uint32_t*)ret = *(uint32_t*)"ercM"; + perror("recv"); goto S3_RETURN; } - timer->numbytes += n; + file_size -= (uint32_t)recvlen; + p += recvlen; + printf("Loop recv from client: %zd bytes, remain: %u bytes\n", recvlen, file_size); + if(file_size == 0) break; } - if(fwrite(timer->data, file_size, 1, fp) != 1) { - perror("fwrite"); - *(uint32_t*)ret = *(uint32_t*)"erro"; - } - goto S3_RETURN; - } - if(timer->numbytes > 0 && fwrite(timer->data, timer->numbytes, 1, fp) != 1) { - perror("fwrite"); - *(uint32_t*)ret = *(uint32_t*)"erro"; - goto S3_RETURN; - } - int32_t remain = file_size - timer->numbytes; - while(remain > 0) { - // printf("remain:%d\n", (int)remain); - ssize_t n = recv(timer->accept_fd, timer->data, (remain>SERVER_THREAD_BUFSZ)?SERVER_THREAD_BUFSZ:remain, MSG_WAITALL); - if(n < 0) { - *(uint32_t*)ret = *(uint32_t*)"erro"; + if(recvlen <= 0) { + *(uint32_t*)ret = *(uint32_t*)"ercF"; + perror("recv"); goto S3_RETURN; } - else if(!n) { - usleep(10000); // 10 ms - continue; - } - if(fwrite(timer->data, n, 1, fp) != 1) { - perror("fwrite"); - *(uint32_t*)ret = *(uint32_t*)"erro"; - goto S3_RETURN; - } - remain -= n; + puts("Finish loop recv"); } S3_RETURN: pthread_cleanup_pop(1); @@ -402,3 +425,8 @@ int main(int argc, char *argv[]) { pthread_cleanup_pop(1); return 99; } + +static void __attribute__((destructor)) defer_close_cache_files() { + file_cache_close(&data_file_cache); + file_cache_close(&kanban_file_cache); +}