博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RedisSpider的调度队列实现过程及其源码
阅读量:6433 次
发布时间:2019-06-23

本文共 5030 字,大约阅读时间需要 16 分钟。

对于非分布式的scrapy爬虫而言,不能共享爬虫队列,不能实现分布式。RedisSpider是依赖Redis存储中介,来实现多台主机多爬虫之间的通信,RedisSpider是去重是内部的queue.py文件实现的,内部实现了队列、堆栈、优先级队列,在调度的统一协调下最终实现分布式协同工作。

queue.py的源码

from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):    """Per-spider base queue class"""    def __init__(self, server, spider, key, serializer=None):        """Initialize per-spider redis queue.        Parameters        ----------        server : StrictRedis            Redis client instance.        spider : Spider            Scrapy spider instance.        key: str            Redis key where to put and get messages.        serializer : object            Serializer object with ``loads`` and ``dumps`` methods.        """        if serializer is None:            # Backward compatibility.            # TODO: deprecate pickle.            serializer = picklecompat        if not hasattr(serializer, 'loads'):            raise TypeError("serializer does not implement 'loads' function: %r"                            % serializer)        if not hasattr(serializer, 'dumps'):            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"                            % serializer)        self.server = server        self.spider = spider        self.key = key % {'spider': spider.name}        self.serializer = serializer    def _encode_request(self, request):        """Encode a request object"""        obj = request_to_dict(request, self.spider)        return self.serializer.dumps(obj)    def _decode_request(self, encoded_request):        """Decode an request previously encoded"""        obj = self.serializer.loads(encoded_request)        return request_from_dict(obj, self.spider)    def __len__(self):        """Return the length of the queue"""        raise NotImplementedError    def push(self, request):        """Push a request"""        raise NotImplementedError    def pop(self, timeout=0):        """Pop a request"""        raise NotImplementedError    def clear(self):        """Clear queue/stack"""        self.server.delete(self.key)class FifoQueue(Base):    """Per-spider FIFO queue"""    def __len__(self):        """Return the length of the queue"""        return self.server.llen(self.key)    def push(self, request):        """Push a request"""        self.server.lpush(self.key, self._encode_request(request))    def pop(self, timeout=0):        """Pop a request"""        if timeout > 0:            data = self.server.brpop(self.key, timeout)            if isinstance(data, tuple):                data = data[1]        else:            data = self.server.rpop(self.key)        if data:            return self._decode_request(data)class PriorityQueue(Base):    """Per-spider priority queue abstraction using redis' sorted set"""    def __len__(self):        """Return the length of the queue"""        return self.server.zcard(self.key)    def push(self, request):        """Push a request"""        data = self._encode_request(request)        score = -request.priority        # We don't use zadd method as the order of arguments change depending on        # whether the class is Redis or StrictRedis, and the option of using        # kwargs only accepts strings, not bytes.        self.server.execute_command('ZADD', self.key, score, data)    def pop(self, timeout=0):        """        Pop a request        timeout not support in this queue class        """        # use atomic range/remove using multi/exec        pipe = self.server.pipeline()        pipe.multi()        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)        results, count = pipe.execute()        if results:            return self._decode_request(results[0])class LifoQueue(Base):    """Per-spider LIFO queue."""    def __len__(self):        """Return the length of the stack"""        return self.server.llen(self.key)    def push(self, request):        """Push a request"""        self.server.lpush(self.key, self._encode_request(request))    def pop(self, timeout=0):        """Pop a request"""        if timeout > 0:            data = self.server.blpop(self.key, timeout)            if isinstance(data, tuple):                data = data[1]        else:            data = self.server.lpop(self.key)        if data:            return self._decode_request(data)# TODO: Deprecate the use of these names.SpiderQueue = FifoQueueSpiderStack = LifoQueueSpiderPriorityQueue = PriorityQueue

其中一个基类声明有那些接口,实现了三个子类分别是SpiderQueue、SpiderStack、SpiderPriorityQueue ,并在该文件引入了序列化方法picklecompat用来处理数据存储中的兼容编码问题。

三个之类分别实现先进先出、先进后出、优先级先出的Request调度方法其中三种方式储存Rquest对象方法如下:

SpiderQueue:lpush(self.key, self._encode_request(request)),即按照key:序列化Request方式储存

SpiderStack:lpush(self.key, self._encode_request(request)),存储方式同上,不同的是获取request对象是先进后出

SpiderPriorityQueue:execute_command('ZADD', self.key, score, data),按照key、优先级、序列化Request对象方式存储,实现优先级的调度。

当在多台主机上运行相同的爬虫,self.key = key % {'spider': spider.name}爬虫名相同就决定了同一个爬虫的调度队列是相同的key,进而实现了不同主机爬虫之间的分布式。

关于分布式爬虫,此前说过三种架构思想,这里验证了RedisSpider的架构思想《》感兴趣的可用其他两种方式实现。

v2-e5b5956e15e037c402743a5be1d7d12a_hd.j

转载地址:http://tuaga.baihongyu.com/

你可能感兴趣的文章
linux后台执行程序
查看>>
剑指offer---二叉搜索树的后序遍历序列
查看>>
Bit Operation妙解算法题
查看>>
VLC Play in web
查看>>
详解PNG文件结构
查看>>
Statistics与Machine Learning有什么区别
查看>>
python 记录
查看>>
Silverlight 鼠标双击 事件
查看>>
Actionscript通过构造自定义事件和方法,谈谈可选参数的问题
查看>>
递归与尾递归总结
查看>>
笔试题错题集
查看>>
ssm+maven+pageHelper搭建maven项目实现快速分页
查看>>
UIImagePickerController, memory warning
查看>>
寻找缺失的数
查看>>
开发工程师人生之路(强烈推荐,分析的透彻!)——再次看,泪流满面!必须转型过来!...
查看>>
EASYAR初次使用体验
查看>>
我的vim配置文件
查看>>
线性表
查看>>
犯贱老爸
查看>>
Sencha-命令-CMD(工作区)(官网文档翻译31)
查看>>