Phase 2 completed: Stream Server Auth, Database and Business API
This commit is contained in:
136
internal/stream/server.go
Normal file
136
internal/stream/server.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/nareix/joy4/av/avutil"
|
||||
"github.com/nareix/joy4/av/pubsub"
|
||||
"github.com/nareix/joy4/format"
|
||||
"github.com/nareix/joy4/format/rtmp"
|
||||
|
||||
"hightube/internal/db"
|
||||
"hightube/internal/model"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Register all supported audio/video formats
|
||||
format.RegisterAll()
|
||||
}
|
||||
|
||||
// RTMPServer manages all active live streams
|
||||
type RTMPServer struct {
|
||||
server *rtmp.Server
|
||||
channels map[string]*pubsub.Queue
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// NewRTMPServer creates and initializes a new media server
|
||||
func NewRTMPServer() *RTMPServer {
|
||||
s := &RTMPServer{
|
||||
channels: make(map[string]*pubsub.Queue),
|
||||
server: &rtmp.Server{},
|
||||
}
|
||||
|
||||
// Triggered when a broadcaster (e.g., OBS) starts publishing
|
||||
s.server.HandlePublish = func(conn *rtmp.Conn) {
|
||||
streamPath := conn.URL.Path // Expected format: /live/{stream_key}
|
||||
fmt.Printf("[INFO] OBS is attempting to publish to: %s\n", streamPath)
|
||||
|
||||
// Extract stream key from path
|
||||
parts := strings.Split(streamPath, "/")
|
||||
if len(parts) < 3 || parts[1] != "live" {
|
||||
fmt.Printf("[WARN] Invalid publish path format: %s\n", streamPath)
|
||||
return
|
||||
}
|
||||
streamKey := parts[2]
|
||||
|
||||
// Authenticate stream key
|
||||
var room model.Room
|
||||
if err := db.DB.Where("stream_key = ?", streamKey).First(&room).Error; err != nil {
|
||||
fmt.Printf("[WARN] Authentication failed, invalid stream key: %s\n", streamKey)
|
||||
return // Reject connection
|
||||
}
|
||||
|
||||
fmt.Printf("[INFO] Stream authenticated for Room ID: %d\n", room.ID)
|
||||
|
||||
// 1. Get audio/video stream metadata
|
||||
streams, err := conn.Streams()
|
||||
if err != nil {
|
||||
fmt.Printf("[ERROR] Failed to parse stream headers: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Map the active stream by Room ID so viewers can use /live/{room_id}
|
||||
roomLivePath := fmt.Sprintf("/live/%d", room.ID)
|
||||
|
||||
s.mutex.Lock()
|
||||
q := pubsub.NewQueue()
|
||||
q.WriteHeader(streams)
|
||||
s.channels[roomLivePath] = q
|
||||
s.mutex.Unlock()
|
||||
|
||||
// Mark room as active in DB
|
||||
db.DB.Model(&room).Update("is_active", true)
|
||||
|
||||
// 3. Cleanup on end
|
||||
defer func() {
|
||||
s.mutex.Lock()
|
||||
delete(s.channels, roomLivePath)
|
||||
s.mutex.Unlock()
|
||||
q.Close()
|
||||
db.DB.Model(&room).Update("is_active", false) // Mark room as inactive
|
||||
fmt.Printf("[INFO] Publishing ended for Room ID: %d\n", room.ID)
|
||||
}()
|
||||
|
||||
// 4. Continuously copy data packets to our broadcast queue
|
||||
avutil.CopyPackets(q, conn)
|
||||
}
|
||||
|
||||
// Triggered when a viewer (e.g., VLC) requests playback
|
||||
s.server.HandlePlay = func(conn *rtmp.Conn) {
|
||||
streamPath := conn.URL.Path // Expected format: /live/{room_id}
|
||||
fmt.Printf("[INFO] VLC is pulling stream from: %s\n", streamPath)
|
||||
|
||||
// 1. Look for the requested room's data queue
|
||||
s.mutex.RLock()
|
||||
q, ok := s.channels[streamPath]
|
||||
s.mutex.RUnlock()
|
||||
|
||||
if !ok {
|
||||
fmt.Printf("[WARN] Stream not found or inactive: %s\n", streamPath)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Get the cursor from the latest position and notify client of stream format
|
||||
cursor := q.Latest()
|
||||
streams, _ := cursor.Streams()
|
||||
conn.WriteHeader(streams)
|
||||
|
||||
// 3. Cleanup on end
|
||||
defer fmt.Printf("[INFO] Playback ended: %s\n", streamPath)
|
||||
|
||||
// 4. Continuously copy data packets to the viewer
|
||||
err := avutil.CopyPackets(conn, cursor)
|
||||
if err != nil && err != io.EOF {
|
||||
// 如果是客户端主动断开连接引起的错误,不将其作为严重错误打印
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
|
||||
fmt.Printf("[INFO] Viewer disconnected normally: %s\n", streamPath)
|
||||
} else {
|
||||
fmt.Printf("[ERROR] Error occurred during playback: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Start launches the RTMP server
|
||||
func (s *RTMPServer) Start(addr string) error {
|
||||
s.server.Addr = addr
|
||||
fmt.Printf("[INFO] RTMP Server is listening on %s...\n", addr)
|
||||
return s.server.ListenAndServe()
|
||||
}
|
||||
Reference in New Issue
Block a user