node.js主从分布式爬虫

更新日期: 2019-03-28阅读: 2.9k标签: 爬虫

前言

前文介绍过用Python写爬虫,但是当任务多的时候就比较慢, 这是由于Python自带的http库urllib2发起的http请求是阻塞式的,这意味着如果采用单线程模型,那么整个进程的大部分时间都阻塞在等待服务端把数据传输过来的过程中。所以我们这次尝试用node.js去做这个爬虫。


为什么选择node.js

node.js是一款基于google的V8引擎开发javascript运行环境。在高性能的V8引擎以及事件驱动的单线程异步非阻塞运行模型的支持下,node.js实现的web服务可以在没有Nginx的http服务器做反向代理的情况下实现很高的业务并发量。


分布式爬虫设计

这次也用上次的分布式设计,使用Redis服务器来作为任务队列。如图:


异步

node.js是基于异步的写法,有时一个函数需要上一个函数的返回值做参数,这样下来一不小心就会陷入回调地狱的陷阱中。
所以这次我们用async模块控制流程。


准备工作

  1. 安装node.js和Redis
  2. 安装request、async与Redis相关的库


代码

主函数(master.js)

"use strict"
const request = require('request')
const cheerio = require('cheerio')
const fs = require('fs')

const utils = require('./utils')
const log = utils.log

const config = require('./config')
const task_url_head = config.task_url_head
const main_url = config.main_url
const proxy_url = config.proxy_url

const redis_cache = require('./redis_cache')
const redis_client = redis_cache.client

const Task = function() {
    this.id = 0
    this.title = ''
    this.url = ''
    this.file_name = ''
    this.file_url = 0
    this.is_download = false
}
//总下载数
var down_cont = 0
//当前下载数
var cur_cont = 0

const taskFromBody = function(task_url, body) {
    
    const task = new Task()
    // cheerio.load 用字符串作为参数返回一个可以查询的特殊对象
    // body 就是 html 内容
    const e = cheerio.load(body)
    // 查询对象的查询语法dom api 中的 querySelector 一样
    const title = e('.controlBar').find('.epi-title').text()
    const file_url = e('.audioplayer').find('audio').attr('src')
    const ext = file_url.substring(file_url.length-4)
    const task_id = task_url.substring(task_url.length-5)
    const file_name = task_id+'.'+title+ext

    task.id = task_id
    task.title = title
    task.url = task_url
    task.file_name = file_name.replace(/\//g,"-").replace(/:/g,":")
    task.file_url = file_url
    task.is_download = false

    redis_client.set('Task:id:'+task_id,JSON.stringify(task),function (error, res) {
        if (error) {
            log('Task:id:'+task_id, error)
        } else {
            log('Task:id:'+task_id, res)
        }
        cur_cont = cur_cont + 1
        if (down_cont == cur_cont) {
            // 操作完成,关闭redis连接
            redis_client.end(true);
            log('已完成')
        }
    })
}

const taskFromUrl = function(task_url) {
    request({
        'url':task_url,
        'proxy':proxy_url,
        }, 
        function(error, response, body) {
        // 回调函数的三个参数分别是  错误, 响应, 响应数据
        // 检查请求是否成功, statusCode 200 是成功的代码
        if (error === null && response.statusCode == 200) {
            taskFromBody(task_url, body)
        } else {
            log('*** ERROR 请求失败 ', error)
        }
    })
}

const parseLink = function(div) {
    let e = cheerio.load(div)
    let href = e('a').attr('href')
    return href
}

const dataFromUrl = function(url) {
    // request 从一个 url 下载数据并调用回调函数
    request({
            'url' : url,
            'proxy' : proxy_url,
            }, 
            function(error, response, body) {
            // 回调函数的三个参数分别是  错误, 响应, 响应数据
            // 检查请求是否成功, statusCode 200 是成功的代码
            if (error === null && response.statusCode == 200) {
                // cheerio.load 用字符串作为参数返回一个可以查询的特殊对象
                // body 就是 html 内容
                const e = cheerio.load(body)
                // 查询对象的查询语法和 DOM API 中的 querySelector 一样
                const itmeDivs = e('.epiItem.video')

                for(let i = 0; i < itmeDivs.length; i++) {
                    let element = itmeDivs[i]
                    // 获取 div 的元素并且用 itmeFromDiv 解析
                    // 然后加入 link_list 数组中
                    const div = e(element).html()
                    // log(div)
                    const url_body = parseLink(div)
                    const task_url = task_url_head+url_body
                    down_cont = itmeDivs.length
                    taskFromUrl(task_url)
                    // redis_client.set('Task:id:'+task_id+':url', task_link, )
                }
                // 操作完成,关闭redis连接
                // redis_client.end(true)
                log('*** success ***')
            } else {
                log('*** ERROR 请求失败 ', error)
            }
        })
}

const __main = function() {
    // 这是主函数
    const url = main_url
    dataFromUrl(url)
}

__main()

从函数(salver.js)

"use strict"
const http = require("http")
const fs = require("fs")
const path = require("path")
const redis = require('redis')
const async = require('async')

const utils = require('./utils')
const log = utils.log

const config = require('./config')
const save_dir_path = config.save_dir_path

const redis_cache = require('./redis_cache')
const redis_client = redis_cache.client

//总下载数
var down_cont = 0
//当前下载数
var cur_cont = 0

const getHttpReqCallback = function(fileUrl, dirName, fileName, downCallback) {
    log('getHttpReqCallback fileName ', fileName)
    var callback = function (res) {
        log("request: " + fileUrl + " return status: " + res.statusCode)
        if (res.statusCode != 200) {
            startDownloadTask(fileUrl, dirName, fileName, downCallback)
            return
        }
        var contentLength = parseInt(res.headers['content-length'])
        var fileBuff = []
        res.on('data', function (chunk) {
            var buffer = new Buffer(chunk)
            fileBuff.push(buffer)
        })
        res.on('end', function () {
            log("end downloading " + fileUrl)
            if (isNaN(contentLength)) {
                log(fileUrl + " content length error")
                return
            }
            var totalBuff = Buffer.concat(fileBuff)
            log("totalBuff.length = " + totalBuff.length + " " + "contentLength = " + contentLength)
            if (totalBuff.length < contentLength) {
                log(fileUrl + " download error, try again")
                startDownloadTask(fileUrl, dirName, fileName, downCallback)
                return
            }
            fs.appendFile(dirName + "/" + fileName, totalBuff, function (err) {
                if (err){
                    throw err;  
                }else{
                    log('download success')
                    downCallback()
                } 
            })
        })
    }
    return callback
}

var startDownloadTask = function (fileUrl, dirName, fileName, downCallback) {
    log("start downloading " + fileUrl)
    var option = {
        host : '127.0.0.1',
        port : '8087',
        method:'get',//这里是发送的方法
        path : fileUrl,
        headers:{
            'Accept-Language':'zh-CN,zh;q=0.8',
            'Host':'maps.googleapis.com'
        }
    }
    var req = http.request(option, getHttpReqCallback(fileUrl, dirName, fileName, downCallback))
    req.on('error', function (e) {
        log("request " + fileUrl + " error, try again")
        startDownloadTask(fileUrl, dirName, fileName, downCallback)
    })
    req.end()
}

const beginTask = function(task_key, callback) {
    log('beginTask', task_key)
    redis_client.get(task_key,function (err,v){
        let task = JSON.parse(v)
        // log('task', task)

        let file_url = task.file_url
        let dir_path = save_dir_path
        let file_name = task.file_name
        
        if (task.is_download === false) {
            startDownloadTask(file_url, dir_path, file_name,function(){
                task.is_download = true
                redis_client.set(task_key, JSON.stringify(task), function (error, res) {   
                    log('update redis success', task_key)
                    // cur_cont = cur_cont + 1
                    // if(cur_cont == down_cont){
                    //     redis_client.end(true)                            
                    // }
                    callback(null,"successful !");
                })
            })
        }else{
            callback(null,"successful !");
        }
    }) 
}


const mainTask = function() {
    redis_client.keys('Task:id:[0-9]*',function (err,v){
        // log(v.sort())  
        let task_keys = v.sort()
        down_cont = task_keys.length
        log('down_cont', down_cont)
        //控制异步
        async.mapLimit(task_keys, 2, function(task_key,callback){
            beginTask(task_key, callback)
        },function(err,result){
            if(err){
                log(err);
            }else{
                // log(result);  //会输出多个“successful”字符串的数组
                log("all down!");
                redis_client.end(true)     
            }
        });
       
    })
}

const initDownFile = function() {
    fs.readdir(save_dir_path, function(err, files){
        if (err) {
            return console.error(err)
        }
        let file_list = []
        files.forEach( function (file){
            file_list.push(file.substring(0, 5))
        })
        // log(file_list)
        redis_client.keys('Task:id:[0-9]*',function (err,v){
            let task_keys = v
            // log(task_keys)
            let unfinish_len = task_keys.filter((item)=>file_list.includes(item.substring(item.length - 5)) == false).length
            let cur_unfinish_lent = 0
            task_keys.forEach(function (task_key){
                let task_id = task_key.substring(task_key.length - 5)
                
                if (file_list.includes(task_id) == false) {
                    // log(task_key)
                    redis_client.get(task_key,function (err,v){
                        let task = JSON.parse(v)

                        task.is_download = false
                        // log(task)
                        // log(task_key)
                        redis_client.set(task_key, JSON.stringify(task), function (error, res) {
                            cur_unfinish_lent++
                            // log('cur_unfinish_lent', cur_unfinish_lent)
                            if (cur_unfinish_lent == unfinish_len) {
                                redis_client.end(true)  
                                log('init finish')
                            }
                        })  
                    })
                }
            })
        })
     })
}

const __main = function() {
    // 这是主函数
    // initDownFile()
    mainTask()
}

__main()

完整代码的地址

https://github.com/zhourunliang/nodejs_crawler

来自:https://www.cnblogs.com/geniusrun/archive/2019/03/28/10614748.html


链接: https://www.fly63.com/article/detial/2591

web爬虫抓取技术的门道,对于网络爬虫技术的攻与防

从爬虫的攻防角度来讲,最简单的爬虫,是几乎所有服务端、客户端编程语言都支持的http请求,只要向目标页面的url发起一个http get请求,即可获得到浏览器加载这个页面时的完整html文档,这被我们称之为“同步页”。

大话爬虫的实践技巧

数据收集的一种是方式是通过上报API进行自身平台用户交互情况的捕获,还有一种手段是通过开发爬虫程序,爬取竞品平台的数据,后面就重点说下爬虫的应用场景和实践中会遇到的问题和反反爬虫的一些套路与技巧。

网络爬虫_基于各种语言的开源网络爬虫总汇

网络爬虫是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。是互联网爬虫,蜘蛛,数据采集器,网页解析器的汇总,下面介绍各语言实现网络爬虫的开源框架

爬虫最终杀手锏 --- PhantomJS 详解(附案例)

PhantomJS无界面的浏览器:认识PhantomJS、网站模拟登录豆瓣网、动态页面模拟点击(unittest -python测试模块)、执行JavaScript代码、模拟最新无界面浏览器...

什么是网络爬虫,网络爬虫有什么用?

网络爬虫在数据采集方面有好的优势,比如采集速度快,比人来操作可能要快一千倍一万倍都不止;方便将获取的数据进行相关的清洗加工以及储存工作;代码可重复使用,或者说是一劳永逸。

爬虫解决网页重定向问题

每个网站主页是网站资源的入口,当重定向发生在网站主页时,如果不能正确处理就很有可能会错失这整个网站的内容。 笔者编写的爬虫在爬取网页时遇到了三种重定向的情况。

爬虫 解决网页ip限制的问题的八种方法

之前由于公司项目需要,采集过google地图数据,还有一些大型网站数据。 有小部分网站的防范措施比较弱,可以伪装下IP,修改X-Forwarded-for(貌似这么拼。。。)即可绕过。ser agent 伪装和轮换 ,使用代理 ip 和轮换

到百度云加速,网页内容爬不到的快速解决

在爬网站时,发现网站做了百度云加速,每次访问首页时要求输入验证码,才能打开网站首页。经过分析网站,发现如果你拿到一个当期可用的Cooikes后,你就可以一直爬数据,且并不会触发百度验证输入

反爬经验与理论基础

完整的反爬体系有三大部分工作要做:感知识别、策略分析、监控封禁。数据支撑:爬虫指纹、设备指纹、风险UA、IP库等,不同端指纹的mapping等。

使用Node.js爬取任意网页资源并输出高质量PDF文件到本地

本文适合无论是否有爬虫以及 Node.js 基础的朋友观看~如果你是一名技术人员,那么可以看我接下来的文章,否则,请直接移步到我的 github 仓库,直接看文档使用即可

点击更多...

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!