Section2.8 prePrepare、prepare-and-commit
待完善。。。。。。
事件来源
在serve()函数中提到的NewPeerWithEngine()
>> engFactory(peer)<=>GetEngine(peer)
中,该函数位于engine.go
中,for
循环不断通过engine.consensusFan.GetOutChannel()
从messageFan
中获取msg
,然后通过engine.consenter.RecvMsg(msg.Msg, msg.Sender)
发送出去,该函数实际上由externalEventReceiver
实现。
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
注意到,该函数是将batchMessageEvent发送到err.manager.Queue()中。
事件处理
在
obcBatch
初始化时通过goroutine
调用managerImpl.eventLoop()
,最终会调用obcBatch.ProcessEvent()
。在
obcBatch.ProcessEvent()
函数中,通过switch et := event.(type)
分支处理events
:1、batchMessageEvent
case batchMessageEvent: ocMsg := et return op.processMessage(ocMsg.msg, ocMsg.sender)
注意到,该分支会调用
obcBatch.processMessage(batchMessageEvent.msg, batchMessageEvent.sender)
func (op *obcBatch) processMessage(ocMsg *pb.Message, senderHandle *pb.PeerID) events.Event { if ocMsg.Type == pb.Message_CHAIN_TRANSACTION { req := op.txToReq(ocMsg.Payload) return op.submitToLeader(req) } if ocMsg.Type != pb.Message_CONSENSUS { logger.Errorf("Unexpected message type: %s", ocMsg.Type) return nil } batchMsg := &BatchMessage{} ... if req := batchMsg.GetRequest(); req != nil { ... if (op.pbft.primary(op.pbft.view) == op.pbft.id) && op.pbft.activeView { return op.leaderProcReq(req) } op.startTimerIfOutstandingRequests() return nil } else if pbftMsg := batchMsg.GetPbftMessage(); pbftMsg != nil { senderID, err := getValidatorID(senderHandle) // who sent this? if err != nil { panic("Cannot map sender's PeerID to a valid replica ID") } msg := &Message{} err = proto.Unmarshal(pbftMsg, msg) if err != nil { logger.Errorf("Error unpacking payload from message: %s", err) return nil } return pbftMessageEvent{ msg: msg, sender: senderID, } } logger.Errorf("Unknown request: %+v", batchMsg) return nil }
注意到:
1.1 消息是
Message_CHAIN_TRANSACTION
,将消息的payload
封装成Request
,交由submitToLeader(req)
处理,经过广播等操作,submitToLeader(req)
>>leaderProcReq(req)
>>sendBatch()
最终返回events.Event
类型的RequestBatch
,这个事件在pbft.ProcessEvent(event)
中处理。1.2 消息是
Message_CONSENSUS
,进行分支:1.2.1
req
,最终会调用leaderProcReq(req)
,后期同Message_CHAIN_TRANSACTION
1.2.2
pbftMsg
,经过处理,最终返回pbftMessageEvent
,这个事件在pbft.ProcessEvent(event)
中处理。
2、default,会执行pbft.ProcessEvent(event)
pbft.ProcessEvent(event)
在该函数中通过switch et := e.(type)
进行分支操作:
2.1 pbftMessageEvent
它会执行next, err := instance.recvMsg(msg.msg, msg.sender)
,该函数如下:
func (instance *pbftCore) recvMsg(msg *Message, senderID uint64) (interface{}, error) {
if reqBatch := msg.GetRequestBatch(); reqBatch != nil {
return reqBatch, nil
} else if preprep := msg.GetPrePrepare(); preprep != nil {
if senderID != preprep.ReplicaId {
return nil, fmt.Errorf("Sender ID included in pre-prepare message (%v) doesn't match ID corresponding to the receiving stream (%v)", preprep.ReplicaId, senderID)
}
return preprep, nil
} else if prep := msg.GetPrepare(); prep != nil {
if senderID != prep.ReplicaId {
return nil, fmt.Errorf("Sender ID included in prepare message (%v) doesn't match ID corresponding to the receiving stream (%v)", prep.ReplicaId, senderID)
}
return prep, nil
} else if commit := msg.GetCommit(); commit != nil {
if senderID != commit.ReplicaId {
return nil, fmt.Errorf("Sender ID included in commit message (%v) doesn't match ID corresponding to the receiving stream (%v)", commit.ReplicaId, senderID)
}
return commit, nil
} else if chkpt := msg.GetCheckpoint(); chkpt != nil {
if senderID != chkpt.ReplicaId {
return nil, fmt.Errorf("Sender ID included in checkpoint message (%v) doesn't match ID corresponding to the receiving stream (%v)", chkpt.ReplicaId, senderID)
}
return chkpt, nil
} else if vc := msg.GetViewChange(); vc != nil {
if senderID != vc.ReplicaId {
return nil, fmt.Errorf("Sender ID included in view-change message (%v) doesn't match ID corresponding to the receiving stream (%v)", vc.ReplicaId, senderID)
}
return vc, nil
} else if nv := msg.GetNewView(); nv != nil {
if senderID != nv.ReplicaId {
return nil, fmt.Errorf("Sender ID included in new-view message (%v) doesn't match ID corresponding to the receiving stream (%v)", nv.ReplicaId, senderID)
}
return nv, nil
} else if fr := msg.GetFetchRequestBatch(); fr != nil {
if senderID != fr.ReplicaId {
return nil, fmt.Errorf("Sender ID included in fetch-request-batch message (%v) doesn't match ID corresponding to the receiving stream (%v)", fr.ReplicaId, senderID)
}
return fr, nil
} else if reqBatch := msg.GetReturnRequestBatch(); reqBatch != nil {
// it's ok for sender ID and replica ID to differ; we're sending the original request message
return returnRequestBatchEvent(reqBatch), nil
}
return nil, fmt.Errorf("Invalid message: %v", msg)
}
从代码可以看出,该函数返回各种类型的时间,供pbft.ProcessEvent(event)
处理。
2.2、RequestBatch
它会执行recvRequestBatch(reqBatch)
,进一步调用sendPrePrepare(reqBatch, digest)
func (instance *pbftCore) sendPrePrepare(reqBatch *RequestBatch, digest string) {
...
logger.Debugf("Primary %d broadcasting pre-prepare for view=%d/seqNo=%d and digest %s", instance.id, instance.view, n, digest)
instance.seqNo = n
preprep := &PrePrepare{
View: instance.view,
SequenceNumber: n,
BatchDigest: digest,
RequestBatch: reqBatch,
ReplicaId: instance.id,
}
cert := instance.getCert(instance.view, n)
cert.prePrepare = preprep
cert.digest = digest
instance.persistQSet()
instance.innerBroadcast(&Message{Payload: &Message_PrePrepare{PrePrepare: preprep}})
instance.maybeSendCommit(digest, instance.view, n)
}
注意到:
1) 初始化
PrePrepare
,赋值给cert
的成员变量prePrepare
(指针赋值),同时作为innerBroadcast()
的参数,广播出去。2)
innerBroadcast()
会随机模拟byzantine fault
节点,正常节点会进一步调用instance.consumer.broadcast(msgRaw)
(该函数由
obcBatch
实现),最终调用broadcaster.send()
实现广播。3)调用
instance.maybeSendCommit(digest, instance.view, n)
,因为if条件不成立,没有进行实质性操作。
2.3、PrePrepare
它会执行recvPrePrepare(prePrepare)
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
if ... {
logger.Debugf("Backup %d broadcasting prepare for view=%d/seqNo=%d", instance.id, preprep.View, preprep.SequenceNumber)
prep := &Prepare{
View: preprep.View,
SequenceNumber: preprep.SequenceNumber,
BatchDigest: preprep.BatchDigest,
ReplicaId: instance.id,
}
cert.sentPrepare = true
instance.persistQSet()
instance.recvPrepare(prep)
return instance.innerBroadcast(&Message{Payload: &Message_Prepare{Prepare: prep}})
}
return nil
}
从函数可看出:
1)初始化
Prepare
,并作为recvPrepare()
和innerBroadcast()
的参数。2)
recvPrepare()
对prepare
进行验证,调用instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber)
,在Prepare
中介绍。3)
innerBroadcast()
同2.1(2)
2.4、Prepare
它会执行instance.recvPrepare(prepare)
,调用instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber)
func (instance *pbftCore) maybeSendCommit(digest string, v uint64, n uint64) error {
cert := instance.getCert(v, n)
if instance.prepared(digest, v, n) && !cert.sentCommit {
logger.Debugf("Replica %d broadcasting commit for view=%d/seqNo=%d",
instance.id, v, n)
commit := &Commit{
View: v,
SequenceNumber: n,
BatchDigest: digest,
ReplicaId: instance.id,
}
cert.sentCommit = true
instance.recvCommit(commit)
return instance.innerBroadcast(&Message{&Message_Commit{commit}})
}
return nil
}
注意到,在RequestBatch
虽然有被调用,但是因为条件不成立,所以无法进入if
分支。在接收到prePrepare
和prepare
后,条件成立,进入if
分支。
1)初始化
commit
,且仅同一个prepare
只初始化一次。2)调用
instance.recvCommit(commit)
,在Commit
中讲解。3)
innerBroadcast()
同2.1(2)
2.5、Commit
它会执行instance.recvCommit(commit)
,进一步调用instance.committed(commit.BatchDigest, commit.View, commit.SequenceNumber)
func (instance *pbftCore) committed(digest string, v uint64, n uint64) bool {
if !instance.prepared(digest, v, n) {
return false
}
quorum := 0
cert := instance.certStore[msgID{v, n}]
if cert == nil {
return false
}
for _, p := range cert.commit {
if p.View == v && p.SequenceNumber == n {
quorum++
}
}
logger.Debugf("Replica %d commit count for view=%d/seqNo=%d: %d",
instance.id, v, n, quorum)
return quorum >= instance.intersectionQuorum()
}
func (instance *pbftCore) intersectionQuorum() int {
return (instance.N + instance.f + 2) / 2
}
从代码可以看出,通过cert
中保存的commit
判定最终是否committed
。cert
的结构如下:
ype msgCert struct {
digest string
prePrepare *PrePrepare
sentPrepare bool
prepare []*Prepare
sentCommit bool
commit []*Commit
}
committed的后续操作尚未研究
Last updated
Was this helpful?