// Package asChat ----------------------------- // @file : cache.go // @author : JJXu // @contact : wavingbear@163.com // @time : 2024/9/11 下午5:18 // ------------------------------------------- package asChat import ( "context" "errors" "fmt" "fonchain-fiee/api/accountFiee" "fonchain-fiee/pkg/cache" "github.com/go-redis/redis" "github.com/goccy/go-json" "go.uber.org/zap" "log" "strings" "sync" "time" ) const CacheChatRecordKey = "chatRecord" const CacheSessionKey = "chatSession" const CacheNewMsgStatKey = "newMsgStat" var chatCacheLocker sync.RWMutex type ChatCache struct { newMessageStatExpireAfter time.Duration //消息统计的数据过期时间 } // ------------------------------存储用户的会话ID-------------------------------- func (cr ChatCache) GetUserSessionCacheKey(userId int64) string { return fmt.Sprintf("%s:%d", CacheSessionKey, userId) } func (cr ChatCache) SaveUserSession(userId int64, sessionId string) { chatCacheLocker.Lock() defer chatCacheLocker.Unlock() ////var c = context.Background() err := cache.RedisClient.Set(cr.GetUserSessionCacheKey(userId), sessionId, 0).Err() if err != nil { log.Fatal("保存用户会话失败", zap.Error(err)) } } func (cr ChatCache) GetUserSession(userId int64) (sessionId string) { fmt.Println("GetUserSession-1") chatCacheLocker.RLock() defer chatCacheLocker.RUnlock() //var c = context.Background() sessionId, err := cache.RedisClient.Get(cr.GetUserSessionCacheKey(userId)).Result() fmt.Println("GetUserSession-2") if err != nil { if err.Error() == "redis: nil" { err = nil } else { log.Fatal("获取用户会话失败", zap.Error(err)) } } fmt.Println("GetUserSession-3, sessionId:", sessionId) return } // ------------------------------存储会话的聊天记录-------------------------------- func (cr ChatCache) GetChatRecordCacheKey(sessionId string) string { return fmt.Sprintf("%s:%s", CacheChatRecordKey, sessionId) } func (cr ChatCache) AddChatRecord(sessionId string, data ...*accountFiee.ChatRecordData) (err error) { ////var c = context.Background() messages := cr.GetChatRecord(sessionId) fmt.Printf("AddChatRecord add data:%+v\n", data) messages = append(messages, data...) cacheBytes, _ := json.Marshal(messages) fmt.Println("Marshal result", string(cacheBytes)) err = cache.RedisClient.Set(cr.GetChatRecordCacheKey(sessionId), cacheBytes, 2*time.Hour).Err() return } func (cr ChatCache) CoverChatRecord(sessionId string, data []*accountFiee.ChatRecordData) (err error) { chatCacheLocker.Lock() defer chatCacheLocker.Unlock() //var c = context.Background() cacheBytes, _ := json.Marshal(data) err = cache.RedisClient.Set(cr.GetChatRecordCacheKey(sessionId), cacheBytes, 2*time.Hour).Err() return } func (cr ChatCache) GetChatRecord(sessionId string) (data []*accountFiee.ChatRecordData) { chatCacheLocker.RLock() defer chatCacheLocker.RUnlock() data = make([]*accountFiee.ChatRecordData, 0) //var c = context.Background() messages, err := cache.RedisClient.Get(cr.GetChatRecordCacheKey(sessionId)).Bytes() if err != nil { if err.Error() == "redis: nil" { err = nil } //log.Fatal("获取聊天记录失败", zap.Error(err)) return } fmt.Printf("cache data: %+v", string(messages)) if len(messages) > 0 { _ = json.Unmarshal(messages, &data) } return } // ------------------------------存储新消息统计-------------------------------- func (cr ChatCache) GetNewMsgStatCacheKey(ownerId int64) string { return fmt.Sprintf("%s:%d", CacheNewMsgStatKey, ownerId) } // 消息数量自增 func (cr ChatCache) IncreaseNewMessageTotal(ownerId int64, sessionId string) (err error) { chatCacheLocker.Lock() defer chatCacheLocker.Unlock() ctx := context.Background() data := cr.GetNewMessageStat(ctx, ownerId) if len(data) > 0 { foundIndex := -1 for i, v := range data { if v.SessionId == sessionId { foundIndex = i break } } if foundIndex > -1 { data[foundIndex].Total += 1 } //将foundIndex之后的所有元素右移动一位 if foundIndex > 0 { elementToMove := data[foundIndex] copy(data[1:], data[0:foundIndex]) data[0] = elementToMove } else if foundIndex == -1 { data = append([]UserMsgStatic{{SessionId: sessionId, Total: 1}}, data...) } } else { data = []UserMsgStatic{{SessionId: sessionId, Total: 1}} } return cr.coverOwnerNewMessageStat(ctx, ownerId, data) } // 重置新消息数量 func (cr ChatCache) ResetNewMessageTotal(ownerId int64, sessionId string, total ...int64) error { chatCacheLocker.Lock() defer chatCacheLocker.Unlock() var tl int64 if len(total) > 0 { tl = total[0] } ctx := context.Background() data := cr.GetNewMessageStat(ctx, ownerId) found := false for i, v := range data { if v.SessionId == sessionId { found = true data[i].Total = tl break } } if !found { data = append(data, UserMsgStatic{ SessionId: sessionId, Total: tl, }) } return cr.coverOwnerNewMessageStat(ctx, ownerId, data) } func (cr ChatCache) RecountNewMessageTotal(ownerId int64) { //var c = context.Background() var keys []string var err error keys, err = cache.RedisClient.Keys(CacheChatRecordKey + "*").Result() if err != nil { log.Fatal("获取聊天记录所有缓存KEY失败", zap.Error(err)) return } var countMap = make(map[string]int) for _, key := range keys { var messages []byte var data []*accountFiee.ChatRecordData messages, err = cache.RedisClient.Get(key).Bytes() if err != nil { if err.Error() == "redis: nil" { err = nil } log.Fatal("获取聊天记录失败", zap.Error(err)) data = make([]*accountFiee.ChatRecordData, 0) continue } if len(messages) > 0 { _ = json.Unmarshal(messages, &data) } var sessionId = strings.Split(key, ":")[1] countMap[sessionId] = 0 for _, v := range data { if v.WaiterRead == 2 { //统计未读消息数量 countMap[sessionId]++ } } } for sessionId, count := range countMap { err = cr.ResetNewMessageTotal(ownerId, sessionId, int64(count)) if err != nil { log.Fatal("重置新消息数量统计", zap.String("function", "RecountNewMessageTotal"), zap.Int64("ownerId", ownerId), zap.String("sessionId", sessionId), zap.Int("count", count), zap.Error(err), ) } } return } // erp获取最新的消息统计 func (cr ChatCache) GetNewMessageStat(ctx context.Context, ownerId int64) (result []UserMsgStatic) { //chatCacheLocker.RLock() //defer chatCacheLocker.RUnlock() result = make([]UserMsgStatic, 0) vals, err := cache.RedisClient.Get(cr.GetNewMsgStatCacheKey(ownerId)).Bytes() if err != nil && errors.Is(err, redis.Nil) { log.Fatal("从缓存获取新消息统计失败", zap.Error(err), zap.Int64("ownerId", ownerId)) return } if vals != nil { _ = json.Unmarshal(vals, &result) } return } // 覆盖指定erp用户的新消息统计 func (cr ChatCache) coverOwnerNewMessageStat(ctx context.Context, ownerId int64, data []UserMsgStatic) (err error) { value, _ := json.Marshal(data) //err = cache.RedisClient.Set(ctx, cr.GetNewMsgStatCacheKey(ownerId), value, cr.newMessageStatExpireAfter).Err() err = cache.RedisClient.Set(cr.GetNewMsgStatCacheKey(ownerId), value, 0).Err() return }