您的位置:首页 >新闻资讯

多线程、异步与动态代理使用代理IP实现网络爬虫

来源:本站 作者:admin 时间:2019-06-06 17:21:03

在使用代理ip采集数据的时候,经常会碰到有反采集策略规则的WAF,使得本来很简单事情变得复杂起来,使得很多人无法获取到。黑名单、限制访问频率、检测HTTP头等这些都是常见的策略,不按常理出牌的也有检测到爬虫代理ip行为,就往里注入假数据返回,以假乱真,但为了良好的用户体验,一般都不会这么做。在遇有反采集、IP地址不够的时候,通常我们想到的是使用大量代理解决这个问题,因代理具有时效、不稳定、访问受限等不确定因素,使得有时候使用起来总会碰到一些问题。

 

进入正题,使用Python3简单实现一个单机版多线程/异步+多代理的爬虫,没有分布式、不谈高效率,先跑起来再说。


代理IP实现网络爬虫


基础知识

 

1.1代理类型

 

使用代理转发数据的同时,代理服务器也会改变REMOTE_ADDR、HTTP_VIA、HTTP_X_FORWARDED_FOR这三个变量发送给目标服务器,一般做爬虫的选择优先级为高匿>混淆>匿名>透明>高透

 

高透代理(HighTransparentProxy):单纯地转发数据

 

REMOTE_ADDR=YourIP

 

HTTP_VIA=YourIP

 

HTTP_X_FORWARDED_FOR=YourIP

 

透明代理(TransparentProxy):知道你在用代理,知道你IP

 

REMOTE_ADDR=ProxyIP

 

HTTP_VIA=ProxyIP

 

HTTP_X_FORWARDED_FOR=YourIP

 

匿名代理(AnonymousProxy):知道你用代理,不知道你IP

 

REMOTE_ADDR=ProxyIP

 

HTTP_VIA=ProxyIP

 

HTTP_X_FORWARDED_FOR=YourProxy

 

高匿代理(HighAnonymityProxy):不知道你在用代理

 

REMOTE_ADDR=ProxyIP

 

HTTP_VIA=N/A

 

HTTP_X_FORWARDED_FOR=N/A

 

混淆代理(DistortingProxies):知道你在用代理,但你的IP是假的

 

REMOTE_ADDR=ProxyIP

 

HTTP_VIA=ProxyIP

 

HTTP_X_FORWARDED_FOR=RandomIP

 

1.2代理协议

 

一般有HTTP/HTTPS/Socks类型,Web爬虫一般只用到前面两者。

 

1.3动态代理

 

实现动态代理一般是建立代理池,使用的时候通常有以下几种方式

 

本地存储调用,将代理保存到数据库中,需要时载入,爬虫可以作为验证代理的一部分,但代理不佳的情况下效率并不高

 

代理集中转发,通过建立本地代理自动切换转发,调用方便,但需要做额外的代理验证程序

 

通过接口获取,一般通过购买服务获取,代理质量较佳,可以结合以上两种方式使用

 

1.4多线程

 

多线程是实现任务并发的方式之一。在Python中实现多线程的方案比较多,最常见的是队列和线程类

 

que=queue.Queue()

 

defworker():

 

whilenotque.empty():

 

do(que.get())

 

threads=[]

 

nloops=256

 

#startthreads

 

foriinrange(nloops):

 

t=threading.Thread(target=worker)

 

art()

 

threads.append(t)

 

#waitforall

 

foriinrange(nloops):

 

threads[i].join()

 

另外也可以使用map实现,map可以通过序列来实现两个函数之间的映射,并结合multiprocessing.dummy实现并发任务

 

frommultiprocessing.dummyimportPoolasThreadPool

 

urls=['/1',

 

'/2']

 

pool=ThreadPool(256)#poolsize

 

res=map(quest.urlopen,urls)

 

ose()

 

in()

 

似乎更简洁,多线程实现还有其他方式,具体哪一种更好,不能一概而论,但多线程操作数据库可能会产生大量的数据库TCP/socket连接,这个需要调整数据库的最大连接数或采用线程池之类的解决。

 

1.5异步IO

 

asyncio是在Python3.4中新增的模块,它提供可以使用协程、IO复用在单线程中实现并发模型的机制。async/await这对关键字是在Python3.5中引入的新语法,用于协成方面的支持,这无疑给写爬虫多了一种选择,asyncio包括一下主要组件:

 

事件循环(Eventloop)

 

I/O机制

 

Futures

 

Tasks

 

一个简单例子:

 

que=asyncio.Queue()

 

urls=['/1',

 

'/2']

 

asyncdefwoker():

 

whileTrue:

 

q=awaitque.get()

 

awaitdo(q)

 

finally:

 

que.task_done()

 

asyncdefmain():

 

awaitasyncio.wait([que.put(i)foriinurls])

 

tasks=[asyncio.ensure_future(self.woker())]

 

in()

 

fortaskintasks:

 

ncel()

 

loop=asyncio.get_event_loop()

 

n_until_complete(main())

 

ose()

 

使用队列是因为后面还要往里面回填数据,注:asyncio中的队列Queue不是线程安全的

 

获取与存储数据

 

2.1加代理的GET请求

 

代理类型支持http、https,其他类型没有去测试

 

pxy={'http':'8.8.8.8:80'}

 

proxy_handler=quest.ProxyHandler(pxy)

 

opener=quest.build_opener(proxy_handler)

 

dheaders=[('User-agent','Mozilla/5.0'),('Host','')]

 

html=opener.open(url).read().decode('utf-8','ignore')

 

aiohttp中的代理类型目前好像只支持http,测试https会抛处异常

 

conn=aiohttp.ProxyConnector(proxy="")

 

session=aiohttp.ClientSession(connector=conn)

 

asyncwithsession.get('')asresp:

 

print(ad().decode('utf-8','ignore'))

 

POST请求实现也类似

 

2.2解码

 

有时候网页中夹有一些特殊的字符导致无法正常解码而掉丢整条记录,可以加ignore参数忽略掉

 

>>>b'\xff'.decode('utf8')

 

Traceback(mostrecentcalllast):

 

File"",line1,inmodule>

 

UnicodeDecodeError:'utf-8'codeccan'tdecodebyte0xffinposition11:invalidstartbyte

 

>>>b'\xff'.decode('utf8','ignore')

 

''

 

2.3HTML解析

 

BeautifulSoup

 

frombs4importBeautifulSoup

 

soup=BeautifulSoup(html,'lxml')

 

使用正则匹配时应使用懒惰模式使匹配结果更准确

 

importre

 

pile(r'(.*?)').findall(html)

 

2.4保存副本

 

在采集的同时建议保存一份副本在本地,如有某个元素匹配错了,可以从本地快速获取,而无须再去采集。

 

withopen('ml','wt')asf:

 

f.write(html)

 

2.5数据存储

 

以MySQL为例,一般分3张表

 

data存放要采集的数据

 

proxy存放代理

 

temp临时表,一般用来存放无效的任务id

 

2.6连接数据库

 

多线程时使用pymysql

 

importpymysql

 

conn=nnect(host='127.0.0.1',user='test',passwd='test',db='test'

 

,unix_socket='/var/run/mysqld/ck',charset='utf8')

 

异步使用aiomysql

 

importaiomysql

 

pool=awaitaiomysql.create_pool(host='127.0.0.1',db='test',user='test',password='test'

 

,unix_socket='/var/run/mysqld/ck',charset='utf8'

 

,loop=loop,minsize=1,maxsize=20)

 

维持爬虫

 

所谓维持,就是得保证爬虫能正常地进行任务,而不会被异常中断而重启无法续抓、低代理情况下无法继续进行等情况。

 

3.1整体流程

 

循环任务队列至空,代理循环载入,一个任务配一个代理,丢弃无法使用的代理并将任务填回队列,一个简单的图

 

3.2更新代理

 

将代理记录存到MySQL数据库,每隔一定时间脚本就去数据库抽取载入脚本。更新代理一般可以通过以下两种方式

 

加锁堵塞线程载入

 

为了防止在在proxy队列为空时其他线程也进入造成多次加载,使用加锁堵塞线程加载完毕再释放

 

lock=threading.Lock()

 

iflock.acquire():

 

ifpxy_queue.empty():

 

awaitload_proxy()

 

lease()

 

将任务分割循环启动

 

有点类似于执行完sleep(100)后再次初始化执行从而将代理更新进去,直到结束exit掉进程

 

whileTrue:

 

main()

 

time.sleep(100)

 

3.3验证代理

 

使用代理一般会有以下几种情况

 

无法建立连接

 

可以连接,请求超时

 

正常返回(包括200,30x,40x,50x)

 

你看到的未必是真的,先来看个例子

 

明明有资源,代理却给你来个404,为了解决这个问题,在采集前先对代理进行首次测试,通过了再次使用

 

#pxy['test']为数据表字段值

 

ifpxy['test']:

 

data=get_html(test_url,pxy)

 

ifdata['status']==200andtest_txtindata['html']:

 

else:

 

return

 

3.4保持代理

 

代理较少时,通过时间延时使代理隔30~60秒去访问一次目标,这样就不会触发拦截。

 

另外一种情况代理的稳定性较差,代理数量较少的情况,可以通过计数的方式维持代理,比如:一个代理连续三次出现不可连接或超时再做排除,成功返回从新计算,不至于一下子把代理全部干掉了。

 

3.5快速启动

 

中途中断重启脚本,为了将已经获取的和已经排除的目标id快速去除,可以一次性查询出来用集合差获取未完成的任务id,举个栗子:

 

pid_set=set([r['pid']forrinres])ifreselseset()

 

task_set=set([rforrinrange(1,max_pid)])

 

#取集合差为剩余任务pid

 

task_set=task_set-pid_set

 

差集s-t的时间复杂度是O(len(s)),这比使用for和xins再append()要快许多。

 

3.6异常记录

 

脚本运行在可能出错的地方加try…except,在logging记录时加exc_info参数记录异常信息到日志,以便分析

 

except:

 

logger.error(sql,exc_info=True)

 

案例演示

 

脑补完毕,我以采集Freebuf.COM所有文章为例

 

4.1分析

 

这就不太符合爬虫可遍历规则,毕竟不知道文章id属于哪个分类,总不能把分页爬一次再抓一次,可再寻找寻找。投过搞的童鞋都知道有个文章预览功能,它的链接是这样的:

 

它并没有分类,经过测试,已经发表的文章依然可以使用预览功能,也可以通过文章ID遍历去提前查看哪些未发表的文章

 

4.2提取数据

 

把HTML源码GET回来后,那就是提取所要的数据了

 

使用BeautifulSoup对数据进行查找提取,从一篇文章页面可以获得

 

是否金币/现金奖励

 

文章主体内容

 

其他数据按需要而定,数据表结构按以上设计,数据解析部分

 

soup=BeautifulSoup(html,'lxml')

 

arct=nd(class_='articlecontent')

 

head=nd(class_='title')

 

title=nd('h2').get_text().strip()#文章标题

 

content=str(nd(id='contenttxt')).strip()#主题内容,HTLM格式

 

......

 

4.3排除错误

 

200、301、302、404、500这几个一般是常用的,错误也有可能是通过200返回的

 

#正常返回,排除该pid

 

ifstatusin(301,302,404):

 

sert_temp(pid,status)

 

#一般为代理不能用或400,403,502由代理原因引起的,将文章pid填回任务队列

 

ifstatus!=200:

 

awaitasyncio.wait([self.pid_queue.put(pid)])

 

returnawaitself.update_proxy('status','status+1',pxy)

 

4.4结果分析

 

采集数据大多情况下是做统计分析,从而得到些什么有价值的信息,比如哪些文章比较受欢迎、每月发布数量走势等等。有时还需要过滤掉不需要的数据,比如从抓取的数据中看有大量测试的、未审核的文章

 

判断文章是否已经审核/发表的一个简单技巧是:该文章是否有分类

 

4.5可视化展示

 

有时候数据太多的时候,进行可视化展示更直观,可以使用Excel,前端D3.js、C3.js等工具生成图表,以下是抓取到的数据FB历年每个月发布文章数量展示图(测试量:105,000,获取有效数量:16,807,有分类数量::6,750,下图为有分类文章统计,数据不一定准确,以官方的为准)

 

总结

 

本文只是记录我的一些想法和实践,以及碰到的问题采取的一些解决办法,很多地方可能写的不完善或不妥,或有更好的解决方案,只有把话题引出来了,才有机会去发现、改进。脚本是前段时间准备离职的时候采用多线程去抓某勾网数据时候写的,不常写爬虫,总会遇到或忽略某些问题。异步方式是写文章时临时码的,写得也比较简陋,只是想简单说明下多代理的一种实现方式,毕竟没有涉及Cookie、验证码、Token、动态解析等内容。

相关文章内容简介
推荐阅读