autMan从2.4.7版本开始支持插件监听实时im的消息,安全起见,仅限于本地访问,支持插件使用。
采取Server-Sent Events (SSE) 通信,即:服务器通过HTTP连接向前端推送事件。客户端订阅一个事件源,而服务器将数据发送到这个事件源。
处理从Server-Sent Events(SSE)推送来的数据主要涉及以下步骤:
建立连接: 使用HTTP发起POST请求到autMan,地址/otto/msghook。body携带json数据,包括监听的渠道,监听的群组,监听的用户。例如:{"imtype":"qq","chatid":"11543756","userid":"282617666"}
读取数据流: 通过一个持续的过程读取流数据。
解析事件: SSE数据是以一定的格式发送的,通常每个事件包含一个或多个行,可能包括事件类型(event)、数据(data)、id(id)或注释(comment)。
处理消息: 根据读取到的行来区分消息类型和数据,并进行相应的业务逻辑处理。
下面是一个Go语言的代码示例,展示了如何连接SSE服务,以及如何读取和处理推送的数据:
package main
import (
"bufio"
"fmt"
"io"
"net/http"
"strings"
)
func main() {
// SSE服务的URL
sseUrl := "http://localhost:8080/otto/msghook"
// 向SSE服务端发起POST请求
req, err := http.NewRequest("POST", sseUrl, nil)
if err != nil {
fmt.Printf("Error creating request: %v\n", err)
return
}
// 设置Header
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Content-Type", "application/json")
// 设置body
req.Body = io.NopCloser(strings.NewReader(fmt.Sprintf(`{"imtype":"%s","chatid":"%s","userid":"%s"}`,imtype, chatid, userid)))
// 发起请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Error making request: %v\n", err)
return
}
// 延迟关闭
defer resp.Body.Close()
// 使用bufio.NewReader来按行读取服务端发送的数据
reader := bufio.NewReader(resp.Body)
// 启动一个协程专门用于循环读取服务端发送的数据
go func() {
for {
// 按行读取数据
data, err := reader.ReadBytes('\n')
if err != nil {
fmt.Printf("Read error: %v\n", err)
break
} else {
fmt.Printf("Read: %s\n", data)
}
// 这里简单地打印出来,实际应用中可能需要根据行的内容进行解析
msg := strings.TrimSuffix(string(data), "\n")
fmt.Printf("Received: %s", msg)
if strings.Contains(msg, "event:message") {
continue
} else {
msg = strings.TrimLeft(msg, "data:")
}
// 处理消息
msg = strings.ReplaceAll(msg, "\\n", "\n")
processSSELine(msg)
}
}()
// 堵塞,不让函数退出
<-exitChannel
}
在上面的示例中,我们创建了一个到SSE终端的POST请求,然后通过bufio.NewReader
逐行读取。每次读取到一行数据后,都会调用processSSELine
函数来处理它。
在processSSELine
函数中,我们对每行数据进行清洗和分析,如果行以 "data:"
开头,我们就认为它包含了事件数据。strings.HasPrefix
检查行是否以特定的前缀开头,strings.TrimSpace
移除行首尾的空格。
在实际的应用中,你可能还需要处理其他类型的行,例如id
用于设置事件的唯一标识,以及event
用于定义自定义的事件类型。你也需要准备应对网络中断和连接丢失,例如通过自动重新连接来增强应用的健壮性。