您好,登錄后才能下訂單哦!
這篇文章主要介紹“Go語言網絡爬蟲實例代碼分享”,在日常操作中,相信很多人在Go語言網絡爬蟲實例代碼分享問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Go語言網絡爬蟲實例代碼分享”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
這篇通過網絡爬蟲的示例,來了解 Go 語言的遞歸、多返回值、延遲函數調用、匿名函數等方面的函數特性。
首先是爬蟲的基礎示例,下面兩個例子展示通過 net/http 包來爬取頁面的內容。
下面的程序展示從互聯網獲取信息,獲取URL的內容,然后不加解析地輸出:
// 輸出從 URL 獲取的內容 package main import ( "fmt" "io" "net/http" "os" "strings" ) func main() { for _, url := range os.Args[1:] { url = checkUrl(url) resp, err := http.Get(url) if err != nil { fmt.Fprintf(os.Stderr, "ERROR fetch request %s: %v\n", url, err) os.Exit(1) // 進程退出時,返回狀態碼1 } _, err = io.Copy(os.Stdout, resp.Body) resp.Body.Close() if err != nil { fmt.Fprintf(os.Stderr, "ERROR fetch reading %s: %v\n", url, err) os.Exit(1) } } } func checkUrl(s string) string { if strings.HasPrefix(s, "http") { return s } return fmt.Sprint("http://", s) }
這個程序使用里使用了 net/http 包。http.Get 函數產生一個 HTTP 請求,如果沒有出錯,返回結果存在響應結構 resp 里面。其中 resp 的 Body 域包含服務器端響應的一個可讀取數據流。這里可以用 ioutil.ReadAll 讀取整個響應的結果。不過這里用的是 io.Copy(dst, src) 函數,這樣不需要把整個響應的數據流都裝到緩沖區之中。讀取完數據后,要關閉 Body 數據流來避免資源泄露。
這個程序和上一個一樣,獲取URL的內容,并且是并發獲取的。這個版本丟棄響應的內容,只報告每一個響應的大小和花費的時間:
// 并發獲取 URL 并報告它們的時間和大小 package main import ( "fmt" "io" "io/ioutil" "net/http" "os" "strings" "time" ) func main() { start := time.Now() ch := make(chan string) for _, url := range os.Args[1:] { url = checkUrl(url) go fetch(url, ch) } for range os.Args[1:] { fmt.Println(<-ch) } fmt.Printf("總耗時: %.2fs\n", time.Since(start).Seconds()) } func fetch(url string, ch chan<- string) { start := time.Now() resp, err := http.Get(url) if err != nil { ch <- fmt.Sprintf("get %s error: %v", url, err) return } nbytes, err := io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() // 不要泄露資源 if err != nil { ch <- fmt.Sprintf("reading %s error: %v", url, err) return } secs := time.Since(start).Seconds() ch <- fmt.Sprintf("%.2fs %7d %s", secs, nbytes, url) } func checkUrl(s string) string { if strings.HasPrefix(s, "http") { return s } return fmt.Sprint("http://", s) }
io.Copy 函數讀取響應的內容,然后通過寫入 ioutil.Discard 輸出流進行丟棄。Copy 返回字節數以及出現的任何錯誤。只所以要寫入 ioutil.Discard 來丟棄,這樣就會有一個讀取的過程,可以獲取返回的字節數。
這章開始,介紹Go語言中函數的一些特性,用到的都是網絡爬蟲相關的示例。
函數可以遞歸調用,這意味著函數可以直接或者間接地調用自己。遞歸是一種實用的技術,可以處理許多帶有遞歸特性的數據結構。下面就會使用遞歸處理HTML文件。
下面的代碼示例使用了 golang.org/x/net/html 包。它提供了解析 HTML 的功能。下面會用到 golang.org/x/net/html API 如下的一些代碼。函數 html.Parse 讀入一段字節序列,解析它們,然后返回 HTML 文檔樹的根節點 html.Node。這里可以只關注函數簽名的部分,函數內部實現細節就先了解到上面文字說明的部分。HTML 有多種節點,比如文本、注釋等。這里只關注 a 標簽和里面的 href 的值:
// golang.org/x/net/html package html // A NodeType is the type of a Node. type NodeType uint32 const ( ErrorNode NodeType = iota TextNode DocumentNode ElementNode CommentNode DoctypeNode scopeMarkerNode ) type Node struct { Parent, FirstChild, LastChild, PrevSibling, NextSibling *Node Type NodeType DataAtom atom.Atom Data string Namespace string Attr []Attribute } type Attribute struct { Namespace, Key, Val string } func Parse(r io.Reader) (*Node, error) { p := &parser{ tokenizer: NewTokenizer(r), doc: &Node{ Type: DocumentNode, }, scripting: true, framesetOK: true, im: initialIM, } err := p.parse() if err != nil { return nil, err } return p.doc, nil }
主函數從標準輸入中讀入 HTML,使用遞歸的 visit 函數獲取 HTML 文本的超鏈接,并且把所有的超鏈接輸出。
下面的 visit 函數遍歷 HTML 樹上的所有節點,從 HTML 的 a 標簽中得到 href 屬性的內容,將獲取到的鏈接內容添加到字符串切片中,最后再返回這個切片:
// 輸出從標準輸入中讀取的 HTML 文檔中的所有連接 package main import ( "fmt" "os" "golang.org/x/net/html" ) func main() { doc, err := html.Parse(os.Stdin) if err != nil { fmt.Fprintf(os.Stderr, "findlinks1: %v\n", err) os.Exit(1) } for _, link := range visit(nil, doc) { fmt.Println(link) } } // 將節點 n 中的每個鏈接添加到結果中 func visit(links []string, n *html.Node) []string { if n.Type == html.ElementNode && n.Data == "a" { for _, a := range n.Attr { if a.Key == "href" { links = append(links, a.Val) } } } for c := n.FirstChild; c != nil; c = c.NextSibling { links = visit(links, c) } return links }
要對樹中的任意節點 n 進行遞歸,visit 遞歸地調用自己去訪問節點 n 的所有子節點,并且將訪問過的節點保存在 FirstChild 鏈表中。
分別將兩個程序編譯后,使用管道將 fetch 程序的輸出定向到 findlinks1。編譯后執行:
PS H:\Go\src\gopl\output\http> go build gopl/output/http/fetch PS H:\Go\src\gopl\output\http> go build gopl/output/http/findlinks1 PS H:\Go\src\gopl\output\http> ./fetch studygolang.com | ./findlinks1 /readings?rtype=1 /dl # http://docs.studygolang.com http://docscn.studygolang.com /pkgdoc http://tour.studygolang.com /account/register /account/login /?tab=all /?tab=hot https://e.coding.net/?utm_source=studygolang
這是另一個版本,把 fetch 和 findLinks 合并到一起了,FindLInks函數自己發送 HTTP 請求。最后還對 visit 進行了修改,現在使用遞歸調用 visit (而不是循環)遍歷 n.FirstChild 鏈表:
package main import ( "fmt" "net/http" "os" "golang.org/x/net/html" ) func main() { for _, url := range os.Args[1:] { links, err := findLinks(url) if err != nil { fmt.Fprintf(os.Stderr, "findlink2: %v\n", err) continue } for _, link := range links { fmt.Println(link) } } } // 發起一個HTTP的GET請求,解析返回的HTML頁面,并返回所有的鏈接 func findLinks(url string) ([]string, error) { resp, err := http.Get(url) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("getting %s: %s\n", url, resp.Status) } doc, err := html.Parse(resp.Body) resp.Body.Close() if err != nil { return nil, fmt.Errorf("parsing %s as HTML: %v\n", url, err) } return visit(nil, doc), nil } // 將節點 n 中的每個鏈接添加到結果中 func visit(links []string, n *html.Node) []string { if n == nil { return links } if n.Type == html.ElementNode && n.Data == "a" { for _, a := range n.Attr { if a.Key == "href" { links = append(links, a.Val) } } } // 可怕的遞歸,非常不好理解。 return visit(visit(links, n.FirstChild), n.NextSibling) }
findLinks 函數有4個返回語句:
第一個返回語句中,錯誤直接返回
后兩個返回語句則使用 fmt.Errorf 格式化處理過的附加上下文信息
如果函數調用成功,最后一個返回語句返回字符串切片,且 error 為空
關閉 resp.Body
這里必須保證 resp.Body 正確關閉使得網絡資源正常釋放。即使在發生錯誤的情況下也必須釋放資源。
Go 語言的垃圾回收機制將回收未使用的內存,但不能指望它會釋放未使用的操作系統資源,比如打開的文件以及網絡連接。必須顯示地關閉它們。
下面的程序使用遞歸遍歷所有 HTML 文本中的節點數,并輸出樹的結構。當遞歸遇到每個元素時,它都會講元素標簽壓入棧,然后輸出棧:
package main import ( "fmt" "os" "golang.org/x/net/html" ) func main() { doc, err := html.Parse(os.Stdin) if err != nil { fmt.Fprintf(os.Stderr, "outline: %v\n", err) os.Exit(1) } outline(nil, doc) } func outline(stack []string, n *html.Node) { if n.Type == html.ElementNode { stack = append(stack, n.Data) // 把標簽壓入棧 fmt.Println(stack) } for c := n.FirstChild; c != nil; c = c.NextSibling { outline(stack, c) } }
注意一個細節,盡管 outline 會將元素壓棧但并不會出棧。當 outline 遞歸調用自己時,被調用的函數會接收到棧的副本。盡管被調用者可能會對棧(切片類型)進行元素的添加、修改甚至創建新數組的操作,但它并不會修改調用者原來傳遞的元素,所以當被調用函數返回時,調用者的棧依舊保持原樣。
現在可以找一些網頁輸出:
PS H:\Go\src\gopl\output\http> ./fetch baidu.com | ./outline [html] [html head] [html head meta] [html body] PS H:\Go\src\gopl\output\http>
許多編程語言使用固定長度的函數調用棧,大小在 64KB 到 2MB 之間。遞歸的深度會受限于固定長度的棧大小,所以當進行深度調用時必須謹防棧溢出。固定長度的棧甚至會造成一定的安全隱患。相比固定長度的棧,Go 語言的實現使用了可變長度的棧,棧的大小會隨著使用而增長,可達到 1GB 左右的上限。這使得我們可以安全地使用遞歸而不用擔心溢出的問題。
這里再換一個實現方式,使用函數變量。可以將每個節點的操作邏輯從遍歷樹形結構的邏輯中分開。這次不重用 fetch 程序了,全部寫在一起了:
package main import ( "fmt" "net/http" "os" "golang.org/x/net/html" ) func main() { for _, url := range os.Args[1:] { outline(url) } } func outline(url string) error { resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() doc, err := html.Parse(resp.Body) if err != nil { return err } forEachNode(doc, startElement, endElement) return nil } // 調用 pre(x) 和 post(x) 遍歷以n為根的樹中的每個節點x // 兩個函數都是可選的 // pre 在子節點被訪問前調用(前序調用) // post 在訪問后調用(后續調用) func forEachNode(n *html.Node, pre, post func(n *html.Node)) { if pre != nil { pre(n) } for c := n.FirstChild; c != nil; c = c.NextSibling { forEachNode(c, pre, post) } if post != nil { post(n) } } var depth int func startElement(n *html.Node) { if n.Type == html.ElementNode { fmt.Printf("%*s<%s>\n", depth*2, "", n.Data) depth++ } } func endElement(n *html.Node) { if n.Type == html.ElementNode { depth-- fmt.Printf("%*s</%s>\n", depth*2, "", n.Data) } }
這里的 forEachNode 函數接受兩個函數作為參數,一個在本節點訪問子節點前調用,另一個在所有子節點都訪問后調用。這樣的代碼組織給調用者提供了很多的靈活性。
這里還巧妙的利用了 fmt 的縮進輸出。%*s 中的 * 號輸出帶有可變數量空格的字符串。輸出的寬度和字符串由后面的兩個參數確定,這里只需要輸出空格,字符串用的是空字符串。
這次輸出的是要縮進效果的結構:
PS H:\Go\src\gopl\ch6\outline2> go run main.go http://baidu.com <html> <head> <meta> </meta> </head> <body> </body> </html> PS H:\Go\src\gopl\ch6\outline2>
下面示例的兩個功能,建議通過延遲函數調用 defer 來實現。
直接使用 http.Get 請求返回的數據,如果請求的 URL 是 HTML 那么一定會正常的工作,但是許多頁面包含圖片、文字和其他文件格式。如果讓 HTML 解析器去解析這類文件可能會發生意料外的狀況。這就需要首先判斷Get請求返回的是一個HTML頁面,通過返回的響應頭的Content-Type來判斷。一般是:Content-Type: text/html; charset=utf-8。然后才是解析HTML的標簽獲取title標簽的內容:
package main import ( "fmt" "net/http" "os" "strings" "golang.org/x/net/html" ) func forEachNode(n *html.Node, pre, post func(n *html.Node)) { if pre != nil { pre(n) } for c := n.FirstChild; c != nil; c = c.NextSibling { forEachNode(c, pre, post) } if post != nil { post(n) } } func title(url string) error { resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() // 檢查返回的頁面是HTML通過判斷Content-Type,比如:Content-Type: text/html; charset=utf-8 ct := resp.Header.Get("Content-Type") if ct != "text/html" && !strings.HasPrefix(ct, "text/html;") { return fmt.Errorf("%s has type %s, not text/html", url, ct) } doc, err := html.Parse(resp.Body) if err != nil { return fmt.Errorf("parseing %s as HTML: %v", url, err) } visitNode := func(n *html.Node) { if n.Type == html.ElementNode && n.Data == "title" && n.FirstChild != nil { fmt.Println(n.FirstChild.Data) } } forEachNode(doc, visitNode, nil) return nil } func main() { for _, url := range os.Args[1:] { err := title(url) if err != nil { fmt.Fprintf(os.Stderr, "get url: %v\n", err) } } }
使用Get請求一個頁面,然后保存到本地的文件中。使用 path.Base 函數獲得 URL 路徑最后一個組成部分作為文件名:
package main import ( "fmt" "io" "net/http" "os" "path" ) func fetch(url string) (filename string, n int64, err error) { resp, err := http.Get(url) if err != nil { return "", 0, err } defer resp.Body.Close() local := path.Base(resp.Request.URL.Path) if local == "/" { local = "index.html" } f, err := os.Create(local) if err != nil { return "", 0, err } n, err = io.Copy(f, resp.Body) // 關閉文件,并保留錯誤消息 if closeErr := f.Close(); err == nil { // 如果 io.Copy 返回的 err 為空,才會報告 closeErr 的錯誤 err = closeErr } return local, n, err } func main() { for _, url := range os.Args[1:] { local, n, err := fetch(url) if err != nil { fmt.Fprintf(os.Stderr, "fetch %s: %v\n", url, err) continue } fmt.Fprintf(os.Stderr, "%s => %s (%d bytes).\n", url, local, n) } }
示例中的 fetch 函數中,會 os.Create 打開一個文件。但是如果使用延遲調用 f.Close 去關閉一個本地文件就會有些問題,因為 os.Create 打開了一個文件對其進行寫入、創建。在許多文件系統中尤其是NFS,寫錯誤往往不是立即返回而是推遲到文件關閉的時候。如果無法檢查關閉操作的結果,就會導致一系列的數據丟失。然后,如果 io.Copy 和 f.Close 同時失敗,我們更傾向于報告 io.Copy 的錯誤,因為它發生在前,更有可能記錄失敗的原因。示例中的最后一個錯誤處理就是這樣的處理邏輯。
優化defer的位置
在打開文件之后,在處理完打開的錯誤之后就應該立即寫上 defer 語句,保證 defer 語句和文件打開的操作成對出現。但是這里還插入了一條 io.Copy 的語句,在這個例子里只是一條語句,也有可能是一段代碼,這樣就會讓 defer 變得不那么清晰了。這里利用 defer 可以改變函數返回值的特性,將 defer 移動到 io.Copy 執行之前,又能夠保證返回的錯誤值 err 可以記錄下 io.Copy 執行后可能的錯誤。
在原有代碼的基礎上,在 defer 中只需要把 if 的代碼塊封裝到匿名函數中就可以了:
f, err := os.Create(local) if err != nil { return "", 0, err } defer func() { if closeErr := f.Close(); err == nil { err = closeErr } }() n, err = io.Copy(f, resp.Body) return local, n, err
這里的做法就是在 defer 中改變返回給調用者的結果。
網絡爬蟲的遍歷。
在之前遍歷節點樹的基礎上,這次來獲取頁面中所有的鏈接。將之前的 visit 函數替換為匿名函數(閉包),現在可以直接在匿名函數里把找到的鏈接添加到 links 切片中,這樣的改變之后,邏輯上更加清晰也更好理解了。因為 Extract 函數只需要前序調用,這里就把 post 部分的參數值傳nil。這里做成一個包,后面要繼續使用:
// 提供解析連接的函數 package links import ( "fmt" "net/http" "golang.org/x/net/html" ) // 向給定的URL發起HTTP GET 請求 // 解析HTML并返回HTML文檔中存在的鏈接 func Extract(url string) ([]string, error) { resp, err := http.Get(url) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("get %s: %s", url, resp.Status) } doc, err := html.Parse(resp.Body) resp.Body.Close() if err != nil { return nil, fmt.Errorf("parse %s: %s", url, err) } var links []string visitNode := func(n *html.Node) { if n.Type == html.ElementNode && n.Data == "a" { for _, a := range n.Attr { if a.Key != "href" { continue } link, err := resp.Request.URL.Parse(a.Val) if err != nil { continue // 忽略不合法的URL } links = append(links, link.String()) } } } forEachNode(doc, visitNode, nil) // 只要前序遍歷,后續不執行,傳nil return links, nil } func forEachNode(n *html.Node, pre, post func(n *html.Node)) { if pre != nil { pre(n) } for c := n.FirstChild; c != nil; c = c.NextSibling { forEachNode(c, pre, post) } if post != nil { post(n) } } /* 使用時寫的函數 func main() { url := "https://baidu.com" urls, err := Extract(url) if err != nil { // 錯誤處理隨便寫寫,不引入新的包 fmt.Printf("extract: %v\n", err) return } for n, u := range urls { fmt.Printf("%2d: %s\n", n, u) } } */
解析URL成為絕對路徑
這里不直接把href原封不動地添加到切片中,而將它解析成基于當前文檔的相對路徑 resp.Request.URL。結果的鏈接是絕對路徑的形式,這樣就可以直接用 http.Get 繼續調用。
網頁爬蟲的核心是解決圖的遍歷,使用遞歸的方法可以實現深度優先遍歷。對于網絡爬蟲,需要廣度優先遍歷。另外還可以進行并發遍歷,這里不講這個。
下面的示例函數展示了廣度優先遍歷的精髓。調用者提供一個初始列表 worklist,它包含要訪問的項和一個函數變量 f 用來處理每一個項。每一個項有字符串來識別。函數 f 將返回一個新的列表,其中包含需要新添加到 worklist 中的項。breadthFirst 函數將在所有節點項都被訪問后返回。它需要維護一個字符串集合來保證每個節點只訪問一次。
在爬蟲里,每一項節點都是 URL。這里需要提供一個 crawl 函數傳給 breadthFirst 函數最為f的值,用來輸出URL,然后解析鏈接并返回:
package main import ( "fmt" "log" "os" "gopl/ch6/links" ) // 對每一個worklist中的元素調用f // 并將返回的內容添加到worklist中,對每一個元素,最多調用一次f func breadthFirst(f func(item string) []string, worklist []string) { seen := make(map[string]bool) for len(worklist) > 0 { items := worklist worklist = nil for _, item := range items { if !seen[item] { seen[item] = true worklist = append(worklist, f(item)...) } } } } func crawl(url string) []string { fmt.Println(url) list, err := links.Extract(url) if err != nil { log.Print(err) } return list } func main() { // 開始廣度遍歷 // 從命令行參數開始 breadthFirst(crawl, os.Args[1:]) }
接下來就是找一個網頁來測試,下面是一些輸出的鏈接:
PS H:\Go\src\gopl\ch6\findlinks3> go run main.go http://lab.scrapyd.cn/ http://lab.scrapyd.cn/ http://lab.scrapyd.cn/archives/57.html http://lab.scrapyd.cn/tag/%E8%89%BA%E6%9C%AF/ http://lab.scrapyd.cn/tag/%E5%90%8D%E7%94%BB/ http://lab.scrapyd.cn/archives/55.html http://lab.scrapyd.cn/archives/29.html http://lab.scrapyd.cn/tag/%E6%9C%A8%E5%BF%83/ http://lab.scrapyd.cn/archives/28.html http://lab.scrapyd.cn/tag/%E6%B3%B0%E6%88%88%E5%B0%94/ http://lab.scrapyd.cn/tag/%E7%94%9F%E6%B4%BB/ http://lab.scrapyd.cn/archives/27.html ......
整個過程將在所有可達的網頁被訪問到或者內存耗盡時結束。
接下來是并發編程,使上面的搜索連接的程序可以并發運行。這樣對 crawl 的獨立調用可以充分利用 Web 上的 I/O 并行機制。
crawl 函數依然還是之前的那個函數不需要修改。而下面的 main 函數類似于原來的 breadthFirst 函數。這里也像之前一樣,用一個任務類別記錄需要處理的條目隊列,每一個條目是一個待爬取的 URL 列表,這次使用通道代替切片來表示隊列。每一次對 crawl 的調用發生在它自己的 goroutine 中,然后將發現的鏈接發送回任務列表:
package main import ( "fmt" "log" "os" "gopl/ch6/links" ) func crawl(url string) []string { fmt.Println(url) list, err := links.Extract(url) if err != nil { log.Print(err) } return list } func main() { worklist := make(chan []string) // 從命令行參數開始 go func() { worklist <- os.Args[1:] }() // 并發爬取 Web seen := make(map[string]bool) for list := range worklist { for _, link := range list { if !seen[link] { seen[link] = true go func(link string) { worklist <- crawl(link) }(link) } } } }
注意,這里爬取的 goroutine 將 link 作為顯式參數來使用,以避免捕獲迭代變量的問題。還要注意,發送給任務列表的命令行參數必須在它自己的 goroutine 中運行來避免死鎖。另一個可選的方案是使用緩沖通道。
現在這個爬蟲高度并發,比原來輸出的效果更高了,但是它有兩個問題。先看第一個問題,它在執行一段時間后會出現大量錯誤日志,過一會后會恢復,再之后又出現錯誤日志,如此往復。主要是因為程序同時創建了太多的網絡連接,超過了程序能打開文件數的限制。
程序的并行度太高了,無限制的并行通常不是一個好主要,因為系統中總有限制因素,例如,對于計算型應用 CPU 的核數,對于磁盤 I/O 操作磁頭和磁盤的個數,下載流所使用的網絡帶寬,或者 Web 服務本身的容量。解決方法是根據資源可用情況限制并發的個數,以匹配合適的并行度。該例子中有一個簡單的辦法是確保對于 link.Extract 的同時調用不超過 n 個,這里的 n 一般小于文件描述符的上限值。
這里可以使用一個容量為 n 的緩沖通道來建立一個并發原語,稱為計數信號量。概念上,對于緩沖通道中的 n 個空閑槽,每一個代表一個令牌,持有者可以執行。通過發送一個值到通道中來領取令牌,從通道中接收一個值來釋放令牌。這里的做法和直觀的理解是反的,盡管使用已填充槽更直觀,但使用空閑槽在創建的通道緩沖區之后可以省掉填充的過程,并且這里的令牌不攜帶任何信息,通道內的元素類型不重要。所以通道內的元素就使用 struct{},它所占用的空間大小是0。
重寫 crawl 函數,使用令牌的獲取和釋放操作限制對 links.Extract 函數的調用,這里保證最多同時20個調用可以進行。保持信號量操作離它約束的 I/O 操作越近越好,這是一個好的實踐:
// 令牌 tokens 是一個計數信號量 // 確保并發請求限制在 20 個以內 var tokens = make(chan struct{}, 20) func crawl(url string) []string { fmt.Println(url) tokens <- struct{}{} // 獲取令牌 list, err := links.Extract(url) <- tokens // 釋放令牌 if err != nil { log.Print(err) } return list }
現在來處理第二個問題,這個程序永遠不會結束。雖然可能爬不完所有的鏈接,也就注意不到這個問題。為了讓程序終止,當任務列表為空并且爬取 goroutine 都結束以后,需要從主循環退出:
func main() { worklist := make(chan []string) var n int // 等待發送到任務列表的數量 // 從命令行參數開始 n++ go func() { worklist <- os.Args[1:] }() // 并發爬取 Web seen := make(map[string]bool) for ; n > 0; n-- { list := <- worklist for _, link := range list { if !seen[link] { seen[link] = true n++ go func(link string) { worklist <- crawl(link) }(link) } } } }
在這個版本中,計數器 n 跟蹤發送到任務列表中的任務個數。在每次將一組條目發送到任務列表前,就遞增變量 n。在主循環中沒處理一個 worklist 后就遞減1,見減到0表示再沒有任務了,于是可以正常退出。
之前的版本,使用 range 遍歷通道,只要通道關閉,也是可以退出循環的。但是這里沒有一個地方可以確認再沒有任務需要添加了從而加上一句關閉通道的close語句。所以需要一個計數器 n 來記錄還有多少個任務等待 worklist 處理。
現在,并發爬蟲的速度大約比之前快了20倍,應該不會出現錯誤,并且能夠正確退出。
這里還有一個替代方案,解決過度并發的問題。這個版本使用最初的 crawl 函數,它沒有技術信號量,但是通過20個長期存活的爬蟲 goroutine 來調用它,這樣也保證了最多20個HTTP請求并發執行:
func main() { worklist := make(chan []string) // 可能有重復的URL列表 unseenLinks := make(chan string) // 去重后的eURL列表 // 向任務列表中添加命令行參數 go func() { worklist <- os.Args[1:] }() // 創建20個爬蟲 goroutine 來獲取每個不可見鏈接 for i := 0; i < 20; i++ { go func() { for link := range unseenLinks { foundLinks := crawl(link) go func() { worklist <- foundLinks }() } }() } // 主 goroutine 對 URL 列表進行去重 // 并把沒有爬取過的條目發送給爬蟲程序 seen := make(map[string]bool) for list := range worklist { for _, link := range list { if !seen[link] { seen[link] = true unseenLinks <- link } } } }
爬取 goroutine 使用同一個通道 unseenLinks 接收要爬取的URL,主 goroutine 負責對從任務列表接收到的條目進行去重,然后發送每一個沒有爬取過的條目到 unseenLinks 通道,之后被爬取 goroutine 接收。
crawl 發現的每組鏈接,通過精心設計的 goroutine 發送到任務列表來避免死鎖。
這里例子目前也沒有解決程序退出的問題,并且不能簡單的參考之前的做法使用計數器 n 來進行計數。上個版本中,計數器 n 都是在主 goroutine 進行操作的,這里也是可以繼續用這個方法來計數判斷程序是否退出,但是在不同的 goroutine 中操作計數器時,就需要考慮并發安全的問題。要使用并發安全的計數器 sycn.WaitGroup,具體實現略。
回到使用令牌并能正確退出的方案。雖然有結束后退出的邏輯,但是一般情況下,一個網站總用無限個鏈接,永遠爬取不完。現在再增加一個功能,深度限制:如果用戶設置 -depth=3,那么僅最多通過三個鏈接可達的 URL 能被找到。另外還增加了一個功能,統計總共爬取的頁面的數量。現在每次打印 URL 的時候,都會加上深度和序號。
先說簡單的,頁面計數的功能。就是要一個計數器,但是需要并發在不同的 goroutine 里操作,所以要考慮并發安全。通過通道就能實現,在主 goroutine 中單獨再用一個 goroutine 負責計數器的自增:
var count = make(chan int) // 統計一共爬取了多個頁面 func main() { // 負責 count 值自增的 goroutine go func() { var i int for { i++ count <- i } }() flag.Parse() // 省略主函數中的其他內容 } func crawl(url string, depth int) urllist { fmt.Println(depth, <-count, url) tokens <- struct{}{} // 獲取令牌 list, err := links.Extract(url) <-tokens // 釋放令牌 if err != nil { log.Print(err) } return urllist{list, depth + 1} }
然后是深度限制的核心功能。首先要為 worklist 添加深度的信息,把原本的字符串切片加上深度信息組成一個結構體作為 worklist 的元素:
type urllist struct { urls []string depth int }
現在爬取頁面后先把返回的信息暫存在 nextList 中,而不是直接添加到 worklist。檢查 nextList 中的深度,如果符合深度限制,就向 worklist 添加,并且要增加 n 計數器。如果超出深度限制,就什么也不做。原本主函數的 for 循環里的每一個 goroutine 都會增加 n 計數器,所以計數器的自增是在主函數里完成的。現在需要在每一個 goroutine 中判斷是否要對計數器進行自增,所以這里要把計數器換成并發安全的 sync.WaitGroup 然后可以在每個 goroutine 里來安全的操作計數器。這里要防止計數器過早的被減到0,不過邏輯還算簡單,就是在向 worlist 添加元素之前進行加1操作。
然后 n 計數器的減1的操作要上更加復雜。需要在 worklist 里的一組 URL 全部操作完之后,才能把 n 計數器減1,這就需要再引入一個計數器 n2。只有等計數器 n2 歸0后,才能將計數器 n 減1。這里還要防止程序卡死。向 worklist 添加元素額操作會一直阻塞,直到主函數 for 循環的下一次迭代時從 worklist 接收數據位置。所以要仔細考慮每個操作的正確順序,具體還是看代碼吧:
package main import ( "flag" "fmt" "log" "sync" "gopl/ch6/links" ) var count = make(chan int) // 統計一共爬取了多個頁面 // 令牌 tokens 是一個計數信號量 // 確保并發請求限制在 20 個以內 var tokens = make(chan struct{}, 20) func crawl(url string, depth int) urllist { fmt.Println(depth, <-count, url) tokens <- struct{}{} // 獲取令牌 list, err := links.Extract(url) <-tokens // 釋放令牌 if err != nil { log.Print(err) } return urllist{list, depth + 1} } var depth int func init() { flag.IntVar(&depth, "depth", -1, "深度限制") // 小于0就是不限制遞歸深度,0就是只爬取當前頁面 } type urllist struct { urls []string depth int } func main() { // 負責 count 值自增的 goroutine go func() { var i int for { i++ count <- i } }() flag.Parse() worklist := make(chan urllist) // 等待發送到任務列表的數量 // 因為需要在 goroutine 里修改,需要換成并發安全的計數器 var n sync.WaitGroup starturls := flag.Args() if len(flag.Args()) == 0 { starturls = []string{"http://lab.scrapyd.cn/"} } // 從命令行參數開始 n.Add(1) go func() { worklist <- urllist{starturls, 0} }() // 等待全部worklist處理完,就關閉worklist go func() { n.Wait() close(worklist) }() // 并發爬取 Web seen := make(map[string]bool) for list := range worklist { // 處理完一個worklist后才能讓 n 計數器減1 // 而處理 worklist 又是很多個 goroutine,所以需要再用一個計數器 var n2 sync.WaitGroup for _, link := range list.urls { if !seen[link] { seen[link] = true n2.Add(1) go func(url string, listDepth int) { nextList := crawl(url, listDepth) // 如果 depth>0 說明有深度限制 // 如果當前的深度已經達到(或超過)深度限制,則爬取完這個連接后,不需要再繼續爬取,直接返回 if depth >= 0 && listDepth >= depth { // 超出遞歸深度的頁面,在爬取完之后,也輸出 URL // for _, nextUrl := range nextList.urls { // fmt.Println(nextList.depth, "stop", nextUrl) // } n2.Done() // 所有退出的情況都要減計數器n2的值,但是一定要在向通道發送之前 return } n.Add(1) // 添加任務前,計數加1 n2.Done() // 先確保計數器n加1了,再減計數器n2的值 worklist <- nextList // 新的任務加入管道必須在最后,之后再一次for循環迭代的時候,才會接收這個值 }(link, list.depth) } } n2.Wait() n.Done() // 把計數器的操作也放到 goroutine 中,這樣可以繼續下一次 for 循環的迭代 // go func() { // n2.Wait() // n.Done() // }() } }
主函數 for 循環最后對計數器 n 和 n2 的操作,也是可以放到一個 goroutine 里的。現在會在 for 循環每次迭代的時候,等待直到一個 worklist 全部處理完畢后,才會處理下一個 worklist。所以這部分的邏輯還是串行的,不個這樣方面確認程序的正確性。之后可以結單修改一下,也放到一個 goroutine 中處理,讓 for 循環可以繼續迭代:
go func() { n2.Wait() n.Done() }()
最后測試程序的還有一個困擾的問題。不過仔細檢查之后,其實并不是問題。就是程序在所有的 URL 輸出之后,還會等待比較長的一段時間才會退出。一個真正的爬蟲,不是要輸出 URL 而是要爬取頁面。程序是在每次準備爬取頁面之前,先將頁面的 URL 打印輸出,然后去爬取并解析頁面的內容。全部 URL 輸出完,程序退出之前,這段沒有任何輸出的時間里,就是在對剩余的頁面進行爬取。原本爬完之后,檢查到深度超過限制就不會做任何操作。這里可以在檢查后,把返回的所有連接的 URL 和深度也進行輸出。這段代碼已經寫在例子中但是被注釋掉了,放開后,就能看到更多的輸出內容,確認退出前的這段時間里,程序依然在正確的執行。
繼續添加功能,這次在任務開始后,可以通過鍵盤輸入,來終止任務。這類操作還是比較常見的,下面應該是一種比較通用的做法。這里還包括一些額外的技巧的講解。
首先了解一下取消操作為什么需要一個廣播的機制,以及利用通道關閉的特性,實現廣播。
一個 goroutine 無法直接終止另一個,因為這樣會讓所有的共享變量狀態處于不確定狀態。正確的做法是使用通道來傳遞一個信號,當 goroutine 接收到信號時,就終止自己。這里要討論的是如何同時取消多個 goroutine。
一個可選的做法是,給通道發送你要取消的 goroutine 同樣多的信號。但是如果一些 goroutine 已經自己終止了,這樣計數就多了,就會在發送過程中卡住。如果某些 goroutine 還會自我繁殖,那么信號的數量又會太少。通常,任何時刻都很難知道有多少個 goroutine 正在工作。對于取消操作,這里需要一個可靠的機制在一個通道上廣播一個事件,這樣所以的 goroutine 就都能收到信號,而不用關心具體有多少個 goroutine。
當一個通道關閉且已經取完所有發送的值后,接下來的接收操作都會立刻返回,得到零值。就可以利用這個特性來創建一個廣播機制。第一步,創建一個取消通道,在它上面不發送任何的值,但是它的關閉表明程序需要停止它正在做的事前。
還要定義一個工具函數 cancelled,在它被調用的時候檢測或輪詢取消狀態:
var done = make(chan struct{}) func cancelled() bool { select { case <-done: return true default: return false } }
如果需要在原本是通道操作的地方增加取消操作判斷的邏輯,那么就對原本要操作的通道和取消廣播的通道寫一個 select 多路復用。
如果要判斷的位置原本沒有通道,那么就是一個非阻塞的只有取消廣播通道的 select 多路復用,就是這里的工具函數。簡單來講,直接調用工具函數進行判斷即可。
之后的代碼里就是這么做的。
接下來,創建一個讀取標準輸入的 goroutine,它通常連接到終端,當用戶按回車后,這個 goroutine 通過關閉 done 通道來廣播取消事件:
// 當檢測到輸入時,廣播取消 go func() { os.Stdin.Read(make([]byte, 1)) // 讀一個字節 close(done) }()
把這個新的 goroutine 加在主函數的開頭就好了。
現在要讓所有的 goroutine 來響應這個取消操作。在主 goroutine 中的 select 中,嘗試從 done 接收。如果接收到了,就需要進行取消操作,但是在結束之前,它必須耗盡 worklist 通道,丟棄它所有的值,直到通道關閉。這么做是為了保證 for 循環里之前迭代時調用的匿名函數都可以執行完,不會卡在向 worklist 通道發送消息上:
var list urllist var worklistok bool select { case <-done: // 耗盡 worklist,讓已經創建的 goroutine 結束 for range worklist { n.Done() } // 執行到這里的前提是迭代完 worklist,就是需要 worklist 關閉 // 關閉 worklist 則需要 n 計數器歸0。而 worklist 每一次減1,需要一個 n2 計數器歸零 // 所以,下面的 return 應該不會在其他 goroutine 運行完畢之前執行 return case list, worklistok = <-worklist: if !worklistok { break loop } }
之后的 for 循環會沒每個 URL 開啟一個 goroutine。在每一次迭代中,在開始的時候輪詢取消狀態。如果是取消的狀態,就什么都不做并且終止迭代:
for _, link := range list.urls { if cancelled() { break } // 省略之后的代碼 }
現在基本就避免了在取消后創建新的 goroutine。但是其他已經創建的 goroutine 則會等待他們執行完畢。要想更快的響應,就需要對程序邏輯進行侵入式的修改。要確保在取消事件之后沒有更多昂貴的操作發生。這就需要更新更多的代碼,但是通常可以通過在少量重要的地方檢查取消狀態來達到目的。在 crawl 中獲取信號量令牌的操作也需要快速結束:
func crawl(url string, depth int) urllist { select { case <-done: return urllist{nil, depth + 1} case tokens <- struct{}{}: // 獲取令牌 fmt.Println(depth, <-count, url) } list, err := links.Extract(url, done) <-tokens // 釋放令牌 if err != nil && !strings.Contains(err.Error(), "net/http: request canceled") { log.Print(err) } return urllist{list, depth + 1} }
在 crwal 函數中,調用了 links.Extract 函數。這是一個非常耗時的網絡爬蟲操作,并且不會馬上返回。正常需要等到頁面爬取完畢,或者連接超時才返回。而我們的程序也會一直等待所有的爬蟲返回后才會退出。所以這里在調用的時候,把取消廣播的通道傳遞傳遞給函數了,下面就是修改 links.Extract 來響應這個取消操作,立刻終止爬蟲并返回。
HTTP 請求可以通過關閉 http.Request 結構體中可選的 Cancel 通道進行取消。http.Get 便利函數沒有提供定制 Request 的機會。這里要使用 http.NewRequest 創建請求,設置它的 Cancel 字段,然后調用 http.DefaultClient.Do(req) 來執行請求。對 links 包中的 Extract 函數按上面說的進行修改,具體如下:
// 向給定的URL發起HTTP GET 請求 // 解析HTML并返回HTML文檔中存在的鏈接 func Extract(url string, done <-chan struct{}) ([]string, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err } req.Cancel = done resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("get %s: %s", url, resp.Status) } // 僅修改開頭的部分,后面的代碼省略 }
期望的情況是,當然是當取消事件到來時 main 函數可以返回,然后程序隨之退出。如果發現在取消事件到來的時候 main 函數沒有返回,可以執行一個 panic 調用。從崩潰的轉存儲信息中通常含有足夠的信息來幫助我們分析,發現哪些 goroutine 還沒有合適的取消。也可能是已經取消了,但是需要的時間比較長。總之,使用 panic 可以幫助查找原因。
最后還有一個小例子,場景比較常見,并且有一個 goroutine 泄露的問題需要注意。
下面的例子展示一個使用緩沖通道的應用。它并發地向三個鏡像地址發請求,鏡像指相同但分布在不同地理區域的服務器。它將它們的響應通過一個緩沖通道進行發送,然后只接收第一個返回的響應,因為它是最早到達的。所以 mirroredQuery 函數甚至在兩個比較慢的服務器還沒有響應之前返回了一個結果。(偶然情況下,會出現像這個例子中的幾個 goroutine 同時在一個通道上并發發送,或者同時從一個通道接收的情況。):
func mirroredQuery() string { responses := make(chan string, 3) // 有幾個鏡像,就要多大的容量,不能少 go func () { responses <- request("asia.gopl.io") }() go func () { responses <- request("europe.gopl.io") }() go func () { responses <- request("americas.gopl.io") }() return <- responses // 返回最快一個獲取到的請求結果 } func request(hostname string) (response string) { return "省略獲取返回的代碼" }
在上面的示例中,如果使用的是無緩沖通道,兩個比較慢的 goroutine 將被卡住,因為在它們發送響應結果到通道的時候沒有 goroutine 來接收。這個情況叫做 goroutine 泄漏。它屬于一個 bug。不像回收變量,泄漏的 goroutine 不會自動回收,所以要確保 goroutine 在不再需要的時候可以自動結束。
上面只是一個大致的框架,不過核心思想都在里面了。現在來完成這里的 request 請求。并且 request 里會發起 http 請求,雖然可以讓每一個請求都執行完畢。但是只要第一個請求完成后,其他請求就可以終止了,現在也已經掌握了主動關閉 http 請求的辦法了。
這里把示例的功能寫的更加完整一些,用上之前的頁面解析和獲取 title 的部分代碼。通過命令行參數提供的多個 url 爬取頁面,解析頁面的 title,返回第一個完成的 title。完整的代碼如下:
package main import ( "errors" "fmt" "io" "net/http" "os" "strings" "sync" "golang.org/x/net/html" ) // 遞歸解析文檔樹獲取 title func forEachNode(n *html.Node, titleP *string, pre, post func(n *html.Node)) { if *titleP != "" { return } if pre != nil { pre(n) } for c := n.FirstChild; c != nil; c = c.NextSibling { forEachNode(c, titleP, pre, post) } if post != nil { post(n) } } // 使用上面的 forEachNode 函數,遞歸文檔樹。返回找到的第一個 title 或者全部遍歷返回空字符串 func soleTitle(doc *html.Node) string { var title string // 被下面的閉包引用了 visitNode := func(n *html.Node) { if n.Type == html.ElementNode && n.Data == "title" && n.FirstChild != nil { title = n.FirstChild.Data } } forEachNode(doc, &title, visitNode, nil) return title } // 解析返回 title 的入口函數 // 把響應體解析為文檔樹,然后交給 soleTitle 處理,獲取 title func title(url string, body io.Reader) (string, error) { doc, err := html.Parse(body) if err != nil { return "", fmt.Errorf("parseing %s as HTML: %v", url, err) } title := soleTitle(doc) if title == "" { return "", errors.New("no title element") } return title, nil } // 上面是解析返回結果的邏輯,不是這里的重點 // 請求鏡像資源 func mirroredQuery(urls ...string) string { type respData struct { // 返回的數據類型 resp string err error } count := len(urls) // 總共發起的請求數 responses := make(chan respData, count) // 有幾個鏡像,就要多大的容量,不能少 done := make(chan struct{}) // 取消廣播的通道 var wg sync.WaitGroup // 計數器,等所有請求返回后再結束。幫助判斷其他連接是否可以取消 wg.Add(count) for _, url := range urls { go func(url string) { defer wg.Done() resp, err := request(url, done) responses <- respData{resp, err} }(url) } // 等待結果返回并處理 var response string for i := 0; i < count; i++ { data := <-responses if data.err == nil { // 只接收第一個無錯誤的返回 response = data.resp close(done) break } fmt.Fprintf(os.Stderr, "mirror get: %v\n", data.err) } wg.Wait() return response } // 負責發起請求并返回結果和可能的錯誤 func request(url string, done <-chan struct{}) (response string, err error) { req, err := http.NewRequest("GET", url, nil) if err != nil { return "", err } req.Cancel = done resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() // 檢查返回的頁面是HTML通過判斷Content-Type,比如:Content-Type: text/html; charset=utf-8 ct := resp.Header.Get("Content-Type") if ct != "text/html" && !strings.HasPrefix(ct, "text/html;") { return "", fmt.Errorf("%s has type %s, not text/html", url, ct) } // 如果上面檢查響應頭沒問題,把響應體交給 title 函數解析獲取結果 return title(url, resp.Body) } func main() { response := mirroredQuery(os.Args[1:]...) fmt.Println(response) }
這里的 request 除了返回響應消息還會返回一個錯誤。在 mirroredQuery 函數內需要處理這個錯誤,從而可以獲取到第一個正確返回的響應消息。也有可能所有的請求都沒有正確的返回,這里的做法也確保了所有的請求都返回錯誤后程序可以正常執行結束。
解析頁面獲取 title 的部分,基本參照了上面的獲取頁面的title這小節的實現。
到此,關于“Go語言網絡爬蟲實例代碼分享”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。