前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Colly源码解析——结合例子分析底层实现

Colly源码解析——结合例子分析底层实现

作者头像
方亮
发布于 2019-01-16 09:09:43
发布于 2019-01-16 09:09:43
1.1K00
代码可运行
举报
文章被收录于专栏:方亮方亮
运行总次数:0
代码可运行

通过《Colly源码解析——框架》分析,我们可以知道Colly执行的主要流程。本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现。(转载请指明出于breaksoftware的csdn博客)

递归深度

        以下例子截取于Basic

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	c := colly.NewCollector(
		// Visit only domains: hackerspaces.org, wiki.hackerspaces.org
		colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),
	)

	// On every a element which has href attribute call callback
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		link := e.Attr("href")
		// Print link
		fmt.Printf("Link found: %q -> %s\n", e.Text, link)
		// Visit link found on page
		// Only those links are visited which are in AllowedDomains
		c.Visit(e.Request.AbsoluteURL(link))
	})

        c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *Collector) Visit(URL string) error {
	return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

        由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// requestCheck method
	if c.MaxDepth > 0 && c.MaxDepth < depth {
		return ErrMaxDepth
	}

        如果希望通过MaxDepth控制深度,则可以参见Max depth例子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	c := colly.NewCollector(
		// MaxDepth is 1, so only the links on the scraped page
		// is visited, and no further links are followed
		colly.MaxDepth(1),
	)

	// On every a element which has href attribute call callback
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		link := e.Attr("href")
		// Print link
		fmt.Println(link)
		// Visit link found on page
		e.Request.Visit(link)
	})

        第4行将深度设置为1,这样理论上只能访问第一层的URL。

        如果OnHTML中的代码和Basic例子一样,即使用Collector的Visit访问URL,则由于其depth一直传1,而导致requestCheck的深度检测一直不满足条件,从而会访问超过1层的URL。

        所以第13行,调用的是HTMLElement的Visit方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (r *Request) Visit(URL string) error {
	return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}

        相较于Collector的Visit,HTMLElement的Visit方法将Depth增加了1,并且传递了请求的上下文(ctx)。由于depth有变化,所以之后的深度检测会返回错误,从而只会访问1层URL。

规则

        Collector的Limit方法用于设置各种规则。这些规则最终在Collector的httpBackend成员中执行。

        一个Collector只有一个httpBackend结构体指针,而一个httpBackend结构体可以有一组规则

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type httpBackend struct {
	LimitRules []*LimitRule
	Client     *http.Client
	lock       *sync.RWMutex
}

        规则针对Domain来区分,我们可以通过设定不同的匹配规则,让每组URL执行相应的操作。这些操作包括:

  • 访问并行数
  • 访问间隔延迟

        参见Parallel例子。只截取其中关键一段

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	// Limit the maximum parallelism to 2
	// This is necessary if the goroutines are dynamically
	// created to control the limit of simultaneous requests.
	//
	// Parallelism can be controlled also by spawning fixed
	// number of go routines.
	c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})

        Collector的Limit最终会调用到httpBackend的Limit,它将规则加入到规则组后初始化该规则。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {
	waitChanSize := 1
	if r.Parallelism > 1 {
		waitChanSize = r.Parallelism
	}
	r.waitChan = make(chan bool, waitChanSize)
	hasPattern := false
	if r.DomainRegexp != "" {
		c, err := regexp.Compile(r.DomainRegexp)
		if err != nil {
			return err
		}
		r.compiledRegexp = c
		hasPattern = true
	}
	if r.DomainGlob != "" {
		c, err := glob.Compile(r.DomainGlob)
		if err != nil {
			return err
		}
		r.compiledGlob = c
		hasPattern = true
	}
	if !hasPattern {
		return ErrNoPattern
	}
	return nil
}

        第7行创建了一个可以承载waitChanSize个元素的channel。可以看到,如果我们在规则中没有设置并行数,也会创建只有1个元素的channel。这个channel会被用于调节并行执行的任务数量。所以这也就意味着,一旦调用了Limit方法而没设置Parallelism值,该Collector中针对符合规则的请求就会变成串行的。

        第10和18行分别针对不同规则初始化一个编译器。因为这个操作比较重,所以在初始化时执行,之后只是简单使用这些编译器即可。

        当发起请求时,流程最终会走到httpBackend的Do方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {
	r := h.GetMatchingRule(request.URL.Host)
	if r != nil {
		r.waitChan <- true
		defer func(r *LimitRule) {
			randomDelay := time.Duration(0)
			if r.RandomDelay != 0 {
				randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))
			}
			time.Sleep(r.Delay + randomDelay)
			<-r.waitChan
		}(r)
	}

        第2行通过域名查找对应的规则,如果找到,则在第4行尝试往channel中加入元素。这个操作相当于上锁。如果channel此时是满的,则该流程会被挂起。否则就执行之后的流程。在Do函数结束,命中规则的会执行上面的匿名函数,它在休眠规则配置的时间后,尝试从channel中获取数据。这个操作相当于释放锁。

        Colly就是通过channel的特性实现了并行控制。

并行

        在“规则”一节,我们讲到可以通过Parallelism控制并行goroutine的数量。httpBackend的Do方法最终将被Collector的fetch方法调用,而该方法可以被异步执行,即是一个goroutine。这就意味着承载Do逻辑的goroutine执行完毕后就会退出。而一种类似线程的技术在Colly也被支持,它更像一个生产者消费者模型。消费者线程执行完一个任务后不会退出,而在生产者生产出的物料池中取出未处理的任务加以处理。

        以下代码截取于Queue

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	q, _ := queue.New(
		2, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

	……

	for i := 0; i < 5; i++ {
		// Add URLs to the queue
		q.AddURL(fmt.Sprintf("%s?n=%d", url, i))
	}
	// Consume URLs
	q.Run(c)

        这次没有调用Collector的Visit等函数,而是调用了Queue的Run。

        第2行创建了一个具有2个消费者(goroutine)的Queue。第10行预先给这个Queue加入5个需要访问的URL。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {
	u, err := url.Parse(URL)
	if err != nil {
		return err
	}
	r := &colly.Request{
		URL:    u,
		Method: "GET",
	}
	d, err := r.Marshal()
	if err != nil {
		return err
	}
	return q.storage.AddRequest(d)
}

        AddUrl的第11行将请求序列化,在第15行将该序列化数据保存到“仓库”中。

        在Run方法中,Colly将启动2个goroutine。注意它是使用for循环组织的,这意味着如果for内无break,它会一直循环执行下去——不退出。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (q *Queue) Run(c *colly.Collector) error {
	wg := &sync.WaitGroup{}
	for i := 0; i < q.Threads; i++ {
		wg.Add(1)
		go func(c *colly.Collector, wg *sync.WaitGroup) {
			defer wg.Done()
			for {

        如果队列中没有需要处理的request,则会尝试退出

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				if q.IsEmpty() {
					if q.activeThreadCount == 0 {
						break
					}
					ch := make(chan bool)
					q.lock.Lock()
					q.threadChans = append(q.threadChans, ch)
					q.lock.Unlock()
					action := <-ch
					if action == stop && q.IsEmpty() {
						break
					}
				}

        activeThreadCount表示当前运行中的消费者goroutine数量。如果已经没有消费者了,则直接跳出for循环,整个goroutine结束。

        如果还有消费者,则创建一个channel,并将其加入到q.threadChans的channel切片中。然后在第9行等待该channel被写入值。如果写入的是true并且此时没有需要处理的request,则退出goroutine。可以看到这段逻辑检测了两次是否有request,这个我们之后再讨论。

        如果还有request要处理,则递增消费者数量(在finish中会递减以抵消)。然后从“仓库”中取出一个任务,在通过Request的Do方法发起请求,最后调用finish方法善后。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				q.lock.Lock()
				atomic.AddInt32(&q.activeThreadCount, 1)
				q.lock.Unlock()
				rb, err := q.storage.GetRequest()
				if err != nil || rb == nil {
					q.finish()
					continue
				}
				r, err := c.UnmarshalRequest(rb)
				if err != nil || r == nil {
					q.finish()
					continue
				}
				r.Do()
				q.finish()
			}
		}(c, wg)
	}
	wg.Wait()
	return nil
}

        finish方法干了三件事:

  1. 递减消费者数量,以抵消Run方法中的递增。
  2. 将Queue的各个等待中的,其他goroutine创建的channel传入true值,即告知他们可以退出了。
  3. 给Queue创建一个空的channel切片
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (q *Queue) finish() {
	q.lock.Lock()
	q.activeThreadCount--
	for _, c := range q.threadChans {
		c <- stop
	}
	q.threadChans = make([]chan bool, 0, q.Threads)
	q.lock.Unlock()
}

        我们再看下怎么在请求的过程中给Queue增加任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {
	d, err := r.Marshal()
	if err != nil {
		return err
	}
	if err := q.storage.AddRequest(d); err != nil {
		return err
	}
	q.lock.Lock()
	for _, c := range q.threadChans {
		c <- !stop
	}
	q.threadChans = make([]chan bool, 0, q.Threads)
	q.lock.Unlock()
	return nil
}

        第3~9行,会将请求序列化后保存到“仓库”中。

        第10~15行,会将其他goroutine创建的channel传入false,告知它们不要退出。然后再创建一个空的channel切片。

        finish和AddRequest都使用锁锁住了所有的逻辑,而且它们都会把其他goroutine创建的channel传入值,然后将Queue的channel切片清空。这样就保证这些channel只可能收到一种状态。由于它自己创建的channel是在finish调用完之后才有机会创建出来,所以不会造成死锁。

        再回来看goroutine退出的逻辑

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				if q.IsEmpty() {
					if q.activeThreadCount == 0 {
						break
					}
					ch := make(chan bool)
					q.lock.Lock()
					q.threadChans = append(q.threadChans, ch)
					q.lock.Unlock()
					action := <-ch
					if action == stop && q.IsEmpty() {
						break
					}
				}

        如果finish方法中递减的activeThreadCount为0,这说明这是最后一个goroutine了,而且当前也没request,所以退出。当然此时存在一种可能:在1行执行结束后,其他非消费者goroutine调用AddRequest新增了若干request。而执行第2行时,goroutine将退出,从而导致存在request没有处理的可能。

        如果还存在其他goroutine,则本goroutine将在第5行创建一个channel,并将这个channel加入到Queue的channel切片中。供其他goroutine调用finish往channel中传入true,或者AddRequest传入false,调控是否需要退出本过程。在第9行等待channel传出数据前,可能存在如下几种情况:

  1. 执行了finish
  2. 执行了AddRequest
  3. 执行了finish后执行了AddRequest
  4. 执行了AddRequest后执行了finish

        如果是第1和4种,action将是false。第2和3种,action是true。但是这个情况下不能单纯的通过action决定是否退出。因为第9和10行执行需要时间,这段时间其他goroutine可能还会执行AddRequest新增任务,或者GetRequest删除任务。所以还要在第10行检测下IsEmpty。

        这段是我阅读Colly中思考的最多的代码,因为有goroutine和channel,导致整个逻辑比较复杂。也感慨下,虽然goroutine很方便,但是真的能把它写对也是不容易的。

分布式

        在Queue例子中,我们看到“仓库”这个概念。回顾下Queue的例子,“仓库”是InMemoryQueueStorage。顾名思义,它是一个内存型的仓库,所以不存在分布式基础。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	// create a request queue with 2 consumer threads
	q, _ := queue.New(
		2, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

        一个分布式的例子是Redis backend,截取一段

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	// create the redis storage
	storage := &redisstorage.Storage{
		Address:  "127.0.0.1:6379",
		Password: "",
		DB:       0,
		Prefix:   "httpbin_test",
	}

	// add storage to the collector
	err := c.SetStorage(storage)
	if err != nil {
		panic(err)
	}

	// delete previous data from storage
	if err := storage.Clear(); err != nil {
		log.Fatal(err)
	}

	// close redis client
	defer storage.Client.Close()

	// create a new request queue with redis storage backend
	q, _ := queue.New(2, storage)

        这儿创建了一个redis型的仓库。不仅Collector的Storage是它,Queue的Storage也是它。这样一个集群上的服务都往这个仓库里存入和取出数据,从而实现分布式架构。

        redisstorage库引自github.com/gocolly/redisstorage。我们查看其源码,其实现了Collector的storage需要的接口

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Storage interface {
	// Init initializes the storage
	Init() error
	// Visited receives and stores a request ID that is visited by the Collector
	Visited(requestID uint64) error
	// IsVisited returns true if the request was visited before IsVisited
	// is called
	IsVisited(requestID uint64) (bool, error)
	// Cookies retrieves stored cookies for a given host
	Cookies(u *url.URL) string
	// SetCookies stores cookies for a given host
	SetCookies(u *url.URL, cookies string)
}

        以及Queue的storage需要的

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Storage is the interface of the queue's storage backend
type Storage interface {
	// Init initializes the storage
	Init() error
	// AddRequest adds a serialized request to the queue
	AddRequest([]byte) error
	// GetRequest pops the next request from the queue
	// or returns error if the queue is empty
	GetRequest() ([]byte, error)
	// QueueSize returns with the size of the queue
	QueueSize() (int, error)
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年11月28日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
java 读写文件的两种方式
  在实际开发过程中,对于文件的读写操作也是经常碰到的,如何用java完成对文件的准确无误的读写呢?
全栈程序员站长
2022/09/06
2510
java 读写文件的两种方式
【Java 基础篇】Java字节流详解
在Java中,字节流是一种用于读取和写入字节数据的流。它提供了一种逐字节操作的方式,适用于处理二进制数据,如图像、音频、视频等。本文将详细介绍Java字节流的原理、使用场景和常用类,并提供一些示例代码。
繁依Fanyi
2023/10/12
3500
Java IO 知识整理
由于java I/O库需要很多性能的各种组合,如果这些性能都是用继承来实现,那么每一种组合都需要一个类,这样就会造成大量行重复的类出现。如果采用装饰模式,那么类的数目就会大大减少,性能的重复也可以减至最少。因此装饰模式是java I/O库基本模式。装饰模式的引进,造成灵活性和复杂性的提高。因此在使用java IO库时,必须理解java IO库是由一些基本的原始流处理器和围绕它们的装饰流处理器所组成的。
Abalone
2022/07/14
6020
Java IO 知识整理
Word转PDF 并转成base64(亲测可用)
ExcelPdToWord.documents4jWordToPdf(sourcePath, targetPath);
默 语
2024/11/20
3710
【Java 基础篇】Java字节缓冲流详解
在Java中,字节缓冲流是一种用于提高字节流读写效率的流。它们通过在内存中创建缓冲区,减少了与底层设备的直接交互次数,从而提高了读写的速度。本文将详细介绍Java字节缓冲流的原理、使用场景和常用类,并提供一些示例代码。
繁依Fanyi
2023/10/12
4670
Java 字节流
此抽象类是表示输出字节流的所有类的超类。输出流接受输出字节并将这些字节发送到某个接收器
Tim在路上
2020/08/04
9040
Java 图片URL转Base64编码
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
全栈程序员站长
2022/10/05
3.9K0
Java 图片URL转Base64编码
md5和base64加密解密
import java.io.IOException; import java.security.MessageDigest; import sun.misc.BASE64Encoder; import sun.misc.BASE64Decoder; public class MD5Util { /** * MD5加密 */ public static String md5Encryption(String str) { MessageDig
HUC思梦
2020/09/03
2.7K0
Base64码常见操作(url链接文件转base64编码、本地文件转base64编码等)
因为是图片,可以复制输出的base64编码到这个网址验证:https://tool.jisuapi.com/base642pic.html 其他base64操作如下:
共饮一杯无
2022/11/28
2.5K0
Java Base64转换,Java Base64工具类
但这个性能一般,而且转换出来的base64字符串会有换行符,可能还需要替换换行符,避免在某些场景因为分行导致出错 2、使用Jdk8的Base64工具类(优先考虑使用)
用户9131103
2023/07/17
8250
Java多文件压缩
ha_lydms
2023/08/10
4310
【Java+EasyExcel】使用 SpringBoot 实现 Excel 文件的导入(含示例代码)
Excel 导入 浏览文件夹,选择需要上传的 Excel 文件,这里使用 POSTMAN 工具; 将本地文件上传至服务器指定位置; 服务器解析Excel文件; 将Excel中解析的数据存入数据库中。
程序员洲洲
2024/06/11
1.4K0
【Java+EasyExcel】使用 SpringBoot 实现 Excel 文件的导入(含示例代码)
JAVA入门学习八
[TOC] 异常概述和分类 概述:异常就是Java程序在运行过程中出现的错误。 异常的分类: 通过API查看Throwable所知在Java.Lang里面使用是不用导包的它是JAVA语言中所有错误或者异常的超类(父根类); Error : 服务器宕机,数据库崩溃等 Exception : 可以接收程序编译和运行时候发生的异常,并且异常子类后缀都是Exception; 异常的继承体系: Throwable (超类) Error Exception 编译时候异常(静态): Java程序必须显示处理,否
全栈工程师修炼指南
2022/09/28
7250
JAVA入门学习八
第五阶段-IO基础:【第三章 异常】
而使用IO流我们可以实现一些强大的功能,例如针对文件的移动复制等操作,又或者程序与外部文件之间的数据存储或者读取,又或者实现一个实时的聊天程序(网络编程),其中数据的传输也用到了我们的IO流,这些内容我们都会在后面设计,下面我就开始IO流的正式学习
BWH_Steven
2019/08/09
5090
Java学习笔记——IO流
File中的方法,仅涉及到如何创建、删除、重命名等等。只要涉及文件内容的,File是无能为力的,必须由io流来完成。
梦飞
2022/06/23
3230
PGP加解密
对接客户需求时对方使用PGP对文件进行加解密,但PGP是商用的非对称加解密方式,可以改用Apache基金会推出的开源的GPG,两者的加解密可以无缝对接。
雨临Lewis
2023/07/11
1.7K0
Java原生图片Base64转码与Base64解码
程序员朱永胜
2023/08/21
4440
实战:第十二章:txt文件转xml文件
开发不就这么点事吗,有个啥好bb的 controller\   @RequestMapping("/DataDistributionController") @RestController public class DataDistributionController { @Autowired DataDistributionService dataDistributionService; @PostMapping("/dataDistribution") public
Java廖志伟
2022/09/28
8650
java实现将图片读取成base64字符串,将base64字符串存储为图片。
将图片转化为字符串以后,由于字符串更方便在网络上通过ajax传输、在网络web前台和后台间进行传输。
全栈程序员站长
2022/11/18
2.1K0
java 将文件流转化成字符串传输
https://www.aliyun.com/jiaocheng/851433.html
用户7886150
2021/04/29
1.6K0
相关推荐
java 读写文件的两种方式
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档