Section1.4 event-send&mine
摘要
geth中的事件类型需要向event
模块进行注册. 每一个外部节点若与geth相链接就会生成一个peer
实例, peer
里保存了在网络上与外部节点的通道. peer
可以向event
模块订阅相应的事件类型, 内部产生的消息会发送给event
, event
保存了订阅该事件类型的peer
信息, 并想这些peer
进行广播. peer
收到消息后会通过网络将消息实际发送给外部节点.
即: 事件产生 >> event >> peer >> 网络 >> 外部节点
特别地, 对于mine过程, 成功挖出块的传递是: ethash.mine() >> ethash.Seal() >> miner.CpuAgent.mine() >> miner.worker >> miner.worker.mux(即:event.TypeMux) >> event.TypeMux.Post() >> peer
过程
geth中, 由ethash产生的事件很单一, 只有挖块成功等几个事件类型. geth中生成块的接口定义为Seal
. 在ethash中, Seal
会调用内部实现的func mine()
, 由其进行实际的挖矿工作. 定义如下:
// mine is the actual proof-of-work miner that searches for a nonce starting from
// seed that results in correct final block difficulty.
func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
// Extract some data from the header
...
// Start generating random nonces until we abort or find a good one
var (
attempts = int64(0)
nonce = seed
)
...
for {
select {
case <-abort:
...
return
default:
// We don't have to update hash rate on every nonce, so update after after 2^X nonces
attempts++
...
// Compute the PoW value of this nonce
digest, result := hashimotoFull(dataset, hash, nonce)
if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
// Correct nonce found, create a new header with it
...
// Seal and return a block (if still needed)
select {
case found <- block.WithSeal(header):
logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
case <-abort:
logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
}
return
}
nonce++
}
}
}
看到, func mine()
参数中包括一个found
channel, 当成功获得一个合法的块时, 会将其header通过found
传出. found
是由func Seal()
定义的, func Seal()
内部接收了这个消息并将header返回. func Seal()
对外会被miner/agent
调用:
func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
self.returnCh <- &Result{work, result}
} else {
if err != nil {
log.Warn("Block sealing failed", "err", err)
}
self.returnCh <- nil
}
}
CpuAgent
将收到的结果传送给发送给returnCh
, 该channel是在worker
给赋给CpuAgent
的, 本身是worker
的成员变量recv chan *Result
. worker
会在func wait()
接受来自recv
的结果, 在func wait()
中, worker
初步解析结果, 并将结果广播:
// broadcast before waiting for validation
go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) {
self.mux.Post(core.NewMinedBlockEvent{Block: block})
self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
self.mux.Post(core.ChainHeadEvent{Block: block})
self.mux.Post(logs)
}
if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
log.Warn("Failed writing block receipts", "err", err)
}
}(block, work.state.Logs(), work.receipts)
调用的是func worker.mux.Post()
. mux
是worker
的成员变量, 类型为*event.TypeMux
. 该类型定义在event/event.go
.
type TypeMux struct {
mutex sync.RWMutex
subm map[reflect.Type][]*TypeMuxSubscription
stopped bool
}
...
// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
func (mux *TypeMux) Post(ev interface{}) error {
event := &TypeMuxEvent{
Time: time.Now(),
Data: ev,
}
rtyp := reflect.TypeOf(ev)
mux.mutex.RLock()
if mux.stopped {
mux.mutex.RUnlock()
return ErrMuxClosed
}
subs := mux.subm[rtyp]
mux.mutex.RUnlock()
for _, sub := range subs {
sub.deliver(event)
}
return nil
}
在geth的事件处理中, geth会针对各种消息类型生成一个订阅列表, peer
可以主动的订阅各类消息, 当产生消息后, event
模块会将此消息发送给所有订阅该消息类型的节点. 一次订阅动作会生成一个TypeMuxSubscription
结构, 该结构里保存了与订阅者交互的通道(channel).
// TypeMuxSubscription is a subscription established through TypeMux.
type TypeMuxSubscription struct {
mux *TypeMux
created time.Time
closeMu sync.Mutex
closing chan struct{}
closed bool
// these two are the same channel. they are stored separately so
// postC can be set to nil without affecting the return value of
// Chan.
postMu sync.RWMutex
readC <-chan *TypeMuxEvent
postC chan<- *TypeMuxEvent
}
接下来peer
就会接收到相应的消息, 并通过网络传送给节点.
Last updated
Was this helpful?