1
0
mirror of https://github.com/fumiama/simple-kanban.git synced 2026-06-05 08:20:29 +08:00
Files
simple-kanban/server.c
2022-03-06 15:11:24 +08:00

536 lines
19 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <simple_protobuf.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include "config.h"
#if !__APPLE__
#include <sys/sendfile.h>
#include <endian.h>
#else
#include <machine/endian.h>
#endif
static CONFIG* cfg; // 存储 pwd 和 sps
static int fd; // server socket fd
#ifdef LISTEN_ON_IPV6
static socklen_t struct_len = sizeof(struct sockaddr_in6);
static struct sockaddr_in6 server_addr;
static struct sockaddr_in6 client_addr;
#else
static socklen_t struct_len = sizeof(struct sockaddr_in);
static struct sockaddr_in server_addr;
static struct sockaddr_in client_addr;
#endif
static char *data_path; // cat 命令读取的文件位置
static char *kanban_path; // get 命令读取的文件位置
#define THREADCNT 16
static pthread_t accept_threads[THREADCNT];
static pthread_attr_t attr;
#define MAXWAITSEC 10
#define TIMERDATSZ BUFSIZ
// accept_timer 使用的结构体
// 包含了本次 accept 的全部信息
// 以方便退出后清理空间
struct THREADTIMER {
int index; // 指向 accept_threads 某个槽位的下标
time_t touch; // 最后访问时间,与当前时间差超过 MAXWAITSEC 将由 timer 强行回收线程
int accept_fd; // 本次 accept 的 fd会自行关闭或出错时由 timer 负责回收
ssize_t numbytes; // 本次接收的数据长度
char status; // 本会话所处的状态
char is_open; // 标识 fp 是否正在使用
FILE *fp; // 本会话打开的文件,会自行关闭或出错时由 timer 负责回收
char data[TIMERDATSZ];
};
typedef struct THREADTIMER THREADTIMER;
#define showUsage(program) printf("Usage: %s [-d] listen_port try_times kanban_file data_file config_file\n\t-d: As daemon\n", program)
static void accept_client();
static void accept_timer(void *p);
static int bind_server(uint16_t port, int try_times);
static int check_buffer(THREADTIMER *timer);
static void clean_timer(THREADTIMER* timer);
static void close_file(FILE *fp);
static int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes);
static void handle_accept(void *accept_fd_p);
static void handle_pipe(int signo);
static void handle_quit(int signo);
static int listen_socket(int try_times);
static FILE *open_file(char* file_path, int lock_type, char* mode);
static int send_all(char* file_path, THREADTIMER *timer);
static int send_data(int accept_fd, char *data, size_t length);
static off_t size_of_file(const char* fname);
static int sm1_pwd(THREADTIMER *timer);
static int s0_init(THREADTIMER *timer);
static int s1_get(THREADTIMER *timer);
static int s2_set(THREADTIMER *timer);
static int s3_set_data(THREADTIMER *timer);
static pid_t pid;
/***************************************
* accept_client 接受新连接,创建线程处理
* 创建的线程入口点为 handle_accept
* 与其伴生的结构体为 timer负责管理
* 该线程使用的资源,当线程(正常/异常)退出
* 或 client 超过 MAXWAITSEC 未响应时
* 将由与其伴生的 accept_timer 线程读取
* timer 的信息,调用 clean_timer 回收
* 未被释放的资源以防止内存泄漏
***************************************/
static void accept_client() {
pid = fork();
while (pid > 0) { // 主进程监控子进程状态,如果子进程异常终止则重启之
wait(NULL);
puts("Server subprocess exited. Restart...");
pid = fork();
}
signal(SIGQUIT, handle_quit);
signal(SIGPIPE, handle_pipe);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if(pid < 0) puts("Error when forking a subprocess.");
else while(1) {
puts("Ready for accept, waitting...");
int p = 0;
while(p < THREADCNT && accept_threads[p] && !pthread_kill(accept_threads[p], 0)) p++;
if(p >= THREADCNT) {
puts("Max thread cnt exceeded");
sleep(1);
continue;
}
printf("Next thread is No.%d\n", p);
THREADTIMER *timer = malloc(sizeof(THREADTIMER));
if(!timer) {
puts("Allocate timer error");
continue;
}
timer->accept_fd = accept(fd, (struct sockaddr *)&client_addr, &struct_len);
if(timer->accept_fd <= 0) {
free(timer);
puts("Accept client error.");
continue;
}
#ifdef LISTEN_ON_IPV6
uint16_t port = ntohs(client_addr.sin6_port);
struct in6_addr in = client_addr.sin6_addr;
char str[INET6_ADDRSTRLEN]; // 46
inet_ntop(AF_INET6, &in, str, sizeof(str));
#else
uint16_t port = ntohs(client_addr.sin_port);
struct in_addr in = client_addr.sin_addr;
char str[INET_ADDRSTRLEN]; // 16
inet_ntop(AF_INET, &in, str, sizeof(str));
#endif
printf("Accept client %s:%u\n", str, port);
timer->index = p;
timer->touch = time(NULL);
timer->is_open = 0;
timer->fp = NULL;
if (pthread_create(&accept_threads[p], &attr, (void *)&handle_accept, timer)) perror("pthread_create");
else puts("Creating thread succeeded");
}
}
#define timer_ptr(x) ((THREADTIMER*)(x))
#define my_thread(timer) accept_threads[timer->index]
/***************************************
* accept_timer 是与 handle_accept 伴生的
* 线程,负责监控其会话状态,并在超时时杀死它
***************************************/
static void accept_timer(void *p) {
while(my_thread(timer_ptr(p)) && !pthread_kill(my_thread(timer_ptr(p)), 0)) {
sleep(MAXWAITSEC / 4);
time_t waitsec = time(NULL) - timer_ptr(p)->touch;
printf("Wait sec: %d, max: %d\n", (int)waitsec, MAXWAITSEC);
if(waitsec > MAXWAITSEC) break;
}
clean_timer(timer_ptr(p));
free(p); // 唯一 free 点
puts("Timer has been freed");
}
static int bind_server(uint16_t port, int try_times) {
int fail_count = 0;
int result = -1;
#ifdef LISTEN_ON_IPV6
server_addr.sin6_family = AF_INET6;
server_addr.sin6_port = htons(port);
bzero(&(server_addr.sin6_addr), sizeof(server_addr.sin6_addr));
fd = socket(PF_INET6, SOCK_STREAM, 0);
#else
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = INADDR_ANY;
bzero(&(server_addr.sin_zero), 8);
fd = socket(AF_INET, SOCK_STREAM, 0);
#endif
while(!~(result = bind(fd, (struct sockaddr *)&server_addr, struct_len)) && fail_count++ < try_times) sleep(1);
if(!~result && fail_count >= try_times) {
puts("Bind server failure!");
return 0;
} else{
puts("Bind server success!");
return 1;
}
}
/***************************************
* check_buffer 检查接收到的数据,结合
* 当前会话所处状态决定接下来的处理流程
***************************************/
static int check_buffer(THREADTIMER *timer) {
printf("Status: %d\n", (int)timer->status);
switch(timer->status) {
case -1: return sm1_pwd(timer); break;
case 0: return s0_init(timer); break;
case 1: return s1_get(timer); break;
case 2: return s2_set(timer); break;
case 3: return s3_set_data(timer); break;
default: return -1; break;
}
}
// clean_timer 清理 timer
static void clean_timer(THREADTIMER* timer) {
puts("Start cleaning.");
if(my_thread(timer)) {
pthread_kill(my_thread(timer), SIGQUIT);
my_thread(timer) = 0;
puts("Kill thread.");
}
if(timer->is_open) {
close_file(timer->fp);
timer->is_open = 0;
puts("Close file.");
}
if(timer->accept_fd) {
close(timer->accept_fd);
timer->accept_fd = 0;
puts("Close accept.");
}
puts("Finish cleaning.");
}
static void close_file(FILE *fp) {
puts("Close file");
if(fp) {
flock(fileno(fp), LOCK_UN);
fclose(fp);
}
}
static int close_file_and_send(THREADTIMER *timer, char *data, size_t numbytes) {
close_file(timer->fp);
timer->is_open = 0;
return send_data(timer->accept_fd, data, numbytes);
}
#define chkbuf(p) if(!check_buffer(timer_ptr(p))) break
#define take_word(p, w, buff) if(timer_ptr(p)->numbytes > strlen(w) && strstr(buff, w) == buff) {\
int l = strlen(w);\
char store = buff[l];\
buff[l] = 0;\
ssize_t n = timer_ptr(p)->numbytes - l;\
timer_ptr(p)->numbytes = l;\
chkbuf(p);\
buff[0] = store;\
memmove(buff + 1, buff + l + 1, n - 1);\
buff[n] = 0;\
timer_ptr(p)->numbytes = n;\
printf("Split cmd: %s\n", w);\
}
#define touch_timer(x) (timer_ptr(x)->touch = time(NULL))
#define my_fd(x) (timer_ptr(x)->accept_fd)
#define my_dat(x) (timer_ptr(x)->data)
// handle_accept 初步解析指令,处理部分粘连
static void handle_accept(void *p) {
if(my_fd(p) > 0) {
puts("Connected to the client.");
signal(SIGQUIT, handle_quit);
signal(SIGPIPE, handle_pipe);
pthread_t thread;
if (pthread_create(&thread, &attr, (void *)&accept_timer, p)) puts("Error creating timer thread");
else puts("Creating timer thread succeeded");
send_data(my_fd(p), "Welcome to simple kanban server.", 33);
timer_ptr(p)->status = -1;
while(my_thread(timer_ptr(p)) && (timer_ptr(p)->numbytes = recv(my_fd(p), my_dat(p), TIMERDATSZ, 0)) > 0) {
touch_timer(p);
my_dat(p)[timer_ptr(p)->numbytes] = 0;
printf("Get %d bytes: %s\n", (int)timer_ptr(p)->numbytes, my_dat(p));
puts("Check buffer");
//处理部分粘连
take_word(p, cfg->pwd, my_dat(p));
take_word(p, "get", my_dat(p));
take_word(p, "set", my_dat(p));
take_word(p, "cat", my_dat(p));
take_word(p, "quit", my_dat(p));
take_word(p, cfg->sps, my_dat(p));
take_word(p, "ver", my_dat(p));
take_word(p, "dat", my_dat(p));
if(timer_ptr(p)->numbytes > 0) chkbuf(p);
}
printf("Break: recv %d bytes\n", (int)timer_ptr(p)->numbytes);
my_thread(timer_ptr(p)) = 0;
clean_timer(timer_ptr(p));
} else puts("Error accepting client");
}
static void handle_quit(int signo) {
perror("signal quit");
pthread_exit(NULL);
}
static void handle_pipe(int signo) {
perror("signal pipe");
pthread_exit(NULL);
}
static int listen_socket(int try_times) {
int fail_count = 0;
int result = -1;
while(!~(result = listen(fd, 10)) && fail_count++ < try_times) sleep(1);
if(!~result && fail_count >= try_times) {
puts("Listen failed!");
return 0;
} else{
puts("Listening....");
return 1;
}
}
static FILE *open_file(char* file_path, int lock_type, char* mode) {
FILE *fp = NULL;
fp = fopen(file_path, mode);
if(fp) {
if(!~flock(fileno(fp), lock_type | LOCK_NB)) {
perror("flock");
fp = NULL;
}
printf("Open file in mode %d\n", lock_type);
} else perror("fopen");
return fp;
}
static int send_all(char* file_path, THREADTIMER *timer) {
int re = 1;
FILE *fp = open_file(file_path, LOCK_SH, "rb");
if(fp) {
timer->fp = fp;
timer->is_open = 1;
uint32_t file_size = (uint32_t)size_of_file(file_path);
printf("Get file size: %d bytes.\n", (int)file_size);
off_t len = 0;
#if __APPLE__
#ifdef WORDS_BIGENDIAN
file_size = __DARWIN_OSSwapInt32(file_size);
#endif
struct sf_hdtr hdtr;
struct iovec headers;
headers.iov_base = &file_size;
headers.iov_len = sizeof(uint32_t);
hdtr.headers = &headers;
hdtr.hdr_cnt = 1;
hdtr.trailers = NULL;
hdtr.trl_cnt = 0;
re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0);
if(!re) perror("sendfile");
#else
#ifdef WORDS_BIGENDIAN
uint32_t little_fs = __builtin_bswap32(file_size);
send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0);
#else
send(timer->accept_fd, &file_size, sizeof(uint32_t), 0);
#endif
re = !sendfile(timer->accept_fd, fileno(fp), &len, file_size) >= 0;
if(!re) perror("sendfile");
#endif
printf("Send %d bytes.\n", (int)len);
close_file(fp);
timer->is_open = 0;
}
return re;
}
static int send_data(int accept_fd, char *data, size_t length) {
if(!~send(accept_fd, data, length, 0)) {
puts("Send data error");
return 0;
}
printf("Send data: ");
if(length > 128) {
data[124] = '.';
data[125] = '.';
data[126] = '.';
data[127] = 0;
}
puts(data);
return 1;
}
static off_t size_of_file(const char* fname) {
struct stat statbuf;
if(stat(fname, &statbuf)==0) return statbuf.st_size;
else return -1;
}
static int sm1_pwd(THREADTIMER *timer) {
if(!strcmp(cfg->pwd, timer->data)) timer->status = 0;
return !timer->status;
}
static int s0_init(THREADTIMER *timer) {
if(!strcmp("get", timer->data)) timer->status = 1;
else if(!strcmp(cfg->sps, timer->data)) timer->status = 2;
else if(!strcmp("cat", timer->data)) return send_all(data_path, timer);
else if(!strcmp("quit", timer->data)) return 0;
return send_data(timer->accept_fd, timer->data, timer->numbytes);
}
static int s1_get(THREADTIMER *timer) { //get kanban
FILE *fp = open_file(kanban_path, LOCK_SH, "rb");
timer->status = 0;
if(fp) {
timer->fp = fp;
timer->is_open = 1;
uint32_t ver, cli_ver;
if(fscanf(fp, "%u", &ver) > 0) {
if(sscanf(timer->data, "%u", &cli_ver) > 0) {
if(cli_ver < ver) { //need to send a new kanban
close_file(fp);
timer->is_open = 0;
int r = send_all(kanban_path, timer);
if(strstr(timer->data, "quit") == timer->data + timer->numbytes - 4) {
puts("Found last cmd is quit.");
return 0;
}
else return r;
}
}
}
}
return close_file_and_send(timer, "null", 4);
}
static int s2_set(THREADTIMER *timer) {
FILE *fp = NULL;
if(!strcmp(timer->data, "ver")) {
fp = open_file(kanban_path, LOCK_EX, "wb");
} else if(!strcmp(timer->data, "dat")) {
fp = open_file(data_path, LOCK_EX, "wb");
}
if(fp) {
timer->status = 3;
timer->fp = fp;
timer->is_open = 1;
return send_data(timer->accept_fd, "data", 4);
} else {
timer->status = 0;
return send_data(timer->accept_fd, "erro", 4);
}
}
static int s3_set_data(THREADTIMER *timer) {
timer->status = 0;
#ifdef WORDS_BIGENDIAN
uint32_t file_size = __builtin_bswap32(*(uint32_t*)(timer->data));
#else
uint32_t file_size = *(uint32_t*)(timer->data);
#endif
printf("Set data size: %u\n", file_size);
int is_first_data = 0;
if(timer->numbytes == sizeof(uint32_t)) {
if((timer->numbytes = recv(timer->accept_fd, timer->data, TIMERDATSZ, 0)) <= 0)
return close_file_and_send(timer, "erro", 4);
is_first_data = 1;
printf("Get data size: %d\n", (int)timer->numbytes);
}
size_t offset = (is_first_data?0:sizeof(uint32_t));
if(file_size <= TIMERDATSZ - offset) {
while(timer->numbytes != file_size - offset) {
ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes + offset, TIMERDATSZ - timer->numbytes - offset, MSG_WAITALL);
if(n <= 0) return close_file_and_send(timer, "erro", 4);
timer->numbytes += n;
}
if(fwrite(timer->data + offset, file_size, 1, timer->fp) != 1) {
perror("fwrite");
return close_file_and_send(timer, "erro", 4);
}
return close_file_and_send(timer, "succ", 4);
}
if(fwrite(timer->data + offset, timer->numbytes - offset, 1, timer->fp) != 1) {
perror("fwrite");
return close_file_and_send(timer, "erro", 4);
}
int32_t remain = file_size - timer->numbytes;
while(remain > 0) {
// printf("remain:%d\n", (int)remain);
ssize_t n = recv(timer->accept_fd, timer->data, (remain>TIMERDATSZ)?TIMERDATSZ:remain, MSG_WAITALL);
if(n <= 0) return close_file_and_send(timer, "erro", 4);
if(fwrite(timer->data, n, 1, timer->fp) != 1) {
perror("fwrite");
return close_file_and_send(timer, "erro", 4);
}
remain -= n;
}
return close_file_and_send(timer, "succ", 4);
}
int main(int argc, char *argv[]) {
if(argc != 6 && argc != 7) showUsage(argv[0]);
else {
int port = 0;
int as_daemon = !strcmp("-d", argv[1]);
sscanf(argv[as_daemon?2:1], "%d", &port);
if(port > 0 && port < 65536) {
int times = 0;
sscanf(argv[as_daemon?3:2], "%d", &times);
if(times > 0) {
if(!as_daemon || (as_daemon && (daemon(1, 1) >= 0))) {
FILE *fp = NULL;
fp = fopen(argv[as_daemon?4:3], "rb+");
if(!fp) fp = fopen(argv[as_daemon?4:3], "wb+");
if(fp) {
kanban_path = argv[as_daemon?4:3];
fclose(fp);
fp = NULL;
fp = fopen(argv[as_daemon?5:4], "rb+");
if(!fp) fp = fopen(argv[as_daemon?5:4], "wb+");
if(fp) {
data_path = argv[as_daemon?5:4];
fclose(fp);
fp = NULL;
fp = fopen(argv[as_daemon?6:5], "rb");
if(fp) {
SIMPLE_PB* spb = get_pb(fp);
cfg = (CONFIG*)spb->target;
fclose(fp);
if(bind_server(port, times)) if(listen_socket(times)) accept_client();
} else printf("Error opening config file: %s\n", argv[as_daemon?6:5]);
} else printf("Error opening data file: %s\n", argv[as_daemon?5:4]);
} else printf("Error opening kanban file: %s\n", argv[as_daemon?4:3]);
} else puts("Start daemon error");
} else printf("Error times: %d\n", times);
} else printf("Error port: %d\n", port);
}
close(fd);
exit(EXIT_FAILURE);
}