checkpoint部分的恢复与修改

入口

在fabric里面,当接收到一个committedEvent的时候,将会返回一个execDoneEvent{},然后再把该event交给pbft.ProcessEvent(event)进行处理,然后再调用func (instance *pbftCore) execDoneSync(),再这里面调用了instance.Checkpoint(instance.lastExec, instance.consumer.getState())。这就是func (instance *pbftCore) Checkpoint(seqNo uint64, id []byte)的入口。

在geth-pbft的func (instance *pbftCore) recvCommit(commit *types.Commit) error里面,我在当该block处于committed状态的时候,增加语句instance.helper.manager.Queue() <- execDoneEvent{},然后在pbft.ProcessEvent(event)里面增加对这个事件的处理:

case execDoneEvent:
        instance.execDoneSync()
        if instance.skipInProgress {
            instance.retryStateTransfer(nil)
        }
        // We will delay new view processing sometimes
        return instance.processNewView()

恢复func (instance *pbftCore) execDoneSync(),在该函数里,当当前处理的block的序号是instance.K的整数倍的时候,调用instance.Checkpoint(instance.lastExec, instance.consumer.getState()),初始化checkpoint信息。

checkpoint部分的恢复与修改

1、 恢复func (instance *pbftCore) Checkpoint(seqNo uint64, id []byte),在这个函数里初始化了checkpoint信息,把它放入了字典instance.chkpts(key是seqNo,value值是string类型的,是checkpoint的digest),并把checkpoint信息进行了广播;调用了instance.recvCheckpoint(chkpt)

checkpointStore里存储的是本节点的和来自其他节点的checkpoint,而chkpts存储的只有本节点的关于checkpoint的信息。

2、恢复func (instance *pbftCore) recvCheckpoint(chkpt *types.Checkpoint) events.Event,在这个函数里对收到的checkpoint信息进行验证后把它放入了checkpointStore里,并对checkpointStore里等于chkpt.SequenceNumberchkpt.Id的checkpoint个数进行统计并做出相应处理。如果checkpoint的个数大于2f+1个,将会触发func (instance *pbftCore) processNewView() events.Event。根据func (instance *pbftCore) recvCheckpoint(chkpt *types.Checkpoint) events.Event的内部调用:

  • 恢复了func (instance *pbftCore) weakCheckpointSetOutOfRange(chkpt *types.Checkpoint) bool

  • 恢复了func (instance *pbftCore) witnessCheckpointWeakCert(chkpt *Checkpoint)

2.1、恢复了func (instance *pbftCore) weakCheckpointSetOutOfRange(chkpt *types.Checkpoint) bool,在这个函数进行判断:

  • 如果checkpoint.seqNo小于本节点的高水位把它从instance.hChkpts中删除,并return false。该字典存储那些高于本节点的高水位的seqNo,key是ReplicaId,value是seqNo。否则return true

  • 如果checkpoint.seqNo大于本届点的高水位,并且instance.hChkpts中存储的高于本节点高水位的checkpoint.seqNo的个数小于f+1个,则把他放入instance.hChkptsreturn false。如果instance.hChkpts中存储的高于本节点高水位的checkpoint.seqNo的个数大于f+1个,意味着在同一时刻该节点是收集不够2f+1条针对该seqNocheckpoint消息的,那么该checkpoint将永远达不到stable的状态,因为3f+1-(f+1)=2f<(2f+1)。在这种情况下,我们将从instance.blockStore里清空所有已处理的block,从instance.outstandingBlocks里清空所有等待处理的block,重新设置高低水位,重新设置checkpoint,并return true。在状态重置的过程中恢复了func (instance *pbftCore) invalidateStatefunc (instance *pbftCore) moveWatermarks(n uint64)等函数。

2.1.1、 在func (instance *pbftCore) weakCheckpointSetOutOfRange(chkpt *types.Checkpoint) bool调用func (instance *pbftCore) moveWatermarks(n uint64),恢复该函数。该函数重新设置了低水位,清空了certStorecheckpointStorepsetqsetchkpts里seqNo低于低水位的block的信息。

2.1.2、 在func (instance *pbftCore) moveWatermarks(n uint64)里调用func (instance *pbftCore) resubmitRequestBatches(),恢复更改为func (instance *pbftCore) resubmitBlockMsges()。具体更改内容参见该函数。

2.1.3、在func (instance *pbftCore) resubmitBlockMsges()调用了instance.recvBlockMsg(block),恢复func (instance *pbftCore) recvBlockMsg(block *types.Block) error。在该函数里调用instance.sendPrePrepare(block)把block以PrePrepareMessage的形式广播出去。

2.2、恢复了func (instance *pbftCore) witnessCheckpointWeakCert(chkpt *Checkpoint),当在已存储checkpointstoresqeNoid都和接收到的checkpoint的sqeNoid相同的checkpoint的个数等于f+1时,就说明该checkpoint在该节点上得到了一个weak certificates,就会调用这个函数。

2.2.1、在func (instance *pbftCore) witnessCheckpointWeakCert(chkpt *Checkpoint)里,初始化了一个target,他是stateUpdateTarget类型的,通过在func (instance *pbftCore) updateHighStateTarget(target *stateUpdateTarget)target传递给instance.highStateTarget = target,获得弱认证的checkpoint的证书。因此恢复func (instance *pbftCore) updateHighStateTarget(target *stateUpdateTarget)

2.2.2、在func (instance *pbftCore) witnessCheckpointWeakCert(chkpt *types.Checkpoint)里,当instance.skipInProgress为true的时候进行状态转换,恢复func (instance *pbftCore) retryStateTransfer(optional *stateUpdateTarget)

2.2.3、在func (instance *pbftCore) retryStateTransfer(optional *stateUpdateTarget)里经过一系列的验证后,调用instance.consumer.skipTo(target.seqNo, target.id, target.replicas),这里把fabric里面的func (op *obcGeneric) skipTo(seqNo uint64, id []byte, replicas []uint64)拿到了pbft-core.go里,变为func (instance *pbftCore) skipTo(seqNo uint64, id []byte, replicas []uint64),并且在pbft_message.go里定义了type BlockchainInfo struct,实现了message接口。

2.2.4、在func (instance *pbftCore) skipTo(seqNo uint64, id []byte, replicas []uint64)里又进一步调用了instance.helper.UpdateState(&checkpointMessage{seqNo, id}, info, getValidatorHandles(replicas)),把checkpointMessagestateUpdateEvent封装放入了manager.Queue()里。这里把这个函数恢复到了helper.go里,其中参数getValidatorHandles(replicas)也进行了恢复。

2.2.5、在函数这里把BlockchainInfoPeerID都恢复到了pbft_message.go里。

Last updated

Was this helpful?