1
0
mirror of https://github.com/fumiama/simple-kanban.git synced 2026-06-06 08:50:28 +08:00
Files
simple-kanban/server.c
2022-10-26 00:39:05 +08:00

635 lines
21 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <signal.h>
#include <simple_protobuf.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include "config.h"
#if !__APPLE__
#include <sys/sendfile.h>
#include <endian.h>
#else
#include <machine/endian.h>
#endif
static config_t* cfg; // 存储 pwd 和 sps
static int fd; // server socket fd
#ifdef LISTEN_ON_IPV6
static socklen_t struct_len = sizeof(struct sockaddr_in6);
static struct sockaddr_in6 server_addr;
static struct sockaddr_in6 client_addr;
#else
static socklen_t struct_len = sizeof(struct sockaddr_in);
static struct sockaddr_in server_addr;
static struct sockaddr_in client_addr;
#endif
static char *data_path; // cat 命令读取的文件位置
static char *kanban_path; // get 命令读取的文件位置
static int file_mode = LOCK_UN;
static int file_ro_cnt;
// THREADCNT 在单线程中指监听队列/select队列长度
#define THREADCNT 16
#define MAXWAITSEC 16
#define TIMERDATSZ BUFSIZ
static struct timeval timeout = {MAXWAITSEC/4, 0};
// accept_timer 使用的结构体
// 包含了本次 accept 的全部信息
// 以方便退出后清理空间
struct threadtimer_t {
int index; // 自身位置
time_t touch; // 最后访问时间,与当前时间差超过 MAXWAITSEC 将强行中断连接
int accept_fd; // 本次 accept 的 fd
int lock_type; // 打开文件类型
ssize_t numbytes; // 本次接收的数据长度
char status; // 本会话所处的状态
char is_open; // 标识 fp 是否正在使用
FILE *fp; // 本会话打开的文件
char data[TIMERDATSZ];
};
typedef struct threadtimer_t threadtimer_t;
static threadtimer_t timers[THREADCNT];
static fd_set rdfds, wrfds, erfds, tmpfds;
#define show_usage(program) printf("Usage: %s (-d) port kanban.txt data.bin config.sp\n\t-d: As daemon\n", program)
static void accept_client();
static int bind_server(int* port);
static int check_buffer(threadtimer_t *timer);
static void clean_timer(threadtimer_t* timer);
static void close_file(threadtimer_t *timer);
static int close_file_and_send(threadtimer_t *timer, char *data, size_t numbytes);
static int handle_accept(threadtimer_t* p);
static void handle_int(int signo);
static void handle_quit(int signo);
static void handle_segv(int signo);
static int listen_socket();
static FILE *open_file(char* file_path, int lock_type, char* mode);
static int send_all(char* file_path, threadtimer_t *timer);
static int send_data(int accept_fd, char *data, size_t length);
static off_t size_of_file(const char* fname);
static int sm1_pwd(threadtimer_t *timer);
static int s0_init(threadtimer_t *timer);
static int s1_get(threadtimer_t *timer);
static int s2_set(threadtimer_t *timer);
static int s3_set_data(threadtimer_t *timer);
/***************************************
* accept_client 接受新连接,创建线程处理
* 创建的线程入口点为 handle_accept
* 与其伴生的结构体为 timer负责管理
* 该线程使用的资源,当线程(正常/异常)退出
* 或 client 超过 MAXWAITSEC 未响应时
* 将由与其伴生的 accept_timer 线程读取
* timer 的信息,调用 clean_timer 回收
* 未被释放的资源以防止内存泄漏
***************************************/
static void accept_client() {
signal(SIGINT, handle_int);
signal(SIGQUIT, handle_quit);
signal(SIGKILL, handle_segv);
signal(SIGSEGV, handle_segv);
signal(SIGPIPE, SIG_IGN);
signal(SIGTERM, handle_segv);
FD_SET(fd, &rdfds);
FD_SET(fd, &erfds);
FD_SET(fd, &tmpfds);
puts("Ready for select, waitting...");
while(1) {
int r = select(THREADCNT+8, &rdfds, &wrfds, &erfds, &timeout);
if(r < 0) {
perror("select");
return;
}
if(r == 0) { // 超时
for(int i = 0; i < THREADCNT; i++) {
if(timers[i].touch && timers[i].accept_fd) {
time_t waitsec = time(NULL) - timers[i].touch;
if(waitsec > MAXWAITSEC) {
printf("Close@%d, wait sec: %u, max: %u\n", i, (unsigned int)waitsec, MAXWAITSEC);
clean_timer(&timers[i]);
}
}
}
goto HANDLE_FINISH;
}
puts("\nSelected");
// 正常
if(FD_ISSET(fd, &rdfds)) { // 有新连接
int p = 0;
while(p < THREADCNT && timers[p].touch) p++;
if(p >= THREADCNT) {
puts("Max thread cnt exceeded");
int nfd = accept(fd, (struct sockaddr *)&client_addr, &struct_len);
if(nfd > 0) close(nfd);
goto HANDLE_CLIENTS;
}
threadtimer_t* timer = &timers[p];
timer->accept_fd = accept(fd, (struct sockaddr *)&client_addr, &struct_len);
if(timer->accept_fd <= 0) {
perror("accept");
goto HANDLE_CLIENTS;
}
#ifdef LISTEN_ON_IPV6
uint16_t port = ntohs(client_addr.sin6_port);
struct in6_addr in = client_addr.sin6_addr;
char str[INET6_ADDRSTRLEN]; // 46
inet_ntop(AF_INET6, &in, str, sizeof(str));
#else
uint16_t port = ntohs(client_addr.sin_port);
struct in_addr in = client_addr.sin_addr;
char str[INET_ADDRSTRLEN]; // 16
inet_ntop(AF_INET, &in, str, sizeof(str));
#endif
time_t t = time(NULL);
printf("> %sAccept client(%d) %s:%u at slot No.%d, ", ctime(&t), timer->accept_fd, str, port, p);
timer->index = p;
timer->touch = time(NULL);
timer->is_open = 0;
timer->fp = NULL;
timer->status = -1;
FD_SET(timer->accept_fd, &tmpfds);
FD_SET(timer->accept_fd, &rdfds);
puts("Add new client into select list");
} else if(FD_ISSET(fd, &erfds)) { // 主套接字错误
int nfd = accept(fd, (struct sockaddr *)&client_addr, &struct_len);
perror("main fd in erfds");
if(nfd > 0) close(nfd);
return;
}
HANDLE_CLIENTS:
for(int i = 0; i < THREADCNT; i++) {
if(timers[i].touch && timers[i].accept_fd) {
if(FD_ISSET(timers[i].accept_fd, &rdfds)) {
if(!handle_accept(&timers[i])) clean_timer(&timers[i]);
else FD_SET(timers[i].accept_fd, &tmpfds);
} else if(FD_ISSET(timers[i].accept_fd, &erfds)) {
printf("Close@%d due to error\n", i);
clean_timer(&timers[i]);
}
}
}
HANDLE_FINISH:
FD_COPY(&tmpfds, &rdfds);
FD_COPY(&tmpfds, &erfds);
FD_ZERO(&tmpfds);
FD_SET(fd, &tmpfds);
}
}
static int bind_server(int* port) {
#ifdef LISTEN_ON_IPV6
server_addr.sin6_family = AF_INET6;
server_addr.sin6_port = htons((uint16_t)(*port));
bzero(&(server_addr.sin6_addr), sizeof(server_addr.sin6_addr));
int fd = socket(PF_INET6, SOCK_STREAM, 0);
#else
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons((uint16_t)(*port));
server_addr.sin_addr.s_addr = INADDR_ANY;
bzero(&(server_addr.sin_zero), 8);
int fd = socket(AF_INET, SOCK_STREAM, 0);
#endif
int on = 1;
if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) {
perror("Set socket option failure");
return 0;
}
if(!~bind(fd, (struct sockaddr *)&server_addr, struct_len)) {
perror("Bind server failure");
return 0;
}
#ifdef LISTEN_ON_IPV6
*port = ntohs(server_addr.sin6_port);
struct in6_addr in = server_addr.sin6_addr;
char str[INET6_ADDRSTRLEN]; // 46
inet_ntop(AF_INET6, &in, str, sizeof(str));
#else
*port = ntohs(server_addr.sin_port);
struct in_addr in = server_addr.sin_addr;
char str[INET_ADDRSTRLEN]; // 16
inet_ntop(AF_INET, &in, str, sizeof(str));
#endif
printf("Bind server successfully on %s:%u\n", str, *port);
return fd;
}
/***************************************
* check_buffer 检查接收到的数据,结合
* 当前会话所处状态决定接下来的处理流程
***************************************/
static int check_buffer(threadtimer_t *timer) {
printf("Status: %d\n", (int)timer->status);
switch(timer->status) {
case -1: return sm1_pwd(timer); break;
case 0: return s0_init(timer); break;
case 1: return s1_get(timer); break;
case 2: return s2_set(timer); break;
case 3: return s3_set_data(timer); break;
default: return -1; break;
}
}
// clean_timer 清理 timer
static void clean_timer(threadtimer_t* timer) {
printf("Start cleaning: ");
if(timer->is_open) {
close_file(timer);
printf("Close file, ");
}
if(timer->accept_fd) {
close(timer->accept_fd);
timer->accept_fd = 0;
printf("Close accept, ");
}
FD_CLR(timer->accept_fd, &rdfds);
FD_CLR(timer->accept_fd, &erfds);
FD_CLR(timer->accept_fd, &tmpfds);
timer->touch = 0;
timer->status = -1;
puts("Finish cleaning.");
}
static void close_file(threadtimer_t *timer) {
if(timer->is_open && timer->fp != NULL) {
int lock_type = timer->lock_type;
puts("Close file");
fclose(timer->fp);
timer->is_open = 0;
timer->fp = NULL;
file_mode &= ~lock_type;
if((lock_type&LOCK_SH) > 0 && --file_ro_cnt > 0) {
file_mode |= LOCK_SH;
}
timer->lock_type = 0;
}
}
static int close_file_and_send(threadtimer_t *timer, char *data, size_t numbytes) {
close_file(timer);
return send_data(timer->accept_fd, data, numbytes);
}
#define chkbuf(p) if(!(r = check_buffer((p)))) break
#define take_word(p, w, buff) if((p)->numbytes > strlen(w) && strstr(buff, w) == buff) {\
int l = strlen(w);\
char store = buff[l];\
buff[l] = 0;\
ssize_t n = (p)->numbytes - l;\
(p)->numbytes = l;\
chkbuf(p);\
buff[0] = store;\
memmove(buff + 1, buff + l + 1, n - 1);\
buff[n] = 0;\
(p)->numbytes = n;\
printf("Split cmd: %s\n", w);\
}
#define touch_timer(x) ((x)->touch = time(NULL))
#define my_fd(x) ((x)->accept_fd)
#define my_dat(x) ((x)->data)
// handle_accept 初步解析指令,处理部分粘连
static int handle_accept(threadtimer_t* p) {
int r = 1;
puts("Recv data from the client.");
send_data(my_fd(p), "Welcome to simple kanban server.", 33);
while(((p)->numbytes = recv(my_fd(p), my_dat(p), TIMERDATSZ, MSG_DONTWAIT)) > 0) {
touch_timer(p);
my_dat(p)[(p)->numbytes] = 0;
printf("Get %d bytes: %s, Check buffer...\n", (int)(p)->numbytes, my_dat(p));
//处理部分粘连
take_word(p, cfg->pwd, my_dat(p));
take_word(p, "get", my_dat(p));
take_word(p, "set", my_dat(p));
take_word(p, "cat", my_dat(p));
take_word(p, "quit", my_dat(p));
take_word(p, cfg->sps, my_dat(p));
take_word(p, "ver", my_dat(p));
take_word(p, "dat", my_dat(p));
if((p)->numbytes > 0) chkbuf(p);
}
puts("Recv finished");
return r;
}
static void handle_int(int signo) {
puts("Keyboard interrupted");
close(fd);
fflush(stdout);
exit(0);
}
static void handle_quit(int signo) {
puts("Handle sigquit");
close(fd);
fflush(stdout);
exit(0);
}
static void handle_segv(int signo) {
puts("Handle kill/segv/term");
close(fd);
fflush(stdout);
exit(0);
}
static int listen_socket() {
int flags = fcntl(fd, F_GETFL, 0);
if(!~listen(fd, THREADCNT)) return 1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
static FILE *open_file(char* file_path, int lock_type, char* mode) {
FILE *fp = NULL;
if((lock_type&LOCK_SH)) {
if((file_mode&LOCK_EX) > 0) {
puts("open_file(SH): file is busy");
return NULL;
}
file_mode |= LOCK_SH;
file_ro_cnt++;
} else if(lock_type&LOCK_EX) {
if((file_mode&(LOCK_EX|LOCK_SH)) > 0) {
puts("open_file(EX): file is busy");
return NULL;
}
file_mode |= LOCK_EX;
}
fp = fopen(file_path, mode);
if(!fp) {
perror("fopen");
file_mode &= ~lock_type;
return NULL;
}
printf("Open file in mode %s\n", mode);
return fp;
}
static int send_all(char* file_path, threadtimer_t *timer) {
int re = 1;
FILE *fp = open_file(file_path, LOCK_SH, "rb");
if(fp) {
timer->fp = fp;
timer->is_open = 1;
timer->lock_type = LOCK_SH;
uint32_t file_size = (uint32_t)size_of_file(file_path);
printf("Get file size: %d bytes.\n", (int)file_size);
off_t len = 0;
#if __APPLE__
#ifdef WORDS_BIGENDIAN
file_size = __DARWIN_OSSwapInt32(file_size);
#endif
struct sf_hdtr hdtr;
struct iovec headers;
headers.iov_base = &file_size;
headers.iov_len = sizeof(uint32_t);
hdtr.headers = &headers;
hdtr.hdr_cnt = 1;
hdtr.trailers = NULL;
hdtr.trl_cnt = 0;
re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0);
if(!re) {
while(errno == EAGAIN) re = !sendfile(fileno(fp), timer->accept_fd, 0, &len, &hdtr, 0);
perror("sendfile");
}
#else
#ifdef WORDS_BIGENDIAN
uint32_t little_fs = __builtin_bswap32(file_size);
send(timer->accept_fd, &little_fs, sizeof(uint32_t), 0);
#else
send(timer->accept_fd, &file_size, sizeof(uint32_t), 0);
#endif
re = sendfile(timer->accept_fd, fileno(fp), &len, file_size) > 0;
if(!re) {
while(errno == EAGAIN) re = sendfile(timer->accept_fd, fileno(fp), &len, file_size) > 0;
perror("sendfile");
}
#endif
printf("Send %d bytes.\n", (int)len);
close_file(timer);
}
return re;
}
static int send_data(int accept_fd, char *data, size_t length) {
if(!~send(accept_fd, data, length, MSG_DONTWAIT)) {
puts("Send data error");
return 0;
}
printf("Send data: ");
if(length > 128) {
data[124] = '.';
data[125] = '.';
data[126] = '.';
data[127] = 0;
}
puts(data);
return 1;
}
static off_t size_of_file(const char* fname) {
struct stat statbuf;
if(stat(fname, &statbuf)==0) return statbuf.st_size;
else return -1;
}
static int sm1_pwd(threadtimer_t *timer) {
if(!strcmp(cfg->pwd, timer->data)) timer->status = 0;
return !timer->status;
}
static int s0_init(threadtimer_t *timer) {
if(!strcmp("get", timer->data)) timer->status = 1;
else if(!strcmp(cfg->sps, timer->data)) timer->status = 2;
else if(!strcmp("cat", timer->data)) return send_all(data_path, timer);
else if(!strcmp("quit", timer->data)) return 0;
return send_data(timer->accept_fd, timer->data, timer->numbytes);
}
static int s1_get(threadtimer_t *timer) { //get kanban
FILE *fp = open_file(kanban_path, LOCK_SH, "rb");
timer->status = 0;
if(fp) {
timer->fp = fp;
timer->is_open = 1;
timer->lock_type = LOCK_SH;
uint32_t ver, cli_ver;
if(fscanf(fp, "%u", &ver) > 0) {
if(sscanf(timer->data, "%u", &cli_ver) > 0) {
if(cli_ver < ver) { //need to send a new kanban
timer->is_open = 0;
close_file(timer);
int r = send_all(kanban_path, timer);
if(strstr(timer->data, "quit") == timer->data + timer->numbytes - 4) {
puts("Found last cmd is quit.");
return 0;
}
return r;
}
}
}
}
int r = close_file_and_send(timer, "null", 4);
if(strstr(timer->data, "quit") == timer->data + timer->numbytes - 4) {
puts("Found last cmd is quit.");
return 0;
}
return r;
}
static int s2_set(threadtimer_t *timer) {
FILE *fp = NULL;
if(!strcmp(timer->data, "ver")) {
fp = open_file(kanban_path, LOCK_EX, "wb");
} else if(!strcmp(timer->data, "dat")) {
fp = open_file(data_path, LOCK_EX, "wb");
}
if(fp) {
timer->status = 3;
timer->fp = fp;
timer->is_open = 1;
timer->lock_type = LOCK_EX;
return send_data(timer->accept_fd, "data", 4);
} else {
timer->status = 0;
return send_data(timer->accept_fd, "erro", 4);
}
}
static int s3_set_data(threadtimer_t *timer) {
char ret[4] = "succ";
timer->status = 0;
#ifdef WORDS_BIGENDIAN
uint32_t file_size = __builtin_bswap32(*(uint32_t*)(timer->data));
#else
uint32_t file_size = *(uint32_t*)(timer->data);
#endif
printf("Set data size: %u\n", file_size);
int is_first_data = 0;
if(timer->numbytes == sizeof(uint32_t)) {
if((timer->numbytes = recv(timer->accept_fd, timer->data, TIMERDATSZ, 0)) <= 0) {
*(uint32_t*)ret = *(uint32_t*)"erro";
goto S3_RETURN;
}
is_first_data = 1;
printf("Get data size: %d\n", (int)timer->numbytes);
}
size_t offset = (is_first_data?0:sizeof(uint32_t));
if(file_size <= TIMERDATSZ - offset) {
while(timer->numbytes != file_size - offset) {
ssize_t n = recv(timer->accept_fd, timer->data + timer->numbytes + offset, TIMERDATSZ - timer->numbytes - offset, MSG_WAITALL);
if(n <= 0) {
*(uint32_t*)ret = *(uint32_t*)"erro";
goto S3_RETURN;
}
timer->numbytes += n;
}
if(fwrite(timer->data + offset, file_size, 1, timer->fp) != 1) {
perror("fwrite");
*(uint32_t*)ret = *(uint32_t*)"erro";
}
goto S3_RETURN;
}
if(fwrite(timer->data + offset, timer->numbytes - offset, 1, timer->fp) != 1) {
perror("fwrite");
*(uint32_t*)ret = *(uint32_t*)"erro";
goto S3_RETURN;
}
int32_t remain = file_size - timer->numbytes;
while(remain > 0) {
// printf("remain:%d\n", (int)remain);
ssize_t n = recv(timer->accept_fd, timer->data, (remain>TIMERDATSZ)?TIMERDATSZ:remain, MSG_WAITALL);
if(n <= 0) {
*(uint32_t*)ret = *(uint32_t*)"erro";
goto S3_RETURN;
}
if(fwrite(timer->data, n, 1, timer->fp) != 1) {
perror("fwrite");
*(uint32_t*)ret = *(uint32_t*)"erro";
goto S3_RETURN;
}
remain -= n;
}
S3_RETURN:
return close_file_and_send(timer, ret, 4);
}
int main(int argc, char *argv[]) {
if(argc != 5 && argc != 6) {
show_usage(argv[0]);
return 0;
}
int port = 0;
int as_daemon = !strcmp("-d", argv[1]);
sscanf(argv[as_daemon?2:1], "%d", &port);
if(port < 0 || port >= 65536) {
printf("Error port: %d\n", port);
return 1;
}
if(as_daemon && daemon(1, 1) < 0) {
perror("Start daemon error");
return 2;
}
FILE *fp = NULL;
fp = fopen(argv[as_daemon?3:2], "rb+");
if(!fp) fp = fopen(argv[as_daemon?3:2], "wb+");
if(!fp) {
printf("Error opening kanban file: ");
perror(argv[as_daemon?3:2]);
return 3;
}
kanban_path = argv[as_daemon?3:2];
fclose(fp);
fp = NULL;
fp = fopen(argv[as_daemon?4:3], "rb+");
if(!fp) fp = fopen(argv[as_daemon?4:3], "wb+");
if(!fp) {
printf("Error opening data file: ");
perror(argv[as_daemon?4:3]);
return 4;
}
data_path = argv[as_daemon?4:3];
fclose(fp);
fp = NULL;
fp = fopen(argv[as_daemon?5:4], "rb");
if(!fp) {
printf("Error opening config file: ");
perror(argv[as_daemon?5:4]);
return 5;
}
SIMPLE_PB* spb = get_pb(fp);
cfg = (config_t*)spb->target;
fclose(fp);
if(!(fd = bind_server(&port))) {
return 6;
}
if(listen_socket()) {
perror("Listen failed");
return 7;
}
accept_client();
close(fd);
return 99;
}