type externalEventReceiver struct {
manager events.Manager
}
...
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
...
// Executed is called whenever Execute completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Executed(tag interface{}) {
eer.manager.Queue() <- executedEvent{tag}
}
// Committed is called whenever Commit completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- committedEvent{tag, target}
}
...
type Manager interface {
Inject(Event) // A temporary interface to allow the event manager thread to skip the queue
Queue() chan<- Event // Get a write-only reference to the queue, to submit events
SetReceiver(Receiver) // Set the target to route events to
Start() // Starts the Manager thread TODO, these thread management things should probably go away
Halt() // Stops the Manager thread
}
func SendEvent(receiver Receiver, event Event) {
next := event
for {
// If an event returns something non-nil, then process it as a new event
next = receiver.ProcessEvent(next)
if next == nil {
break
}
}
}
switch et := event.(type) {
case batchMessageEvent:
ocMsg := et
return op.processMessage(ocMsg.msg, ocMsg.sender)
...
case stateUpdatedEvent:
// When the state is updated, clear any outstanding requests, they may have been processed while we were gone
op.reqStore = newRequestStore()
return op.pbft.ProcessEvent(event)
default:
return op.pbft.ProcessEvent(event)