侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

玩转webmagic代码之Scheduler

2022-06-25 星期六 / 0 评论 / 0 点赞 / 62 阅读 / 8981 字

webmagic上线之后,因为灵活性很强,得到了一些爬虫老手的欢迎,但是对于新手来说可能稍微摸不着头脑,我的需求是这样子,什么模块化,什么灵活性,但是看了半天,我也不知道怎么解决我的问题啊?这里先谈谈

webmagic上线之后,因为灵活性很强,得到了一些爬虫老手的欢迎,但是对于新手来说可能稍微摸不着头脑,我的需求是这样子,什么模块化,什么灵活性,但是看了半天,我也不知道怎么解决我的问题啊?

这里先谈谈Scheduler,不单关乎框架,更多是一些爬虫通用的思想,希望对大家有帮助。

为什么要有Scheduler

其实Scheduler并非webmagic独创,在scrapy以及其他成熟爬虫中都有类似模块。Scheduler管理了所有待抓取的url,单个爬虫自己是无法控制要抓取什么的,抓什么都由Scheduler决定。

这样子最大的好处就是,爬虫本身没有状态,给一个url,处理一个,非常容易进行水平扩展(就是加线程、或者加机器),而且即使单台爬虫宕机,也不会有什么损失。这跟我们在应用开发中,所说的"服务无状态"的思想是很像的。而相反,如果在单个爬虫线程内部,循环甚至递归的进行抓取,那么这部分工作是无法扩展的,而且宕机之后恢复会很困难。

<!-- lang: java -->public interface Scheduler {    public void push(Request request, Task task);    public Request poll(Task task);}

webmagic里的Scheduler只有两个接口,一个放入url,一个取出url。

玩转Scheduler

层级关系及上下文信息

我们这里举一个较复杂的例子。例如,我们要从http://www.ip138.com/post/上抓取全国的邮编地址,最后我们想要得到一个树状结构的结果,这个结果包括省 市 县 村/街道 邮编。这里有两个需求:一个是优先抓最终页面,一个是要带上所有前面页面的信息。如果随便手写一个爬虫,可能我们就会用递归的形式写了,那么在webmagic里如何做呢?

从0.2.1起,webmagic的Request,也就是保存待抓取url的对象,有两个大的改动:

一个是支持优先级,这样子要深度优先还是广度优先,都可以通过给不同层次设置不同值完成。

二是可以在Request里附加额外信息request.putExtra(key,value),这个额外信息会带到下次页面抓取中去。

于是,我们可以通过给最终页面增加高优先级,达到优先抓取的目的;同时可以把之前抓取的信息保存到Request里去,在最终结果中,附加上前面页面的信息。

最终代码在这里,当然,其实这个例子里,最终页面是包含“省”、“市”信息的,这里只是讨论附加信息的可能性。

<!-- lang: java -->public class ZipCodePageProcessor implements PageProcessor {    private Site site = Site.me().setCharset("gb2312")            .setSleepTime(100).addStartUrl("http://www.ip138.com/post/");    @Override    public void process(Page page) {        if (page.getUrl().toString().equals("http://www.ip138.com/post/")) {            processCountry(page);        } else if (page.getUrl().regex("http://www//.ip138//.com/post///w+[/]?$").toString() != null) {            processProvince(page);        } else {            processDistrict(page);        }    }    private void processCountry(Page page) {        List<String> provinces = page.getHtml().xpath("//*[@id=/"newAlexa/"]/table/tbody/tr/td").all();        for (String province : provinces) {            String link = xpath("//@href").select(province);            String title = xpath("/text()").select(province);            Request request = new Request(link).setPriority(0).putExtra("province", title);            page.addTargetRequest(request);        }    }    private void processProvince(Page page) {        //这里仅靠xpath没法精准定位,所以使用正则作为筛选,不符合正则的会被过滤掉        List<String> districts = page.getHtml().xpath("//body/table/tbody/tr/td").regex(".*http://www//.ip138//.com/post///w+///w+.*").all();        for (String district : districts) {            String link = xpath("//@href").select(district);            String title = xpath("/text()").select(district);            Request request = new Request(link).setPriority(1).putExtra("province", page.getRequest().getExtra("province")).putExtra("district", title);            page.addTargetRequest(request);        }    }    private void processDistrict(Page page) {        String province = page.getRequest().getExtra("province").toString();        String district = page.getRequest().getExtra("district").toString();        List<String> counties = page.getHtml().xpath("//body/table/tbody/tr").regex(".*<td>//d+</td>.*").all();        String regex = "<td[^<>]*>([^<>]+)</td><td[^<>]*>([^<>]+)</td><td[^<>]*>([^<>]+)</td><td[^<>]*>([^<>]+)</td>";        for (String county : counties) {            String county0 = regex(regex, 1).select(county);            String county1 = regex(regex, 2).select(county);            String zipCode = regex(regex, 3).select(county);            page.putField("result", StringUtils.join(new String[]{province, district,                    county0, county1, zipCode}, "/t"));        }        List<String> links = page.getHtml().links().regex("http://www//.ip138//.com/post///w+///w+").all();        for (String link : links) {            page.addTargetRequest(new Request(link).setPriority(2).putExtra("province", province).putExtra("district", district));        }    }    @Override    public Site getSite() {        return site;    }    public static void main(String[] args) {        Spider.create(new ZipCodePageProcessor()).scheduler(new PriorityScheduler()).run();    }}

这段代码略复杂,因为我们其实进行了了3种页面的抽取,论单个页面,还是挺简单的:)

同样的,我们可以实现一个最多抓取n层的爬虫。通过在request.extra里增加一个"层数"的概念即可做到,而Scheduler只需做少量定制:

<!-- lang: java -->public class LevelLimitScheduler extends PriorityScheduler {    private int levelLimit = 3;    public LevelLimitScheduler(int levelLimit) {        this.levelLimit = levelLimit;    }    @Override    public synchronized void push(Request request, Task task) {        if (((Integer) request.getExtra("_level")) <= levelLimit) {            super.push(request, task);        }    }}

按照指定URL查询

例如我想要抓取百度某些关键词查询的结果,这个需求再简单不过了,你可以先新建一个Scheduler,将想要查询的URL全部放入Scheduler之后,再启动Spider即可:

<!-- lang: java -->PriorityScheduler scheduler = new PriorityScheduler();Spider spider = Spider.create(new ZipCodePageProcessor()).scheduler(scheduler);scheduler.push(new Request("http://www.baidu.com/s?wd=webmagic"),spider);//这里webmagic是关键词...//其他地址spider.run();

定期轮询

有一类需求是,定期检查页面是否更新,如果更新,则抓取最新数据。这里包括两个问题:

定期抓取和更新持久化数据。后者在Pipeline分享时候再说。

而定期轮询,最简单的方法就是定期去启动Spider.run()。这样子没什么问题,只是不够优雅,还有一种方法是用Scheduler做定期分发,一次性把URL放进去,然后隔一段时间间隔后,再把url取出来。我这里基于DelayQueue进行了一个实现:DelayQueueScheduler,大致思路就是这样。

分布式

webmagic里有一个基于redis的RedisScheduler,可以实现较简单的分布式功能。选用redis是因为redis比较轻量,同时有强大的数据结构支持。实际上更为通用的方法是:将队列管理和url去重拆分开来,用对应的工具去做。

url队列,实际上很适合的载体工具就是各种消息队列,例如JMS的实现ActiveMQ。当然如果你对关系数据库比较熟悉,用它们来处理也是没有问题的。

关于去重,就现成的工具来说的话,倒是没有什么比redis更合适了。当然,你也可以自己构建一个去重服务,利用bloom filter等算法减少内存开销。

玩转webmagic系列以后会不定期更新,希望对大家有帮助。

最后依然附上 webmagic的github地址:

https://github.com/code4craft/webmagic

广告 广告

评论区