Files
2026-04-09 00:14:57 +08:00

217 lines
5.7 KiB
Go

package stream
import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"github.com/gin-gonic/gin"
"github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/av/pubsub"
"github.com/nareix/joy4/format"
"github.com/nareix/joy4/format/flv"
"github.com/nareix/joy4/format/rtmp"
"hightube/internal/chat"
"hightube/internal/db"
"hightube/internal/model"
"hightube/internal/monitor"
)
func init() {
// Register all supported audio/video formats
format.RegisterAll()
}
// RTMPServer manages all active live streams
type RTMPServer struct {
server *rtmp.Server
channels map[string]*pubsub.Queue
mutex sync.RWMutex
}
type writeFlusher struct {
httpFlusher http.Flusher
io.Writer
}
func (w writeFlusher) Flush() error {
w.httpFlusher.Flush()
return nil
}
// NewRTMPServer creates and initializes a new media server
func NewRTMPServer() *RTMPServer {
s := &RTMPServer{
channels: make(map[string]*pubsub.Queue),
server: &rtmp.Server{},
}
// Triggered when a broadcaster (e.g., OBS) starts publishing
s.server.HandlePublish = func(conn *rtmp.Conn) {
streamPath := conn.URL.Path // Expected format: /live/{stream_key}
monitor.Infof("OBS publish attempt: %s", streamPath)
// Extract stream key from path
parts := strings.Split(streamPath, "/")
if len(parts) < 3 || parts[1] != "live" {
monitor.Warnf("Invalid publish path format: %s", streamPath)
return
}
streamKey := parts[2]
// Authenticate stream key
var room model.Room
if err := db.DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil {
monitor.Warnf("Invalid stream key: %s", streamKey)
return // Reject connection
}
monitor.Infof("Stream authenticated for room_id=%d", room.ID)
// 1. Get audio/video stream metadata
streams, err := conn.Streams()
if err != nil {
monitor.Errorf("Failed to parse stream headers: %v", err)
return
}
// 2. Map the active stream by Room ID so viewers can use /live/{room_id}
roomLivePath := fmt.Sprintf("/live/%d", room.ID)
s.mutex.Lock()
q := pubsub.NewQueue()
q.WriteHeader(streams)
s.channels[roomLivePath] = q
s.mutex.Unlock()
// Mark room as active in DB (using map to ensure true/false is correctly updated)
db.DB.Model(&room).Updates(map[string]interface{}{"is_active": true})
// 3. Cleanup on end
defer func() {
s.mutex.Lock()
delete(s.channels, roomLivePath)
s.mutex.Unlock()
q.Close()
// Explicitly set is_active to false using map
db.DB.Model(&room).Updates(map[string]interface{}{"is_active": false})
// Clear chat history for this room
chat.MainHub.ClearRoomHistory(fmt.Sprintf("%d", room.ID))
monitor.Infof("Publishing ended for room_id=%d", room.ID)
}()
// 4. Continuously copy data packets to our broadcast queue
avutil.CopyPackets(q, conn)
}
// Triggered when a viewer (e.g., VLC) requests playback
s.server.HandlePlay = func(conn *rtmp.Conn) {
streamPath := conn.URL.Path // Expected format: /live/{room_id}
monitor.Infof("RTMP play requested: %s", streamPath)
// 1. Look for the requested room's data queue
s.mutex.RLock()
q, ok := s.channels[streamPath]
s.mutex.RUnlock()
if !ok {
monitor.Warnf("Stream not found or inactive: %s", streamPath)
return
}
// 2. Get the cursor from the latest position and notify client of stream format
cursor := q.Latest()
streams, _ := cursor.Streams()
conn.WriteHeader(streams)
// 3. Cleanup on end
defer monitor.Infof("Playback ended: %s", streamPath)
// 4. Continuously copy data packets to the viewer
err := avutil.CopyPackets(conn, cursor)
if err != nil && err != io.EOF {
// 如果是客户端主动断开连接引起的错误,不将其作为严重错误打印
errStr := err.Error()
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
monitor.Infof("Viewer disconnected: %s", streamPath)
} else {
monitor.Errorf("Playback error on %s: %v", streamPath, err)
}
}
}
return s
}
// Start launches the RTMP server
func (s *RTMPServer) Start(addr string) error {
s.server.Addr = addr
monitor.Infof("RTMP server listening on %s", addr)
return s.server.ListenAndServe()
}
// HandleHTTPFLV serves browser-compatible HTTP-FLV playback for web clients.
func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) {
streamPath := fmt.Sprintf("/live/%s", c.Param("room_id"))
s.mutex.RLock()
q, ok := s.channels[streamPath]
s.mutex.RUnlock()
if !ok {
c.JSON(http.StatusNotFound, gin.H{"error": "Stream not found or inactive"})
return
}
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming is not supported by the current server"})
return
}
c.Header("Content-Type", "video/x-flv")
c.Header("Transfer-Encoding", "chunked")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
c.Status(http.StatusOK)
flusher.Flush()
muxer := flv.NewMuxerWriteFlusher(writeFlusher{
httpFlusher: flusher,
Writer: c.Writer,
})
cursor := q.Latest()
if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF {
errStr := err.Error()
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath)
return
}
monitor.Errorf("HTTP-FLV playback error on %s: %v", streamPath, err)
}
}
func (s *RTMPServer) ActiveStreamCount() int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return len(s.channels)
}
func (s *RTMPServer) ActiveStreamPaths() []string {
s.mutex.RLock()
defer s.mutex.RUnlock()
paths := make([]string, 0, len(s.channels))
for path := range s.channels {
paths = append(paths, path)
}
return paths
}