Section2.3.1 event
ExecutionConsumer
ExecutionConsumer
ExecutionConsumer allows callbacks from asycnhronous execution and statetransfer.
type ExecutionConsumer interface {
Executed(tag interface{}) // Called whenever Execute completes
Committed(tag interface{}, target *pb.BlockchainInfo) // Called whenever Commit completes
RolledBack(tag interface{}) // Called whenever a Rollback completes
StateUpdated(tag interface{}, target *pb.BlockchainInfo) // Called when state transfer completes, if target is nil, this indicates a failure and a new target should be supplied
}
ExecutionConsumer
主要定义了Tx被处理之后的操作(从函数名可以看出, 都是过去式).
Usage
作为
consensus.Consenter
接口的一部分.type Consenter interface { RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error // Called serially with incoming messages from gRPC ExecutionConsumer }
即实现
Consenter
的结构也同时需要实现ExecutionConsumer
. 具体被调用情况见下文对Consenter
的介绍.作为
consensus/executor.coordinatorImpl
的成员变量consumer
.type coordinatorImpl struct { manager events.Manager // Maintains event thread and sends events to the coordinator rawExecutor PartialStack // Does the real interaction with the ledger consumer consensus.ExecutionConsumer // The consumer of this coordinator which receives the callbacks stc statetransfer.Coordinator // State transfer instance batchInProgress bool // Are we mid execution batch skipInProgress bool // Are we mid state transfer }
executor.coordinatorImpl
是用来处理事件的, 定义了func ProcessEvent()
, 在执行完一个事件之后, 该函数会调用consumer
的对应的各函数.
Implement
程序中没有单独实现的ExecutionConsumer
. 即便对于Usage #2, 该成员变量也是用consensus.helper
来初始化的. 而consensus.helper
对该接口的实现也仅仅是调用helper
的成员变量consenter
. 所以, 代码中仅有实现Consenter
的类型对ExecutionConsumer
进行了实现. 对于Consenter
的实现, 在Consenter
部分介绍.
// Executed is called whenever Execute completes
func (h *Helper) Executed(tag interface{}) {
if h.consenter != nil {
h.consenter.Executed(tag)
}
}
Summary
程序中有两处使用ExecutionConsumer
, 一处作为Consenter
接口的一部分, 另一处作为consensus/executor.coordinatorImpl
的成员变量. 两者都是由通过Consenter
实现的, 所以不必单独考虑ExecutionConsumer
的实现, 只考虑Consenter
中对应函数的实现即可. 事实上, 该接口单独定义出来只是为了把功能区分出来, 其实完全可以将该接口定义在Consenter
里, 因为事实上程序使用的也只有Consenter
.
Consenter
Consenter
Consenter is used to receive messages from the network. Every consensus plugin needs to implement this interface
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error // Called serially with incoming messages from gRPC
ExecutionConsumer
}
在fabirc/event中已经提到, Consenter
是consensus
中接收事件的接口. 此外还负责consense
内部的通信功能.
Usage
作为
consensus/controller
的全局变量.var consenter consensus.Consenter
controller
虽然保存了一个全局变量, 但是, 该变量为私有变量, 不能被外部访问, 但是controller
又没有对该变量进行操作, 所以该变量也是多余的. 并没有什么用. 另外, 在controller
中, 定义了func NewConsenter(stack consensus.Stack) consensus.Consenter
, 该函数会根据配置信息选择何种共识机制. 此函数会在初始化engine
时被使用. 除此之外controller
内也没有其他功能了.作为
consensus/helper.EngineImpl
的成员变量.type EngineImpl struct { consenter consensus.Consenter helper *Helper peerEndpoint *pb.PeerEndpoint consensusFan *util.MessageFan }
在fabric/event中提到了, 在
EngineImpl
的初始化函数中会开启一个新的线程, 不断地从网络获取消息. 该过程会重复调用func consenter.RecvMsg()
. 另外在func EngineImpl.ProcessTransactionMsg()
中, 在执行完一个tx消息后, 会开始等待接收一个返回消息. 特别的, 代码中也有注释, 讨论其必要性.TODO, do we want to put these requests into a queue? This will block until the consenter gets around to handling the message, but it also provides some natural feedback to the REST API to determine how long it takes to queue messages
err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)
有些不解的是, 该函数的调用的sender
参数是来自engine
的一个成员变量, 但观察代码暂时只发现对于该成员变量只有一处赋值. 这个过程可能会在详细分析engine
的时候弄清楚.
作为
consensus/helper.Helper
的成员变量.type Helper struct { consenter consensus.Consenter coordinator peer.MessageHandlerCoordinator secOn bool valid bool // Whether we believe the state is up to date secHelper crypto.Peer curBatch []*pb.Transaction // TODO, remove after issue 579 curBatchErrs []*pb.TransactionResult // TODO, remove after issue 579 persist.Helper executor consensus.Executor }
在
ExecutionConsumer
已经提到,Helper
通过自己成员变量consenter
实现了ExecutionConsumer
接口, 事实上也仅有这一个用处. 当Consenter
以ExecutionConsumer
的身份作为consensus/executor.coordinatorImpl
的成员变量时, 被用在func (co *coordinatorImpl) ProcessEvent(event events.Event) events.Event
. 在fabric/event
里已经提到过, 这里会根据事件类型进行不同的处理, 其中就包括有些动作处理完成之后对func Executed()
等的调用.作为
consensus/pbft
的全局变量.var pluginInstance consensus.Consenter // singleton service
前面提到了,
controller
在engine
初始化时会调用func pbft.GetPlugin()
生成一个Consenter
作为engine
的成员变量.// GetPlugin returns the handle to the Consenter singleton func GetPlugin(c consensus.Stack) consensus.Consenter { if pluginInstance == nil { pluginInstance = New(c) } return pluginInstance }
可以看到,
pbft
只会生成一个consensus.Consenter
, 以后的调用都会返回同一个Consenter
.pbft
自身并没有直接使用pluginInstance
.
Implement
Helper
的Consenter
来自于engine
, engine
的Consenter
来自controller
, controller
的来自于pbft
. pbft
使用func New()
构建, 如下所示.
func New(stack consensus.Stack) consensus.Consenter {
handle, _, _ := stack.GetNetworkHandles()
id, _ := getValidatorID(handle)
switch strings.ToLower(config.GetString("general.mode")) {
case "batch":
return newObcBatch(id, config, stack)
default:
panic(fmt.Errorf("Invalid PBFT mode: %s", config.GetString("general.mode")))
}
}
可以看到, Consenter
接口的实现为obcBatch
, 来自于func newObcBatch()
. 该函数定义在consensus/pbft/batch.go
. 但正如fabric/event
中提到的, obcBatch
并没有自己实现这些函数, 而是继承自externalEventReceiver
. 该类型定义在consensus/pbft/external.go
, 该类型通过和events.Manager
交互实现了接口中定义的五个函数.
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
// Executed is called whenever Execute completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Executed(tag interface{}) {
eer.manager.Queue() <- executedEvent{tag}
}
// Committed is called whenever Commit completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- committedEvent{tag, target}
}
// RolledBack is called whenever a Rollback completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) RolledBack(tag interface{}) {
eer.manager.Queue() <- rolledBackEvent{}
}
// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *externalEventReceiver) StateUpdated(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- stateUpdatedEvent{
chkpt: tag.(*checkpointMessage),
target: target,
}
}
Summary
可以看到, 对于Consenter
的使用分为两部分, 分别为engine
使用func RevMsg()
来接收消息以及Helper
调用ExecutionConsumer
部分的函数来对接收到的消息进行处理. 其余两处的调用仅是对数据的一些储存和传递, 并没有直接使用. Consenter
接口由obcBatch
来实现, 但它自己也没有实现这些功能, 而是继承了externalEventReceiver
, externalEventReceiver
通过和event.Manager
的直接交互来对接口进行了实现.
Last updated
Was this helpful?