Section2.2.1 stream to consenter

摘要

在初始化过程中, peer获得了engine, 该engine会启动一个线程一直从MessageFanoutchannel里读取消息送给consenter处理.

out的消息是MessageFan启动线程从多个in(ins)里读取的.peer在初始化中会从engine获得HandlerFactory, 然后使用HandlerFactory生成ConsensusHandler, 生成ConsensusHandler的过程会创建一个channel, 加入到ins. peer会通过func ConsensusHandler.HandleMessage()stream的消息送给该channel(即in).

即stream >> MessageFan.ins >> MessageFan.out >> consenter

过程

fabric用github.com/spf13/cobra来管理命令. 经过一系列初始化之后, 程序会调用在peer/node/start.go中定义的func serve(args []string) error, 其中通过命令

peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)

来获得Peer实例, 这里的第二个参数helper.GetEngine便是在consensus/helper/engine.go定义的func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error). 在func peer.NewPeerWithEngine()中, 有

peer.engine, err = engFactory(peer)

即, peer在初始化时, 将自己作为参数传给func GetEngine(), 获得一个engine实例. func GetEngine()即为fabric和共识机制交互的入口. 定义如下:

func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
    var err error
    engineOnce.Do(func() {
        engine = new(EngineImpl)
        engine.helper = NewHelper(coord)
        engine.consenter = controller.NewConsenter(engine.helper)
        engine.helper.setConsenter(engine.consenter)
        engine.peerEndpoint, err = coord.GetPeerEndpoint()
        engine.consensusFan = util.NewMessageFan()

        go func() {
            logger.Debug("Starting up message thread for consenter")

            // The channel never closes, so this should never break
            for msg := range engine.consensusFan.GetOutChannel() {
                engine.consenter.RecvMsg(msg.Msg, msg.Sender)
            }
        }()
    })
    return engine, err
}

注意到, 前面部分是对engine成员变量的初始化, 后面通过go指令启动了一个线程, 该线程会一直保持运行. 在go func() {...}内部, func GetOutChannel()返回了一个MessageFan的成员变量out, 类型为channel, for循环的作用便是一直从该channel读取消息, 然后通过func engine.consenter.RecvMsg()送给consenter.

MessageFan另外有个成员变量ins []<-chan *Message, 通过调用func (fan *MessageFan) AddFaninChannel(channel <-chan *Message)可以向ins增加输入的channel, 每一次调用该函数, MessageFan会启动一个线程自动开始持续地将来自新channel的消息写入out, 进而再被engine送给consenter.

func AddFaninChannel()仅在consensus/helper/handler.go中定义的func NewConsensusHandler()中被调用.

NewConsensusHandler constructs a new MessageHandler for the plugin. Is instance of peer.HandlerFactory

该函数(初始化handler)的过程中, hanlder创建了一个channel, 并保存在自己的成员变量中. 该函数也仅仅有一次引用:

// GetHandlerFactory returns new NewConsensusHandler
func (eng *EngineImpl) GetHandlerFactory() peer.HandlerFactory {
    return NewConsensusHandler
}

即, engine把它进一步封装成成员函数供peer使用. 该函数同样是在func NewPeerWithEngine()中被使用的:

peer.handlerFactory = peer.engine.GetHandlerFactory()

peer接下来可以使用func handleChat()来从stream获取消息, 然后交给handler.

// Chat implementation of the the Chat bidi streaming RPC function
func (p *Impl) handleChat(ctx context.Context, stream ChatStream, initiatedStream bool) error {
    deadline, ok := ctx.Deadline()
    peerLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)
    handler, err := p.handlerFactory(p, stream, initiatedStream)
    if err != nil {
        return fmt.Errorf("Error creating handler during handleChat initiation: %s", err)
    }
    defer handler.Stop()
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            peerLogger.Debug("Received EOF, ending Chat")
            return nil
        }
        if err != nil {
            e := fmt.Errorf("Error during Chat, stopping handler: %s", err)
            peerLogger.Error(e.Error())
            return e
        }
        err = handler.HandleMessage(in)
        if err != nil {
            peerLogger.Errorf("Error handling message: %s", err)
            //return err
        }
    }
}

注意到handler.HandleMessage(in), 该handler实际上就是通过调用开头说到的consensus/helper/handler.go中定义的func NewConsensusHandler()返回的ConsensusHandler, 类型为

type ConsensusHandler struct {
    peer.MessageHandler
    consenterChan chan *util.Message
    coordinator   peer.MessageHandlerCoordinator
}

func HandleMessage()的实现为:

func (handler *ConsensusHandler) HandleMessage(msg *pb.Message) error {
    if msg.Type == pb.Message_CONSENSUS {
        senderPE, _ := handler.To()
        select {
        case handler.consenterChan <- &util.Message{
            Msg:    msg,
            Sender: senderPE.ID,
        }:
            return nil
        default:
            err := fmt.Errorf("Message channel for %v full, rejecting", senderPE.ID)
            logger.Errorf("Failed to queue consensus message because: %v", err)
            return err
        }
    }

    if logger.IsEnabledFor(logging.DEBUG) {
        logger.Debugf("Did not handle message of type %s, passing on to next MessageHandler", msg.Type)
    }
    return handler.MessageHandler.HandleMessage(msg)
}

这里便会通过handler.consenterChan将消息发送给consenter. 注意到, return handler.MessageHandler.HandleMessage(msg)msg重新传回了peer, 交给FMS(?)处理, 进一步的机制未知.

Last updated

Was this helpful?