316 lines
8.7 KiB
Go
316 lines
8.7 KiB
Go
|
// Package ws -----------------------------
|
|||
|
// @file : chatRoom.go
|
|||
|
// @author : JJXu
|
|||
|
// @contact : wavingbear@163.com
|
|||
|
// @time : 2022/10/21 18:17:17
|
|||
|
// -------------------------------------------
|
|||
|
package ws
|
|||
|
|
|||
|
import (
|
|||
|
"encoding/json"
|
|||
|
"fmt"
|
|||
|
"fonchain-fiee/pkg/utils"
|
|||
|
"github.com/gorilla/websocket"
|
|||
|
"go.uber.org/zap"
|
|||
|
"log"
|
|||
|
"runtime"
|
|||
|
"strconv"
|
|||
|
"sync"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
const (
|
|||
|
// Time allowed to write a notice to the peer.
|
|||
|
writeWait = 10 * time.Second
|
|||
|
|
|||
|
// Time allowed to read the next pong notice from the peer.
|
|||
|
pongWait = 60 * time.Second
|
|||
|
|
|||
|
// Send pings to peer with this period. Must be less than pongWait.
|
|||
|
pingPeriod = (pongWait * 9) / 10
|
|||
|
|
|||
|
// Maximum notice size allowed from peer.
|
|||
|
maxMessageSize = 1024
|
|||
|
)
|
|||
|
|
|||
|
func NewChatRoom() *ChatRoom {
|
|||
|
var room = ChatRoom{
|
|||
|
clientsRwLocker: &sync.RWMutex{},
|
|||
|
clients: make(map[int64]map[string]*Client),
|
|||
|
register: make(clientChan),
|
|||
|
UnRegister: make(clientChan),
|
|||
|
broadcast: make(broadcastChan),
|
|||
|
}
|
|||
|
go room.Run()
|
|||
|
return &room
|
|||
|
}
|
|||
|
|
|||
|
type broadcastMessage struct {
|
|||
|
UserIds []int64
|
|||
|
message []byte
|
|||
|
}
|
|||
|
type (
|
|||
|
// []byte类型管道 用于客户端接收消息数据
|
|||
|
messageChan chan []byte
|
|||
|
|
|||
|
//
|
|||
|
wsConnChan chan *websocket.Conn
|
|||
|
|
|||
|
// Client类型数据管道
|
|||
|
clientChan chan *Client
|
|||
|
|
|||
|
broadcastChan chan *broadcastMessage
|
|||
|
)
|
|||
|
|
|||
|
type ChatRoom struct {
|
|||
|
clientsRwLocker *sync.RWMutex
|
|||
|
//clients 客户端信息存储
|
|||
|
//// 支持多客户端连接 map[userId]map[clientId]*Client
|
|||
|
clients map[int64]map[string]*Client
|
|||
|
|
|||
|
//会话 map[sessionId][]*Client
|
|||
|
Session map[string][]*Client
|
|||
|
|
|||
|
//register register 注册管道
|
|||
|
register clientChan
|
|||
|
|
|||
|
//unRegister 注销管道 接收需要注销的客户端
|
|||
|
UnRegister clientChan
|
|||
|
|
|||
|
broadcast broadcastChan
|
|||
|
}
|
|||
|
|
|||
|
func (o *ChatRoom) Run() {
|
|||
|
//消息分发
|
|||
|
for {
|
|||
|
select {
|
|||
|
// 注册事件
|
|||
|
case newClient := <-o.register:
|
|||
|
////删除临时map中的客户户端
|
|||
|
//delete(o.tempClient, client.clientId)
|
|||
|
o.clientsRwLocker.Lock()
|
|||
|
//添加到客户端集合中
|
|||
|
if o.clients[newClient.UserId] == nil {
|
|||
|
o.clients[newClient.UserId] = make(map[string]*Client)
|
|||
|
}
|
|||
|
o.clients[newClient.UserId][newClient.ClientId] = newClient
|
|||
|
//添加到会话集合中
|
|||
|
if o.Session == nil {
|
|||
|
o.Session = make(map[string][]*Client)
|
|||
|
}
|
|||
|
//if _, ok := o.Session[newClient.SessionId]; ok {
|
|||
|
// for i, client := range o.Session[newClient.SessionId] {
|
|||
|
// if client.ClientId == newClient.ClientId {
|
|||
|
// //将之前的客户端注销
|
|||
|
// o.UnRegister <- client
|
|||
|
// }
|
|||
|
// o.Session[newClient.SessionId][i] = newClient
|
|||
|
// }
|
|||
|
//}
|
|||
|
if newClient.Waiter {
|
|||
|
//客服人员没有默认会话窗口,而是自动加入所有用户的会话
|
|||
|
for sessionId, _ := range o.Session {
|
|||
|
sessionId := sessionId
|
|||
|
if sessionId != newClient.SessionId {
|
|||
|
for _, client := range o.clients[newClient.UserId] {
|
|||
|
o.Session[sessionId] = append(o.Session[sessionId], client)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
} else {
|
|||
|
//画家添加会话的逻辑
|
|||
|
_, ok := o.Session[newClient.SessionId]
|
|||
|
if !ok {
|
|||
|
o.Session[newClient.SessionId] = make([]*Client, 0)
|
|||
|
//把客服拉入会话
|
|||
|
for userId, clientInfo := range o.clients {
|
|||
|
if userId == newClient.UserId {
|
|||
|
continue
|
|||
|
}
|
|||
|
for i, client := range clientInfo {
|
|||
|
if client != nil && client.Waiter {
|
|||
|
//把客服人员客户端加入会话中
|
|||
|
o.Session[newClient.SessionId] = append(o.Session[newClient.SessionId], clientInfo[i])
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
//再把自己的客户端加入会话
|
|||
|
o.Session[newClient.SessionId] = append(o.Session[newClient.SessionId], newClient)
|
|||
|
}
|
|||
|
o.clientsRwLocker.Unlock()
|
|||
|
//注销事件
|
|||
|
case client := <-o.UnRegister:
|
|||
|
//panic 恢复
|
|||
|
defer func() {
|
|||
|
if r := recover(); r != "" {
|
|||
|
const size = 64 << 10
|
|||
|
buf := make([]byte, size)
|
|||
|
buf = buf[:runtime.Stack(buf, false)]
|
|||
|
err, ok := r.(error)
|
|||
|
if !ok {
|
|||
|
err = fmt.Errorf("%v", r)
|
|||
|
}
|
|||
|
log.Fatal("close webosocket connection occured panic , recovered!", zap.Any("client", client), zap.Error(err), zap.String("stack", string(buf)))
|
|||
|
}
|
|||
|
}()
|
|||
|
fmt.Println("ws客户端注销事件触发")
|
|||
|
//从客户端集合中删除
|
|||
|
if _, ok := o.clients[client.UserId]; ok {
|
|||
|
if client != nil && client.Conn != nil {
|
|||
|
//_ = client.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|||
|
_ = client.Conn.Close()
|
|||
|
}
|
|||
|
o.clients[client.UserId][client.ClientId] = nil
|
|||
|
delete(o.clients[client.UserId], client.ClientId)
|
|||
|
fmt.Printf("ws客户端%s 被注销\n", client.ClientId)
|
|||
|
}
|
|||
|
// 消息群发事件
|
|||
|
case messageInfo := <-o.broadcast:
|
|||
|
o.Broadcast(messageInfo.message, messageInfo.UserIds...)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
func (o *ChatRoom) Register(c *Client) (sessionId string) {
|
|||
|
if c.SessionId == "" && !c.Waiter {
|
|||
|
//这里的c经常拿不到sessionId,所以使用userId固定死
|
|||
|
//c.SessionId = fmt.Sprintf("%d-%d", c.UserId, time.Now().Unix())
|
|||
|
c.SessionId = fmt.Sprintf("%d", c.UserId)
|
|||
|
}
|
|||
|
o.register <- c
|
|||
|
return c.SessionId
|
|||
|
}
|
|||
|
|
|||
|
// SendSessionMessage
|
|||
|
// sendUserId: 发送消息的用户id,消息提醒时,此用户将会被排除
|
|||
|
// sessionId: 会话id
|
|||
|
// msgType: 消息类型
|
|||
|
// message: 消息内容
|
|||
|
func (o *ChatRoom) SendSessionMessage(sendUserId int64, sessionId string, msgType WsType, message any) (userIdInSession []int64, err error) {
|
|||
|
o.clientsRwLocker.Lock()
|
|||
|
defer o.clientsRwLocker.Unlock()
|
|||
|
var msg = WsSessionInfo{
|
|||
|
Type: msgType,
|
|||
|
Content: message,
|
|||
|
}
|
|||
|
msgBytes, _ := json.Marshal(msg)
|
|||
|
if o.Session[sessionId] == nil {
|
|||
|
err = fmt.Errorf("该会话不存在或已失效")
|
|||
|
return
|
|||
|
}
|
|||
|
fmt.Println("ChatRoom.SendSessionMessage - 1")
|
|||
|
usableClients := []*Client{}
|
|||
|
fmt.Printf("sessionId:[%s],客户端数量%d\n", sessionId, len(o.Session[sessionId]))
|
|||
|
for i, client := range o.Session[sessionId] {
|
|||
|
if client != nil {
|
|||
|
_, exist := o.clients[client.UserId][client.ClientId]
|
|||
|
if exist {
|
|||
|
usableClients = append(usableClients, o.Session[sessionId][i])
|
|||
|
}
|
|||
|
}
|
|||
|
fmt.Printf("client:%+v\n", client)
|
|||
|
if client != nil && client.UserId != sendUserId {
|
|||
|
client.Send <- msgBytes
|
|||
|
userIdInSession = append(userIdInSession, client.UserId)
|
|||
|
}
|
|||
|
//client.Send <- msgBytes
|
|||
|
}
|
|||
|
o.Session[sessionId] = usableClients
|
|||
|
fmt.Printf("sessionId:[%s],客户端数量%d\n", sessionId, len(o.Session[sessionId]))
|
|||
|
fmt.Println("userIdInSession", userIdInSession)
|
|||
|
return
|
|||
|
}
|
|||
|
func (o *ChatRoom) GetUserIdInSession(sessionId string, withoutUserId ...int64) (userIds []int64) {
|
|||
|
fmt.Printf("sessionId:%s withoutUserId:%d\n", sessionId, withoutUserId)
|
|||
|
//o.clientsRwLocker.RLock()
|
|||
|
//defer o.clientsRwLocker.RUnlock()
|
|||
|
fmt.Println("GetUserIdInSession 1")
|
|||
|
if o.Session[sessionId] != nil {
|
|||
|
fmt.Printf("GetUserIdInSession 2,o.Session[sessionId]:%+v", o.Session[sessionId])
|
|||
|
for _, client := range o.Session[sessionId] {
|
|||
|
fmt.Println("session one of userId", client.UserId)
|
|||
|
var jump bool
|
|||
|
if withoutUserId != nil {
|
|||
|
for _, userId := range withoutUserId {
|
|||
|
if client.UserId == userId {
|
|||
|
jump = true
|
|||
|
continue
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
if !jump {
|
|||
|
fmt.Println("ADD USER", client.UserId)
|
|||
|
userId := client.UserId
|
|||
|
userIds = append(userIds, userId)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
//针对app没有连接上websocket(聊天室没有检查到用户的客户端,此时websocket无法发送通知),但是需要推送app通知给用户的情况进行优化
|
|||
|
fmt.Println("GetUserIdInSession 3,userIds:", userIds)
|
|||
|
if len(userIds) == 0 {
|
|||
|
sessionUserId, _ := strconv.Atoi(sessionId)
|
|||
|
add := true
|
|||
|
if sessionUserId != 0 {
|
|||
|
for _, v := range withoutUserId {
|
|||
|
if v == int64(sessionUserId) {
|
|||
|
add = false
|
|||
|
break
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
if add {
|
|||
|
userIds = append(userIds, int64(sessionUserId))
|
|||
|
}
|
|||
|
fmt.Println("GetUserIdInSession 4,userIds:", userIds)
|
|||
|
}
|
|||
|
userIds = utils.Unique(userIds)
|
|||
|
fmt.Println("GetUserIdInSession 5,userIds:", userIds)
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// func (o ChatRoom) RegisterClient(c *Client) {
|
|||
|
// o.register <- c
|
|||
|
// }
|
|||
|
//
|
|||
|
// func (o ChatRoom) DeleteClient(c *Client) {
|
|||
|
// o.unRegister <- c
|
|||
|
// }
|
|||
|
func (o ChatRoom) Broadcast(message []byte, userIds ...int64) {
|
|||
|
// 如果userIds为空则群发,否则找到这个用户的ws对象
|
|||
|
if userIds == nil {
|
|||
|
for _, userClients := range o.clients {
|
|||
|
for _, cli := range userClients {
|
|||
|
if cli == nil {
|
|||
|
o.UnRegister <- cli
|
|||
|
continue
|
|||
|
}
|
|||
|
go func() {
|
|||
|
err := cli.Conn.WriteMessage(websocket.TextMessage, message)
|
|||
|
if err != nil {
|
|||
|
o.UnRegister <- cli
|
|||
|
}
|
|||
|
}()
|
|||
|
}
|
|||
|
}
|
|||
|
} else {
|
|||
|
for _, userId := range userIds {
|
|||
|
userClients, ok := o.clients[userId]
|
|||
|
if ok == false {
|
|||
|
return
|
|||
|
}
|
|||
|
for _, cli := range userClients {
|
|||
|
if cli == nil {
|
|||
|
o.UnRegister <- cli
|
|||
|
continue
|
|||
|
}
|
|||
|
go func() {
|
|||
|
err := cli.Conn.WriteMessage(websocket.TextMessage, message)
|
|||
|
if err != nil {
|
|||
|
o.UnRegister <- cli
|
|||
|
}
|
|||
|
}()
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|