乾貨分享,程序員自建代理ip池,輕鬆爬取數據不封ip沒有反爬蟲。

代理池主要分為4個模塊:存儲模塊、獲取模塊、檢測模塊、接口模塊

無私分享全套Python爬蟲乾貨,如果你也想學習Python,@ 私信小編獲取

存儲模塊

這裡我們使用Redis的有序集合,集合的每一個元素都是不重複的。另外,有序集合的每一個元素都有一個分數字段。

具體代碼實現如下(ippool_save.py)

<code>MAX_SCORE = 

100

MIN_SCORE =

0

INITIAL_SCORE =

10

REDIS_HOST =

'localhost'

REDIS_PORT =

6379

REDIS_PASSWORD =

None

REDIS_KEY =

'proxies'

import

redis

from

random

import

choice

class

PoolEmptyError

()

:

def

__str__

(self)

:

return

PoolEmptyError

class

RedisClient

(object)

:

def

__init__

(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD)

:

''' 初始化 :param host:地址 :param port: 端口號 :param password: 密碼 '''

self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=

True

)

def

add

(self,proxy,score=INITIAL_SCORE)

:

''' 添加代理,設置初始分數 :param proxy: 代理 :param score: 分數 :return: 添加結果 '''

if

not

self.db.zscore(REDIS_KEY,proxy):

return

self.db.zadd(REDIS_KEY,{proxy:score})

def

random

(self)

:

''' 隨即獲取有效代理,首先嚐試獲取最高分數代理,如果最高分數不存在,則按照排名獲取 :return: '''

result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)

if

len(result):

return

choice(result)

else

: result = self.db.zrevrange(REDIS_KEY,

0

,

100

)

if

len(result):

return

choice(result)

else

:

raise

PoolEmptyError

def

decrease

(self, proxy)

:

''' 代理值減一分,分數小於最小值,則代理刪除 :param proxy: 代理 :return: 修改後的代理分數 '''

score = self.db.zscore(REDIS_KEY,proxy)

if

score

and

score>MIN_SCORE: print(

"代理"

,proxy,

"當前分數"

,score,

"減1"

)

return

self.db.zincrby(REDIS_KEY,

-1

,proxy)

else

: print(

"代理"

,proxy,

"當前分數"

,score,

"移除"

)

return

self.db.zrem(REDIS_KEY,proxy)

def

exists

(self,proxy)

:

''' 判斷是否存在 :param proxy: 代理 :return: 是否存在 '''

return

not

self.db.zscore(REDIS_KEY,proxy) ==

None

def

max

(self,proxy)

:

''' 將代理設置為MAX_SCORE :param proxy: 代理 :return: 設置結果 '''

print(

"代理"

,proxy,

"可用,設置為"

,MAX_SCORE)

return

self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})

def

count

(self)

:

''' 獲取數量 :return:數量 '''

return

self.db.zcard(REDIS_KEY)

def

all

(self)

:

''' 獲取全部代理 :return: 全部代理列表 '''

return

self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE) /<code>

獲取模塊

獲取模塊的邏輯相對簡單,首先要定義一個ippool_crawler.py來從各大網站抓取,具體代碼如下:

<code>

import

json

import

requests

from

lxml

import

etree

from

ippool_save

import

RedisClient

class

ProxyMetaclass

(type)

:

def

__new__

(cls, name,bases,attrs)

:

count =

0

attrs[

'__CrawlFunc__'

] = []

for

k,v

in

attrs.items():

if

'crawl_'

in

k: attrs[

'__CrawlFunc__'

].append(k) count+=

1

attrs[

'__CrawlFuncCount__'

] = count

return

type.__new__(cls,name,bases,attrs)

class

Crawler

(object,metaclass=ProxyMetaclass)

:

def

__init__

(self)

:

self.proxy = RedisClient().random() self.proxies = {

'http'

:

'http://'

+ self.proxy,

'https'

:

'https://'

+ self.proxy }

def

get_proxies

(self,callback)

:

proxies = []

for

proxy

in

eval(

"self.{}()"

.format(callback)): print(

'成功獲取代理'

,proxy) proxies.append(proxy)

return

proxies /<code>

我們還需要定義一個Getter類,用來動態地調用所有以crawl開頭的方法,然後獲取抓取到的代理,將其加入到數據庫存儲起來,具體代碼如下(ippool_getter.py)

<code>from ippool_save import RedisClient
from ippool_crawler import Crawler

POOL_UPPER_THRESHOLD = 

1000

class

Getter

():

def

__init__

(

self

)

:

self

.redis = RedisClient()

self

.crawler = Crawler()

def

is_over_threshold

(

self

)

:

if

self

.redis.count() >=

POOL_UPPER_THRESHOLD:

return

True

else:

return

False

def

run

(

self

)

: print(

"獲取器開始執行"

)

if

not

self

.is_over_threshold():

for

callback_label

in

range(

self

.crawler.__CrawlFuncCount_

_

): callback =

self

.crawler.__CrawlFunc_

_

[callback_label] proxies =

self

.crawler.get_proxies(callback)

for

proxy

in

proxies:

self

.redis.add(proxy) /<code>

檢測模塊

我們已經將各個網站的代理都抓取下來了現在就需要一個檢測模塊來對所有代理進行多輪檢測。

<code>VALID_STATUS_CODES = [

200

] TEST_URL =

"http://www.baidu.com"

BATCH_TEST_SIZE =

100

from

ippool_save

import

RedisClient

import

aiohttp

import

asyncio

import

time

class

Tester

(object)

:

def

__init__

(self)

:

self.redis = RedisClient()

async

def

test_single_proxy

(self,proxy)

:

conn = aiohttp.TCPConnector(verify_ssl=

False

)

async

with

aiohttp.ClientSession(connector=conn)

as

session:

try

:

if

isinstance(proxy,bytes): proxy = proxy.decode(

'utf-8'

) real_proxy =

'http://'

+ proxy print(

"正在測試"

,proxy)

async

with

session.get(TEST_URL,proxy=real_proxy,timeout=

15

)

as

response:

if

response.status

in

VALID_STATUS_CODES: self.redis.max(proxy) print(

'代理可用'

,proxy)

else

: self.redis.decrease(proxy) print(

'請求響應碼不合法'

,proxy)

except

(TimeoutError,ArithmeticError): self.redis.decrease(proxy) print(

'代理請求失敗'

,proxy)

def

run

(self)

:

print(

'測試開始運行'

)

try

: proxies = self.redis.all() loop = asyncio.get_event_loop()

for

i

in

range(

0

,len(proxies),BATCH_TEST_SIZE): test_proxies = proxies[i:i+BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy)

for

proxy

in

test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(

5

)

except

Exception

as

e: print(

'測試器發生錯誤'

, e.args)/<code>

接口模塊

為了更方便地獲取可用代理,我們增加了一個接口模塊。

使用Flask來實現這個接口模塊,實現代碼如下(ippool_api.py)

<code>

from

flask

import

Flask,g

from

ippool_save

import

RedisClient __all__ = [

'app'

] app = Flask(__name__)

def

get_conn

()

:

if

not

hasattr(g,

'redis'

): g.redis = RedisClient()

return

g.redis

def

index

()

:

return

'

Welcome to Proxy Pool System

'

def

get_proxy

()

:

conn = get_conn()

return

conn.random()

def

get_counts

()

:

conn = get_conn()

return

str(conn.count())

if

__name__ ==

'__main__'

: app.run()/<code>

調度模塊

調度模塊就是調用以上定義的3個模塊,將這3個模塊通過多進程的形式運行起來。

最後,只需要調用Scheduler的run()方法即可啟動整個代碼池。

<code>TESTER_CYCLE = 

20

GETTER_CYCLE =

20

TESTER_ENABLED =

True

GETTER_ENABLED =

True

API_ENABLED =

True

from

multiprocessing

import

Process

from

ippool_api

import

app

from

ippool_getter

import

Getter

from

ippool_check

import

Tester

import

time

class

Scheduler

()

:

def

schedule_tester

(self,cycle=TESTER_CYCLE)

:

tester = Tester()

while

True

: print(

'測試器開始運行'

) tester.run() time.sleep(cycle)

def

schedule_getter

(self,cycle=GETTER_CYCLE)

:

getter = Getter()

while

True

: print(

'開始抓取代理'

) getter.run() time.sleep(cycle)

def

schedule_api

(self)

:

app.run()

def

run

(self)

:

print(

'代理池開始運行'

)

if

TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start()

if

GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start()

if

API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()

if

__name__ ==

'__main__'

: Scheduler().run() /<code>

為了幫助大家更輕鬆的學好Python,我給大家分享一套Python學習資料,希望對正在學習的你有所幫助!

獲取方式:關注並私信小編 “ 學習 ”,即可免費獲取!


乾貨分享,程序員自建代理ip池,輕鬆爬取數據不封ip沒有反爬蟲。


分享到:


相關文章: