// Package ws ----------------------------- // @file : chatRoom.go // @author : JJXu // @contact : wavingbear@163.com // @time : 2022/10/21 18:17:17 // ------------------------------------------- package ws import ( "encoding/json" "fmt" "fonchain-fiee/pkg/utils" "github.com/gorilla/websocket" "go.uber.org/zap" "log" "runtime" "strconv" "sync" "time" ) const ( // Time allowed to write a notice to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong notice from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum notice size allowed from peer. maxMessageSize = 1024 ) func NewChatRoom() *ChatRoom { var room = ChatRoom{ clientsRwLocker: &sync.RWMutex{}, clients: make(map[int64]map[string]*Client), register: make(clientChan), UnRegister: make(clientChan), broadcast: make(broadcastChan), } go room.Run() return &room } type broadcastMessage struct { UserIds []int64 message []byte } type ( // []byte类型管道 用于客户端接收消息数据 messageChan chan []byte // wsConnChan chan *websocket.Conn // Client类型数据管道 clientChan chan *Client broadcastChan chan *broadcastMessage ) type ChatRoom struct { clientsRwLocker *sync.RWMutex //clients 客户端信息存储 //// 支持多客户端连接 map[userId]map[clientId]*Client clients map[int64]map[string]*Client //会话 map[sessionId][]*Client Session map[string][]*Client //register register 注册管道 register clientChan //unRegister 注销管道 接收需要注销的客户端 UnRegister clientChan broadcast broadcastChan } func (o *ChatRoom) Run() { //消息分发 for { select { // 注册事件 case newClient := <-o.register: ////删除临时map中的客户户端 //delete(o.tempClient, client.clientId) o.clientsRwLocker.Lock() //添加到客户端集合中 if o.clients[newClient.UserId] == nil { o.clients[newClient.UserId] = make(map[string]*Client) } o.clients[newClient.UserId][newClient.ClientId] = newClient //添加到会话集合中 if o.Session == nil { o.Session = make(map[string][]*Client) } //if _, ok := o.Session[newClient.SessionId]; ok { // for i, client := range o.Session[newClient.SessionId] { // if client.ClientId == newClient.ClientId { // //将之前的客户端注销 // o.UnRegister <- client // } // o.Session[newClient.SessionId][i] = newClient // } //} if newClient.Waiter { //客服人员没有默认会话窗口,而是自动加入所有用户的会话 for sessionId, _ := range o.Session { sessionId := sessionId if sessionId != newClient.SessionId { for _, client := range o.clients[newClient.UserId] { o.Session[sessionId] = append(o.Session[sessionId], client) } } } } else { //画家添加会话的逻辑 _, ok := o.Session[newClient.SessionId] if !ok { o.Session[newClient.SessionId] = make([]*Client, 0) //把客服拉入会话 for userId, clientInfo := range o.clients { if userId == newClient.UserId { continue } for i, client := range clientInfo { if client != nil && client.Waiter { //把客服人员客户端加入会话中 o.Session[newClient.SessionId] = append(o.Session[newClient.SessionId], clientInfo[i]) } } } } //再把自己的客户端加入会话 o.Session[newClient.SessionId] = append(o.Session[newClient.SessionId], newClient) } o.clientsRwLocker.Unlock() //注销事件 case client := <-o.UnRegister: //panic 恢复 defer func() { if r := recover(); r != "" { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] err, ok := r.(error) if !ok { err = fmt.Errorf("%v", r) } log.Fatal("close webosocket connection occured panic , recovered!", zap.Any("client", client), zap.Error(err), zap.String("stack", string(buf))) } }() fmt.Println("ws客户端注销事件触发") //从客户端集合中删除 if _, ok := o.clients[client.UserId]; ok { if client != nil && client.Conn != nil { //_ = client.Conn.WriteMessage(websocket.CloseMessage, []byte{}) _ = client.Conn.Close() } o.clients[client.UserId][client.ClientId] = nil delete(o.clients[client.UserId], client.ClientId) fmt.Printf("ws客户端%s 被注销\n", client.ClientId) } // 消息群发事件 case messageInfo := <-o.broadcast: o.Broadcast(messageInfo.message, messageInfo.UserIds...) } } } func (o *ChatRoom) Register(c *Client) (sessionId string) { if c.SessionId == "" && !c.Waiter { //这里的c经常拿不到sessionId,所以使用userId固定死 //c.SessionId = fmt.Sprintf("%d-%d", c.UserId, time.Now().Unix()) c.SessionId = fmt.Sprintf("%d", c.UserId) } o.register <- c return c.SessionId } // SendSessionMessage // sendUserId: 发送消息的用户id,消息提醒时,此用户将会被排除 // sessionId: 会话id // msgType: 消息类型 // message: 消息内容 func (o *ChatRoom) SendSessionMessage(sendUserId int64, sessionId string, msgType WsType, message any) (userIdInSession []int64, err error) { o.clientsRwLocker.Lock() defer o.clientsRwLocker.Unlock() var msg = WsSessionInfo{ Type: msgType, Content: message, } msgBytes, _ := json.Marshal(msg) if o.Session[sessionId] == nil { err = fmt.Errorf("该会话不存在或已失效") return } fmt.Println("ChatRoom.SendSessionMessage - 1") usableClients := []*Client{} fmt.Printf("sessionId:[%s],客户端数量%d\n", sessionId, len(o.Session[sessionId])) for i, client := range o.Session[sessionId] { if client != nil { _, exist := o.clients[client.UserId][client.ClientId] if exist { usableClients = append(usableClients, o.Session[sessionId][i]) } } fmt.Printf("client:%+v\n", client) if client != nil && client.UserId != sendUserId { client.Send <- msgBytes userIdInSession = append(userIdInSession, client.UserId) } //client.Send <- msgBytes } o.Session[sessionId] = usableClients fmt.Printf("sessionId:[%s],客户端数量%d\n", sessionId, len(o.Session[sessionId])) fmt.Println("userIdInSession", userIdInSession) return } func (o *ChatRoom) GetUserIdInSession(sessionId string, withoutUserId ...int64) (userIds []int64) { fmt.Printf("sessionId:%s withoutUserId:%d\n", sessionId, withoutUserId) //o.clientsRwLocker.RLock() //defer o.clientsRwLocker.RUnlock() fmt.Println("GetUserIdInSession 1") if o.Session[sessionId] != nil { fmt.Printf("GetUserIdInSession 2,o.Session[sessionId]:%+v", o.Session[sessionId]) for _, client := range o.Session[sessionId] { fmt.Println("session one of userId", client.UserId) var jump bool if withoutUserId != nil { for _, userId := range withoutUserId { if client.UserId == userId { jump = true continue } } } if !jump { fmt.Println("ADD USER", client.UserId) userId := client.UserId userIds = append(userIds, userId) } } } //针对app没有连接上websocket(聊天室没有检查到用户的客户端,此时websocket无法发送通知),但是需要推送app通知给用户的情况进行优化 fmt.Println("GetUserIdInSession 3,userIds:", userIds) if len(userIds) == 0 { sessionUserId, _ := strconv.Atoi(sessionId) add := true if sessionUserId != 0 { for _, v := range withoutUserId { if v == int64(sessionUserId) { add = false break } } } if add { userIds = append(userIds, int64(sessionUserId)) } fmt.Println("GetUserIdInSession 4,userIds:", userIds) } userIds = utils.Unique(userIds) fmt.Println("GetUserIdInSession 5,userIds:", userIds) return } // func (o ChatRoom) RegisterClient(c *Client) { // o.register <- c // } // // func (o ChatRoom) DeleteClient(c *Client) { // o.unRegister <- c // } func (o ChatRoom) Broadcast(message []byte, userIds ...int64) { // 如果userIds为空则群发,否则找到这个用户的ws对象 if userIds == nil { for _, userClients := range o.clients { for _, cli := range userClients { if cli == nil { o.UnRegister <- cli continue } go func() { err := cli.Conn.WriteMessage(websocket.TextMessage, message) if err != nil { o.UnRegister <- cli } }() } } } else { for _, userId := range userIds { userClients, ok := o.clients[userId] if ok == false { return } for _, cli := range userClients { if cli == nil { o.UnRegister <- cli continue } go func() { err := cli.Conn.WriteMessage(websocket.TextMessage, message) if err != nil { o.UnRegister <- cli } }() } } } }