package stream import ( "fmt" "io" "net/http" "strings" "sync" "github.com/gin-gonic/gin" "github.com/nareix/joy4/av/avutil" "github.com/nareix/joy4/av/pubsub" "github.com/nareix/joy4/format" "github.com/nareix/joy4/format/flv" "github.com/nareix/joy4/format/rtmp" "hightube/internal/chat" "hightube/internal/db" "hightube/internal/model" "hightube/internal/monitor" ) func init() { // Register all supported audio/video formats format.RegisterAll() } // RTMPServer manages all active live streams type RTMPServer struct { server *rtmp.Server channels map[string]*pubsub.Queue mutex sync.RWMutex } type writeFlusher struct { httpFlusher http.Flusher io.Writer } func (w writeFlusher) Flush() error { w.httpFlusher.Flush() return nil } // NewRTMPServer creates and initializes a new media server func NewRTMPServer() *RTMPServer { s := &RTMPServer{ channels: make(map[string]*pubsub.Queue), server: &rtmp.Server{}, } // Triggered when a broadcaster (e.g., OBS) starts publishing s.server.HandlePublish = func(conn *rtmp.Conn) { streamPath := conn.URL.Path // Expected format: /live/{stream_key} monitor.Infof("OBS publish attempt: %s", streamPath) // Extract stream key from path parts := strings.Split(streamPath, "/") if len(parts) < 3 || parts[1] != "live" { monitor.Warnf("Invalid publish path format: %s", streamPath) return } streamKey := parts[2] // Authenticate stream key var room model.Room if err := db.DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil { monitor.Warnf("Invalid stream key: %s", streamKey) return // Reject connection } monitor.Infof("Stream authenticated for room_id=%d", room.ID) // 1. Get audio/video stream metadata streams, err := conn.Streams() if err != nil { monitor.Errorf("Failed to parse stream headers: %v", err) return } // 2. Map the active stream by Room ID so viewers can use /live/{room_id} roomLivePath := fmt.Sprintf("/live/%d", room.ID) s.mutex.Lock() q := pubsub.NewQueue() q.WriteHeader(streams) s.channels[roomLivePath] = q s.mutex.Unlock() // Mark room as active in DB (using map to ensure true/false is correctly updated) db.DB.Model(&room).Updates(map[string]interface{}{"is_active": true}) // 3. Cleanup on end defer func() { s.mutex.Lock() delete(s.channels, roomLivePath) s.mutex.Unlock() q.Close() // Explicitly set is_active to false using map db.DB.Model(&room).Updates(map[string]interface{}{"is_active": false}) // Clear chat history for this room chat.MainHub.ClearRoomHistory(fmt.Sprintf("%d", room.ID)) monitor.Infof("Publishing ended for room_id=%d", room.ID) }() // 4. Continuously copy data packets to our broadcast queue avutil.CopyPackets(q, conn) } // Triggered when a viewer (e.g., VLC) requests playback s.server.HandlePlay = func(conn *rtmp.Conn) { streamPath := conn.URL.Path // Expected format: /live/{room_id} monitor.Infof("RTMP play requested: %s", streamPath) // 1. Look for the requested room's data queue s.mutex.RLock() q, ok := s.channels[streamPath] s.mutex.RUnlock() if !ok { monitor.Warnf("Stream not found or inactive: %s", streamPath) return } // 2. Get the cursor from the latest position and notify client of stream format cursor := q.Latest() streams, _ := cursor.Streams() conn.WriteHeader(streams) // 3. Cleanup on end defer monitor.Infof("Playback ended: %s", streamPath) // 4. Continuously copy data packets to the viewer err := avutil.CopyPackets(conn, cursor) if err != nil && err != io.EOF { // 如果是客户端主动断开连接引起的错误,不将其作为严重错误打印 errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { monitor.Infof("Viewer disconnected: %s", streamPath) } else { monitor.Errorf("Playback error on %s: %v", streamPath, err) } } } return s } // Start launches the RTMP server func (s *RTMPServer) Start(addr string) error { s.server.Addr = addr monitor.Infof("RTMP server listening on %s", addr) return s.server.ListenAndServe() } // HandleHTTPFLV serves browser-compatible HTTP-FLV playback for web clients. func (s *RTMPServer) HandleHTTPFLV(c *gin.Context) { streamPath := fmt.Sprintf("/live/%s", c.Param("room_id")) s.mutex.RLock() q, ok := s.channels[streamPath] s.mutex.RUnlock() if !ok { c.JSON(http.StatusNotFound, gin.H{"error": "Stream not found or inactive"}) return } flusher, ok := c.Writer.(http.Flusher) if !ok { c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming is not supported by the current server"}) return } c.Header("Content-Type", "video/x-flv") c.Header("Transfer-Encoding", "chunked") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") c.Status(http.StatusOK) flusher.Flush() muxer := flv.NewMuxerWriteFlusher(writeFlusher{ httpFlusher: flusher, Writer: c.Writer, }) cursor := q.Latest() if err := avutil.CopyFile(muxer, cursor); err != nil && err != io.EOF { errStr := err.Error() if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { monitor.Infof("HTTP-FLV viewer disconnected: %s", streamPath) return } monitor.Errorf("HTTP-FLV playback error on %s: %v", streamPath, err) } } func (s *RTMPServer) ActiveStreamCount() int { s.mutex.RLock() defer s.mutex.RUnlock() return len(s.channels) } func (s *RTMPServer) ActiveStreamPaths() []string { s.mutex.RLock() defer s.mutex.RUnlock() paths := make([]string, 0, len(s.channels)) for path := range s.channels { paths = append(paths, path) } return paths }