代理池主要分為4個模塊:存儲模塊、獲取模塊、檢測模塊、接口模塊
無私分享全套Python爬蟲乾貨,如果你也想學習Python,@ 私信小編獲取存儲模塊
這裡我們使用Redis的有序集合,集合的每一個元素都是不重複的。另外,有序集合的每一個元素都有一個分數字段。
具體代碼實現如下(ippool_save.py)
<code>MAX_SCORE =100
MIN_SCORE =0
INITIAL_SCORE =10
REDIS_HOST ='localhost'
REDIS_PORT =6379
REDIS_PASSWORD =None
REDIS_KEY ='proxies'
import
redisfrom
randomimport
choiceclass
PoolEmptyError
()
:def
__str__
(self)
:return
PoolEmptyErrorclass
RedisClient
(object)
:def
__init__
(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD)
:''' 初始化 :param host:地址 :param port: 端口號 :param password: 密碼 '''
self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True
)def
add
(self,proxy,score=INITIAL_SCORE)
:''' 添加代理,設置初始分數 :param proxy: 代理 :param score: 分數 :return: 添加結果 '''
if
not
self.db.zscore(REDIS_KEY,proxy):return
self.db.zadd(REDIS_KEY,{proxy:score})def
random
(self)
:''' 隨即獲取有效代理,首先嚐試獲取最高分數代理,如果最高分數不存在,則按照排名獲取 :return: '''
result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)if
len(result):return
choice(result)else
: result = self.db.zrevrange(REDIS_KEY,0
,100
)if
len(result):return
choice(result)else
:raise
PoolEmptyErrordef
decrease
(self, proxy)
:''' 代理值減一分,分數小於最小值,則代理刪除 :param proxy: 代理 :return: 修改後的代理分數 '''
score = self.db.zscore(REDIS_KEY,proxy)if
scoreand
score>MIN_SCORE: print("代理"
,proxy,"當前分數"
,score,"減1"
)return
self.db.zincrby(REDIS_KEY,-1
,proxy)else
: print("代理"
,proxy,"當前分數"
,score,"移除"
)return
self.db.zrem(REDIS_KEY,proxy)def
exists
(self,proxy)
:''' 判斷是否存在 :param proxy: 代理 :return: 是否存在 '''
return
not
self.db.zscore(REDIS_KEY,proxy) ==None
def
max
(self,proxy)
:''' 將代理設置為MAX_SCORE :param proxy: 代理 :return: 設置結果 '''
print("代理"
,proxy,"可用,設置為"
,MAX_SCORE)return
self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})
def
count
(self)
:''' 獲取數量 :return:數量 '''
return
self.db.zcard(REDIS_KEY)def
all
(self)
:''' 獲取全部代理 :return: 全部代理列表 '''
return
self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE) /<code>
獲取模塊
獲取模塊的邏輯相對簡單,首先要定義一個ippool_crawler.py來從各大網站抓取,具體代碼如下:
<code>import
jsonimport
requestsfrom
lxmlimport
etreefrom
ippool_saveimport
RedisClientclass
ProxyMetaclass
(type)
:def
__new__
(cls, name,bases,attrs)
: count =0
attrs['__CrawlFunc__'
] = []for
k,vin
attrs.items():if
'crawl_'
in
k: attrs['__CrawlFunc__'
].append(k) count+=1
attrs['__CrawlFuncCount__'
] = countreturn
type.__new__(cls,name,bases,attrs)class
Crawler
(object,metaclass=ProxyMetaclass)
:def
__init__
(self)
: self.proxy = RedisClient().random() self.proxies = {'http'
:'http://'
+ self.proxy,'https'
:'https://'
+ self.proxy }def
get_proxies
(self,callback)
: proxies = []for
proxyin
eval("self.{}()"
.format(callback)): print('成功獲取代理'
,proxy) proxies.append(proxy)return
proxies /<code>
我們還需要定義一個Getter類,用來動態地調用所有以crawl開頭的方法,然後獲取抓取到的代理,將其加入到數據庫存儲起來,具體代碼如下(ippool_getter.py)
<code>from ippool_save import RedisClient from ippool_crawler import Crawler POOL_UPPER_THRESHOLD =1000
class
Getter
():def
__init__
(
self
):self
.redis = RedisClient()self
.crawler = Crawler()def
is_over_threshold
(
self
):if
self
.redis.count() >=POOL_UPPER_THRESHOLD:
return
Trueelse:
return
Falsedef
run
(
self
): print("獲取器開始執行"
)if
not
self
.is_over_threshold():for
callback_labelin
range(self
.crawler.__CrawlFuncCount__
): callback =self
.crawler.__CrawlFunc__
[callback_label] proxies =self
.crawler.get_proxies(callback)for
proxyin
proxies:
self
.redis.add(proxy) /<code>
檢測模塊
我們已經將各個網站的代理都抓取下來了現在就需要一個檢測模塊來對所有代理進行多輪檢測。
<code>VALID_STATUS_CODES = [200
] TEST_URL ="http://www.baidu.com"
BATCH_TEST_SIZE =100
from
ippool_saveimport
RedisClientimport
aiohttpimport
asyncioimport
timeclass
Tester
(object)
:def
__init__
(self)
: self.redis = RedisClient()async
def
test_single_proxy
(self,proxy)
: conn = aiohttp.TCPConnector(verify_ssl=False
)async
with
aiohttp.ClientSession(connector=conn)as
session:try
:if
isinstance(proxy,bytes): proxy = proxy.decode('utf-8'
) real_proxy ='http://'
+ proxy print("正在測試"
,proxy)async
with
session.get(TEST_URL,proxy=real_proxy,timeout=15
)as
response:if
response.statusin
VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用'
,proxy)else
: self.redis.decrease(proxy) print('請求響應碼不合法'
,proxy)except
(TimeoutError,ArithmeticError): self.redis.decrease(proxy) print('代理請求失敗'
,proxy)def
run
(self)
: print('測試開始運行'
)try
: proxies = self.redis.all() loop = asyncio.get_event_loop()for
iin
range(0
,len(proxies),BATCH_TEST_SIZE): test_proxies = proxies[i:i+BATCH_TEST_SIZE] tasks = [self.test_single_proxy(proxy)for
proxyin
test_proxies] loop.run_until_complete(asyncio.wait(tasks)) time.sleep(5
)except
Exceptionas
e: print('測試器發生錯誤'
, e.args)/<code>
接口模塊
為了更方便地獲取可用代理,我們增加了一個接口模塊。
使用Flask來實現這個接口模塊,實現代碼如下(ippool_api.py)
<code>from
flaskimport
Flask,gfrom
ippool_saveimport
RedisClient __all__ = ['app'
] app = Flask(__name__)def
get_conn
()
:if
not
hasattr(g,'redis'
): g.redis = RedisClient()return
g.redisdef
index
()
:return
'
Welcome to Proxy Pool System
'def
get_proxy
()
: conn = get_conn()return
conn.random()def
get_counts
()
: conn = get_conn()return
str(conn.count())if
__name__ =='__main__'
: app.run()/<code>
調度模塊
調度模塊就是調用以上定義的3個模塊,將這3個模塊通過多進程的形式運行起來。
最後,只需要調用Scheduler的run()方法即可啟動整個代碼池。
<code>TESTER_CYCLE =20
GETTER_CYCLE =20
TESTER_ENABLED =True
GETTER_ENABLED =True
API_ENABLED =True
from
multiprocessingimport
Processfrom
ippool_apiimport
appfrom
ippool_getterimport
Getterfrom
ippool_checkimport
Testerimport
timeclass
Scheduler
()
:def
schedule_tester
(self,cycle=TESTER_CYCLE)
: tester = Tester()while
True
: print('測試器開始運行'
) tester.run() time.sleep(cycle)def
schedule_getter
(self,cycle=GETTER_CYCLE)
: getter = Getter()while
True
: print('開始抓取代理'
) getter.run() time.sleep(cycle)
def
schedule_api
(self)
: app.run()def
run
(self)
: print('代理池開始運行'
)if
TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start()if
GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start()if
API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()if
__name__ =='__main__'
: Scheduler().run() /<code>
為了幫助大家更輕鬆的學好Python,我給大家分享一套Python學習資料,希望對正在學習的你有所幫助!
獲取方式:關注並私信小編 “ 學習 ”,即可免費獲取!