- Added chat history persistence for active rooms with auto-cleanup on stream end. - Overhauled Settings page with user profile, theme color picker, and password change. - Added backend API for user password updates. - Integrated flutter_launcher_icons and updated app icon to 'H' logo. - Fixed 'Duplicate keys' bug in danmaku by using UniqueKey and filtering historical messages. - Updated version to 1.0.0-beta3.5 and author info.
198 lines
4.6 KiB
Go
198 lines
4.6 KiB
Go
package chat
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
const (
|
|
writeWait = 10 * time.Second
|
|
pongWait = 60 * time.Second
|
|
pingPeriod = (pongWait * 9) / 10
|
|
maxMessageSize = 512
|
|
)
|
|
|
|
type Message struct {
|
|
Type string `json:"type"` // "chat", "system", "danmaku"
|
|
Username string `json:"username"`
|
|
Content string `json:"content"`
|
|
RoomID string `json:"room_id"`
|
|
IsHistory bool `json:"is_history"`
|
|
}
|
|
|
|
type Client struct {
|
|
Hub *Hub
|
|
Conn *websocket.Conn
|
|
Send chan []byte
|
|
RoomID string
|
|
Username string
|
|
}
|
|
|
|
type Hub struct {
|
|
rooms map[string]map[*Client]bool
|
|
roomsHistory map[string][]Message
|
|
broadcast chan Message
|
|
register chan *Client
|
|
unregister chan *Client
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
func NewHub() *Hub {
|
|
return &Hub{
|
|
broadcast: make(chan Message),
|
|
register: make(chan *Client),
|
|
unregister: make(chan *Client),
|
|
rooms: make(map[string]map[*Client]bool),
|
|
roomsHistory: make(map[string][]Message),
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Run() {
|
|
for {
|
|
select {
|
|
case client := <-h.register:
|
|
h.mutex.Lock()
|
|
if h.rooms[client.RoomID] == nil {
|
|
h.rooms[client.RoomID] = make(map[*Client]bool)
|
|
}
|
|
h.rooms[client.RoomID][client] = true
|
|
|
|
// Send existing history to the newly joined client
|
|
if history, ok := h.roomsHistory[client.RoomID]; ok {
|
|
for _, msg := range history {
|
|
msg.IsHistory = true
|
|
msgBytes, _ := json.Marshal(msg)
|
|
// Use select to avoid blocking if client's send channel is full
|
|
select {
|
|
case client.Send <- msgBytes:
|
|
default:
|
|
// If send fails, we could potentially log or ignore
|
|
}
|
|
}
|
|
}
|
|
h.mutex.Unlock()
|
|
|
|
case client := <-h.unregister:
|
|
h.mutex.Lock()
|
|
if rooms, ok := h.rooms[client.RoomID]; ok {
|
|
if _, ok := rooms[client]; ok {
|
|
delete(rooms, client)
|
|
close(client.Send)
|
|
// We no longer delete the room from h.rooms here if we want history to persist
|
|
// even if everyone leaves (as long as it's active in DB).
|
|
// But we should clean up if the room is empty and we want to save memory.
|
|
// However, the history is what matters.
|
|
if len(rooms) == 0 {
|
|
delete(h.rooms, client.RoomID)
|
|
}
|
|
}
|
|
}
|
|
h.mutex.Unlock()
|
|
|
|
case message := <-h.broadcast:
|
|
h.mutex.Lock()
|
|
// Only store "chat" and "danmaku" messages in history
|
|
if message.Type == "chat" || message.Type == "danmaku" {
|
|
h.roomsHistory[message.RoomID] = append(h.roomsHistory[message.RoomID], message)
|
|
// Limit history size to avoid memory leak (e.g., last 100 messages)
|
|
if len(h.roomsHistory[message.RoomID]) > 100 {
|
|
h.roomsHistory[message.RoomID] = h.roomsHistory[message.RoomID][1:]
|
|
}
|
|
}
|
|
|
|
clients := h.rooms[message.RoomID]
|
|
if clients != nil {
|
|
msgBytes, _ := json.Marshal(message)
|
|
for client := range clients {
|
|
select {
|
|
case client.Send <- msgBytes:
|
|
default:
|
|
close(client.Send)
|
|
delete(clients, client)
|
|
}
|
|
}
|
|
}
|
|
h.mutex.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ClearRoomHistory removes history for a room, should be called when stream ends
|
|
func (h *Hub) ClearRoomHistory(roomID string) {
|
|
h.mutex.Lock()
|
|
defer h.mutex.Unlock()
|
|
delete(h.roomsHistory, roomID)
|
|
}
|
|
|
|
func (h *Hub) RegisterClient(c *Client) {
|
|
h.register <- c
|
|
}
|
|
|
|
// BroadcastToRoom sends a message to the broadcast channel
|
|
func (h *Hub) BroadcastToRoom(msg Message) {
|
|
h.broadcast <- msg
|
|
}
|
|
|
|
func (c *Client) ReadPump() {
|
|
defer func() {
|
|
c.Hub.unregister <- c
|
|
c.Conn.Close()
|
|
}()
|
|
c.Conn.SetReadLimit(maxMessageSize)
|
|
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
|
|
for {
|
|
_, message, err := c.Conn.ReadMessage()
|
|
if err != nil {
|
|
break
|
|
}
|
|
var msg Message
|
|
if err := json.Unmarshal(message, &msg); err == nil {
|
|
msg.RoomID = c.RoomID
|
|
msg.Username = c.Username
|
|
c.Hub.broadcast <- msg
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) WritePump() {
|
|
ticker := time.NewTicker(pingPeriod)
|
|
defer func() {
|
|
ticker.Stop()
|
|
c.Conn.Close()
|
|
}()
|
|
for {
|
|
select {
|
|
case message, ok := <-c.Send:
|
|
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if !ok {
|
|
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
return
|
|
}
|
|
w, err := c.Conn.NextWriter(websocket.TextMessage)
|
|
if err != nil {
|
|
return
|
|
}
|
|
w.Write(message)
|
|
if err := w.Close(); err != nil {
|
|
return
|
|
}
|
|
case <-ticker.C:
|
|
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var MainHub *Hub
|
|
|
|
func InitChat() {
|
|
MainHub = NewHub()
|
|
go MainHub.Run()
|
|
}
|