Phase 4.0: WebSocket chat and danmaku system implemented

This commit is contained in:
2026-03-18 15:30:03 +08:00
parent d05ec7ccdf
commit 9c7261cbda
11 changed files with 507 additions and 44 deletions

View File

@@ -0,0 +1,51 @@
package api
import (
"fmt"
"net/http"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"hightube/internal/chat"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // Allow all connections
},
}
// WSHandler handles websocket requests from clients
func WSHandler(c *gin.Context) {
roomID := c.Param("room_id")
username := c.DefaultQuery("username", "Anonymous")
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
fmt.Printf("[WS ERROR] Failed to upgrade: %v\n", err)
return
}
client := &chat.Client{
Hub: chat.MainHub,
Conn: conn,
Send: make(chan []byte, 256),
RoomID: roomID,
Username: username,
}
client.Hub.RegisterClient(client)
// Start reading and writing loops in goroutines
go client.WritePump()
go client.ReadPump()
// Optionally broadcast a system message: User Joined
chat.MainHub.BroadcastToRoom(chat.Message{
Type: "system",
Username: "System",
Content: fmt.Sprintf("%s joined the room", username),
RoomID: roomID,
})
}

View File

@@ -18,6 +18,9 @@ func SetupRouter() *gin.Engine {
r.POST("/api/register", Register)
r.POST("/api/login", Login)
r.GET("/api/rooms/active", GetActiveRooms)
// WebSocket endpoint for live chat
r.GET("/api/ws/room/:room_id", WSHandler)
// Protected routes (require JWT)
authGroup := r.Group("/api")

View File

@@ -0,0 +1,160 @@
package chat
import (
"encoding/json"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
maxMessageSize = 512
)
type Message struct {
Type string `json:"type"` // "chat", "system", "danmaku"
Username string `json:"username"`
Content string `json:"content"`
RoomID string `json:"room_id"`
}
type Client struct {
Hub *Hub
Conn *websocket.Conn
Send chan []byte
RoomID string
Username string
}
type Hub struct {
rooms map[string]map[*Client]bool
broadcast chan Message
register chan *Client
unregister chan *Client
mutex sync.RWMutex
}
func NewHub() *Hub {
return &Hub{
broadcast: make(chan Message),
register: make(chan *Client),
unregister: make(chan *Client),
rooms: make(map[string]map[*Client]bool),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mutex.Lock()
if h.rooms[client.RoomID] == nil {
h.rooms[client.RoomID] = make(map[*Client]bool)
}
h.rooms[client.RoomID][client] = true
h.mutex.Unlock()
case client := <-h.unregister:
h.mutex.Lock()
if rooms, ok := h.rooms[client.RoomID]; ok {
if _, ok := rooms[client]; ok {
delete(rooms, client)
close(client.Send)
if len(rooms) == 0 {
delete(h.rooms, client.RoomID)
}
}
}
h.mutex.Unlock()
case message := <-h.broadcast:
h.mutex.RLock()
clients := h.rooms[message.RoomID]
if clients != nil {
msgBytes, _ := json.Marshal(message)
for client := range clients {
select {
case client.Send <- msgBytes:
default:
close(client.Send)
delete(clients, client)
}
}
}
h.mutex.RUnlock()
}
}
}
func (h *Hub) RegisterClient(c *Client) {
h.register <- c
}
// BroadcastToRoom sends a message to the broadcast channel
func (h *Hub) BroadcastToRoom(msg Message) {
h.broadcast <- msg
}
func (c *Client) ReadPump() {
defer func() {
c.Hub.unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
break
}
var msg Message
if err := json.Unmarshal(message, &msg); err == nil {
msg.RoomID = c.RoomID
msg.Username = c.Username
c.Hub.broadcast <- msg
}
}
}
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.Conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
var MainHub *Hub
func InitChat() {
MainHub = NewHub()
go MainHub.Run()
}