ExecutionConsumer allows callbacks from asycnhronous execution and statetransfer.
typeExecutionConsumerinterface{Executed(taginterface{})// Called whenever Execute completesCommitted(taginterface{},target*pb.BlockchainInfo)// Called whenever Commit completesRolledBack(taginterface{})// Called whenever a Rollback completesStateUpdated(taginterface{},target*pb.BlockchainInfo)// Called when state transfer completes, if target is nil, this indicates a failure and a new target should be supplied}
typecoordinatorImplstruct{managerevents.Manager// Maintains event thread and sends events to the coordinatorrawExecutorPartialStack// Does the real interaction with the ledgerconsumerconsensus.ExecutionConsumer// The consumer of this coordinator which receives the callbacksstcstatetransfer.Coordinator// State transfer instancebatchInProgressbool// Are we mid execution batchskipInProgressbool// Are we mid state transfer}
TODO, do we want to put these requests into a queue? This will block until the consenter gets around to handling the message, but it also provides some natural feedback to the REST API to determine how long it takes to queue messages
// Executed is called whenever Execute completes
func (h *Helper) Executed(tag interface{}) {
if h.consenter != nil {
h.consenter.Executed(tag)
}
}
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error // Called serially with incoming messages from gRPC
ExecutionConsumer
}
type Helper struct {
consenter consensus.Consenter
coordinator peer.MessageHandlerCoordinator
secOn bool
valid bool // Whether we believe the state is up to date
secHelper crypto.Peer
curBatch []*pb.Transaction // TODO, remove after issue 579
curBatchErrs []*pb.TransactionResult // TODO, remove after issue 579
persist.Helper
executor consensus.Executor
}
var pluginInstance consensus.Consenter // singleton service
// GetPlugin returns the handle to the Consenter singleton
func GetPlugin(c consensus.Stack) consensus.Consenter {
if pluginInstance == nil {
pluginInstance = New(c)
}
return pluginInstance
}
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
// Executed is called whenever Execute completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Executed(tag interface{}) {
eer.manager.Queue() <- executedEvent{tag}
}
// Committed is called whenever Commit completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- committedEvent{tag, target}
}
// RolledBack is called whenever a Rollback completes, no-op for noops as it uses the legacy synchronous api
func (eer *externalEventReceiver) RolledBack(tag interface{}) {
eer.manager.Queue() <- rolledBackEvent{}
}
// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *externalEventReceiver) StateUpdated(tag interface{}, target *pb.BlockchainInfo) {
eer.manager.Queue() <- stateUpdatedEvent{
chkpt: tag.(*checkpointMessage),
target: target,
}
}