Section2.8 prePrepare、prepare-and-commit

待完善。。。。。。

事件来源

在serve()函数中提到的NewPeerWithEngine() >> engFactory(peer)<=>GetEngine(peer)中,该函数位于engine.go中,for循环不断通过engine.consensusFan.GetOutChannel()messageFan中获取msg,然后通过engine.consenter.RecvMsg(msg.Msg, msg.Sender)发送出去,该函数实际上由externalEventReceiver实现。

// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
    eer.manager.Queue() <- batchMessageEvent{
        msg:    ocMsg,
        sender: senderHandle,
    }
    return nil
}

注意到,该函数是将batchMessageEvent发送到err.manager.Queue()中。

事件处理

  • obcBatch初始化时通过goroutine调用managerImpl.eventLoop(),最终会调用obcBatch.ProcessEvent()

  • obcBatch.ProcessEvent()函数中,通过switch et := event.(type) 分支处理events

    1、batchMessageEvent

    case batchMessageEvent:
          ocMsg := et
          return op.processMessage(ocMsg.msg, ocMsg.sender)

    注意到,该分支会调用obcBatch.processMessage(batchMessageEvent.msg, batchMessageEvent.sender)

    func (op *obcBatch) processMessage(ocMsg *pb.Message, senderHandle *pb.PeerID) events.Event {
      if ocMsg.Type == pb.Message_CHAIN_TRANSACTION {
          req := op.txToReq(ocMsg.Payload)
          return op.submitToLeader(req)
      }
    
      if ocMsg.Type != pb.Message_CONSENSUS {
          logger.Errorf("Unexpected message type: %s", ocMsg.Type)
          return nil
      }
    
      batchMsg := &BatchMessage{}
      ...
    
      if req := batchMsg.GetRequest(); req != nil {
          ...
          if (op.pbft.primary(op.pbft.view) == op.pbft.id) && op.pbft.activeView {
              return op.leaderProcReq(req)
          }
          op.startTimerIfOutstandingRequests()
          return nil
      } else if pbftMsg := batchMsg.GetPbftMessage(); pbftMsg != nil {
          senderID, err := getValidatorID(senderHandle) // who sent this?
          if err != nil {
              panic("Cannot map sender's PeerID to a valid replica ID")
          }
          msg := &Message{}
          err = proto.Unmarshal(pbftMsg, msg)
          if err != nil {
              logger.Errorf("Error unpacking payload from message: %s", err)
              return nil
          }
          return pbftMessageEvent{
              msg:    msg,
              sender: senderID,
          }
      }
    
      logger.Errorf("Unknown request: %+v", batchMsg)
    
      return nil
    }

    注意到:

  • 1.1 消息是Message_CHAIN_TRANSACTION,将消息的payload封装成Request,交由submitToLeader(req)处理,经过广播等操作,

    submitToLeader(req) >> leaderProcReq(req) >> sendBatch() 最终返回events.Event类型的RequestBatch,这个事件在pbft.ProcessEvent(event)中处理。

  • 1.2 消息是Message_CONSENSUS,进行分支:

    • 1.2.1 req,最终会调用leaderProcReq(req),后期同Message_CHAIN_TRANSACTION

    • 1.2.2 pbftMsg,经过处理,最终返回pbftMessageEvent,这个事件在pbft.ProcessEvent(event)中处理。

2、default,会执行pbft.ProcessEvent(event)

在该函数中通过switch et := e.(type)进行分支操作:

2.1 pbftMessageEvent

它会执行next, err := instance.recvMsg(msg.msg, msg.sender),该函数如下:

从代码可以看出,该函数返回各种类型的时间,供pbft.ProcessEvent(event)处理。

2.2、RequestBatch

它会执行recvRequestBatch(reqBatch),进一步调用sendPrePrepare(reqBatch, digest)

注意到:

  • 1) 初始化PrePrepare,赋值给cert的成员变量prePrepare(指针赋值),同时作为innerBroadcast()的参数,广播出去。

  • 2)innerBroadcast()会随机模拟byzantine fault节点,正常节点会进一步调用instance.consumer.broadcast(msgRaw)

    (该函数由obcBatch实现),最终调用broadcaster.send()实现广播。

  • 3)调用instance.maybeSendCommit(digest, instance.view, n),因为if条件不成立,没有进行实质性操作。

2.3、PrePrepare

它会执行recvPrePrepare(prePrepare)

从函数可看出:

  • 1)初始化Prepare,并作为recvPrepare()innerBroadcast()的参数。

  • 2)recvPrepare()prepare进行验证,调用instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber),在Prepare中介绍。

  • 3)innerBroadcast()同2.1(2)

2.4、Prepare

它会执行instance.recvPrepare(prepare),调用instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber)

注意到,在RequestBatch虽然有被调用,但是因为条件不成立,所以无法进入if分支。在接收到prePrepareprepare后,条件成立,进入if分支。

  • 1)初始化commit,且仅同一个prepare只初始化一次。

  • 2)调用instance.recvCommit(commit),在Commit中讲解。

  • 3)innerBroadcast()同2.1(2)

2.5、Commit

它会执行instance.recvCommit(commit),进一步调用instance.committed(commit.BatchDigest, commit.View, commit.SequenceNumber)

从代码可以看出,通过cert中保存的commit判定最终是否committedcert的结构如下:

committed的后续操作尚未研究

Last updated