ExecutionConsumer allows callbacks from asycnhronous execution and statetransfer.
typeExecutionConsumerinterface {Executed(tag interface{}) // Called whenever Execute completesCommitted(tag interface{}, target *pb.BlockchainInfo) // Called whenever Commit completesRolledBack(tag interface{}) // Called whenever a Rollback completesStateUpdated(tag interface{}, target *pb.BlockchainInfo) // Called when state transfer completes, if target is nil, this indicates a failure and a new target should be supplied}
typecoordinatorImplstruct { manager events.Manager// Maintains event thread and sends events to the coordinator rawExecutor PartialStack// Does the real interaction with the ledger consumer consensus.ExecutionConsumer// The consumer of this coordinator which receives the callbacks stc statetransfer.Coordinator// State transfer instance batchInProgress bool// Are we mid execution batch skipInProgress bool// Are we mid state transfer}
TODO, do we want to put these requests into a queue? This will block until the consenter gets around to handling the message, but it also provides some natural feedback to the REST API to determine how long it takes to queue messages
// Executed is called whenever Execute completes
func (h *Helper) Executed(tag interface{}) {
if h.consenter != nil {
h.consenter.Executed(tag)
}
}
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error // Called serially with incoming messages from gRPC
ExecutionConsumer
}
type Helper struct {
consenter consensus.Consenter
coordinator peer.MessageHandlerCoordinator
secOn bool
valid bool // Whether we believe the state is up to date
secHelper crypto.Peer
curBatch []*pb.Transaction // TODO, remove after issue 579
curBatchErrs []*pb.TransactionResult // TODO, remove after issue 579
persist.Helper
executor consensus.Executor
}
var pluginInstance consensus.Consenter // singleton service
// GetPlugin returns the handle to the Consenter singleton
func GetPlugin(c consensus.Stack) consensus.Consenter {
if pluginInstance == nil {
pluginInstance = New(c)
}
return pluginInstance
}
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
// Executed is called whenever Execute completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Executed(tag interface{}) {
eer.manager.Queue() <- executedEvent{tag}
}
// Committed is called whenever Commit completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- committedEvent{tag, target}
}
// RolledBack is called whenever a Rollback completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) RolledBack(tag interface{}) {
eer.manager.Queue() <- rolledBackEvent{}
}
// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *externalEventReceiver) StateUpdated(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- stateUpdatedEvent{
chkpt: tag.(*checkpointMessage),
target: target,
}
}