Section1.3 event-receive

摘要

geth定义了两个peer类型, 一个在p2p, 另一个在eth. geth启动后, 由eth.ProtocalManagerp2p.Server分别监听addPeer的消息, 当收到消息后会创建一个新的peer, 并开始持续从peer的IO中读取消息并处理.

对于p2p.Server, 会直接从网络层获得消息, 它会将关于p2p网络的消息进行处理(比如ping), 其余消息全部会交给eth.ProtocalManager. eth.ProtocalManager在收到来自p2p的消息后, 会识别消息类型(比如交易, 块请求), 并分别进行处理.

我们将会主要关心eth.ProtocalManager对消息的处理, p2p层不具体描述.

消息大致传送路径为: 网络层 >> p2p >> eth.

过程

在geth启动后, 会通过eth/backend.go中定义的func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error)生成Ethereum实例. 在该函数中, 同时通过eth.protocolManager, err = NewProtocolManager(...)生成了一个ProtocalManager.

该类型定义在eth/handler.go, 通过func NewProtocolManager()初始化. 初始化过程中会注册一个函数:

manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
    ...
    Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
        peer := manager.newPeer(int(version), p, rw)
        select {
        case manager.newPeerCh <- peer:
            manager.wg.Add(1)
            defer manager.wg.Done()
            return manager.handle(peer)
        case <-manager.quitSync:
            return p2p.DiscQuitting
        }
    },
...

该函数会在P2P协议启动时, 开启单独的线程并执行(定义在p2p/peer.go). 注意到, 该函数内部会定义: 当manager.newPeerCh收到一个新的peer后, 会调用manager.handle(peer), 即每一个新的peer都会被调用manager.handle(peer).

func handle()定义如下:

func (pm *ProtocolManager) handle(p *peer) error {
    ...
    for {
        if err := pm.handleMsg(p); err != nil {
            p.Log().Debug("Ethereum message handling failed", "err", err)
            return err
        }
    }
}

函数末尾会有一个死循环, 一直重复调用pm.handleMsg(p). func handleMsg()就是程序处理消息的地方.

handleMsg is invoked whenever an inbound message is received from a remote peer. The remote connection is torn down upon returning any error.

在geth还有一个注册peer并持续调用handleMsg的地方, 在p2p.Server. 两者的peer类型只是名字一样, 是两个不同的结构. 其也会监听addPeer消息, 创建peer, 启动peer并通过handleMsg接收消息(也叫handleMsg, 但不是同一个函数), 但里面只会处理关于p2p的消息(例如ping), 其他消息作为subprotocol message仍会交给ProtocalManager处理.

func (pm *ProtocolManager) handleMsg(p *peer) error {
    // Read the next message from the remote peer, and ensure it's fully consumed
    msg, err := p.rw.ReadMsg()
    ...
    // Handle the message depending on its contents
    switch {
    case msg.Code == StatusMsg:
    ...
    // Block header query, collect the requested headers and reply
    case msg.Code == GetBlockHeadersMsg:
    ...
    case msg.Code == BlockHeadersMsg:
    ...
    case msg.Code == GetBlockBodiesMsg:
    ...
    case msg.Code == BlockBodiesMsg:
    ...
    case msg.Code == TxMsg:
        ...
        // Transactions can be processed, parse all of them and deliver to the pool
        var txs []*types.Transaction
        if err := msg.Decode(&txs); err != nil {
            return errResp(ErrDecode, "msg %v: %v", msg, err)
        }
        for i, tx := range txs {
            // Validate and mark the remote transaction
            if tx == nil {
                return errResp(ErrDecode, "transaction %d is nil", i)
            }
            p.MarkTransaction(tx.Hash())
        }
        pm.txpool.AddBatch(txs)

    default:
        return errResp(ErrInvalidMsgCode, "%v", msg.Code)
    }
    return nil
}

可以看到, 这里geth根据消息code的不同, 判断消息类型并执行相应操作. 这里的消息是从peer.rw.ReadMsg()中读取的.消息本身则是在p2p/message.go里定义的. rw的类型为p2p.MsgReadWriter, 会根据各个peer的链接从网络获取消息. 再底层的机制不再深究.

Last updated

Was this helpful?