您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關以太坊 p2p Server 原理及實現是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
區塊鏈技術的去中心依賴于底層組網技術,以太坊的底層實現了p2pServer,大約可以分為這樣三層。
底層路由表。封裝了kad路由,節點的數據結構以及計算記錄,節點搜索,驗證等功能。
中層peer抽象,message開放發送接口,server對外提供peer檢測,初始化,事件訂閱,peer狀態查詢,啟動,停止等功能
以太坊最上層peer,peerset再封裝,通過協議的Run函數,在中層啟動peer時,獲取peer,最終通過一個循環截取穩定peer,包裝在peerset中使用。
這里簡化問題僅討論Node Discovery Protocol。 這一層維護了一個buckets桶,總共有17個桶,每個桶有16個節點和10個替換節點。 Node放入時先要計算hash和localNode的距離。再按距離選擇一個桶放進去,取的時候逐個計算target和每個桶中對象的舉例,詳細參考closest函數,后面會貼出來。
距離公式滿足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n為相同節點數量 map為一個負相關的映射關系。
簡單來說就是相似越多,值越小。細節參考Node.go的logdist函數。 這里需要了解算法Kademlia,
. ├── database.go //封裝node數據庫相關操作 ├── node.go //節點數據結構 ├── ntp.go //同步時間 ├── table.go //路由表 ├── udp.go //網絡相關操作
其中最重要的就是table對象,table公共方法有:
newTable 實例創建
Self local節點獲取
ReadRandomNodes 隨機讀取幾個節點
Close 關閉
Resolve 在周邊查找某個節點
Lookup 查找某個節點的鄰近節點
逐個來分析這些方法:
1:生成對象實例(獲取數據庫客戶端,LocalNode etc)
// If no node database was given, use an in-memory one db, err := newNodeDB(nodeDBPath, Version, ourID) if err != nil { return nil, err } tab := &Table{ net: t, db: db, self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), refreshReq: make(chan chan struct{}), initDone: make(chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), rand: mrand.New(mrand.NewSource(0)), ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, }
2:載入引導節點,初始化k桶。
if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err } for i := 0; i < cap(tab.bondslots); i++ { tab.bondslots <- struct{}{} } for i := range tab.buckets { tab.buckets[i] = &bucket{ ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, } }
3:將節點放入到桶里,生成一條協程用于刷新,驗證節點。
tab.seedRand() tab.loadSeedNodes(false) //載入種子節點 // Start the background expiration goroutine after loading seeds so that the search for // seed nodes also considers older nodes that would otherwise be removed by the // expiration. tab.db.ensureExpirer() go tab.loop()
載入種子節點
func (tab *Table) loadSeedNodes(bond bool) { seeds := tab.db.querySeeds(seedCount, seedMaxAge) //數據庫中的種子節點和引導節點合并 seeds = append(seeds, tab.nursery...) if bond { seeds = tab.bondall(seeds) //節點驗證 } for i := range seeds { seed := seeds[i] age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }} log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age) tab.add(seed) //節點入桶 } }
節點入桶,同時也要檢查ip等限制。
func (tab *Table) add(new *Node) { tab.mutex.Lock() defer tab.mutex.Unlock() b := tab.bucket(new.sha) //獲取當前節點對應的桶 if !tab.bumpOrAdd(b, new) { // Node is not in table. Add it to the replacement list. tab.addReplacement(b, new) } }
桶的選擇
func (tab *Table) bucket(sha common.Hash) *bucket { d := logdist(tab.self.sha, sha) //計算hash舉例 if d <= bucketMinDistance { //這里按算法來看,只要hash前三位相等就會到第一個buckets return tab.buckets[0] } return tab.buckets[d-bucketMinDistance-1] }
根據Node的Id查找Node,先在當前的桶里面查找,查找一遍之后沒找到就在周邊的節點里面搜索一遍再找。
// Resolve searches for a specific node with the given ID. // It returns nil if the node could not be found. func (tab *Table) Resolve(targetID NodeID) *Node { // If the node is present in the local table, no // network interaction is required. hash := crypto.Keccak256Hash(targetID[:]) tab.mutex.Lock() //查找最近節點 cl := tab.closest(hash, 1) tab.mutex.Unlock() if len(cl.entries) > 0 && cl.entries[0].ID == targetID { return cl.entries[0] } // Otherwise, do a network lookup. //不存在 搜索鄰居節點 result := tab.Lookup(targetID) for _, n := range result { if n.ID == targetID { return n } } return nil }
這里需要理解的函數是 closest,遍歷所有桶的所有節點,查找最近的一個
// closest returns the n nodes in the table that are closest to the // given id. The caller must hold tab.mutex. func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance { // This is a very wasteful way to find the closest nodes but // obviously correct. I believe that tree-based buckets would make // this easier to implement efficiently. close := &nodesByDistance{target: target} for _, b := range tab.buckets { for _, n := range b.entries { close.push(n, nresults) } } return close } func (h *nodesByDistance) push(n *Node, maxElems int) { ix := sort.Search(len(h.entries), func(i int) bool { return distcmp(h.target, h.entries[i].sha, n.sha) > 0 }) if len(h.entries) < maxElems { h.entries = append(h.entries, n) } if ix == len(h.entries) { // farther away than all nodes we already have. // if there was room for it, the node is now the last element. } else { // slide existing entries down to make room // this will overwrite the entry we just appended. //近的靠前邊 copy(h.entries[ix+1:], h.entries[ix:]) h.entries[ix] = n } }
整體思路是先拷貝出來,再逐個桶的抽最上面的一個,剩下空桶移除,剩下的桶合并后,下一輪再抽桶的第一個節點,直到填滿給定數據或者桶全部空掉。最后返回填到數組里面的數量。
// ReadRandomNodes fills the given slice with random nodes from the // table. It will not write the same node more than once. The nodes in // the slice are copies and can be modified by the caller. func (tab *Table) ReadRandomNodes(buf []*Node) (n int) { if !tab.isInitDone() { return 0 } tab.mutex.Lock() defer tab.mutex.Unlock() // Find all non-empty buckets and get a fresh slice of their entries. var buckets [][]*Node //拷貝節點 for _, b := range tab.buckets { if len(b.entries) > 0 { buckets = append(buckets, b.entries[:]) } } if len(buckets) == 0 { return 0 } // Shuffle the buckets. for i := len(buckets) - 1; i > 0; i-- { j := tab.rand.Intn(len(buckets)) buckets[i], buckets[j] = buckets[j], buckets[i] } // Move head of each bucket into buf, removing buckets that become empty. var i, j int for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) { b := buckets[j] buf[i] = &(*b[0]) //取第一個節點 buckets[j] = b[1:] //移除第一個 if len(b) == 1 { //空桶移除 buckets = append(buckets[:j], buckets[j+1:]...) } if len(buckets) == 0 { break } } return i + 1 }
lookup會要求已知節點查找鄰居節點,查找的鄰居節點又遞歸的找它周邊的節點
for { // ask the alpha closest nodes that we haven't asked yet for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ { n := result.entries[i] if !asked[n.ID] { asked[n.ID] = true pendingQueries++ go func() { // Find potential neighbors to bond with r, err := tab.net.findnode(n.ID, n.addr(), targetID) if err != nil { // Bump the failure counter to detect and evacuate non-bonded entries fails := tab.db.findFails(n.ID) + 1 tab.db.updateFindFails(n.ID, fails) log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails) if fails >= maxFindnodeFailures { log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails) tab.delete(n) } } reply <- tab.bondall(r) }() } } if pendingQueries == 0 { // we have asked all closest nodes, stop the search break } // wait for the next reply for _, n := range <-reply { //此處會阻塞請求 if n != nil && !seen[n.ID] { seen[n.ID] = true result.push(n, bucketSize) } } pendingQueries-- }
桶初始化完成后會進入一個循環邏輯,其中通過三個timer控制調整周期。
驗證timer 間隔 10s左右
刷新timer 間隔 30 min
持久化timer 間隔 30s
revalidate = time.NewTimer(tab.nextRevalidateTime()) refresh = time.NewTicker(refreshInterval) copyNodes = time.NewTicker(copyNodesInterval)
刷新邏輯:重新加載種子節點,查找周邊節點,隨機三個節點,并查找這三個節點的周圍節點。
func (tab *Table) doRefresh(done chan struct{}) { defer close(done) tab.loadSeedNodes(true) tab.lookup(tab.self.ID, false) for i := 0; i < 3; i++ { var target NodeID crand.Read(target[:]) tab.lookup(target, false) } }
驗證邏輯:驗證每個桶的最末尾節點,如果該節點通過驗證則放到隊首(驗證過程是本地節點向它發送ping請求,如果回應pong則通過)
last, bi := tab.nodeToRevalidate() //取最后一個節點 if last == nil { // No non-empty bucket found. return } // Ping the selected node and wait for a pong. err := tab.ping(last.ID, last.addr()) //通信驗證 tab.mutex.Lock() defer tab.mutex.Unlock() b := tab.buckets[bi] if err == nil { // The node responded, move it to the front. log.Debug("Revalidated node", "b", bi, "id", last.ID) b.bump(last) //提到隊首 return }
相關文件
. ├── dial.go //封裝一個任務生成處理結構以及三種任務結構中(此處命名不太精確) ├── message.go //定義一些數據的讀寫接口,以及對外的Send/SendItem函數 ├── peer.go //封裝了Peer 包括消息讀取 ├── rlpx.go //內部的握手協議 ├── server.go //初始化,維護Peer網絡,還有一些對外的接口
這一層會不斷的從路由中提取節點,提取出來的節點要經過身份驗證,協議檢查之后加入到peer里面,緊接著如果沒有人使用這個peer,這個peer就會被刪除,再重新選擇一些節點出來繼續這個流程,peer再其中是隨生隨銷,這樣做是為了平均的使用所有的節點,而不是僅僅依賴于特定的幾個節點。因而這里從Server開始入手分析整個流程
Peers() //peer對象 PeerCount() //peer數量 AddPeer(node *discover.Node) //添加節點 RemovePeer(node *discover.Node) //刪除節點 SubscribeEvents(ch chan *PeerEvent) //訂閱內部的事件(節點的增加,刪除) //以上四個屬于對外的接口,不影響內部邏輯 Start() //server開始工作 SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) //啟動一個連接,經過兩次驗證之后,如果通過則加入到peer之中。
Start做了三件事,生成路由表于建立底層網絡。生成DialState用于驅動維護本地peer的更新與死亡,監聽本地接口用于信息應答。這里主要分析peer的維護過程。函數是run函數。
func (srv *Server) Start() (err error) { //**************初始化代碼省略 if !srv.NoDiscovery && srv.DiscoveryV5 { unhandled = make(chan discover.ReadPacket, 100) sconn = &sharedUDPConn{conn, unhandled} } // node table if !srv.NoDiscovery { //路由表生成 cfg := discover.Config{ PrivateKey: srv.PrivateKey, AnnounceAddr: realaddr, NodeDBPath: srv.NodeDatabase, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodes, Unhandled: unhandled, } ntab, err := discover.ListenUDP(conn, cfg) if err != nil { return err } srv.ntab = ntab } if srv.DiscoveryV5 { //路由表生成 var ( ntab *discv5.Network err error ) if sconn != nil { ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase) } else { ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase) } if err != nil { return err } if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil { return err } srv.DiscV5 = ntab } dynPeers := srv.maxDialedConns() //newDialState 對象生成,這個對象包含Peer的實際維護代碼 dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) // handshake 協議加載 srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)} for _, p := range srv.Protocols { srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) } // listen/dial //監聽本地端口 if srv.ListenAddr != "" { if err := srv.startListening(); err != nil { return err } } if srv.NoDial && srv.ListenAddr == "" { srv.log.Warn("P2P server will be useless, neither dialing nor listening") } srv.loopWG.Add(1) //重要的一句,開個協程,在其中做peer的維護 go srv.run(dialer) srv.running = true return nil }
該函數中定義了兩個隊列
runningTasks []task //正在執行的任務 queuedTasks []task //尚未執行的任務
定義了三個匿名函數
//從正在執行任務中刪除任務 delTask := func(t task) { for i := range runningTasks { if runningTasks[i] == t { runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) break } } } //開始一批任務 startTasks := func(ts []task) (rest []task) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } return ts[i:] } //啟動開始一批任務再調用dialstate的newTasks函數生成一批任務,加載到任務隊列里面 scheduleTasks := func() { // Start from queue first. queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) // Query dialer for new tasks and start as many as possible now. if len(runningTasks) < maxActiveDialTasks { nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) queuedTasks = append(queuedTasks, startTasks(nt)...) } }
定義了一個循環,分不同的chanel執行對應的邏輯
for { //調度開始找生成任務 scheduleTasks() select { case <-srv.quit://退出 break running case n := <-srv.addstatic: //增加一個節點 該節點最終會生成一個dialTask //并在newTasks的時候加入到讀列 srv.log.Debug("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: //直接刪除該節點 節點不再參與維護,很快就會死掉了 dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) } case op := <-srv.peerOp: // Peers 和 PeerCount 兩個外部接口,只是讀取peer信息 op(peers) srv.peerOpDone <- struct{}{} case t := <-taskdone: //task完成后會根據不同的任務類型進行相應的處理 srv.log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: //身份驗證通過 if trusted[c.id] { // Ensure that the trusted flag is set before checking against MaxPeers. c.flags |= trustedConn } select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): case <-srv.quit: break running } case c := <-srv.addpeer: //身份協議驗證通過 加入隊列 err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) // If message events are enabled, pass the peerFeed // to the peer if srv.EnableMsgEvents { p.events = &srv.peerFeed } name := truncateName(c.name) srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) go srv.runPeer(p) //觸發事件 此處是最上層截取peer的位置,如果此物沒有外部影響,那么這個peer很快就被銷毀了 peerAdd++ fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel) peers[c.id] = p if p.Inbound() { inboundCount++ } } // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or // discarded. Unblock the task last. select { case c.cont <- err: case <-srv.quit: break running } case pd := <-srv.delpeer: //移除peer d := common.PrettyDuration(mclock.Now() - pd.created) pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err) delete(peers, pd.ID()) peerDel++ fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel) if pd.Inbound() { inboundCount-- } } }
記住上面的代碼,再來逐個的看:
scheduleTasks調度生成任務,生成的任務中有一種dialTask的任務,該任務結構如下
type dialTask struct { flags connFlag dest *discover.Node lastResolved time.Time resolveDelay time.Duration } func (t *dialTask) Do(srv *Server) { if t.dest.Incomplete() { if !t.resolve(srv) { return } } err := t.dial(srv, t.dest) //此處會調用到setupConn函數 if err != nil { log.Trace("Dial error", "task", t, "err", err) // Try resolving the ID of static nodes if dialing failed. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { if t.resolve(srv) { t.dial(srv, t.dest) } } } }
dial最終回調用到setupConn函數,函數只保留重點的幾句,篇幅有點長了
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error { //身份驗證碼 獲取設備,標識等信息 if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != //此處會往chanel中添加連接對象,最終觸發循環中的posthandshake分支 err = srv.checkpoint(c, srv.posthandshake) //協議驗證 phs, err := c.doProtoHandshake(srv.ourHandshake) c.caps, c.name = phs.Caps, phs.Name //此處會往chanel中添加連接對象 最終觸發循環中的addpeer分支 err = srv.checkpoint(c, srv.addpeer) }
posthandshake 分支僅僅做了驗證,addpeer做的事情就比較多了,重要的就是執行runPeer函數
func (srv *Server) runPeer(p *Peer) { // 廣播 peer add srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeAdd, Peer: p.ID(), }) // run the protocol remoteRequested, err := p.run() // // 廣播 peer drop srv.peerFeed.Send(&PeerEvent{ Type: PeerEventTypeDrop, Peer: p.ID(), Error: err.Error(), }) //移除peer srv.delpeer <- peerDrop{p, err, remoteRequested} } func (p *Peer) run() (remoteRequested bool, err error) { //************* writeStart <- struct{}{} p.startProtocols(writeStart, writeErr) //************* //這一句阻塞性確保了peer的存活 p.wg.Wait() } func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { p.wg.Add(len(p.running)) for _, proto := range p.running { proto := proto proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr var rw MsgReadWriter = proto if p.events != nil { rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) go func() { //其他的都是為這一句做準備的,在以太坊中p2p就是靠這一句對上層暴露peer對象 err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned } else if err != io.EOF { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err p.wg.Done() }() } }
這樣就可以可理出一條思路 scheduleTasks執行生成dialTask任務 dialTask任務執行過程中逐個填充posthandshake,addPeer這兩個chanel。 addPeer執行時對上層暴露了Peer對象,完成后填充了delpeer,最后刪除了Peer。
具體看代碼中的注釋
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { if s.start.IsZero() { s.start = now } var newtasks []task //這里聲明了一個添加任務的函數 addDial := func(flag connFlag, n *discover.Node) bool { if err := s.checkDial(n, peers); err != nil { log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err) return false } s.dialing[n.ID] = flag //排除掉已經再測試的 newtasks = append(newtasks, &dialTask{flags: flag, dest: n}) return true } // Compute number of dynamic dials necessary at this point. needDynDials := s.maxDynDials //當前系統中最大連接數目 for _, p := range peers { //扣除已建立鏈接的peer if p.rw.is(dynDialedConn) { needDynDials-- } } for _, flag := range s.dialing { //扣除已建立鏈接的peer if flag&dynDialedConn != 0 { needDynDials-- } } //外部命令添加的節點 這種節點不占用needDynDials數目, //是為了保證手動加的節點能夠起效 for id, t := range s.static { err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err) delete(s.static, t.dest.ID) case nil: s.dialing[id] = t.flags newtasks = append(newtasks, t) } } // If we don't have any peers whatsoever, try to dial a random bootnode. This // scenario is useful for the testnet (and private networks) where the discovery // table might be full of mostly bad peers, making it hard to find good ones. if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && //檢查引導節點 因為引導節點比搜索到的節點更大概率靠譜 因而比較靠前 now.Sub(s.start) > fallbackInterval { bootnode := s.bootnodes[0] s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...) s.bootnodes = append(s.bootnodes, bootnode) if addDial(dynDialedConn, bootnode) { needDynDials-- } } //隨機的從路由中抽取最大節點的二分之一 randomCandidates := needDynDials / 2 if randomCandidates > 0 { n := s.ntab.ReadRandomNodes(s.randomNodes) for i := 0; i < randomCandidates && i < n; i++ { if addDial(dynDialedConn, s.randomNodes[i]) { needDynDials-- } } } // 從lookupbuf中抽取 i := 0 for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { if addDial(dynDialedConn, s.lookupBuf[i]) { needDynDials-- } } s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] // 如果還是不夠,路由再去搜索節點 if len(s.lookupBuf) < needDynDials && !s.lookupRunning { s.lookupRunning = true newtasks = append(newtasks, &discoverTask{}) } // wait if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 { t := &waitExpireTask{s.hist.min().exp.Sub(now)} newtasks = append(newtasks, t) } return newtasks }
另一個是message中的Send,SendItem函數 實現了MsgWriter的對象都可以調用這個函數寫入,覺得這里沒什么必要,完全可以封裝到peer里面去,不過它上層做廣播的時候確實是調用的這兩個函數。
func Send(w MsgWriter, msgcode uint64, data interface{}) error { size, r, err := rlp.EncodeToReader(data) if err != nil { return err } return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r}) } func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error { return Send(w, msgcode, elems) }
文件:go-ethereum/eth/peer.go
定義了兩個struct,Peer和PeerSet。Peer封裝了底層的p2p.Peer,集成了一些和業務相關的方法,比如SendTransactions,SendNewBlock等。PeerSet是Peer的集合
type peer struct { id string *p2p.Peer rw p2p.MsgReadWriter version int // Protocol version negotiated forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time head common.Hash td *big.Int lock sync.RWMutex knownTxs *set.Set // Set of transaction hashes known to be known by this peer knownBlocks *set.Set // Set of block hashes known to be known by this peer } type peerSet struct { peers map[string]*peer lock sync.RWMutex closed bool }
文件:go-ethereum/eth/handler.go manager.handle在檢查了peer后會把這個peer注冊到peerset中,表示此peer可用,發生錯誤后peerset注銷該peer,返回錯誤,最后再Server中銷毀。
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth73 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() //此處如果順利會進入for循環 如果失敗返回錯誤我會銷毀掉這個peer return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, }) }
看完上述內容,你們對以太坊 p2p Server 原理及實現是怎樣的有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。