Section2.6 fabric v0.6 实际入口serve()函数
在fabric v0.6 startup中分析到node.Cmd()通过nodeStartCmd调用func serve(args []string) error函数,本节主讲该函数。
func serve(args []string) error
该函数位于peer/node/start.go
中,主要分析几个比较重要的函数,如下:
1、peer.CacheConfiguration()
位于core/peer/config.go
中,主要获取并设置和缓存一些经常用到的变量,包含两个比较重要的函数类型变量:getLocalAddress
和 getPeerEndpoint
,并且在函数体后面直接被执行。 getLocalAddress()
用于获取本地工作的peer
的套接字(IPaddress:port) getPeerEndpoint()
用于获取Peer
的实例PeerEndpoint
,其成员变量为peerID、peerAddress、peerType
。
2、peer.GetPeerEndpoint()
位于core/peer/config.go
中,通过if
判断configurationCached
真值而决定是否调用cacheConfiguration()
,虽然cacheConfiguration()
在程序中多处被调用,但是其先决条件configurationCached
的真值决定,在CacheConfiguration()
函数被调用后,configurationCached
真值将被设为true
,所以cacheConfiguration()
函数将不会在运行,进一步调用CacheConfiguration()
的操作也不会执行。实际上,cacheConfiguration()
和CacheConfiguration()
都只运行一次。注意区分大小写!!!
GetPeerEndpoint()
最终会获得前期执行peer.CacheConfiguration()
时设置的全局变量peerEndpoint
。
3、createEventHubServer()
4、core.SecurityEnabled()
位于core/config.go
中,调用执行原理同GetPeerEndpoint()
,只不过两者调用的同名函数不是同一个。
5、db.Start()
位于core/db/db.go
中,init the openchainDB instance and open the db.
Start()
>> openchainDB.open()
6、grpcServer := grpc.NewServer(opts...)
位于vendor/google.golang.org/grpc/server.go
,新建一个全新的gRPC server
.
7、secHelper, err := getSecHelper()
this should be called exactly once and the result cached. NOTE- this crypto func might rightly belong in a crypto package and universally accessed.
该函数仅执行一次,根据SecurityEnabled()
区分validator
和non-validator
.
validator
调用
RegisterValidator(enrollID, nil, enrollID, enrollSecret)
,进一步调用validator.register(name, pwd, enrollID, enrollPWD, nil)
,向PKI
注册validator
,这个函数比较复杂,后期详细介绍。??????????????调用
InitValidator(enrollID, nil)
,进一步调用validator.init(name, pwd, nil)
,这个函数比较复杂,后期详细介绍。注意函数中的函数类型参数??????????????
non-validator
调用
RegisterPeer(enrollID, nil, enrollID, enrollSecret)
,进一步调用peer.register(NodePeer, name, pwd, enrollID, enrollPWD, nil)
,向PKI
注册peer
,最终调用的函数和上面RegisterValidator
最终调用的为同一个。后期介绍????调用
InitPeer(enrollID, nil)
,进一步调用peer.init(NodePeer, name, pwd, nil)
,最终调用的函数和上面InitValidator
最终调用的为同一个。后期介绍????
8、registerChaincodeSupport(chaincode.DefaultChain, grpcServer, secHelper)
secHelper
为Peer
接口类型的变量,作为security helper
实体,作用????
chaincode.NewChaincodeSupport(chainname, peer.GetPeerEndpoint, userRunsCC, ccStartupTimeout, secHelper)
初始化一个ChaincodeSupport
实例。作用???RegisterChaincodeSupportServer(grpcServer, ccSrv)
注册服务。
9、Create the peerServer
该过程设计两个比较重要的函数:NewPeerWithEngine()
和NewPeerWithHandler()
,分析如下:
9.1、NewPeerWithEngine(secHelperFunc, helper.GetEngine)
NewPeerWithEngine returns a Peer which uses the supplied handler factory function for creating new handlers on new Chat service invocations. 位于
core/peer/peer.go
中,该函数比较复杂,如下:```go func NewPeerWithEngine(secHelperFunc func() crypto.Peer, engFactory EngineFactory) (peer *Impl, err error) { peer = new(Impl) peerNodes := peer.initDiscovery()
}
该函数先对engine
成员变量的初始化, 后面通过启动了一个goroutine, 在go func() {...}
内部,for
循环获取并处理msg
,获取是通过engine.consensusFan.GetOutChannel()
返回了MessageFan
的一个类型为channel
的成员变量out
, 处理是通过func engine.consenter.RecvMsg()
送给consenter
.
此外,MessageFan
还有个成员变量ins []<-chan *Message
, 通过调用func (fan *MessageFan) AddFaninChannel(channel <-chan *Message)
可以向ins
添加channel
, 每次调用该函数, MessageFan
都会启动一个goroutine持续地将新channel
的消息写入out
, 进而再被engine
送给consenter
.
func AddFaninChannel()
仅在consensus/helper/handler.go
中定义的func NewConsensusHandler()
中被调用:
NewConsensusHandler constructs a new MessageHandler for the plugin. Is instance of peer.HandlerFactory
该函数用于初始化handler, hanlder
创建了一个channel, 并保存在自己的成员变量consenterChan
中. 该函数也仅仅有一次引用:
即, engine
把它进一步封装成成员函数供peer
使用. 该函数同样是在func NewPeerWithEngine()
中被调用的: peer.handlerFactory = peer.engine.GetHandlerFactory()
9.1.3、peer.engine.GetHandlerFactory()
上文提到的是逆序引用,转换为顺序为:
peer.engine.GetHandlerFactory()
==>EngineImpl.GetHandlerFactory()
>>NewConsensusHandler
==>NewConsensusHandler()
>>MessageFan.AddFaninChannel()
9.1.4、peer.chatWithSomePeers(peerNodes)
Impl.chatWithSomePeers()
>>go Impl.chatWithPeer()
>>Impl.handleChat(ctx, stream, true)
从stream
获取消息, 然后交给handler
.
该函数中的handler.HandleMessage(in)
, 其中的handler
通过p.handlerFactory(p, stream, initiatedStream)
==>NewConsensusHandler()
获得,实际上就是前面提到的consensus/helper/handler.go
中定义的func NewConsensusHandler()
返回的ConsensusHandler
, 类型为:
handler.HandleMessage()
的定义如下:
该函数通过handler.consenterChan
将消息发送给consenter
. 注意到, return handler.MessageHandler.HandleMessage(msg)
将msg
重新传回给handler.MessageHandler
, handler.MessageHandler
在NewConsensusHandler()
函数中被peerHandler
赋值,peerHandler
由NewPeerHandler()
初始化,实际上真正调用的函数为Handler.HandleMessage()
,该函数位于core/peer/handler.go
中,最终调用handler.FSM.Event(msg.Type.String(), msg)
,即msg
最终交给FSM
处理.
FSM is the state machine that holds the current state. 第三方扩展包,位于vendor/github.com/looplab/fsm/fsm.go中。
9.2、NewPeerWithHandler(secHelperFunc, peer.NewPeerHandler)
NewPeerWithHandler returns a Peer which uses the supplied handler factory function for creating new handlers on new Chat service invocations.
位于core/peer/peer.go
中,该函数部分功能和NewPeerWithEngine()
相同,如下:
9.2.1、peer.handlerFactory = handlerFact
handlerFact()
==>peer.NewPeerHandler()
,该函数在上文中有涉及,该函数主要返回一个全新的Peer Handler,可能会调用handler.initiatedStream()初始化Stream.9.2.2、ledger.GetLedger() 同上文提到的。
9.2.3、peer.chatWithSomePeers(peerNodes) 同上文提到的,最终都是调用
Handler.HandleMessage()
10、注册服务器的函数
11、goroutine启动服务器
grpcServer.Serve(lis) // Start the grpc server.
ehubGrpcServer.Serve(ehubLis) // Start the event hub server
12、return <-serve
一直阻塞,直到grpc启动遇到错误。如果grpc启动未遇到错误,将永久阻塞。
Last updated
Was this helpful?