typeexternalEventReceiverstruct{managerevents.Manager}...// RecvMsg is called by the stack when a new message is receivedfunc(eer *externalEventReceiver)RecvMsg(ocMsg*pb.Message,senderHandle*pb.PeerID)error{eer.manager.Queue()<-batchMessageEvent{msg:ocMsg,sender:senderHandle,}returnnil}...// Executed is called whenever Execute completes, no-op for noops as it uses the legacy synchronous apifunc(eer *externalEventReceiver)Executed(taginterface{}){eer.manager.Queue()<-executedEvent{tag}}// Committed is called whenever Commit completes, no-op for noops as it uses the legacy synchronous apifunc(eer *externalEventReceiver)Committed(taginterface{},target*pb.BlockchainInfo){eer.manager.Queue()<-committedEvent{tag,target}}...
type Manager interface {
Inject(Event) // A temporary interface to allow the event manager thread to skip the queue
Queue() chan<- Event // Get a write-only reference to the queue, to submit events
SetReceiver(Receiver) // Set the target to route events to
Start() // Starts the Manager thread TODO, these thread management things should probably go away
Halt() // Stops the Manager thread
}
type obcBatch struct {
...
externalEventReceiver
...
manager events.Manager // TODO, remove eventually, the event manager
...
}
...
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
...
op.manager = events.NewManagerImpl() // TODO, this is hacky, eventually rip it out
op.manager.SetReceiver(op)
etf := events.NewTimerFactoryImpl(op.manager)
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
...
op.externalEventReceiver.manager = op.manager
...
}
func SendEvent(receiver Receiver, event Event) {
next := event
for {
// If an event returns something non-nil, then process it as a new event
next = receiver.ProcessEvent(next)
if next == nil {
break
}
}
}
switch et := event.(type) {
case batchMessageEvent:
ocMsg := et
return op.processMessage(ocMsg.msg, ocMsg.sender)
...
case stateUpdatedEvent:
// When the state is updated, clear any outstanding requests, they may have been processed while we were gone
op.reqStore = newRequestStore()
return op.pbft.ProcessEvent(event)
default:
return op.pbft.ProcessEvent(event)