FRP源码阅读笔记

FRP源码阅读笔记

当前记录的FRP版本

2a68c115 fatedier <[email protected]> on 2021/7/8 at 10:42
committed by GitHub <[email protected]>

这条Commit记录

FRPC

func (svr *Service) Run() error {
    xl := xlog.FromContextSafe(svr.ctx)

    // login to frps
    for {
        conn, session, err := svr.login()
        if err != nil {
            xl.Warn("login to server failed: %v", err)

            // if login_fail_exit is true, just exit this program
            // otherwise sleep a while and try again to connect to server
            if svr.cfg.LoginFailExit {
                return err
            }
            time.Sleep(10 * time.Second)
        } else {
            // login success
            ctl := NewControl(svr.ctx, svr.runID, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort, svr.authSetter)
            ctl.Run()
            svr.ctlMu.Lock()
            svr.ctl = ctl
            svr.ctlMu.Unlock()
            break
        }
    }

    go svr.keepControllerWorking()

    if svr.cfg.AdminPort != 0 {
        // Init admin server assets
        err := assets.Load(svr.cfg.AssetsDir)
        if err != nil {
            return fmt.Errorf("Load assets error: %v", err)
        }

        address := net.JoinHostPort(svr.cfg.AdminAddr, strconv.Itoa(svr.cfg.AdminPort))
        err = svr.RunAdminServer(address)
        if err != nil {
            log.Warn("run admin server error: %v", err)
        }
        log.Info("admin server listen on %s:%d", svr.cfg.AdminAddr, svr.cfg.AdminPort)
    }
    <-svr.ctx.Done()
    return nil
}
func (svr *Service) login() (conn net.Conn, session *fmux.Session, err error) {
    xl := xlog.FromContextSafe(svr.ctx)
    var tlsConfig *tls.Config
    if svr.cfg.TLSEnable {
        sn := svr.cfg.TLSServerName
        if sn == "" {
            sn = svr.cfg.ServerAddr
        }

        tlsConfig, err = transport.NewClientTLSConfig(
            svr.cfg.TLSCertFile,
            svr.cfg.TLSKeyFile,
            svr.cfg.TLSTrustedCaFile,
            sn)
        if err != nil {
            xl.Warn("fail to build tls configuration when service login, err: %v", err)
            return
        }
    }

    address := net.JoinHostPort(svr.cfg.ServerAddr, strconv.Itoa(svr.cfg.ServerPort))
    conn, err = frpNet.ConnectServerByProxyWithTLS(svr.cfg.HTTPProxy, svr.cfg.Protocol, address, tlsConfig)
    if err != nil {
        return
    }

    defer func() {
        if err != nil {
            conn.Close()
            if session != nil {
                session.Close()
            }
        }
    }()

    if svr.cfg.TCPMux {
        fmuxCfg := fmux.DefaultConfig()
        fmuxCfg.KeepAliveInterval = 20 * time.Second
        fmuxCfg.LogOutput = ioutil.Discard
        session, err = fmux.Client(conn, fmuxCfg)
        if err != nil {
            return
        }
        stream, errRet := session.OpenStream()
        if errRet != nil {
            session.Close()
            err = errRet
            return
        }
        conn = stream
    }

    loginMsg := &msg.Login{
        Arch:      runtime.GOARCH,
        Os:        runtime.GOOS,
        PoolCount: svr.cfg.PoolCount,
        User:      svr.cfg.User,
        Version:   version.Full(),
        Timestamp: time.Now().Unix(),
        RunID:     svr.runID,
        Metas:     svr.cfg.Metas,
    }

    // Add auth
    if err = svr.authSetter.SetLogin(loginMsg); err != nil {
        return
    }

    if err = msg.WriteMsg(conn, loginMsg); err != nil {
        return
    }

    var loginRespMsg msg.LoginResp
    conn.SetReadDeadline(time.Now().Add(10 * time.Second))
    if err = msg.ReadMsgInto(conn, &loginRespMsg); err != nil {
        return
    }
    conn.SetReadDeadline(time.Time{})

    if loginRespMsg.Error != "" {
        err = fmt.Errorf("%s", loginRespMsg.Error)
        xl.Error("%s", loginRespMsg.Error)
        return
    }

    svr.runID = loginRespMsg.RunID
    xl.ResetPrefixes()
    xl.AppendPrefix(svr.runID)

    svr.serverUDPPort = loginRespMsg.ServerUDPPort
    xl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunID, loginRespMsg.ServerUDPPort)
    return
}

FRPCFRPS请求登录,保持住这个长连接,如果登录异常会等待10s后重试

func (ctl *Control) Run() {
    go ctl.worker()

    // start all proxies
    ctl.pm.Reload(ctl.pxyCfgs)

    // start all visitors
    go ctl.vm.Run()
    return
}

go ctl.worker()

func (ctl *Control) worker() {
    go ctl.msgHandler() // msgHandler 处理所有管道的事件并做成对应的操作.
    go ctl.reader() // 将从 FRPS 得到的信息都转发给 readCh
    go ctl.writer() // 将 sendCh 得到的信息发送给 FRPS

    select {
    case <-ctl.closedCh:
        // close related channels and wait until other goroutines done
        close(ctl.readCh)
        ctl.readerShutdown.WaitDone()
        ctl.msgHandlerShutdown.WaitDone()

        close(ctl.sendCh)
        ctl.writerShutdown.WaitDone()

        ctl.pm.Close()
        ctl.vm.Close()

        close(ctl.closedDoneCh)
        if ctl.session != nil {
            ctl.session.Close()
        }
        return
    }
}

ctl.pm.Reload(ctl.pxyCfgs) 启动所有的代理

func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
    xl := xlog.FromContextSafe(pm.ctx)
    pm.mu.Lock()
    defer pm.mu.Unlock()

    delPxyNames := make([]string, 0)
    for name, pxy := range pm.proxies {
        del := false
        cfg, ok := pxyCfgs[name]
        if !ok {
            del = true
        } else {
            if !pxy.Cfg.Compare(cfg) {
                del = true
            }
        }

        if del {
            delPxyNames = append(delPxyNames, name)
            delete(pm.proxies, name)

            pxy.Stop()
        }
    }
    if len(delPxyNames) > 0 {
        xl.Info("proxy removed: %v", delPxyNames)
    }

    addPxyNames := make([]string, 0)
    for name, cfg := range pxyCfgs {
        if _, ok := pm.proxies[name]; !ok {
            pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.serverUDPPort)
            pm.proxies[name] = pxy
            addPxyNames = append(addPxyNames, name)

            pxy.Start()
        }
    }
    if len(addPxyNames) > 0 {
        xl.Info("proxy added: %v", addPxyNames)
    }
}

go ctl.vm.Run() 访问者用于将流量从本地端口转发到远程服务。

func (vm *VisitorManager) Run() {
    xl := xlog.FromContextSafe(vm.ctx)

    ticker := time.NewTicker(vm.checkInterval)
    defer ticker.Stop()

    for {
        select {
        case <-vm.stopCh:
            xl.Info("gracefully shutdown visitor manager")
            return
        case <-ticker.C:
            vm.mu.Lock()
            for _, cfg := range vm.cfgs {
                name := cfg.GetBaseInfo().ProxyName
                if _, exist := vm.visitors[name]; !exist {
                    xl.Info("try to start visitor [%s]", name)
                    vm.startVisitor(cfg)
                }
            }
            vm.mu.Unlock()
        }
    }
}

go svr.keepControllerWorking() 函数用途如函数名称所示

func (svr *Service) keepControllerWorking() {
    xl := xlog.FromContextSafe(svr.ctx)
    maxDelayTime := 20 * time.Second
    delayTime := time.Second

    // if frpc reconnect frps, we need to limit retry times in 1min
    // current retry logic is sleep 0s, 0s, 0s, 1s, 2s, 4s, 8s, ...
    // when exceed 1min, we will reset delay and counts
    // 前3个重连没有时延,3次以上后面每次重试 时延将*2
    cutoffTime := time.Now().Add(time.Minute)
    reconnectDelay := time.Second
    reconnectCounts := 1

    for {
        <-svr.ctl.ClosedDoneCh()
        if atomic.LoadUint32(&svr.exit) != 0 {
            return
        }

        // the first three retry with no delay
        if reconnectCounts > 3 {
            time.Sleep(reconnectDelay)
            reconnectDelay *= 2
        }
        reconnectCounts++

        now := time.Now()
        if now.After(cutoffTime) {
            // reset
            cutoffTime = now.Add(time.Minute)
            reconnectDelay = time.Second
            reconnectCounts = 1
        }

        for {
            xl.Info("try to reconnect to server...")
            conn, session, err := svr.login()
            if err != nil {
                xl.Warn("reconnect to server error: %v", err)
                time.Sleep(delayTime)

                opErr := &net.OpError{}
                // quick retry for dial error
                if errors.As(err, &opErr) && opErr.Op == "dial" {
                    delayTime = 2 * time.Second
                } else {
                    delayTime = delayTime * 2
                    if delayTime > maxDelayTime {
                        delayTime = maxDelayTime
                    }
                }
                continue
            }
            // reconnect success, init delayTime
            delayTime = time.Second

            ctl := NewControl(svr.ctx, svr.runID, conn, session, svr.cfg, svr.pxyCfgs, svr.visitorCfgs, svr.serverUDPPort, svr.authSetter)
            ctl.Run()
            svr.ctlMu.Lock()
            if svr.ctl != nil {
                svr.ctl.Close()
            }
            svr.ctl = ctl
            svr.ctlMu.Unlock()
            break
        }
    }
}

到这里整个FRPC的启动过程就追踪完了

FRPS

我们直接看Service Run的函数

func (svr *Service) Run() {
    if svr.rc.NatHoleController != nil {
        go svr.rc.NatHoleController.Run()
    }
    if svr.cfg.KCPBindPort > 0 {
        go svr.HandleListener(svr.kcpListener) // 接受kcp连接
    }

    go svr.HandleListener(svr.websocketListener) // 接受来自FRPC的websocket连接
    go svr.HandleListener(svr.tlsListener) // 接受来自FRPC的tls连接

    svr.HandleListener(svr.listener) // 接受来自客户端的连接
}
未完待续
高性能,高性价比服务器推荐 ( DogYun ) 😉
暂无评论

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇