golang源码阅读:livego直播系统
分析源码之前,先搭建一个直播系统:
直播服务器
https://github.com/gwuhaolin/livego
播放站点
https://github.com/Bilibili/flv.js/
推流
https://github.com/obsproject/obs-studio
首先启动直播服务器
./livego --flv_dir=./data --level=debug1,在启动livego服务后默认会监听以下端口:
8090端口:用于控制台,通过HTTP请求可查看与控制直播房间的推拉流
1935端口:用于RTMP推拉流,目前貌似只能通过RTMP方式推流
7001端口:用于FLV拉流
7002端口:用于HLS拉流
2,创建直播房间:
请求:http://你的服务器地址:8090/control/get?room=房间名字(房间名字你自己自定义)
成功响应:{“status”:200,“data”:一段与房间名对应的MD5秘钥}
http://127.0.0.1:8090/control/get?room=xiazemin{status: 200,data: "rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk"}
3,启动推流服务器
配置文件livego.yaml里可以看到默认的app名字,串流密码输入room名字,这里设置成xiazemin
server:- appname: live

4,启动网页服务器,拉取视频内容
<html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>课程直播</title></head><body><script src="flv.min.js"></script><video id="videoElement" autoplay controls loop preload muted></video><script>if (flvjs.isSupported()) {var videoElement = document.getElementById('videoElement');var flvPlayer = flvjs.createPlayer({type: 'flv',isLive: true,url: 'http://127.0.0.1:7001/live/rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk.flv'});flvPlayer.attachMediaElement(videoElement);flvPlayer.load();flvPlayer.play();document.body.addEventListener('mousedown', function(){var vdo = $("video")[0]; //jqueryvdo.muted = false;}, false);}</script></body></html>
下载我们依赖的flv播放库
 https://cdn.bootcdn.net/ajax/libs/flv.js/1.6.1/flv.min.js启动一个静态服务器,返回上述网页
package mainimport ("fmt""net/http")//http://newbt.net/ms/vdisk/show_bbs.php?id=56A46991BF52F1A048DB5F0350D74D01&page=0func handler(w http.ResponseWriter, r *http.Request) {w.Header().Add("Access-Control-Allow-Origin", "*")w.Header().Add("Access-Control-Allow-Methods", "GET, DELETE, HEAD, OPTIONS,POST,PUT,PATCH")w.Header().Add("Access-Control-Allow-Headers", "x-requested-with,content-type")w.Header().Add("Access-Control-Allow-Credentials", "true")http.FileServer(http.Dir("./")).ServeHTTP(w, r)return}//https://stackoverflow.com/questions/69435888/how-to-ensure-cors-response-header-values-are-valid-when-querying-mongodb-in-nfunc main() {http.HandleFunc("/", handler)if err := http.ListenAndServe(":8089", nil); err != nil {fmt.Println(err)}}
至此直播服务器搭建完毕。
livego还提供了其他推拉流管理的接口和统计信息相关的接口
http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin{status: 400,data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}
{status: 200,data: {publishers: [{key: "live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",url: "rtmp://localhost:1935/live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",stream_id: 1,video_total_bytes: 1940374,video_speed: 1465,audio_total_bytes: 206931,audio_speed: 161}],players: null}}
{status: 200,data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}
http://127.0.0.1:8090/control/pull?&oper=stop&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin{status: 400,data: "<h1>push url stop rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}下面我们开始分析源码,整体包括三个部分,推流,拉流,推拉流管理。我从livego的入口开始main.go的main函数:创建了RTMP stream,然后粉笔起了拉流服务hls和httpflv,然后起了控制台服务器,最后起了推流服务器:
rtmpServer = rtmp.NewRtmpServer(stream, nil)rtmpServer = rtmp.NewRtmpServer(stream, hlsServer)rtmpServer.Serve(rtmpListen)
    func startHls() *hls.Server       hlsServer := hls.NewServer()      go func() {        hlsServer.Serve(hlsListen)func startHTTPFlv(stream *rtmp.RtmpStream)hdlServer := httpflv.NewServer(stream)go func() {hdlServer.Serve(flvListen)
      opServer := api.NewServer(stream, rtmpAddr)      go func() {        opServer.Serve(opListen)
stream := rtmp.NewRtmpStream()hlsServer = startHls()startHTTPFlv(stream)startAPI(stream)hlsServer)
推流是基于rtmp协议的,底层是基于tcp协议的,在循环中监听连接,每个连接起一个协程:
protocol/rtmp/rtmp.go
func (s *Server) Serve(listener net.Listener) (err error)netconn, err = listener.Accept()conn := core.NewConn(netconn, 4*1024)go s.handleConn(conn)
func (s *Server) handleConn(conn *core.Conn) errorconnServer := core.NewConnServer(conn)appname, name, _ := connServer.GetInfo()key, err := configure.RoomKeys.GetKey(name)channel, err := configure.RoomKeys.GetChannel(name)pushlist, ret := configure.GetStaticPushUrlList(appname);s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))
本质上做的工作就是把数据从输入channel copy到输出channel,数据存储在sync.Map里面:
protocol/rtmp/stream.go
type RtmpStream struct {streams *sync.Map //key}
func (rs *RtmpStream) HandleReader(r av.ReadCloser)i, ok := rs.streams.Load(info.Key)ns := NewStream()rs.streams.Store(info.Key, ns)stream.AddReader(r)
func (rs *RtmpStream) HandleWriter(w av.WriteCloser)item, ok := rs.streams.Load(info.Key)rs.streams.Store(info.Key, s)s.AddWriter(w)
func (rs *RtmpStream) CheckAlive()rs.streams.Delete(key)
HLS(Http Live Streaming)传输内容包括两部分:一是M3U8描述文件,二是TS媒体文件。TS媒体文件中的视频必须是H264编码,音频必须是AAC或MP3编码。protocol/hls/hls.go
type Server struct {listener net.Listenerconns *sync.Map
func (server *Server) Serve(listener net.Listener) errormux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {server.handle(w, r)})
func (server *Server) handle(w http.ResponseWriter, r *http.Request)path.Base(r.URL.Path) == "crossdomain.xml"switch path.Ext(r.URL.Path)case ".m3u8":key, _ := server.parseM3u8(r.URL.Path)conn := server.getConn(key)tsCache := conn.GetCacheInc()body, err := tsCache.GenM3U8PlayList()w.Header().Set("Access-Control-Allow-Origin", "*")w.Write(body)case ".ts":key, _ := server.parseTs(r.URL.Path)conn := server.getConn(key)tsCache := conn.GetCacheInc()item, err := tsCache.GetItem(r.URL.Path)
我们前面搭建的直播系统是基于http flv 协议的:protocol/httpflv/server.go
type Server struct {handler av.Handler}
func (server *Server) Serve(l net.Listener) errormux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {server.handleConn(w, r)})mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) {server.getStream(w, r)})
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request)path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")paths := strings.SplitN(path, "/", 2)msgs := server.getStreams(w, r)for _, item := range msgs.Publishers {if item.Key == path {include = truewriter := NewFLVWriter(paths[0], paths[1], url, w)server.handler.HandleWriter(writer)
func (server *Server) getStream(w http.ResponseWriter, r *http.Request)msgs := server.getStreams(w, r)w.Write(resp)
func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streamsrtmpStream := server.handler.(*rtmp.RtmpStream)msgs := new(streams)rtmpStream.GetStreams().Range(func(key, val interface{}) bool {msg := stream{key.(string), s.GetReader().Info().UID}msgs.Publishers = append(msgs.Publishers, msg)rtmpStream.GetStreams().Range(func(key, val interface{}) bool {ws := val.(*rtmp.Stream).GetWs()ws.Range(func(k, v interface{}) bool {msg := stream{key.(string), pw.GetWriter().Info().UID}msgs.Players = append(msgs.Players, msg)
protocol/hls/cache.go
func (tcCacheItem *TSCacheItem) GenM3U8PlayList() ([]byte, error)w.Write(m3u8body.Bytes())
func (tcCacheItem *TSCacheItem) GetItem(key string) (TSItem, error)item, ok := tcCacheItem.lm[key]
type TSCacheItem struct {id stringnum intlock sync.RWMutexll *list.Listlm map[string]TSItem}
protocol/httpflv/writer.go
type FLVWriter struct {Uid stringav.RWBaserapp, title, url stringbuf []byteclosed boolclosedChan chan struct{}ctx http.ResponseWriterpacketQueue chan *av.Packet}
av/av.go
type Handler interface {HandleReader(ReadCloser)HandleWriter(WriteCloser)}
直播服务的管理控制接口实现在protocol/api/api.go:
type Server struct {handler av.Handlersession map[string]*rtmprelay.RtmpRelayrtmpAddr string}
它也实现了一个静态文件服务器,如果有静态文件,可以放在static里面:
func (s *Server) Serve(l net.Listener) errormux.Handle("/statics/", http.StripPrefix("/statics/", http.FileServer(http.Dir("statics"))))mux.HandleFunc("/control/push", func(w http.ResponseWriter, r *http.Request) {s.handlePush(w, r)})mux.HandleFunc("/control/pull", func(w http.ResponseWriter, r *http.Request) {s.handlePull(w, r)})mux.HandleFunc("/control/get", func(w http.ResponseWriter, r *http.Request) {s.handleGet(w, r)})mux.HandleFunc("/control/reset", func(w http.ResponseWriter, r *http.Request) {s.handleReset(w, r)})mux.HandleFunc("/control/delete", func(w http.ResponseWriter, r *http.Request) {s.handleDelete(w, r)})mux.HandleFunc("/stat/livestat", func(w http.ResponseWriter, r *http.Request) {s.GetLiveStatics(w, r)})
func (s *Server) handlePush(w http.ResponseWriter, req *http.Request)//http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456oper == "stop"pushRtmprelay, found := s.session[keyString]pushRtmprelay.Stop()delete(s.session, keyString)elsepushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)err = pushRtmprelay.Start()s.session[keyString] = pushRtmprelay
func (s *Server) handlePull(w http.ResponseWriter, req *http.Request)//http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456oper == "stop"pullRtmprelay, found := s.session[keyString]pullRtmprelay.Stop()elsepullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)err = pullRtmprelay.Start()s.session[keyString] = pullRtmprelay
func (s *Server) handleGet(w http.ResponseWriter, r *http.Request)//http://127.0.0.1:8090/control/get?room=ROOM_NAMEmsg, err := configure.RoomKeys.GetKey(room)
func (s *Server) handleReset(w http.ResponseWriter, r *http.Request)//http://127.0.0.1:8090/control/reset?room=ROOM_NAMEmsg, err := configure.RoomKeys.SetKey(room)
func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request)//http://127.0.0.1:8090/control/delete?room=ROOM_NAMEconfigure.RoomKeys.DeleteChannel(room)
func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request)//http://127.0.0.1:8090/stat/livestatrtmpStream := server.handler.(*rtmp.RtmpStream)rtmpStream.GetStreams().Range(func(key, val interface{}) bool {v := s.GetReader().(*rtmp.VirReader)msg := stream{key.(string), v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS}rtmpStream.GetStreams().Range(func(key, val interface{}) bool {v := pw.GetWriter().(*rtmp.VirWriter)msg := stream{key.(string), v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS}roomInfo, exists := (rtmpStream.GetStreams()).Load(room)http.Serve(l, JWTMiddleware(mux))ErrorHandler: func(w http.ResponseWriter, r *http.Request, err string) {res := &Response{w: w,Status: 403,Data: err,}res.SendJson()},
protocol/rtmp/rtmprelay/rtmprelay.go
type RtmpRelay struct {PlayUrl stringPublishUrl stringcs_chan chan core.ChunkStreamsndctrl_chan chan stringconnectPlayClient *core.ConnClientconnectPublishClient *core.ConnClientstartflag bool
func (self *RtmpRelay) Start() errorself.connectPlayClient = core.NewConnClient()self.connectPublishClient = core.NewConnClient()err := self.connectPlayClient.Start(self.PlayUrl, av.PLAY)err = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH)go self.rcvPlayChunkStream()go self.sendPublishChunkStream()
func (self *RtmpRelay) rcvPlayChunkStream() {err := self.connectPlayClient.Read(&rc)r := bytes.NewReader(rc.Data)vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)self.cs_chan <- rc
func (self *RtmpRelay) sendPublishChunkStream()self.connectPublishClient.Write(rc)self.connectPublishClient.Close(nil)
configure/channel.go
var RoomKeys = &RoomKeysType{localCache: cache.New(cache.NoExpiration, 0),}
type RoomKeysType struct {redisCli *redis.ClientlocalCache *cache.Cache}
func (r *RoomKeysType) SetKey(channel string) (key string, err error)key = uid.RandStringRunes(48)if _, err = r.redisCli.Get(key).Result(); err == redis.Nil {err = r.redisCli.Set(channel, key, 0).Err()err = r.redisCli.Set(key, channel, 0).Err()
func (r *RoomKeysType) DeleteChannel(channel string) boolr.redisCli.Del(channel).Err()key, ok := r.localCache.Get(channel)r.localCache.Delete(channel)r.localCache.Delete(key.(string))




