Section2.6 fabric v0.6 实际入口serve()函数

fabric v0.6 startup中分析到node.Cmd()通过nodeStartCmd调用func serve(args []string) error函数,本节主讲该函数。

func serve(args []string) error

该函数位于peer/node/start.go中,主要分析几个比较重要的函数,如下:

1、peer.CacheConfiguration()

位于core/peer/config.go中,主要获取并设置和缓存一些经常用到的变量,包含两个比较重要的函数类型变量:getLocalAddressgetPeerEndpoint,并且在函数体后面直接被执行。 getLocalAddress()用于获取本地工作的peer的套接字(IPaddress:port) getPeerEndpoint()用于获取Peer的实例PeerEndpoint,其成员变量为peerID、peerAddress、peerType

2、peer.GetPeerEndpoint()

位于core/peer/config.go中,通过if判断configurationCached真值而决定是否调用cacheConfiguration(),虽然cacheConfiguration()在程序中多处被调用,但是其先决条件configurationCached的真值决定,在CacheConfiguration()函数被调用后,configurationCached真值将被设为true,所以cacheConfiguration()函数将不会在运行,进一步调用CacheConfiguration()的操作也不会执行。实际上,cacheConfiguration()CacheConfiguration()都只运行一次。注意区分大小写!!!

GetPeerEndpoint()最终会获得前期执行peer.CacheConfiguration()时设置的全局变量peerEndpoint

3、createEventHubServer()

4、core.SecurityEnabled()

位于core/config.go中,调用执行原理同GetPeerEndpoint(),只不过两者调用的同名函数不是同一个。

5、db.Start()

位于core/db/db.go中,init the openchainDB instance and open the db.

Start() >> openchainDB.open()

6、grpcServer := grpc.NewServer(opts...)

位于vendor/google.golang.org/grpc/server.go,新建一个全新的gRPC server.

7、secHelper, err := getSecHelper()

this should be called exactly once and the result cached. NOTE- this crypto func might rightly belong in a crypto package and universally accessed.

该函数仅执行一次,根据SecurityEnabled()区分validatornon-validator.

  • validator

    • 调用RegisterValidator(enrollID, nil, enrollID, enrollSecret),进一步调用validator.register(name, pwd, enrollID, enrollPWD, nil),向PKI注册validator,这个函数比较复杂,后期详细介绍。??????????????

    • 调用InitValidator(enrollID, nil),进一步调用validator.init(name, pwd, nil),这个函数比较复杂,后期详细介绍。注意函数中的函数类型参数??????????????

  • non-validator

    • 调用RegisterPeer(enrollID, nil, enrollID, enrollSecret),进一步调用peer.register(NodePeer, name, pwd, enrollID, enrollPWD, nil),向PKI注册peer,最终调用的函数和上面RegisterValidator最终调用的为同一个。后期介绍????

    • 调用InitPeer(enrollID, nil),进一步调用peer.init(NodePeer, name, pwd, nil),最终调用的函数和上面InitValidator最终调用的为同一个。后期介绍????

8、registerChaincodeSupport(chaincode.DefaultChain, grpcServer, secHelper)

secHelperPeer接口类型的变量,作为security helper实体,作用????

  • chaincode.NewChaincodeSupport(chainname, peer.GetPeerEndpoint, userRunsCC, ccStartupTimeout, secHelper)初始化一个ChaincodeSupport实例。作用???

  • RegisterChaincodeSupportServer(grpcServer, ccSrv)注册服务。

9、Create the peerServer

    if peer.ValidatorEnabled() {
        logger.Debug("Running as validating peer - making genesis block if needed")
        makeGenesisError := genesis.MakeGenesis()
        if makeGenesisError != nil {
            return makeGenesisError
        }
        logger.Debugf("Running as validating peer - installing consensus %s",
            viper.GetString("peer.validator.consensus"))

        peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)
    } else {
        logger.Debug("Running as non-validating peer")
        peerServer, err = peer.NewPeerWithHandler(secHelperFunc, peer.NewPeerHandler)
    }

该过程设计两个比较重要的函数:NewPeerWithEngine()NewPeerWithHandler(),分析如下:

9.1、NewPeerWithEngine(secHelperFunc, helper.GetEngine)

NewPeerWithEngine returns a Peer which uses the supplied handler factory function for creating new handlers on new Chat service invocations. 位于core/peer/peer.go中,该函数比较复杂,如下:

```go func NewPeerWithEngine(secHelperFunc func() crypto.Peer, engFactory EngineFactory) (peer *Impl, err error) { peer = new(Impl) peerNodes := peer.initDiscovery()

...

// Initialize the ledger before the engine, as consensus may want to begin interrogating the ledger immediately
ledgerPtr, err := ledger.GetLedger()
...

peer.engine, err = engFactory(peer)
...
peer.handlerFactory = peer.engine.GetHandlerFactory()
...
peer.chatWithSomePeers(peerNodes)
return peer, nil

}

* 9.1.1、ledger.GetLedger()
> Initialize the ledger before the engine, as consensus may want to begin interrogating the ledger immediately

`GetLedger()` >> `GetNewLedger()` >> `newBlockchain()` 作用???

* 9.1.2、engFactory(peer) ==> GetEngine(peer), peer初始化时, 将自身作为参数传给`func GetEngine()`, 获得一个`engine`实例.

```go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
    var err error
    engineOnce.Do(func() {
        engine = new(EngineImpl)
        engine.helper = NewHelper(coord)
        engine.consenter = controller.NewConsenter(engine.helper)
        engine.helper.setConsenter(engine.consenter)
        engine.peerEndpoint, err = coord.GetPeerEndpoint()
        engine.consensusFan = util.NewMessageFan()

        go func() {
            logger.Debug("Starting up message thread for consenter")

            // The channel never closes, so this should never break
            for msg := range engine.consensusFan.GetOutChannel() {
                engine.consenter.RecvMsg(msg.Msg, msg.Sender)
            }
        }()
    })
    return engine, err
}

该函数先对engine成员变量的初始化, 后面通过启动了一个goroutine, 在go func() {...}内部,for循环获取并处理msg,获取是通过engine.consensusFan.GetOutChannel()返回了MessageFan的一个类型为channel的成员变量out, 处理是通过func engine.consenter.RecvMsg()送给consenter.

此外,MessageFan还有个成员变量ins []<-chan *Message, 通过调用func (fan *MessageFan) AddFaninChannel(channel <-chan *Message)可以向ins添加channel, 每次调用该函数, MessageFan都会启动一个goroutine持续地将新channel的消息写入out, 进而再被engine送给consenter.

func AddFaninChannel()仅在consensus/helper/handler.go中定义的func NewConsensusHandler()中被调用:

NewConsensusHandler constructs a new MessageHandler for the plugin. Is instance of peer.HandlerFactory

func NewConsensusHandler(coord peer.MessageHandlerCoordinator, stream peer.ChatStream, initiatedStream bool) (peer.MessageHandler, error) {

    peerHandler, err := peer.NewPeerHandler(coord, stream, initiatedStream)
    if err != nil {
        return nil, fmt.Errorf("Error creating PeerHandler: %s", err)
    }

    handler := &ConsensusHandler{
        MessageHandler: peerHandler,
        coordinator:    coord,
    }

    consensusQueueSize := viper.GetInt("peer.validator.consensus.buffersize")

    if consensusQueueSize <= 0 {
        logger.Errorf("peer.validator.consensus.buffersize is set to %d, but this must be a positive integer, defaulting to %d", consensusQueueSize, DefaultConsensusQueueSize)
        consensusQueueSize = DefaultConsensusQueueSize
    }

    handler.consenterChan = make(chan *util.Message, consensusQueueSize)
    getEngineImpl().consensusFan.AddFaninChannel(handler.consenterChan)

    return handler, nil
}

该函数用于初始化handler, hanlder创建了一个channel, 并保存在自己的成员变量consenterChan中. 该函数也仅仅有一次引用:

// GetHandlerFactory returns new NewConsensusHandler
func (eng *EngineImpl) GetHandlerFactory() peer.HandlerFactory {
    return NewConsensusHandler
}

即, engine把它进一步封装成成员函数供peer使用. 该函数同样是在func NewPeerWithEngine()中被调用的: peer.handlerFactory = peer.engine.GetHandlerFactory()

  • 9.1.3、peer.engine.GetHandlerFactory()

    上文提到的是逆序引用,转换为顺序为:

    peer.engine.GetHandlerFactory() ==> EngineImpl.GetHandlerFactory() >> NewConsensusHandler ==> NewConsensusHandler() >> MessageFan.AddFaninChannel()

  • 9.1.4、peer.chatWithSomePeers(peerNodes)

    Impl.chatWithSomePeers() >> go Impl.chatWithPeer() >> Impl.handleChat(ctx, stream, true)stream获取消息, 然后交给handler.

// Chat implementation of the the Chat bidi streaming RPC function
func (p *Impl) handleChat(ctx context.Context, stream ChatStream, initiatedStream bool) error {
    deadline, ok := ctx.Deadline()
    peerLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)
    handler, err := p.handlerFactory(p, stream, initiatedStream)
    if err != nil {
        return fmt.Errorf("Error creating handler during handleChat initiation: %s", err)
    }
    defer handler.Stop()
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            peerLogger.Debug("Received EOF, ending Chat")
            return nil
        }
        if err != nil {
            e := fmt.Errorf("Error during Chat, stopping handler: %s", err)
            peerLogger.Error(e.Error())
            return e
        }
        err = handler.HandleMessage(in)
        if err != nil {
            peerLogger.Errorf("Error handling message: %s", err)
            //return err
        }
    }
}

该函数中的handler.HandleMessage(in), 其中的handler通过p.handlerFactory(p, stream, initiatedStream)==>NewConsensusHandler()获得,实际上就是前面提到的consensus/helper/handler.go中定义的func NewConsensusHandler()返回的ConsensusHandler, 类型为:

type ConsensusHandler struct {
    peer.MessageHandler
    consenterChan chan *util.Message
    coordinator   peer.MessageHandlerCoordinator
}

handler.HandleMessage()的定义如下:

func (handler *ConsensusHandler) HandleMessage(msg *pb.Message) error {
    if msg.Type == pb.Message_CONSENSUS {
        senderPE, _ := handler.To()
        select {
        case handler.consenterChan <- &util.Message{
            Msg:    msg,
            Sender: senderPE.ID,
        }:
            return nil
        default:
            err := fmt.Errorf("Message channel for %v full, rejecting", senderPE.ID)
            logger.Errorf("Failed to queue consensus message because: %v", err)
            return err
        }
    }

    if logger.IsEnabledFor(logging.DEBUG) {
        logger.Debugf("Did not handle message of type %s, passing on to next MessageHandler", msg.Type)
    }
    return handler.MessageHandler.HandleMessage(msg)
}

该函数通过handler.consenterChan将消息发送给consenter. 注意到, return handler.MessageHandler.HandleMessage(msg)msg重新传回给handler.MessageHandler, handler.MessageHandlerNewConsensusHandler()函数中被peerHandler赋值,peerHandlerNewPeerHandler()初始化,实际上真正调用的函数为Handler.HandleMessage(),该函数位于core/peer/handler.go中,最终调用handler.FSM.Event(msg.Type.String(), msg),即msg最终交给FSM处理.

FSM is the state machine that holds the current state. 第三方扩展包,位于vendor/github.com/looplab/fsm/fsm.go中。

9.2、NewPeerWithHandler(secHelperFunc, peer.NewPeerHandler)

NewPeerWithHandler returns a Peer which uses the supplied handler factory function for creating new handlers on new Chat service invocations.

位于core/peer/peer.go中,该函数部分功能和NewPeerWithEngine()相同,如下:

func NewPeerWithHandler(secHelperFunc func() crypto.Peer, handlerFact HandlerFactory) (*Impl, error) {
    ...
    peer.handlerFactory = handlerFact
    ...
    ledgerPtr, err := ledger.GetLedger()
    ...
    peer.chatWithSomePeers(peerNodes)
    return peer, nil
}
  • 9.2.1、peer.handlerFactory = handlerFact handlerFact() ==> peer.NewPeerHandler(),该函数在上文中有涉及,该函数主要返回一个全新的Peer Handler,可能会调用handler.initiatedStream()初始化Stream.

  • 9.2.2、ledger.GetLedger() 同上文提到的。

  • 9.2.3、peer.chatWithSomePeers(peerNodes) 同上文提到的,最终都是调用Handler.HandleMessage()

10、注册服务器的函数

        // Register the Peer server
    pb.RegisterPeerServer(grpcServer, peerServer)

    // Register the Admin server
    pb.RegisterAdminServer(grpcServer, core.NewAdminServer())

    // Register Devops server
    serverDevops := core.NewDevopsServer(peerServer)
    pb.RegisterDevopsServer(grpcServer, serverDevops)

    // Register the ServerOpenchain server
    serverOpenchain, err := rest.NewOpenchainServerWithPeerInfo(peerServer)
    if err != nil {
        err = fmt.Errorf("Error creating OpenchainServer: %s", err)
        return err
    }

    pb.RegisterOpenchainServer(grpcServer, serverOpenchain)

11、goroutine启动服务器

  • grpcServer.Serve(lis) // Start the grpc server.

  • ehubGrpcServer.Serve(ehubLis) // Start the event hub server

12、return <-serve

一直阻塞,直到grpc启动遇到错误。如果grpc启动未遇到错误,将永久阻塞。

Last updated

Was this helpful?