golang源码阅读:livego直播系统

2023-04-1915:14:25后端程序开发Comments1,163 views字数 11616阅读模式

分析源码之前,先搭建一个直播系统:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

直播服务器文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

https://github.com/gwuhaolin/livego文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

播放站点文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

https://github.com/Bilibili/flv.js/文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

推流文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

https://github.com/obsproject/obs-studio文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

首先启动直播服务器文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

./livego --flv_dir=./data --level=debug

1,在启动livego服务后默认会监听以下端口:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

8090端口:用于控制台,通过HTTP请求可查看与控制直播房间的推拉流文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

1935端口:用于RTMP推拉流,目前貌似只能通过RTMP方式推流文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

7001端口:用于FLV拉流文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

7002端口:用于HLS拉流文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

2,创建直播房间:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

请求:http://你的服务器地址:8090/control/get?room=房间名字(房间名字你自己自定义)文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

成功响应:{“status”:200,“data”:一段与房间名对应的MD5秘钥}文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

http://127.0.0.1:8090/control/get?room=xiazemin{status: 200,data: "rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk"}

3,启动推流服务器文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

配置文件livego.yaml里可以看到默认的app名字,串流密码输入room名字,这里设置成xiazemin文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

server:- appname: live

golang源码阅读:livego直播系统文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

4,启动网页服务器,拉取视频内容文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <meta name="viewport" content="width=device-width, initial-scale=1.0">    <title>课程直播</title></head><body>    <script src="flv.min.js"></script>    <video id="videoElement" autoplay controls loop preload muted></video>    <script>        if (flvjs.isSupported()) {            var videoElement = document.getElementById('videoElement');            var flvPlayer = flvjs.createPlayer({                type: 'flv',                isLive: true,                url: 'http://127.0.0.1:7001/live/rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk.flv'            });            flvPlayer.attachMediaElement(videoElement);            flvPlayer.load();            flvPlayer.play();            document.body.addEventListener('mousedown', function(){    var vdo = $("video")[0]; //jquery    vdo.muted = false;}, false);          }</script></body></html>

下载我们依赖的flv播放库文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

 https://cdn.bootcdn.net/ajax/libs/flv.js/1.6.1/flv.min.js

启动一个静态服务器,返回上述网页文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

package main
import (  "fmt"  "net/http")
//http://newbt.net/ms/vdisk/show_bbs.php?id=56A46991BF52F1A048DB5F0350D74D01&page=0func handler(w http.ResponseWriter, r *http.Request) {  w.Header().Add("Access-Control-Allow-Origin", "*")  w.Header().Add("Access-Control-Allow-Methods", "GET, DELETE, HEAD, OPTIONS,POST,PUT,PATCH")  w.Header().Add("Access-Control-Allow-Headers", "x-requested-with,content-type")  w.Header().Add("Access-Control-Allow-Credentials", "true")  http.FileServer(http.Dir("./")).ServeHTTP(w, r)  return}
//https://stackoverflow.com/questions/69435888/how-to-ensure-cors-response-header-values-are-valid-when-querying-mongodb-in-n
func main() {  http.HandleFunc("/", handler)  if err := http.ListenAndServe(":8089", nil); err != nil {    fmt.Println(err)  }}

至此直播服务器搭建完毕。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

livego还提供了其他推拉流管理的接口和统计信息相关的接口文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin
{status: 400,data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}
http://127.0.0.1:8090/stat/livestat文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html
{status: 200,data: {publishers: [{key: "live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",url: "rtmp://localhost:1935/live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",stream_id: 1,video_total_bytes: 1940374,video_speed: 1465,audio_total_bytes: 206931,audio_speed: 161}],players: null}}
http://127.0.0.1:8090/control/push?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html
{status: 200,data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}
http://127.0.0.1:8090/control/pull?&oper=stop&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin{status: 400,data: "<h1>push url stop rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}

        下面我们开始分析源码,整体包括三个部分,推流,拉流,推拉流管理。我从livego的入口开始main.go的main函数:创建了RTMP stream,然后粉笔起了拉流服务hls和httpflv,然后起了控制台服务器,最后起了推流服务器:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

      rtmpServer = rtmp.NewRtmpServer(stream, nil)
      rtmpServer = rtmp.NewRtmpServer(stream, hlsServer)      rtmpServer.Serve(rtmpListen)
func startHls() *hls.Server hlsServer := hls.NewServer() go func() { hlsServer.Serve(hlsListen)
func startHTTPFlv(stream *rtmp.RtmpStream) 
      hdlServer := httpflv.NewServer(stream)      go func() {        hdlServer.Serve(flvListen)
func startAPI(stream *rtmp.RtmpStream) opServer := api.NewServer(stream, rtmpAddr) go func() { opServer.Serve(opListen)文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html
  stream := rtmp.NewRtmpStream()    hlsServer = startHls()    startHTTPFlv(stream)    startAPI(stream)    startRtmp(stream, hlsServer)

推流是基于rtmp协议的,底层是基于tcp协议的,在循环中监听连接,每个连接起一个协程:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

protocol/rtmp/rtmp.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

func (s *Server) Serve(listener net.Listener) (err error)       netconn, err = listener.Accept()      conn := core.NewConn(netconn, 4*1024)      go s.handleConn(conn)  
func (s *Server) handleConn(conn *core.Conn) error       connServer := core.NewConnServer(conn)      appname, name, _ := connServer.GetInfo()      key, err := configure.RoomKeys.GetKey(name)      channel, err := configure.RoomKeys.GetChannel(name)       pushlist, ret := configure.GetStaticPushUrlList(appname);       s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))

本质上做的工作就是把数据从输入channel copy到输出channel,数据存储在sync.Map里面:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

protocol/rtmp/stream.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type RtmpStream struct {  streams *sync.Map //key}
func (rs *RtmpStream) HandleReader(r av.ReadCloser) 
        i, ok := rs.streams.Load(info.Key)        ns := NewStream()        rs.streams.Store(info.Key, ns)        stream.AddReader(r)    
func (rs *RtmpStream) HandleWriter(w av.WriteCloser)
        item, ok := rs.streams.Load(info.Key)        rs.streams.Store(info.Key, s)        s.AddWriter(w)    
func (rs *RtmpStream) CheckAlive()
        rs.streams.Delete(key)

HLS(Http Live Streaming)传输内容包括两部分:一是M3U8描述文件,二是TS媒体文件。TS媒体文件中的视频必须是H264编码,音频必须是AAC或MP3编码。protocol/hls/hls.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type Server struct {  listener net.Listener  conns    *sync.Map}  
func (server *Server) Serve(listener net.Listener) error
      mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {    server.handle(w, r)  })
func (server *Server) handle(w http.ResponseWriter, r *http.Request) 
      path.Base(r.URL.Path) == "crossdomain.xml"       switch path.Ext(r.URL.Path)        case ".m3u8":          key, _ := server.parseM3u8(r.URL.Path)          conn := server.getConn(key)          tsCache := conn.GetCacheInc()          body, err := tsCache.GenM3U8PlayList()          w.Header().Set("Access-Control-Allow-Origin", "*")          w.Write(body)        case ".ts":          key, _ := server.parseTs(r.URL.Path)          conn := server.getConn(key)          tsCache := conn.GetCacheInc()          item, err := tsCache.GetItem(r.URL.Path)

我们前面搭建的直播系统是基于http flv 协议的:protocol/httpflv/server.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type Server struct {  handler av.Handler}
func (server *Server) Serve(l net.Listener) error
        mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {    server.handleConn(w, r)  })        mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) {    server.getStream(w, r)  })
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) 
      path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")      paths := strings.SplitN(path, "/", 2)      msgs := server.getStreams(w, r)        for _, item := range msgs.Publishers {      if item.Key == path {        include = true      writer := NewFLVWriter(paths[0], paths[1], url, w)      server.handler.HandleWriter(writer)
func (server *Server) getStream(w http.ResponseWriter, r *http.Request) 
      msgs := server.getStreams(w, r)      w.Write(resp)
func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams
      rtmpStream := server.handler.(*rtmp.RtmpStream)      msgs := new(streams)      rtmpStream.GetStreams().Range(func(key, val interface{}) bool {        msg := stream{key.(string), s.GetReader().Info().UID}        msgs.Publishers = append(msgs.Publishers, msg)      rtmpStream.GetStreams().Range(func(key, val interface{}) bool {        ws := val.(*rtmp.Stream).GetWs()        ws.Range(func(k, v interface{}) bool {          msg := stream{key.(string), pw.GetWriter().Info().UID}          msgs.Players = append(msgs.Players, msg)

protocol/hls/cache.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

func (tcCacheItem *TSCacheItem) GenM3U8PlayList() ([]byte, error)      w.Write(m3u8body.Bytes())
func (tcCacheItem *TSCacheItem) GetItem(key string) (TSItem, error)
      item, ok := tcCacheItem.lm[key]
type TSCacheItem struct {
  id   string  num  int  lock sync.RWMutex  ll   *list.List  lm   map[string]TSItem}

protocol/httpflv/writer.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type FLVWriter struct {  Uid string  av.RWBaser  app, title, url string  buf             []byte  closed          bool  closedChan      chan struct{}  ctx             http.ResponseWriter  packetQueue     chan *av.Packet}

av/av.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type Handler interface {  HandleReader(ReadCloser)  HandleWriter(WriteCloser)}

直播服务的管理控制接口实现在protocol/api/api.go:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type Server struct {  handler  av.Handler  session  map[string]*rtmprelay.RtmpRelay  rtmpAddr string}

它也实现了一个静态文件服务器,如果有静态文件,可以放在static里面:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

func (s *Server) Serve(l net.Listener) error 
   mux.Handle("/statics/", http.StripPrefix("/statics/", http.FileServer(http.Dir("statics"))))   mux.HandleFunc("/control/push"func(w http.ResponseWriter, r *http.Request) {    s.handlePush(w, r)  })    mux.HandleFunc("/control/pull", func(w http.ResponseWriter, r *http.Request) {    s.handlePull(w, r)  })     mux.HandleFunc("/control/get", func(w http.ResponseWriter, r *http.Request) {    s.handleGet(w, r)  })   mux.HandleFunc("/control/reset", func(w http.ResponseWriter, r *http.Request) {    s.handleReset(w, r)  })  mux.HandleFunc("/control/delete"func(w http.ResponseWriter, r *http.Request) {    s.handleDelete(w, r)  })  mux.HandleFunc("/stat/livestat"func(w http.ResponseWriter, r *http.Request) {    s.GetLiveStatics(w, r)  })
func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) 
          //http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456           oper == "stop"            pushRtmprelay, found := s.session[keyString]            pushRtmprelay.Stop()            delete(s.session, keyString)          else            pushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)            err = pushRtmprelay.Start()            s.session[keyString] = pushRtmprelay      
func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) 
          //http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456          oper == "stop"            pullRtmprelay, found := s.session[keyString]            pullRtmprelay.Stop()          else            pullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)            err = pullRtmprelay.Start()            s.session[keyString] = pullRtmprelay    
func (s *Server) handleGet(w http.ResponseWriter, r *http.Request)           //http://127.0.0.1:8090/control/get?room=ROOM_NAME          msg, err := configure.RoomKeys.GetKey(room)    
func (s *Server) handleReset(w http.ResponseWriter, r *http.Request)           //http://127.0.0.1:8090/control/reset?room=ROOM_NAME          msg, err := configure.RoomKeys.SetKey(room)      
func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request)          //http://127.0.0.1:8090/control/delete?room=ROOM_NAME          configure.RoomKeys.DeleteChannel(room)     
func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request)          //http://127.0.0.1:8090/stat/livestat          rtmpStream := server.handler.(*rtmp.RtmpStream)          rtmpStream.GetStreams().Range(func(key, val interface{}) bool {            v := s.GetReader().(*rtmp.VirReader)              msg := stream{key.(string), v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,              v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS}          rtmpStream.GetStreams().Range(func(key, val interface{}) bool {            v := pw.GetWriter().(*rtmp.VirWriter)            msg := stream{key.(string), v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,                v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS}          roomInfo, exists := (rtmpStream.GetStreams()).Load(room)      http.Serve(l, JWTMiddleware(mux))              ErrorHandler: func(w http.ResponseWriter, r *http.Request, err string) {        res := &Response{          w:      w,          Status: 403,          Data:   err,        }        res.SendJson()      },

protocol/rtmp/rtmprelay/rtmprelay.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

type RtmpRelay struct {  PlayUrl              string  PublishUrl           string  cs_chan              chan core.ChunkStream  sndctrl_chan         chan string  connectPlayClient    *core.ConnClient  connectPublishClient *core.ConnClient  startflag            bool}  
func (self *RtmpRelay) Start() error 
      self.connectPlayClient = core.NewConnClient()      self.connectPublishClient = core.NewConnClient()      err := self.connectPlayClient.Start(self.PlayUrl, av.PLAY)      err = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH)      go self.rcvPlayChunkStream()      go self.sendPublishChunkStream()
func (self *RtmpRelay) rcvPlayChunkStream() {      err := self.connectPlayClient.Read(&rc)            r := bytes.NewReader(rc.Data)      vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)      self.cs_chan <- rc
func (self *RtmpRelay) sendPublishChunkStream() 
      self.connectPublishClient.Write(rc)      self.connectPublishClient.Close(nil)

configure/channel.go文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html

var RoomKeys = &RoomKeysType{  localCache: cache.New(cache.NoExpiration, 0),}
type RoomKeysType struct {
  redisCli   *redis.Client  localCache *cache.Cache}
func (r *RoomKeysType) SetKey(channel string) (key string, err error) 
      key = uid.RandStringRunes(48)      if _, err = r.redisCli.Get(key).Result(); err == redis.Nil {      err = r.redisCli.Set(channel, key, 0).Err()      err = r.redisCli.Set(key, channel, 0).Err()
func (r *RoomKeysType) DeleteChannel(channel string) bool 
      r.redisCli.Del(channel).Err()       key, ok := r.localCache.Get(channel)      r.localCache.Delete(channel)      r.localCache.Delete(key.(string))
文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/36204.html
  • 本站内容整理自互联网,仅提供信息存储空间服务,以方便学习之用。如对文章、图片、字体等版权有疑问,请在下方留言,管理员看到后,将第一时间进行处理。
  • 转载请务必保留本文链接:https://www.cainiaoxueyuan.com/bc/36204.html

Comment

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定