本文最后更新于 2022年02月17日 已经是 586天前了 ,文章可能具有时效性,若有错误或已失效,请在下方留言。
FRP源码阅读笔记
当前记录的FRP
版本
2a68c115 fatedier <[email protected]> on 2021/7/8 at 10:42
committed by GitHub <[email protected]>
这条Commit记录
流量图
sequenceDiagram
autonumber
participant 用户
participant FRPS
participant FRPC
participant 服务端
FRPC->>FRPS:FRPC向FRPS发起注册
用户->>FRPS: 用户客户端请求
FRPS->>FRPC:FRPS告知FRPC有用户连接
FRPC->>服务端:收到FRPS的请求,向真正的服务端发起请求
FRPC->>FRPS:收到FRPS的请求,建立新连接
FRPS->>FRPC:转发FRPC与用户之间的流量
FRPS->>用户:转发FRPC与用户之间的流量
FRPC
func (svr *Service) Run() error {
xl := xlog.FromContextSafe(svr.ctx)
// login to frps
for {
conn, session, err := svr.login()
if err != nil {
xl.Warn("login to server failed: %v", err)
// if login_fail_exit is true, just exit this program
// otherwise sleep a while and try again to connect to server
if svr.cfg.LoginFailExit {
return err
}
time.Sleep(10 * time.Second)
} else {
// login success
ctl := NewControl(svr.ctx, svr.runID, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort, svr.authSetter)
ctl.Run()
svr.ctlMu.Lock()
svr.ctl = ctl
svr.ctlMu.Unlock()
break
}
}
go svr.keepControllerWorking()
if svr.cfg.AdminPort != 0 {
// Init admin server assets
err := assets.Load(svr.cfg.AssetsDir)
if err != nil {
return fmt.Errorf("Load assets error: %v", err)
}
address := net.JoinHostPort(svr.cfg.AdminAddr, strconv.Itoa(svr.cfg.AdminPort))
err = svr.RunAdminServer(address)
if err != nil {
log.Warn("run admin server error: %v", err)
}
log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort)
}
<-svr.ctx.Done()
return nil
}
func (svr *Service) login() (conn net.Conn, session *fmux.Session, err error) {
xl := xlog.FromContextSafe(svr.ctx)
var tlsConfig *tls.Config
if svr.cfg.TLSEnable {
sn := svr.cfg.TLSServerName
if sn == "" {
sn = svr.cfg.ServerAddr
}
tlsConfig, err = transport.NewClientTLSConfig(
svr.cfg.TLSCertFile,
svr.cfg.TLSKeyFile,
svr.cfg.TLSTrustedCaFile,
sn)
if err != nil {
xl.Warn("fail to build tls configuration when service login, err: %v", err)
return
}
}
address := net.JoinHostPort(svr.cfg.ServerAddr, strconv.Itoa(svr.cfg.ServerPort))
conn, err = frpNet.ConnectServerByProxyWithTLS(svr.cfg.HTTPProxy, svr.cfg.Protocol, address, tlsConfig)
if err != nil {
return
}
defer func() {
if err != nil {
conn.Close()
if session != nil {
session.Close()
}
}
}()
if svr.cfg.TCPMux {
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.KeepAliveInterval = 20 * time.Second
fmuxCfg.LogOutput = ioutil.Discard
session, err = fmux.Client(conn, fmuxCfg)
if err != nil {
return
}
stream, errRet := session.OpenStream()
if errRet != nil {
session.Close()
err = errRet
return
}
conn = stream
}
loginMsg := &msg.Login{
Arch: runtime.GOARCH,
Os: runtime.GOOS,
PoolCount: svr.cfg.PoolCount,
User: svr.cfg.User,
Version: version.Full(),
Timestamp: time.Now().Unix(),
RunID: svr.runID,
Metas: svr.cfg.Metas,
}
// Add auth
if err = svr.authSetter.SetLogin(loginMsg); err != nil {
return
}
if err = msg.WriteMsg(conn, loginMsg); err != nil {
return
}
var loginRespMsg msg.LoginResp
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
if err = msg.ReadMsgInto(conn, &loginRespMsg); err != nil {
return
}
conn.SetReadDeadline(time.Time{})
if loginRespMsg.Error != "" {
err = fmt.Errorf("%s", loginRespMsg.Error)
xl.Error("%s", loginRespMsg.Error)
return
}
svr.runID = loginRespMsg.RunID
xl.ResetPrefixes()
xl.AppendPrefix(svr.runID)
svr.serverUDPPort = loginRespMsg.ServerUDPPort
xl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunID, loginRespMsg.ServerUDPPort)
return
}
FRPC
向FRPS
请求登录,保持住这个长连接,如果登录异常会等待10s
后重试
func (ctl *Control) Run() {
go ctl.worker()
// start all proxies
ctl.pm.Reload(ctl.pxyCfgs)
// start all visitors
go ctl.vm.Run()
return
}
go ctl.worker()
func (ctl *Control) worker() {
go ctl.msgHandler() // msgHandler 处理所有管道的事件并做成对应的操作.
go ctl.reader() // 将从 FRPS 得到的信息都转发给 readCh
go ctl.writer() // 将 sendCh 得到的信息发送给 FRPS
select {
case <-ctl.closedCh:
// close related channels and wait until other goroutines done
close(ctl.readCh)
ctl.readerShutdown.WaitDone()
ctl.msgHandlerShutdown.WaitDone()
close(ctl.sendCh)
ctl.writerShutdown.WaitDone()
ctl.pm.Close()
ctl.vm.Close()
close(ctl.closedDoneCh)
if ctl.session != nil {
ctl.session.Close()
}
return
}
}
ctl.pm.Reload(ctl.pxyCfgs)
启动所有的代理
func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
xl := xlog.FromContextSafe(pm.ctx)
pm.mu.Lock()
defer pm.mu.Unlock()
delPxyNames := make([]string, 0)
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgs[name]
if !ok {
del = true
} else {
if !pxy.Cfg.Compare(cfg) {
del = true
}
}
if del {
delPxyNames = append(delPxyNames, name)
delete(pm.proxies, name)
pxy.Stop()
}
}
if len(delPxyNames) > 0 {
xl.Info("proxy removed: %v", delPxyNames)
}
addPxyNames := make([]string, 0)
for name, cfg := range pxyCfgs {
if _, ok := pm.proxies[name]; !ok {
pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.serverUDPPort)
pm.proxies[name] = pxy
addPxyNames = append(addPxyNames, name)
pxy.Start()
}
}
if len(addPxyNames) > 0 {
xl.Info("proxy added: %v", addPxyNames)
}
}
go ctl.vm.Run()
访问者用于将流量从本地端口转发到远程服务。
func (vm *VisitorManager) Run() {
xl := xlog.FromContextSafe(vm.ctx)
ticker := time.NewTicker(vm.checkInterval)
defer ticker.Stop()
for {
select {
case <-vm.stopCh:
xl.Info("gracefully shutdown visitor manager")
return
case <-ticker.C:
vm.mu.Lock()
for _, cfg := range vm.cfgs {
name := cfg.GetBaseInfo().ProxyName
if _, exist := vm.visitors[name]; !exist {
xl.Info("try to start visitor [%s]", name)
vm.startVisitor(cfg)
}
}
vm.mu.Unlock()
}
}
}
go svr.keepControllerWorking()
函数用途如函数名称所示
func (svr *Service) keepControllerWorking() {
xl := xlog.FromContextSafe(svr.ctx)
maxDelayTime := 20 * time.Second
delayTime := time.Second
// if frpc reconnect frps, we need to limit retry times in 1min
// current retry logic is sleep 0s, 0s, 0s, 1s, 2s, 4s, 8s, ...
// when exceed 1min, we will reset delay and counts
// 前3个重连没有时延,3次以上后面每次重试 时延将*2
cutoffTime := time.Now().Add(time.Minute)
reconnectDelay := time.Second
reconnectCounts := 1
for {
<-svr.ctl.ClosedDoneCh()
if atomic.LoadUint32(&svr.exit) != 0 {
return
}
// the first three retry with no delay
if reconnectCounts > 3 {
time.Sleep(reconnectDelay)
reconnectDelay *= 2
}
reconnectCounts++
now := time.Now()
if now.After(cutoffTime) {
// reset
cutoffTime = now.Add(time.Minute)
reconnectDelay = time.Second
reconnectCounts = 1
}
for {
xl.Info("try to reconnect to server...")
conn, session, err := svr.login()
if err != nil {
xl.Warn("reconnect to server error: %v", err)
time.Sleep(delayTime)
opErr := &net.OpError{}
// quick retry for dial error
if errors.As(err, &opErr) && opErr.Op == "dial" {
delayTime = 2 * time.Second
} else {
delayTime = delayTime * 2
if delayTime > maxDelayTime {
delayTime = maxDelayTime
}
}
continue
}
// reconnect success, init delayTime
delayTime = time.Second
ctl := NewControl(svr.ctx, svr.runID, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort, svr.authSetter)
ctl.Run()
svr.ctlMu.Lock()
if svr.ctl != nil {
svr.ctl.Close()
}
svr.ctl = ctl
svr.ctlMu.Unlock()
break
}
}
}
到这里整个FRPC
的启动过程就追踪完了
FRPS
我们直接看Service Run
的函数
func (svr *Service) Run() {
if svr.rc.NatHoleController != nil {
go svr.rc.NatHoleController.Run()
}
if svr.cfg.KCPBindPort > 0 {
go svr.HandleListener(svr.kcpListener) // 接受kcp连接
}
go svr.HandleListener(svr.websocketListener) // 接受来自FRPC的websocket连接
go svr.HandleListener(svr.tlsListener) // 接受来自FRPC的tls连接
svr.HandleListener(svr.listener) // 接受来自客户端的连接
}
未完待续