Section2.8 prePrepare、prepare-and-commit

待完善。。。。。。

事件来源

在serve()函数中提到的NewPeerWithEngine() >> engFactory(peer)<=>GetEngine(peer)中,该函数位于engine.go中,for循环不断通过engine.consensusFan.GetOutChannel()messageFan中获取msg,然后通过engine.consenter.RecvMsg(msg.Msg, msg.Sender)发送出去,该函数实际上由externalEventReceiver实现。

// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
    eer.manager.Queue() <- batchMessageEvent{
        msg:    ocMsg,
        sender: senderHandle,
    }
    return nil
}

注意到,该函数是将batchMessageEvent发送到err.manager.Queue()中。

事件处理

  • obcBatch初始化时通过goroutine调用managerImpl.eventLoop(),最终会调用obcBatch.ProcessEvent()

  • obcBatch.ProcessEvent()函数中,通过switch et := event.(type) 分支处理events

    1、batchMessageEvent

    case batchMessageEvent:
          ocMsg := et
          return op.processMessage(ocMsg.msg, ocMsg.sender)

    注意到,该分支会调用obcBatch.processMessage(batchMessageEvent.msg, batchMessageEvent.sender)

    func (op *obcBatch) processMessage(ocMsg *pb.Message, senderHandle *pb.PeerID) events.Event {
      if ocMsg.Type == pb.Message_CHAIN_TRANSACTION {
          req := op.txToReq(ocMsg.Payload)
          return op.submitToLeader(req)
      }
    
      if ocMsg.Type != pb.Message_CONSENSUS {
          logger.Errorf("Unexpected message type: %s", ocMsg.Type)
          return nil
      }
    
      batchMsg := &BatchMessage{}
      ...
    
      if req := batchMsg.GetRequest(); req != nil {
          ...
          if (op.pbft.primary(op.pbft.view) == op.pbft.id) && op.pbft.activeView {
              return op.leaderProcReq(req)
          }
          op.startTimerIfOutstandingRequests()
          return nil
      } else if pbftMsg := batchMsg.GetPbftMessage(); pbftMsg != nil {
          senderID, err := getValidatorID(senderHandle) // who sent this?
          if err != nil {
              panic("Cannot map sender's PeerID to a valid replica ID")
          }
          msg := &Message{}
          err = proto.Unmarshal(pbftMsg, msg)
          if err != nil {
              logger.Errorf("Error unpacking payload from message: %s", err)
              return nil
          }
          return pbftMessageEvent{
              msg:    msg,
              sender: senderID,
          }
      }
    
      logger.Errorf("Unknown request: %+v", batchMsg)
    
      return nil
    }

    注意到:

  • 1.1 消息是Message_CHAIN_TRANSACTION,将消息的payload封装成Request,交由submitToLeader(req)处理,经过广播等操作,

    submitToLeader(req) >> leaderProcReq(req) >> sendBatch() 最终返回events.Event类型的RequestBatch,这个事件在pbft.ProcessEvent(event)中处理。

  • 1.2 消息是Message_CONSENSUS,进行分支:

    • 1.2.1 req,最终会调用leaderProcReq(req),后期同Message_CHAIN_TRANSACTION

    • 1.2.2 pbftMsg,经过处理,最终返回pbftMessageEvent,这个事件在pbft.ProcessEvent(event)中处理。

2、default,会执行pbft.ProcessEvent(event)

在该函数中通过switch et := e.(type)进行分支操作:

2.1 pbftMessageEvent

它会执行next, err := instance.recvMsg(msg.msg, msg.sender),该函数如下:

func (instance *pbftCore) recvMsg(msg *Message, senderID uint64) (interface{}, error) {
    if reqBatch := msg.GetRequestBatch(); reqBatch != nil {
        return reqBatch, nil
    } else if preprep := msg.GetPrePrepare(); preprep != nil {
        if senderID != preprep.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in pre-prepare message (%v) doesn't match ID corresponding to the receiving stream (%v)", preprep.ReplicaId, senderID)
        }
        return preprep, nil
    } else if prep := msg.GetPrepare(); prep != nil {
        if senderID != prep.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in prepare message (%v) doesn't match ID corresponding to the receiving stream (%v)", prep.ReplicaId, senderID)
        }
        return prep, nil
    } else if commit := msg.GetCommit(); commit != nil {
        if senderID != commit.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in commit message (%v) doesn't match ID corresponding to the receiving stream (%v)", commit.ReplicaId, senderID)
        }
        return commit, nil
    } else if chkpt := msg.GetCheckpoint(); chkpt != nil {
        if senderID != chkpt.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in checkpoint message (%v) doesn't match ID corresponding to the receiving stream (%v)", chkpt.ReplicaId, senderID)
        }
        return chkpt, nil
    } else if vc := msg.GetViewChange(); vc != nil {
        if senderID != vc.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in view-change message (%v) doesn't match ID corresponding to the receiving stream (%v)", vc.ReplicaId, senderID)
        }
        return vc, nil
    } else if nv := msg.GetNewView(); nv != nil {
        if senderID != nv.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in new-view message (%v) doesn't match ID corresponding to the receiving stream (%v)", nv.ReplicaId, senderID)
        }
        return nv, nil
    } else if fr := msg.GetFetchRequestBatch(); fr != nil {
        if senderID != fr.ReplicaId {
            return nil, fmt.Errorf("Sender ID included in fetch-request-batch message (%v) doesn't match ID corresponding to the receiving stream (%v)", fr.ReplicaId, senderID)
        }
        return fr, nil
    } else if reqBatch := msg.GetReturnRequestBatch(); reqBatch != nil {
        // it's ok for sender ID and replica ID to differ; we're sending the original request message
        return returnRequestBatchEvent(reqBatch), nil
    }
    return nil, fmt.Errorf("Invalid message: %v", msg)
}

从代码可以看出,该函数返回各种类型的时间,供pbft.ProcessEvent(event)处理。

2.2、RequestBatch

它会执行recvRequestBatch(reqBatch),进一步调用sendPrePrepare(reqBatch, digest)

func (instance *pbftCore) sendPrePrepare(reqBatch *RequestBatch, digest string) {
    ...
    logger.Debugf("Primary %d broadcasting pre-prepare for view=%d/seqNo=%d and digest %s", instance.id, instance.view, n, digest)
    instance.seqNo = n
    preprep := &PrePrepare{
        View:           instance.view,
        SequenceNumber: n,
        BatchDigest:    digest,
        RequestBatch:   reqBatch,
        ReplicaId:      instance.id,
    }
    cert := instance.getCert(instance.view, n)
    cert.prePrepare = preprep
    cert.digest = digest
    instance.persistQSet()
    instance.innerBroadcast(&Message{Payload: &Message_PrePrepare{PrePrepare: preprep}})
    instance.maybeSendCommit(digest, instance.view, n)
}

注意到:

  • 1) 初始化PrePrepare,赋值给cert的成员变量prePrepare(指针赋值),同时作为innerBroadcast()的参数,广播出去。

  • 2)innerBroadcast()会随机模拟byzantine fault节点,正常节点会进一步调用instance.consumer.broadcast(msgRaw)

    (该函数由obcBatch实现),最终调用broadcaster.send()实现广播。

  • 3)调用instance.maybeSendCommit(digest, instance.view, n),因为if条件不成立,没有进行实质性操作。

2.3、PrePrepare

它会执行recvPrePrepare(prePrepare)

func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
    ...
    if ... {
        logger.Debugf("Backup %d broadcasting prepare for view=%d/seqNo=%d", instance.id, preprep.View, preprep.SequenceNumber)
        prep := &Prepare{
            View:           preprep.View,
            SequenceNumber: preprep.SequenceNumber,
            BatchDigest:    preprep.BatchDigest,
            ReplicaId:      instance.id,
        }
        cert.sentPrepare = true
        instance.persistQSet()
        instance.recvPrepare(prep)
        return instance.innerBroadcast(&Message{Payload: &Message_Prepare{Prepare: prep}})
    }

    return nil
}

从函数可看出:

  • 1)初始化Prepare,并作为recvPrepare()innerBroadcast()的参数。

  • 2)recvPrepare()prepare进行验证,调用instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber),在Prepare中介绍。

  • 3)innerBroadcast()同2.1(2)

2.4、Prepare

它会执行instance.recvPrepare(prepare),调用instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber)

func (instance *pbftCore) maybeSendCommit(digest string, v uint64, n uint64) error {
    cert := instance.getCert(v, n)
    if instance.prepared(digest, v, n) && !cert.sentCommit {
        logger.Debugf("Replica %d broadcasting commit for view=%d/seqNo=%d",
            instance.id, v, n)
        commit := &Commit{
            View:           v,
            SequenceNumber: n,
            BatchDigest:    digest,
            ReplicaId:      instance.id,
        }
        cert.sentCommit = true
        instance.recvCommit(commit)
        return instance.innerBroadcast(&Message{&Message_Commit{commit}})
    }
    return nil
}

注意到,在RequestBatch虽然有被调用,但是因为条件不成立,所以无法进入if分支。在接收到prePrepareprepare后,条件成立,进入if分支。

  • 1)初始化commit,且仅同一个prepare只初始化一次。

  • 2)调用instance.recvCommit(commit),在Commit中讲解。

  • 3)innerBroadcast()同2.1(2)

2.5、Commit

它会执行instance.recvCommit(commit),进一步调用instance.committed(commit.BatchDigest, commit.View, commit.SequenceNumber)

func (instance *pbftCore) committed(digest string, v uint64, n uint64) bool {
    if !instance.prepared(digest, v, n) {
        return false
    }

    quorum := 0
    cert := instance.certStore[msgID{v, n}]
    if cert == nil {
        return false
    }

    for _, p := range cert.commit {
        if p.View == v && p.SequenceNumber == n {
            quorum++
        }
    }

    logger.Debugf("Replica %d commit count for view=%d/seqNo=%d: %d",
        instance.id, v, n, quorum)

    return quorum >= instance.intersectionQuorum()
}

func (instance *pbftCore) intersectionQuorum() int {
    return (instance.N + instance.f + 2) / 2
}

从代码可以看出,通过cert中保存的commit判定最终是否committedcert的结构如下:

ype msgCert struct {
    digest      string
    prePrepare  *PrePrepare
    sentPrepare bool
    prepare     []*Prepare
    sentCommit  bool
    commit      []*Commit
}

committed的后续操作尚未研究

Last updated

Was this helpful?