Section1.3 event-receive
摘要
geth定义了两个peer
类型, 一个在p2p
, 另一个在eth
. geth启动后, 由eth.ProtocalManager
和p2p.Server
分别监听addPeer
的消息, 当收到消息后会创建一个新的peer
, 并开始持续从peer
的IO中读取消息并处理.
对于p2p.Server
, 会直接从网络层获得消息, 它会将关于p2p
网络的消息进行处理(比如ping
), 其余消息全部会交给eth.ProtocalManager
. eth.ProtocalManager
在收到来自p2p
的消息后, 会识别消息类型(比如交易, 块请求), 并分别进行处理.
我们将会主要关心eth.ProtocalManager
对消息的处理, p2p
层不具体描述.
消息大致传送路径为: 网络层 >> p2p >> eth.
过程
在geth启动后, 会通过eth/backend.go
中定义的func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error)
生成Ethereum
实例. 在该函数中, 同时通过eth.protocolManager, err = NewProtocolManager(...)
生成了一个ProtocalManager
.
该类型定义在eth/handler.go
, 通过func NewProtocolManager()
初始化. 初始化过程中会注册一个函数:
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
...
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
...
该函数会在P2P协议启动时, 开启单独的线程并执行(定义在p2p/peer.go
). 注意到, 该函数内部会定义: 当manager.newPeerCh
收到一个新的peer
后, 会调用manager.handle(peer)
, 即每一个新的peer
都会被调用manager.handle(peer)
.
func handle()
定义如下:
func (pm *ProtocolManager) handle(p *peer) error {
...
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
函数末尾会有一个死循环, 一直重复调用pm.handleMsg(p)
. func handleMsg()
就是程序处理消息的地方.
handleMsg is invoked whenever an inbound message is received from a remote peer. The remote connection is torn down upon returning any error.
在geth还有一个注册peer并持续调用handleMsg的地方, 在p2p.Server
. 两者的peer
类型只是名字一样, 是两个不同的结构. 其也会监听addPeer
消息, 创建peer
, 启动peer
并通过handleMsg
接收消息(也叫handleMsg
, 但不是同一个函数), 但里面只会处理关于p2p
的消息(例如ping), 其他消息作为subprotocol message
仍会交给ProtocalManager
处理.
func (pm *ProtocolManager) handleMsg(p *peer) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
...
// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
...
// Block header query, collect the requested headers and reply
case msg.Code == GetBlockHeadersMsg:
...
case msg.Code == BlockHeadersMsg:
...
case msg.Code == GetBlockBodiesMsg:
...
case msg.Code == BlockBodiesMsg:
...
case msg.Code == TxMsg:
...
// Transactions can be processed, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
}
pm.txpool.AddBatch(txs)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
return nil
}
可以看到, 这里geth根据消息code的不同, 判断消息类型并执行相应操作. 这里的消息是从peer.rw.ReadMsg()
中读取的.消息本身则是在p2p/message.go
里定义的. rw
的类型为p2p.MsgReadWriter
, 会根据各个peer
的链接从网络获取消息. 再底层的机制不再深究.
Last updated
Was this helpful?