Section2.2.2 consenter to pbft

摘要

fabric中用了obcBacth来实现Consenter接口, 但没有直接实现接受消息的功能, 而是继承了pbft中定义的externalEventReceiver. externalEventReceiver会把消息进行相应的封装, 然后传给自己的成员变量manager, 该变量在utils/event中定义和实现. 在obcBatch初始化时会启动manager, manager启动后便会持续调用obcBatchfunc ProcessEvent(), 该函数会对消息进行类型识别, 并调用pbft进行相应操作.

即: Consenter(externalEventReceiver)(封装) >> event.Manager >> Consenter(obcBacth)(初步解析) >> pbft

共识机制内部生成的事件也会交与Consenter处理.

过程

consenterengine的成员变量, 在启动程序之后, engine便会启动线程持续地接受来自peer的消息, 并调用func consenter.RecvMsg()进行处理.

Consenter是在consensus.go定义的接口, 通过engine.consenter = controller.NewConsenter(engine.helper)初始化, 经过一系列调用, 可以追溯到, 在pbft中, Consenter接口是type obcBatch strcut来实现的. 但是obcBatch自身并没有实现func RecvMsg(), 此函数是继承自externalEventReceiver. 此结构体定义在consensus/pbft/external.go.

type externalEventReceiver struct {
    manager events.Manager
}
...
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
    eer.manager.Queue() <- batchMessageEvent{
        msg:    ocMsg,
        sender: senderHandle,
    }
    return nil
}
...
// Executed is called whenever Execute completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Executed(tag interface{}) {
    eer.manager.Queue() <- executedEvent{tag}
}

// Committed is called whenever Commit completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) {
    eer.manager.Queue() <- committedEvent{tag, target}
}
...

可以看到, 再收到消息后, externalEventReceiver又把它传给了events.Manager. 在externalEventReceiver中还定义了Executed, Committed, RolledBack等操作, 但从代码可以看出, 它并没有对消息进行特别的处理, 只是根据调用方法的不同, 分别把消息封装了一下, 然后全部传给eer.manager.Queue(). 查找Executed等事件可以发现, 这些事件都是共识机制内部发起的, 用于内部通信.

events.Manager是一个接口, 定义在consensus/util/events/events.go, 不在pbft内部.

type Manager interface {
    Inject(Event)         // A temporary interface to allow the event manager thread to skip the queue
    Queue() chan<- Event  // Get a write-only reference to the queue, to submit events
    SetReceiver(Receiver) // Set the target to route events to
    Start()               // Starts the Manager thread TODO, these thread management things should probably go away
    Halt()                // Stops the Manager thread
}

其用type managerImpl struct实现该接口,

type managerImpl struct {
    threaded
    receiver Receiver
    events   chan Event
}

managerImpl的成员函数func (em *managerImpl) Queue() chan<- Event中, managerImpl返回了events, 即, 传往Manager的消息会最终传给events.

回到Consenter. 在初始化obcBatchfunc newObcBatch()中, 会将obcBatch的一个成员变量manager初始化成managerImpl, 但稍后又用manager初始化了op.externalEventReceiver.manager. 本身obcBatch是继承了externalEventReceiver, 就算没有定义自己的manager仍会从externalEventReceiver继承到manager. 代码中注释介绍:

type obcBatch struct {
    ...
    externalEventReceiver
    ...
    manager events.Manager // TODO, remove eventually, the event manager
    ...
}
...
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
    ...
    op.manager = events.NewManagerImpl() // TODO, this is hacky, eventually rip it out
    op.manager.SetReceiver(op)
    etf := events.NewTimerFactoryImpl(op.manager)
    op.pbft = newPbftCore(id, config, op, etf)
    op.manager.Start()
    ...
    op.externalEventReceiver.manager = op.manager
    ...
}

所以反复的赋值, 这种机制属于geth本身未修复一个bug, 对外应该只需要externalEventReceiver即可. 具体原因未知.

obcBatch把自己设为managerreceiver, 稍后启动了manager, 查看Manager的代码会发现, manager启动后会开启一个新的线程持续读取events中的信息, 并调用receiverfunc ProcessEvent():

func SendEvent(receiver Receiver, event Event) {
    next := event
    for {
        // If an event returns something non-nil, then process it as a new event
        next = receiver.ProcessEvent(next)
        if next == nil {
            break
        }
    }
}

obcBatch实现了func ProcessEvent(), 在该函数中, obcBatch会判断event的类型, 并执行对应的函数,

    switch et := event.(type) {
    case batchMessageEvent:
        ocMsg := et
        return op.processMessage(ocMsg.msg, ocMsg.sender)
    ...
    case stateUpdatedEvent:
        // When the state is updated, clear any outstanding requests, they may have been processed while we were gone
        op.reqStore = newRequestStore()
        return op.pbft.ProcessEvent(event)
    default:
        return op.pbft.ProcessEvent(event)

可以看到, 针对特定的事件类型, obcBatch对其进行了处理, 并将某些事件直接传递给pbft来处理. 至此, 消息就已经从consenter传送到了pbft.

Last updated

Was this helpful?