mirror of
https://github.com/fumiama/simple-kanban.git
synced 2026-06-28 15:10:27 +08:00
feat: use mmap instaed of fp operation
This commit is contained in:
10
client.c
10
client.c
@@ -88,22 +88,22 @@ int main(int argc,char *argv[]) { //usage: ./client host port
|
|||||||
hdtr.trailers = NULL;
|
hdtr.trailers = NULL;
|
||||||
hdtr.trl_cnt = 0;
|
hdtr.trl_cnt = 0;
|
||||||
if(!sendfile(fileno(fp), sockfd, 0, &len, &hdtr, 0)) puts("Send file success.");
|
if(!sendfile(fileno(fp), sockfd, 0, &len, &hdtr, 0)) puts("Send file success.");
|
||||||
else puts("Send file error.");
|
else perror("sendfile");
|
||||||
#else
|
#else
|
||||||
send(sockfd, &file_size, sizeof(uint32_t), 0);
|
send(sockfd, &file_size, sizeof(uint32_t), 0);
|
||||||
if(!sendfile(sockfd, fileno(fp), &len, file_size)) puts("Send file success.");
|
if(!sendfile(sockfd, fileno(fp), &len, file_size)) puts("Send file success.");
|
||||||
else puts("Send file error.");
|
else perror("sendfile");
|
||||||
#endif
|
#endif
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
printf("Send count:%d\n", (int)len);
|
printf("Send count: %d\n", (int)len);
|
||||||
} else puts("Open file error!");
|
} else perror("fopen");
|
||||||
} else {
|
} else {
|
||||||
send(sockfd, buf, strlen(buf), 0);
|
send(sockfd, buf, strlen(buf), 0);
|
||||||
if(!strcmp(buf, "quit")) exit(EXIT_SUCCESS);
|
if(!strcmp(buf, "quit")) exit(EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
sleep(1);
|
sleep(1);
|
||||||
}
|
}
|
||||||
} else perror("Create msg thread failed");
|
} else perror("pthread_create");
|
||||||
close(sockfd);
|
close(sockfd);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
190
file.h
190
file.h
@@ -3,97 +3,151 @@
|
|||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <sys/mman.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
static char* file_filepath[2];
|
// FILE_CACHE_MAX_SIZE 1G
|
||||||
|
#define FILE_CACHE_MAX_SIZE (1024*1024*1024)
|
||||||
|
|
||||||
static volatile uint16_t has_file_opened[2];
|
struct file_cache_t {
|
||||||
static volatile uint16_t is_file_opening[2];
|
pthread_rwlock_t mu;
|
||||||
static volatile uint32_t file_owner_index[2] = {(uint32_t)-1, (uint32_t)-1};
|
char const *path;
|
||||||
|
char *data;
|
||||||
|
size_t size;
|
||||||
|
};
|
||||||
|
typedef struct file_cache_t file_cache_t;
|
||||||
|
|
||||||
static FILE* file_fp[2];
|
int file_cache_init(file_cache_t* fc, char* path) {
|
||||||
static pthread_rwlock_t mu[2];
|
static int page_size;
|
||||||
|
int fd;
|
||||||
static inline off_t get_file_size(int isdata) {
|
struct stat sb;
|
||||||
struct stat statbuf;
|
char* mapped;
|
||||||
if(stat(file_filepath[!!isdata], &statbuf)==0) {
|
if(page_size <= 0) page_size = (int)sysconf(_SC_PAGE_SIZE);
|
||||||
return statbuf.st_size;
|
if(pthread_rwlock_init(&fc->mu, NULL)) {
|
||||||
|
perror("pthread_rwlock_init");
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
return -1;
|
fd = open(path, O_RDWR|O_CREAT);
|
||||||
}
|
if(fd < 0) {
|
||||||
|
perror("open");
|
||||||
static int init_file(char* file_path[2]) {
|
return -2;
|
||||||
int i = 0;
|
|
||||||
for(; i < 2; i++) {
|
|
||||||
FILE* fp = fopen(file_path[i], "rb+");
|
|
||||||
if(!fp) {
|
|
||||||
perror("Open file error");
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
int err = pthread_rwlock_init(&mu[i], NULL);
|
|
||||||
if(err) {
|
|
||||||
perror("Init lock error");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
file_filepath[i] = file_path[i];
|
|
||||||
fclose(fp);
|
|
||||||
}
|
}
|
||||||
|
if(fstat(fd, &sb) < 0) {
|
||||||
|
perror("fstat");
|
||||||
|
return -3;
|
||||||
|
}
|
||||||
|
if(sb.st_size < page_size) {
|
||||||
|
if(ftruncate(fd, page_size) < 0) {
|
||||||
|
perror("ftruncate");
|
||||||
|
return -4;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mapped = mmap(NULL, (size_t)sb.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
|
||||||
|
close(fd);
|
||||||
|
if(mapped == MAP_FAILED) {
|
||||||
|
perror("mmap");
|
||||||
|
return -5;
|
||||||
|
}
|
||||||
|
fc->path = path;
|
||||||
|
fc->data = mapped+sizeof(uint64_t);
|
||||||
|
fc->size = (size_t)sb.st_size;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline FILE* open_file(uint32_t index, int isdata, int isro) {
|
uint64_t file_cache_get_data_size(file_cache_t* fc) {
|
||||||
isdata = !!isdata;
|
#ifdef WORDS_BIGENDIAN
|
||||||
is_file_opening[isdata] = 1;
|
return __builtin_bswap64(*(uint64_t*)(fc->data - sizeof(uint64_t)));
|
||||||
if(pthread_rwlock_wrlock(&mu[isdata])) {
|
#else
|
||||||
perror("Open file: Writelock busy");
|
return *(uint64_t*)(fc->data - sizeof(uint64_t));
|
||||||
is_file_opening[isdata] = 0;
|
#endif
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
is_file_opening[isdata] = 0;
|
|
||||||
file_fp[isdata] = fopen(file_filepath[isdata], isro?"rb":"rb+");
|
|
||||||
if(!file_fp[isdata]) {
|
|
||||||
perror("Open file: fopen");
|
|
||||||
pthread_rwlock_unlock(&mu[isdata]);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
has_file_opened[isdata] = 1;
|
|
||||||
file_owner_index[isdata] = index;
|
|
||||||
puts("Open file");
|
|
||||||
return file_fp[isdata];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int require_shared_lock(int isdata) {
|
void file_cache_set_data_size(file_cache_t* fc, uint64_t size) {
|
||||||
if(pthread_rwlock_rdlock(&mu[!!isdata])) {
|
#ifdef WORDS_BIGENDIAN
|
||||||
perror("Open file: Readlock busy");
|
*(uint64_t*)(fc->data - sizeof(uint64_t)) = __builtin_bswap64(size);
|
||||||
|
#else
|
||||||
|
*(uint64_t*)(fc->data - sizeof(uint64_t)) = size;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
int file_cache_read_lock(file_cache_t* fc) {
|
||||||
|
if(pthread_rwlock_rdlock(&fc->mu)) {
|
||||||
|
perror("pthread_rwlock_rdlock");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
puts("Shared lock required");
|
puts("file_cache_read_lock: obtained");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void release_shared_lock(int isdata) {
|
int file_cache_write_lock(file_cache_t* fc) {
|
||||||
pthread_rwlock_unlock(&mu[!!isdata]);
|
if(pthread_rwlock_wrlock(&fc->mu)) {
|
||||||
puts("Release shared lock");
|
perror("pthread_rwlock_wrlock");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
puts("file_cache_write_lock: obtained");
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void close_file(uint32_t index, int isdata) {
|
int file_cache_unlock(file_cache_t* fc) {
|
||||||
isdata = !!isdata;
|
if(pthread_rwlock_unlock(&fc->mu)) {
|
||||||
if(index != file_owner_index[isdata]) return;
|
perror("file_cache_unlock");
|
||||||
if(has_file_opened[isdata]) {
|
return 1;
|
||||||
fclose(file_fp[isdata]);
|
}
|
||||||
file_fp[isdata] = NULL;
|
puts("file_cache_unlock: success");
|
||||||
has_file_opened[isdata] = 0;
|
return 0;
|
||||||
file_owner_index[isdata] = (uint32_t)-1;
|
|
||||||
pthread_rwlock_unlock(&mu[isdata]);
|
|
||||||
puts("Close file");
|
|
||||||
} else puts("file already closed");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void close_file_wrap(uint32_t index_isdata[2]) {
|
// file_cache_realloc must be used after obtaining write lock
|
||||||
close_file(index_isdata[0], (int)index_isdata[1]);
|
int file_cache_realloc(file_cache_t* fc, uint64_t newsize) {
|
||||||
|
if(newsize > FILE_CACHE_MAX_SIZE) {
|
||||||
|
printf("file_cache_realloc: too big size %llu\n", newsize);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if(newsize <= fc->size - sizeof(uint64_t)) {
|
||||||
|
file_cache_set_data_size(fc, newsize);
|
||||||
|
printf("file_cache_realloc: new data size %llu bytes (fast)\n", newsize);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if(munmap(fc->data - sizeof(uint64_t), fc->size) < 0) {
|
||||||
|
perror("munmap");
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
int fd = open(fc->path, O_RDWR|O_CREAT);
|
||||||
|
if(fd < 0) {
|
||||||
|
perror("open");
|
||||||
|
return -3;
|
||||||
|
}
|
||||||
|
fc->size = (size_t)newsize + sizeof(uint64_t);
|
||||||
|
if(ftruncate(fd, fc->size) < 0) {
|
||||||
|
perror("ftruncate");
|
||||||
|
return -4;
|
||||||
|
}
|
||||||
|
fc->data = mmap(NULL, fc->size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
|
||||||
|
close(fd);
|
||||||
|
if(fc->data == MAP_FAILED) {
|
||||||
|
perror("mmap");
|
||||||
|
return -5;
|
||||||
|
}
|
||||||
|
fc->data += sizeof(uint64_t);
|
||||||
|
file_cache_set_data_size(fc, newsize);
|
||||||
|
printf("file_cache_realloc: new data size %llu bytes\n", newsize);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int file_cache_close(file_cache_t* fc) {
|
||||||
|
if(munmap(fc->data - sizeof(uint64_t), fc->size) < 0) {
|
||||||
|
perror("munmap");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if(pthread_rwlock_destroy(&fc->mu)) {
|
||||||
|
perror("pthread_rwlock_destroy");
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
196
server.c
196
server.c
@@ -13,6 +13,7 @@
|
|||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
|
#include <sys/uio.h>
|
||||||
#include <sys/wait.h>
|
#include <sys/wait.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
@@ -31,28 +32,43 @@
|
|||||||
static char *data_path; // cat 命令读取的文件位置
|
static char *data_path; // cat 命令读取的文件位置
|
||||||
static char *kanban_path; // get 命令读取的文件位置
|
static char *kanban_path; // get 命令读取的文件位置
|
||||||
|
|
||||||
|
static file_cache_t data_file_cache;
|
||||||
|
static file_cache_t kanban_file_cache;
|
||||||
|
|
||||||
static uint8_t _cfg[sizeof(simple_pb_t)+sizeof(config_t)];
|
static uint8_t _cfg[sizeof(simple_pb_t)+sizeof(config_t)];
|
||||||
#define cfg ((const const_config_t*)(_cfg+sizeof(simple_pb_t))) // 存储 pwd 和 sps
|
#define cfg ((const const_config_t*)(_cfg+sizeof(simple_pb_t))) // 存储 pwd 和 sps
|
||||||
|
|
||||||
#define TCPOOL_THREAD_TIMER_T_SZ 65536
|
#define TCPOOL_THREAD_TIMER_T_SZ 65536
|
||||||
|
|
||||||
#define TCPOOL_MAXWAITSEC 16
|
#define TCPOOL_MAXWAITSEC 16
|
||||||
|
|
||||||
#define SERVER_THREAD_BUFSZ ( \
|
#define SERVER_THREAD_BUFSZ ( \
|
||||||
TCPOOL_THREAD_TIMER_T_SZ \
|
TCPOOL_THREAD_TIMER_T_SZ \
|
||||||
-TCPOOL_THREAD_TIMER_T_HEAD_SZ \
|
-TCPOOL_THREAD_TIMER_T_HEAD_SZ \
|
||||||
-sizeof(ssize_t)-2*sizeof(uint8_t) \
|
-sizeof(ssize_t)-2*sizeof(uint8_t) \
|
||||||
)
|
)
|
||||||
|
|
||||||
#define TCPOOL_THREAD_CONTEXT \
|
#define TCPOOL_THREAD_CONTEXT \
|
||||||
ssize_t numbytes; /* 本次接收的数据长度 */ \
|
ssize_t numbytes; /* 本次接收的数据长度 */ \
|
||||||
int8_t status; /* 本会话所处的状态 */ \
|
int8_t status; /* 本会话所处的状态 */ \
|
||||||
uint8_t isdata; /* 是否为 datfile */ \
|
uint8_t isdata; /* 是否为 datfile */ \
|
||||||
|
uint8_t isopen; /* 是否获得了文件锁 */ \
|
||||||
char data[SERVER_THREAD_BUFSZ]
|
char data[SERVER_THREAD_BUFSZ]
|
||||||
#define TCPOOL_TOUCH_TIMER_CONDITION (*(volatile uint32_t*)(is_file_opening))
|
|
||||||
#define TCPOOL_INIT_ACTION init_file((char*[]){kanban_path, data_path})
|
#define TCPOOL_TOUCH_TIMER_CONDITION 0
|
||||||
|
|
||||||
|
#define TCPOOL_INIT_ACTION \
|
||||||
|
file_cache_init(&data_file_cache, data_path); \
|
||||||
|
file_cache_init(&kanban_file_cache, kanban_path);
|
||||||
|
|
||||||
#define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) \
|
#define TCPOOL_PREHANDLE_ACCEPT_ACTION(timer) \
|
||||||
timer->status = -1; \
|
timer->status = -1; \
|
||||||
timer->isdata = 0;
|
timer->isdata = 0;
|
||||||
|
|
||||||
#define TCPOOL_CLEANUP_THREAD_ACTION(timer) \
|
#define TCPOOL_CLEANUP_THREAD_ACTION(timer) \
|
||||||
close_file(timer->index, timer->isdata); \
|
if(timer->isopen) file_cache_unlock(timer->isdata?&data_file_cache:&kanban_file_cache); \
|
||||||
|
timer->isopen = 0; \
|
||||||
|
timer->isdata = 0; \
|
||||||
timer->status = -1;
|
timer->status = -1;
|
||||||
|
|
||||||
#include "tcpool.h"
|
#include "tcpool.h"
|
||||||
@@ -137,39 +153,32 @@ static int check_buffer(tcpool_thread_timer_t *timer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int send_all(tcpool_thread_timer_t *timer) {
|
static int send_all(tcpool_thread_timer_t *timer) {
|
||||||
int re = 1;
|
int re;
|
||||||
FILE *fp = open_file(timer->index, timer->isdata, 1);
|
file_cache_t* fc = timer->isdata?&data_file_cache:&kanban_file_cache;
|
||||||
if(fp == NULL) return 1;
|
if(file_cache_read_lock(fc)) {
|
||||||
uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata};
|
return 0;
|
||||||
pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data);
|
}
|
||||||
off_t len = 0, file_size = get_file_size(timer->isdata);
|
pthread_cleanup_push((void (*)(void*))&file_cache_unlock, (void*)fc);
|
||||||
printf("Get file size: %d bytes, ", (int)file_size);
|
uint64_t file_size = file_cache_get_data_size(fc);
|
||||||
#if __APPLE__
|
printf("Get file size: %llu bytes, ", file_size);
|
||||||
#ifdef WORDS_BIGENDIAN
|
#ifdef WORDS_BIGENDIAN
|
||||||
file_size = __DARWIN_OSSwapInt32(file_size);
|
uint32_t little_fs = __builtin_bswap32(file_size);
|
||||||
#endif
|
|
||||||
struct sf_hdtr hdtr;
|
|
||||||
struct iovec headers;
|
|
||||||
headers.iov_base = &file_size;
|
|
||||||
headers.iov_len = sizeof(uint32_t);
|
|
||||||
hdtr.headers = &headers;
|
|
||||||
hdtr.hdr_cnt = 1;
|
|
||||||
hdtr.trailers = NULL;
|
|
||||||
hdtr.trl_cnt = 0;
|
|
||||||
re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0);
|
|
||||||
if(!re) perror("sendfile");
|
|
||||||
#else
|
|
||||||
#ifdef WORDS_BIGENDIAN
|
|
||||||
uint32_t little_fs = __builtin_bswap32(file_size);
|
|
||||||
send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0);
|
|
||||||
#else
|
|
||||||
send(timer->accept_fd, &file_size, sizeof(uint32_t), 0);
|
|
||||||
#endif
|
|
||||||
re = sendfile(timer->accept_fd, fileno(fp), &len, file_size) > 0;
|
|
||||||
if(!re) perror("sendfile");
|
|
||||||
#endif
|
#endif
|
||||||
printf("Send %d bytes\n", (int)len);
|
struct iovec iov[2] = {
|
||||||
|
#ifdef WORDS_BIGENDIAN
|
||||||
|
{&little_fs, sizeof(uint32_t)},
|
||||||
|
#else
|
||||||
|
{&file_size, sizeof(uint32_t)},
|
||||||
|
#endif
|
||||||
|
{(void*)fc->data, file_size}
|
||||||
|
};
|
||||||
|
re = writev(timer->accept_fd, (const struct iovec *)&iov, 2);
|
||||||
pthread_cleanup_pop(1);
|
pthread_cleanup_pop(1);
|
||||||
|
if(re <= 0) {
|
||||||
|
perror("writev");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
printf("Send %d bytes\n", re);
|
||||||
return re;
|
return re;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,26 +232,25 @@ static int s0_init(tcpool_thread_timer_t *timer) {
|
|||||||
|
|
||||||
// s1_get scan getxxx
|
// s1_get scan getxxx
|
||||||
static int s1_get(tcpool_thread_timer_t *timer) {
|
static int s1_get(tcpool_thread_timer_t *timer) {
|
||||||
FILE *fp = open_file(timer->index, 0, 1);
|
file_cache_t* fc = timer->isdata?&data_file_cache:&kanban_file_cache;
|
||||||
timer->status = 0;
|
|
||||||
if(!fp) goto GET_END;
|
|
||||||
|
|
||||||
uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata};
|
uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata};
|
||||||
int r; uint32_t ver, cli_ver;
|
int r; uint32_t ver, cli_ver;
|
||||||
|
|
||||||
r = send_data(timer->accept_fd, "get", 3);
|
r = send_data(timer->accept_fd, "get", 3);
|
||||||
if (!r) goto GET_END;
|
if (!r) goto GET_END;
|
||||||
|
if(file_cache_read_lock(fc)) {
|
||||||
pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data);
|
goto GET_END;
|
||||||
timer->isdata = 0;
|
}
|
||||||
r = fscanf(fp, "%u", &ver);
|
timer->status = 0;
|
||||||
|
pthread_cleanup_push((void (*)(void*))&file_cache_unlock, (void*)fc);
|
||||||
|
r = sscanf(fc->data, "%u", &ver);
|
||||||
pthread_cleanup_pop(1);
|
pthread_cleanup_pop(1);
|
||||||
|
|
||||||
if(r <= 0) goto GET_END;
|
if(r <= 0) goto GET_END;
|
||||||
if(sscanf(timer->data, "%u", &cli_ver) <= 0) goto GET_END;
|
if(sscanf(timer->data, "%u", &cli_ver) <= 0) goto GET_END;
|
||||||
if(cli_ver >= ver) goto GET_END;
|
if(cli_ver >= ver) goto GET_END;
|
||||||
|
|
||||||
//need to send a new kanban
|
// need to send a new kanban
|
||||||
r = send_all(timer);
|
r = send_all(timer);
|
||||||
goto GET_SKIP;
|
goto GET_SKIP;
|
||||||
|
|
||||||
@@ -282,13 +290,25 @@ static int s3_set_data(tcpool_thread_timer_t *timer) {
|
|||||||
char ret[4];
|
char ret[4];
|
||||||
*(uint32_t*)ret = *(uint32_t*)"succ";
|
*(uint32_t*)ret = *(uint32_t*)"succ";
|
||||||
timer->status = 0;
|
timer->status = 0;
|
||||||
FILE* fp = open_file(timer->index, timer->isdata, 0);
|
int recv_bufsz;
|
||||||
uint32_t close_file_wrap_data[2] = {timer->index, (uint32_t)timer->isdata};
|
socklen_t optlen = sizeof(recv_bufsz);
|
||||||
pthread_cleanup_push((void (*)(void*))&close_file_wrap, (void*)close_file_wrap_data);
|
if(getsockopt(timer->accept_fd, SOL_SOCKET, SO_RCVBUF, &recv_bufsz, &optlen)) {
|
||||||
|
perror("getsockopt");
|
||||||
|
*(uint32_t*)ret = *(uint32_t*)"erop";
|
||||||
|
goto S3_RETURN;
|
||||||
|
}
|
||||||
|
printf("Set recv buffer size: %d\n", recv_bufsz);
|
||||||
|
file_cache_t* fc = timer->isdata?&data_file_cache:&kanban_file_cache;
|
||||||
|
if(file_cache_write_lock(fc)) {
|
||||||
|
*(uint32_t*)ret = *(uint32_t*)"erwl";
|
||||||
|
goto S3_RETURN;
|
||||||
|
}
|
||||||
|
pthread_cleanup_push((void (*)(void*))&file_cache_unlock, (void*)fc);
|
||||||
if(timer->numbytes < 4) {
|
if(timer->numbytes < 4) {
|
||||||
ssize_t n = recv(timer->accept_fd, timer->data+timer->numbytes, 4-timer->numbytes, MSG_WAITALL);
|
ssize_t n = recv(timer->accept_fd, timer->data+timer->numbytes, 4-timer->numbytes, MSG_WAITALL);
|
||||||
if(n < 4-timer->numbytes) {
|
if(n < 4-timer->numbytes) {
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
*(uint32_t*)ret = *(uint32_t*)"ercN";
|
||||||
|
perror("recv");
|
||||||
goto S3_RETURN;
|
goto S3_RETURN;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -297,50 +317,53 @@ static int s3_set_data(tcpool_thread_timer_t *timer) {
|
|||||||
#else
|
#else
|
||||||
uint32_t file_size = *(uint32_t*)(timer->data);
|
uint32_t file_size = *(uint32_t*)(timer->data);
|
||||||
#endif
|
#endif
|
||||||
printf("Set data size: %u\n", file_size);
|
printf("Client set data size: %u\n", file_size);
|
||||||
if((timer->numbytes = recv(timer->accept_fd, timer->data, SERVER_THREAD_BUFSZ, 0)) < 0 && errno != EAGAIN) {
|
timer->numbytes -= 4;
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
if(file_cache_realloc(fc, (uint64_t)file_size)) {
|
||||||
|
*(uint32_t*)ret = *(uint32_t*)"eral";
|
||||||
goto S3_RETURN;
|
goto S3_RETURN;
|
||||||
}
|
}
|
||||||
printf("Get data size: %d\n", (int)timer->numbytes);
|
if(timer->numbytes >= file_size) {
|
||||||
if(file_size <= SERVER_THREAD_BUFSZ) {
|
memcpy(fc->data, timer->data+4, file_size);
|
||||||
while(timer->numbytes != file_size) {
|
puts("All data received and copied");
|
||||||
ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes, SERVER_THREAD_BUFSZ - timer->numbytes, MSG_WAITALL);
|
goto S3_RETURN;
|
||||||
if(n <= 0) {
|
}
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
ssize_t recvlen = 0, p = 0;
|
||||||
|
if(timer->numbytes > 0) {
|
||||||
|
p = timer->numbytes;
|
||||||
|
memcpy(fc->data, timer->data+4, p);
|
||||||
|
file_size -= p;
|
||||||
|
printf("Copy received data: %zd bytes, remain: %u bytes\n", p, file_size);
|
||||||
|
}
|
||||||
|
if((uint64_t)file_size <= (uint64_t)recv_bufsz) {
|
||||||
|
if((recvlen = recv(timer->accept_fd, fc->data+p, (size_t)file_size, MSG_WAITALL)) != (ssize_t)file_size) {
|
||||||
|
*(uint32_t*)ret = *(uint32_t*)"ercA";
|
||||||
|
perror("recv");
|
||||||
|
goto S3_RETURN;
|
||||||
|
}
|
||||||
|
printf("Recv from client: %zd bytes\n", recvlen);
|
||||||
|
} else {
|
||||||
|
puts("Start loop recv");
|
||||||
|
while((recvlen = recv(
|
||||||
|
timer->accept_fd, fc->data+p,
|
||||||
|
(size_t)(((uint64_t)file_size>(uint64_t)recv_bufsz)?recv_bufsz:file_size), MSG_WAITALL)
|
||||||
|
) > 0) {
|
||||||
|
if(recvlen <= 0 || (uint32_t)recvlen > file_size) {
|
||||||
|
*(uint32_t*)ret = *(uint32_t*)"ercM";
|
||||||
|
perror("recv");
|
||||||
goto S3_RETURN;
|
goto S3_RETURN;
|
||||||
}
|
}
|
||||||
timer->numbytes += n;
|
file_size -= (uint32_t)recvlen;
|
||||||
|
p += recvlen;
|
||||||
|
printf("Loop recv from client: %zd bytes, remain: %u bytes\n", recvlen, file_size);
|
||||||
|
if(file_size == 0) break;
|
||||||
}
|
}
|
||||||
if(fwrite(timer->data, file_size, 1, fp) != 1) {
|
if(recvlen <= 0) {
|
||||||
perror("fwrite");
|
*(uint32_t*)ret = *(uint32_t*)"ercF";
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
perror("recv");
|
||||||
}
|
|
||||||
goto S3_RETURN;
|
|
||||||
}
|
|
||||||
if(timer->numbytes > 0 && fwrite(timer->data, timer->numbytes, 1, fp) != 1) {
|
|
||||||
perror("fwrite");
|
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
|
||||||
goto S3_RETURN;
|
|
||||||
}
|
|
||||||
int32_t remain = file_size - timer->numbytes;
|
|
||||||
while(remain > 0) {
|
|
||||||
// printf("remain:%d\n", (int)remain);
|
|
||||||
ssize_t n = recv(timer->accept_fd, timer->data, (remain>SERVER_THREAD_BUFSZ)?SERVER_THREAD_BUFSZ:remain, MSG_WAITALL);
|
|
||||||
if(n < 0) {
|
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
|
||||||
goto S3_RETURN;
|
goto S3_RETURN;
|
||||||
}
|
}
|
||||||
else if(!n) {
|
puts("Finish loop recv");
|
||||||
usleep(10000); // 10 ms
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if(fwrite(timer->data, n, 1, fp) != 1) {
|
|
||||||
perror("fwrite");
|
|
||||||
*(uint32_t*)ret = *(uint32_t*)"erro";
|
|
||||||
goto S3_RETURN;
|
|
||||||
}
|
|
||||||
remain -= n;
|
|
||||||
}
|
}
|
||||||
S3_RETURN:
|
S3_RETURN:
|
||||||
pthread_cleanup_pop(1);
|
pthread_cleanup_pop(1);
|
||||||
@@ -402,3 +425,8 @@ int main(int argc, char *argv[]) {
|
|||||||
pthread_cleanup_pop(1);
|
pthread_cleanup_pop(1);
|
||||||
return 99;
|
return 99;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void __attribute__((destructor)) defer_close_cache_files() {
|
||||||
|
file_cache_close(&data_file_cache);
|
||||||
|
file_cache_close(&kanban_file_cache);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user