From 6a42e70f55ab6b0bf6a3157c779d09d6b0279b6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Sun, 15 Oct 2023 15:23:18 +0900 Subject: [PATCH] =?UTF-8?q?finish=20bot=20=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.go | 99 +++++++++++++++++++++++++++++++++++------------------- helper.go | 28 ++++++++++----- payload.go | 7 +++- 3 files changed, 91 insertions(+), 43 deletions(-) diff --git a/bot.go b/bot.go index 6798c7c..daf97fe 100644 --- a/bot.go +++ b/bot.go @@ -8,12 +8,12 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "unsafe" "github.com/RomiChan/syncx" "github.com/RomiChan/websocket" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -56,7 +56,7 @@ func (b *Bot) Init(gateway string, shard [2]byte) *Bot { continue } tp := t.Field(i).Name[2:] // skip On - log.Infoln("[bot] 注册处理函数", tp) + log.Infoln(getLogHeader(), "注册处理函数", tp) handler := f.Interface() b.handlers[tp] = *(*GeneralHandleType)(unsafe.Add(unsafe.Pointer(&handler), unsafe.Sizeof(uintptr(0)))) } @@ -80,7 +80,7 @@ func (bot *Bot) reveive() (payload WebsocketPayload, err error) { // https://bot.q.qq.com/wiki/develop/api/gateway/reference.html#_1-%E8%BF%9E%E6%8E%A5%E5%88%B0-gateway func (bot *Bot) Connect() *Bot { network, address := resolveURI(bot.gateway) - log.Infoln("[bot] 开始尝试连接到网关:", address, ", AppID:", bot.AppID) + log.Infoln(getLogHeader(), "开始尝试连接到网关:", address, ", AppID:", bot.AppID) dialer := websocket.Dialer{ NetDial: func(_, addr string) (net.Conn, error) { if network == "unix" { @@ -99,7 +99,7 @@ func (bot *Bot) Connect() *Bot { for { conn, resp, err := dialer.Dial(address, http.Header{}) if err != nil { - log.Warnf("[bot] 连接到网关 %v 时出现错误: %v", bot.gateway, err) + log.Warnf(getLogHeader(), "连接到网关 %v 时出现错误: %v", bot.gateway, err) time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue } @@ -107,14 +107,14 @@ func (bot *Bot) Connect() *Bot { _ = resp.Body.Close() payload, err := bot.reveive() if err != nil { - log.Warnln("[bot] 获取心跳间隔时出现错误:", err) + log.Warnln(getLogHeader(), "获取心跳间隔时出现错误:", err) _ = conn.Close() time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue } - bot.heartbeat, err = payload.GetHeartbeatInterval() + hb, err := payload.GetHeartbeatInterval() if err != nil { - log.Warnln("[bot] 解析心跳间隔时出现错误:", err) + log.Warnln(getLogHeader(), "解析心跳间隔时出现错误:", err) _ = conn.Close() time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue @@ -127,36 +127,37 @@ func (bot *Bot) Connect() *Bot { Properties: bot.Properties, }) if err != nil { - log.Warnln("[bot] 包装 Identify 时出现错误:", err) + log.Warnln(getLogHeader(), "包装 Identify 时出现错误:", err) _ = conn.Close() time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue } err = bot.SendPayload(&payload) if err != nil { - log.Warnln("[bot] 发送 Identify 时出现错误:", err) + log.Warnln(getLogHeader(), "发送 Identify 时出现错误:", err) _ = conn.Close() time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue } payload, err = bot.reveive() if err != nil { - log.Warnln("[bot] 获取 EventReady 时出现错误:", err) + log.Warnln(getLogHeader(), "获取 EventReady 时出现错误:", err) _ = conn.Close() time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue } bot.ready, err = payload.GetEventReady() if err != nil { - log.Warnln("[bot] 解析 EventReady 时出现错误:", err) + log.Warnln(getLogHeader(), "解析 EventReady 时出现错误:", err) _ = conn.Close() time.Sleep(2 * time.Second) // 等待两秒后重新连接 continue } + atomic.StoreUint32(&bot.heartbeat, hb) break } - clients.Store(bot.Token, bot) - log.Infoln("[bot] 连接到网关成功, 用户名:", bot.ready.User.Username) + clients.Store(bot.Token+"_"+strconv.Itoa(int(bot.shard[0])), bot) + log.Infoln(getLogHeader(), "连接到网关成功, 用户名:", bot.ready.User.Username) bot.hbonce.Do(func() { go bot.doheartbeat() }) @@ -169,10 +170,13 @@ func (bot *Bot) doheartbeat() { Op OpCode `json:"op"` D *uint32 `json:"d"` }{Op: OpCodeHeartbeat} - t := time.NewTicker(time.Duration(bot.heartbeat) * time.Millisecond) - defer t.Stop() - time.Sleep(time.Minute) - for range t.C { + for { + if atomic.LoadUint32(&bot.heartbeat) == 0 { + time.Sleep(time.Second) + log.Warnln(getLogHeader(), "等待服务器建立连接...") + continue + } + time.Sleep(time.Duration(bot.heartbeat) * time.Millisecond) if bot.seq == 0 { payload.D = nil } else { @@ -182,7 +186,7 @@ func (bot *Bot) doheartbeat() { err := bot.conn.WriteJSON(&payload) bot.mu.Unlock() if err != nil { - log.Warnln("[bot] 发送心跳时出现错误:", err) + log.Warnln(getLogHeader(), "发送心跳时出现错误:", err) } } } @@ -216,39 +220,66 @@ func (bot *Bot) Resume() error { payload := WebsocketPayload{Op: OpCodeResume} payload.WrapData(&struct { T string `json:"token"` - S string `json:"session_id_i_stored"` + S string `json:"session_id"` Q uint32 `json:"seq"` }{bot.Authorization(), bot.ready.SessionID, bot.seq}) - payload, err = bot.reveive() - if err != nil { - return err - } - if payload.Op == OpCodeDispatch && payload.T == "RESUMED" { - return nil - } - return errors.New(getThisFuncName() + " unexpected OpCode " + strconv.Itoa(int(payload.Op)) + ", T: " + payload.T + ", D: " + BytesToString(payload.D)) + return bot.SendPayload(&payload) } // Listen 监听事件 func (bot *Bot) Listen() { - log.Infoln("[bot] 开始监听", bot.ready.User.Username, "的事件") + log.Infoln(getLogHeader(), "开始监听", bot.ready.User.Username, "的事件") payload := WebsocketPayload{} + lastheartbeat := time.Now() for { + payload.Reset() err := bot.conn.ReadJSON(&payload) if err != nil { // reconnect - clients.Delete(bot.Token) - log.Warn("[bot]", bot.ready.User.Username, "的网关连接断开, 尝试恢复...") + atomic.StoreUint32(&bot.heartbeat, 0) + k := bot.Token + "_" + strconv.Itoa(int(bot.shard[0])) + clients.Delete(k) + log.Warnln(getLogHeader(), bot.ready.User.Username, "的网关连接断开, 尝试恢复:", err) for { - time.Sleep(time.Millisecond * time.Duration(3)) + time.Sleep(time.Second) err = bot.Resume() if err == nil { break } - log.Warn("[bot]", bot.ready.User.Username, "的网关连接恢复失败:", err) + log.Warnln(getLogHeader(), bot.ready.User.Username, "的网关连接恢复失败:", err) } - clients.Store(bot.Token, bot) + clients.Store(k, bot) continue } - log.Debugln("[bot] 接收到第", payload.S, "个事件:", payload.Op, ", 类型:", payload.T) + log.Debugln(getLogHeader(), "接收到第", payload.S, "个事件:", payload.Op, ", 类型:", payload.T, ", 数据:", BytesToString(payload.D)) + bot.seq = payload.S + switch payload.Op { + case OpCodeDispatch: // Receive + switch payload.T { + case "RESUMED": + log.Infoln(getLogHeader(), bot.ready.User.Username, "的网关连接恢复完成") + } + case OpCodeHeartbeat: // Send/Receive + log.Debugln(getLogHeader(), "收到服务端推送心跳, 间隔:", time.Since(lastheartbeat)) + lastheartbeat = time.Now() + case OpCodeReconnect: // Receive + log.Warnln(getLogHeader(), "收到服务端通知重连") + atomic.StoreUint32(&bot.heartbeat, 0) + bot.Connect() + case OpCodeInvalidSession: // Receive + log.Warnln(getLogHeader(), bot.ready.User.Username, "的网关连接恢复失败: InvalidSession, 尝试重连...") + atomic.StoreUint32(&bot.heartbeat, 0) + bot.Connect() + case OpCodeHello: // Receive + intv, err := payload.GetHeartbeatInterval() + if err != nil { + log.Warnln(getLogHeader(), "解析心跳间隔时出现错误:", err) + continue + } + atomic.StoreUint32(&bot.heartbeat, intv) + case OpCodeHeartbeatACK: // Receive/Reply + log.Debugln(getLogHeader(), "收到心跳返回, 间隔:", time.Since(lastheartbeat)) + lastheartbeat = time.Now() + case OpCodeHTTPCallbackACK: // Reply + } } } diff --git a/helper.go b/helper.go index 7958e86..8127ded 100644 --- a/helper.go +++ b/helper.go @@ -8,27 +8,39 @@ import ( "unsafe" ) -func getFuncNameWithSkip(n int) string { - pc, _, _, ok := runtime.Caller(n) +func getFuncAndFileNameWithSkip(n int) (string, string) { + pc, fn, _, ok := runtime.Caller(n) if !ok { - return "" + return "", "" + } + i := strings.LastIndex(fn, "/") + 1 + if i > 0 { + fn = strings.TrimSuffix(fn[i:], ".go") } fullname := runtime.FuncForPC(pc).Name() - i := strings.LastIndex(fullname, ".") + 1 + i = strings.LastIndex(fullname, ".") + 1 if i <= 0 || i >= len(fullname) { - return fullname + return fullname, fn } - return fullname[i:] + return fullname[i:], fn } // getThisFuncName 获取正在执行的函数名 func getThisFuncName() string { - return getFuncNameWithSkip(2) + x, _ := getFuncAndFileNameWithSkip(2) + return x } // getCallerFuncName 获取调用者函数名 func getCallerFuncName() string { - return getFuncNameWithSkip(3) + x, _ := getFuncAndFileNameWithSkip(3) + return x +} + +// getLogHeader [文件名.函数名] +func getLogHeader() string { + funcname, filename := getFuncAndFileNameWithSkip(2) + return "[" + filename + "." + funcname + "]" } // MessageEscape 消息转义 diff --git a/payload.go b/payload.go index f8cba02..0d01d53 100644 --- a/payload.go +++ b/payload.go @@ -12,10 +12,15 @@ import ( type WebsocketPayload struct { Op OpCode `json:"op"` D json.RawMessage `json:"d,omitempty"` - S int `json:"s,omitempty"` + S uint32 `json:"s,omitempty"` T string `json:"t,omitempty"` } +// Reset 恢复到 0 值 +func (wp *WebsocketPayload) Reset() { + *wp = WebsocketPayload{} +} + // GetHeartbeatInterval OpCodeHello 获得心跳周期 单位毫秒 func (wp *WebsocketPayload) GetHeartbeatInterval() (uint32, error) { if wp.Op != OpCodeHello {