Section2.3.7 summary

Summary:

  • 系统启动时运行serve(),调用NewPeerWithEngine启动Validator Peer,具备validate功能的peer;

  • 然后调用chatWithSomePeers,chatWithSomePeers调用chatWithPeer,其又调用handleChat,与该peer进行连接,并产生ConsensusHandler,

  • ConsensusHandler处理与peer的连接stream中的消息,即调用HandleMessage()。此函数判断消息的类型是否属于CONSENSUS类型,将之发送到consenterChan,该channel可由consenter读取并处理;非CONSENSUS类型的消息有MessageHandler处理。

代码分析如下:

1. 首先我们对Engine的产生过程进行分析,在core/peer/peer.go中:

// NewPeerWithEngine returns a Peer which uses the supplied handler factory function for creating new handlers on new Chat service invocations.
func NewPeerWithEngine(secHelperFunc func() crypto.Peer, engFactory EngineFactory) (peer *Impl, err error) {
    peer = new(Impl)
    peerNodes := peer.initDiscovery()
...

    peer.engine, err = engFactory(peer)
    peer.handlerFactory = peer.engine.GetHandlerFactory()
    peer.chatWithSomePeers(peerNodes)
...
}

其中包括engine的产生和handlerFactory函数的赋值。其中调用了GetHandlerFactory的到handlerFactory。

2. 接下来我们看Impl结构体,其即为区块链系统节点结构体,或者是验证节点VP,或者是普通的Peer(定义在core/peer/peer.go)

其中handlerFactory为函数,由GetHandlerFactory()赋值,我们看GetHandlerFactory如何得到调用函数

3. 由EngineImpl代码可看出,GetHandlerFactory实际上就是调用NewConsensusHandler (在consensus/helper/engine.go中)

4.接下来分析NewConsensusHandler(consensus/helper/handler.go):

从NewConsensusHandler可看出,其实现产生一个PeerHandler,具备P2P通信处理的功能;包涵一个MessageHandlerCoordinator,实际为节点自己,即Impl的实例peer。注意handler的consenterChan添加到consensusFan中,即其消息发送到engine的consensusFan,engine会对消息进行处理。

ConsensusHandler的结构体如下:

其继承了MessageHandler,并且包含一个channel和一个coordinator.

由上面的分析课可看出,handlerFactory即为NewConsensusHandler,接下来看它什么时候被调用:

5. NewConsensusHandler在Impl.handleChat中被调用:

上面的代码显示handlerChat调用了handlerFactory,开始处理收到的消息,这里handlerFactory返回一个ConsensusHandler类型的handler,其成员变量coord即为peer;stream开始接收消息,并让handler调用HandleMessage进行处理。 从以上代码可以看到 1) NewConsensusHandler中的coord实际上就是Impl的实例,即peer。 2) stream为的消息由handler进行处理。 3) 对于每一个peer,都会产生一个handler处理其传输过来的消息

以上过程的实际效果是,节点在handleChat过程内,通过handlerFactory产生一个ConsensusHandler,ConsensusHandler内产生一个PeerHandler。然后通过stream读取消息,消息将有HandleMessage进行处理。

6. handleChat被两个函数调用,分别是chatwithpeers和chat:

以上可看出chatWithPeer是与peer建立连接,stream即为网络连接数据流。chatWithPeer由chatWithSomePeers调用,它又是由NewPeerWithEngine调用。当新的peer被发现的时候,PeersDiscovered被调用,它也会调用chatWithSomePeers(). Impl为每一个新发现的peer调用chatWithSomePeers,其会调用chatWithPeer,随机调用handleChat,其会调用handlerFactory产生一个ConsensusHandler,由此Consensushandler的HandleMessage()处理与peer链接的stream中的消息。

7. NewPeerWithEngine为Validator,NewPeerWithHandler为非Validator,在peer/node/node.go文件中的serve()函数中:

ConsensusHandler将消息传递给ConsensusEngine

1. ConsensusHandler处理消息的函数为

实际上它将消息发送给了consenterChan,而在NewConsensusHandler函数中,该Channel是添加到了ConsensusFan.ins,因此需关注consensusFan.out是被谁读取处理。

MessageHandler即为ConsensusHandler的成员PeerHandler,其在core/peer/handler.go中定义,其功能为各种非Consensus消息提供处理过程。

2. 从GetEngine可看出,consenter调用了RecvMsg处理consensusFan.out中的消息,GetEngine实际上极为engFactory().

RecvMsg实际上由obcBatch中的externalEventReceiver执行,然后发送给其成员Manager(一个事件管理器)的events channel。 该Manager的接收者被设置成obcBatch自己,Manager启动后会开始事件循环,陆续发送channel中的事件给obcBatch,让obcBatch调用ProcessEvent处理事件(consensus/util/events.go)。

obcBatch的ProcessEvent会进而调用pbftcore的ProcessEvent处理PBFT各种事件,obcBatch只处理与Batch相关的事件。 此外,obcBatch还负责通信和签名机制,如下:

3. innerStack interface implemented by obcBatch.

pbftCore 中包含一个成员类型为innerStack的consumer,即为obcBatch对象,该consumer为pbftCore提供网络通信和签名验证功能。

VP节点命名规则

Fabric采用了viper工具进行系统配置,viper可以读取配置文件,环境参数,以及命令行参数,对系统节点等对象进行配置。 Fabric的主要配置文件为peer/core.yaml,该文件配置了VP节点的ID,consensus算法等参数。

在PBFT算法中,每个节点有一个ValidatorID,该ID来自于core.yaml配置的ID,比如core.yaml中设置的ID为vp0, 则PBFT中该节点的ID为0,以此类推。具体可见pbft.go文件中的下列函数:

其中注明了“For MVP, set the VP's peer.id to vpX, where X is a unique integer between 0 and N-1”。

PBFT Request and Message

  • PBFT中,每一个request中包涵一个tx,使用txToRequest将tx封装成Request;

  • RequestBatch包涵多个Requests, obcBatch中的BatchStore也是多个Request组成;

  • Request发送到primary node,由primary node发起PBFT共识过程。

ProtocolBuffer

  • PBFT 中使用ProtocolBuffer对发送的消息进行定义,定义在message.proto文件,然后使用proton进行编译,得到message.pb.go文件。

Last updated

Was this helpful?