在處理數據採集過程中,相信大家都會遇到動態網站的採集,如果是幾個或者幾十個,都可以通過抓包,分析數據流直接獲取數據,但是當有幾千或者幾萬個的時候,抓包就顯得太過於浪費金錢和時間了。這也是Pyppeteer、selenium、PhantomJS等存在的原因。今天主要介紹一下Pyppeteer。
Pyppeteer其實是Puppeteer的Python版本,下面簡單介紹下Pyppeteer的兩大特點,chromium瀏覽器和asyncio框架:
1).chromium
Chromium是一款獨立的瀏覽器,是Google為發展自家的瀏覽器Google Chrome而開啟的計劃,相當於Chrome的實驗版,Chromium的穩定性不如Chrome但是功能更加豐富,而且更新速度很快,通常每隔數小時就有新的開發版本發佈。
Pyppeteer的web自動化是基於
來實現的,由於chromium中某些特性的關係,Pyppeteer的安裝配置非常簡單,關於這一點稍後我們會詳細介紹。
2).asyncio
asyncio是Python的一個異步協程庫,自3.4版本引入的標準庫,直接內置了對異步IO的支持,號稱是Python最有野心的庫,官網上有非常詳細的介紹。
由於Pyppeteer是基於asyncio實現的,所以它本身就支持異步操作,執行效率得到大幅提升。
3).安裝
使用pip install pyppeteer命令就能完成pyppeteer庫的安裝,至於chromium瀏覽器,只需要一條pyppeteer-install命令就會自動下載對應的最新版本chromium瀏覽器到pyppeteer的默認位置。
如果不運行pyppeteer-install命令,在第一次使用pyppeteer的時候也會自動下載並安裝chromium瀏覽器,效果是一樣的。總的來說,pyppeteer比起selenium省去了driver配置的環節。
當然,出於某種原因,也可能會出現chromium自動安裝無法順利完成的情況,這時可以考慮手動安裝:
首先,從下列網址中找到自己系統的對應版本,下載chromium壓縮包;
1·'linux':'https://storage.googleapis.com/chromium-browser-snapshots/Linux_x64/575458/chrome-linux.zip'
2·'mac':'https://storage.googleapis.com/chromium-browser-snapshots/Mac/575458/chrome-mac.zip'
3·'win32':'https://storage.googleapis.com/chromium-browser-snapshots/Win/575458/chrome-win32.zip'
4·'win64':'https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/575458/chrome-win32.zip'
4).使用
下面是我基於工作中常用到的方法,整理的一個公共類,大家可以借鑑一下:
'''
Created on Jul 27, 2019
@author: admin
'''
import asyncio, tkinter, traceback
import base64, time, random
from pyppeteer import launch
from com.fy.utils.http.UserAgentUtils import UserAgentUtils
from com.fy.utils.hash.HashUtils import Hash_Utils
from com.fy.utils.file.FileUtils import File_Utils
class PyppeteerBrowser:
def __init__(self):
self.hash = Hash_Utils()
self.url = None
self.ua = UserAgentUtils()
#"""使用tkinter獲取屏幕大小""")
def screen_size(self):
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
#構造一個瀏覽器對象; ; 如果需要每次初始化新的瀏覽器對象,則userDataDir路徑必須不同,否則,始終是在第一次初始化的瀏覽器對象上進行操作,且容易出異常;
async def getbrowser(self, headless=False, userDataDir=None):
'''
參數:
•ignoreHTTPSErrors(bool):是否忽略 HTTPS 錯誤。默認為 False
•headless(bool):是否在無頭模式下運行瀏覽器。默認為 True除非appMode或devtools選項True
•executablePath (str):運行 Chromium 或 Chrome 可執行文件的路徑,而不是默認捆綁的 Chromium。如果指定之後就不需要使用默認的 Chromium 了,可以指定為已有的 Chrome 或 Chromium。
•slowMo (int | float):通過傳入指定的時間,可以減緩 Pyppeteer 的一些模擬操作。 (按指定的毫秒數減慢 pyppeteer 操作。)
•args (List [str]):傳遞給瀏覽器進程的附加參數(標誌)。
•dumpio(bool):是否管道瀏覽器進程 stdout 和 stderr 進入process.stdout和process.stderr。默認為False。為 True時,可以解決chromium瀏覽器多開頁面卡死問題。
•userDataDir (str):用戶數據目錄的路徑。即用戶數據文件夾,即可以保留一些個性化配置和操作記錄。(比如登錄信息等;可以在以後打開時自動登錄;)
•env(dict):指定瀏覽器可見的環境變量。默認與 python 進程相同。
•devtools(bool):是否為每個選項卡自動打開 DevTools 面板。如果是此選項True,headless則將設置該選項 False。
•logLevel(int | str):用於打印日誌的日誌級別。默認值與根記錄器相同。
•autoClose(bool):腳本完成時自動關閉瀏覽器進程。默認為True。
•loop(asyncio.AbstractEventLoop):事件循環(實驗)。
•args:常用的有['--no-sandbox','--disable-gpu', '--disable-setuid-sandbox','--window-size=1440x900']
•dumpio: 不知道為什麼,如果不加 dumpio=True 有時會出現瀏覽器卡頓
•autoClose:默認就好,不過如果你需要保持瀏覽器狀態,可以不關閉,下次直接連接這個已存在的瀏覽器
ignoreDefaultArgs (bool): 不使用 Pyppeteer 的默認參數,如果使用了這個參數,那麼最好通過 args 參數來設定一些參數,否則可能會出現一些意想不到的問題。這個參數相對比較危險,慎用。
handleSIGINT (bool): 是否響應 SIGINT 信號,也就是可以使用 Ctrl + C 來終止瀏覽器程序,默認是 True。
handleSIGTERM (bool): 是否響應 SIGTERM 信號,一般是 kill 命令,默認是 True。
handleSIGHUP (bool): 是否響應 SIGHUP 信號,即掛起信號,比如終端退出操作,默認是 True。
launch_kwargs = {
# 控制是否為無頭模式
"headless": False,
# chrome啟動命令行參數
"args": [
# 瀏覽器代理 配合某些中間人代理使用
"--proxy-server=http://127.0.0.1:8008",
# 最大化窗口
"--start-maximized",
# 取消沙盒模式 沙盒模式下權限太小
"--no-sandbox",
# 不顯示信息欄 比如 chrome正在受到自動測試軟件的控制 ...
"--disable-infobars",
# log等級設置 在某些不是那麼完整的系統裡 如果使用默認的日誌等級 可能會出現一大堆的warning信息
"--log-level=3",
# 設置UA
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
],
# 用戶數據保存目錄 這個最好也自己指定一個目錄
# 如果不指定的話,chrome會自動新建一個臨時目錄使用,在瀏覽器退出的時候會自動刪除臨時目錄
# 在刪除的時候可能會刪除失敗(不知道為什麼會出現權限問題,我用的windows) 導致瀏覽器退出失敗
# 然後chrome進程就會一直沒有退出 CPU就會狂飆到99%
"userDataDir": "",
}
'''
print("構造瀏覽器對象開始...")
args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"]
parameters = {}
if userDataDir == None:
parameters = {'headless': headless, #是否打開瀏覽器;False:打開瀏覽器;True:進程中運行;
'args': args,
'dumpio': True #'dumpio': True:解決chromium瀏覽器多開頁面卡死問題。
}
else:
parameters = {'headless': headless, #是否打開瀏覽器;False:打開瀏覽器;True:進程中運行;
'args': args,
"userDataDir": userDataDir,
'dumpio': True #'dumpio': True:解決chromium瀏覽器多開頁面卡死問題。
}
#注意:同一個用戶目錄(userDataDir)不能被兩個chrome進程使用,如果你要多開,記得分別指定用戶目錄。否則會報編碼錯誤。
self.browser = await launch(parameters)
self.page = await self.browser.newPage()#在此瀏覽器上創建新頁面並返回其對象。
width, height = self.screen_size()
# 設置網頁可視區域大小
await self.page.setViewport({
"width": width,
"height": height
})
# 是否啟用JS,enabled設為False,則無渲染效果
await self.page.setJavaScriptEnabled(enabled=True)
#設置請求頭userAgent
await self.page.setUserAgent(self.ua.getheaders())
await self.preventCheckWebdriver(self.page)
print("構造瀏覽器對象完畢....", self.page)
#獲取當前操作的界面
async def getPage(self):
return self.page
#獲取當前page對象的鏈接;
async def getCurUrl(self, page):
if page == None:
page = self.page
return page.url
#打開一個新的界面;)
async def getnewpage(self):
return await self.browser.newPage()
#獲取當前操作的界面重新加載
async def reload(self):
await self.page.reload()
#當前操作界面返回
async def goBack(self):
await self.page.goBack()
#獲取當前操作的界面的URL
async def getPageUrl(self):
await self.page.url()
#打開連接;
async def open(self, url, timeout=60):
try:
if url == None:
print("當前傳入的【url】不能為空,參數錯誤!!")
self.url = url
print("打開網頁:" + (url))
self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)})#打開連接;
await asyncio.sleep(1)#強行等待3秒
status = self.res.status
curUrl = self.page.url
await self.preventCheckWebdriver(self.page)
return status, curUrl
except:return 404, None
#
#防止 webdriver 檢測,如淘寶登錄。其實淘寶主要通過 window.navigator.webdriver 來對 webdriver 進行檢測,所以我們只需要使用 JavaScript 將它設置為 false 即可
async def preventCheckWebdriver(self, page):
if page == None:
page = self.page
# 替換淘寶在檢測瀏覽時採集的一些參數。
# 就是在瀏覽器運行的時候,始終讓window.navigator.webdriver=false
# navigator是windiw對象的一個屬性,同時修改plugins,languages,navigator
await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''') # 以下為插入中間js,將淘寶會為了檢測瀏覽器而調用的js修改其結果。
await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
#關閉當前打開的瀏覽器;
async def closeBrowser(self, browser):
if browser == None:
browser = self.browser
try:
await browser.close()
except:pass
#關閉當前打開的瀏覽器中的一個界面;
async def closePage(self, page):
if page == None:
page = self.page
await page.close()
#關閉當前打開的瀏覽器中的某一個界面;
async def closeNumPage(self, number:"號碼從0開始"):
pages = await self.browser.pages()
await pages[number].close()
return True
#關閉除了最後一個所有的界面;
async def retainLastPage(self):
pages = await self.browser.pages()
num = 0
for page in pages:
if num != (len(pages) - 1):
await page.close()
else:
self.page = page
num += 1
#獲取當前打開頁面的響應狀態
async def gerReponseStatus(self):
try:return self.res.status # 響應狀態
except:return 200
#截個圖
async def screenshot(self, page):
hashCode = self.hash.getMd5Hash(self.url)
if page == None:
page = self.page
await page.screenshot({'path': './screenshots/' + str(hashCode) + '.png'})
# 得到響應頭;
async def getHeader(self):
return self.res.headers # 響應頭;
# 滾動到頁面底部
async def scrollToButtom(self, page):
if page == None:
page = self.page
await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
print("滑動到當前界面底部【完畢】")
# 獲取當前頁面的cookie
async def getCookies(self, page):
if page == None:
page = self.page
return await page.cookies()
# 獲取登錄後cookie尚未測試是否正常;
async def getCookieStr(page):
if page == None:
page = self.page
cookies_list = await page.cookies()
cookies = ''
for cookie in cookies_list:
str_cookie = '{0}={1};'
str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
cookies += str_cookie
print(cookies)
# 將cookie 放入 cookie 池 以便多次請求 封賬號 利用cookie 對搜索內容進行爬取
return cookies
# 獲取當前頁面的cookie
async def setCookies(self, page, cookies):
if page == None:
page = self.page
return await page.setCookie(*cookies)
# 獲取所有 html源碼
async def getHtml(self, page):
if page == None:
page = self.page
return (await page.content())
#當前頁標題
async def getCurPageTitle(self, page):
if page == None:
page = self.page
return (await page.title())
#獲取對象屬性值;
async def getElementFieldValue(self, page, element, field):
if element == None:
print("當前傳入的【element】不能為空,參數錯誤!!")
return None
if field == None:
print("當前傳入的【field】不能為空,參數錯誤!!")
return None
if page == None:
page = self.page
if str(type(element)) == "<class>":/<class>
print("當前傳入的【element】不是單個對象,為list集合,參數錯誤!!")
return None
fieldValue = await (await element.getProperty(field)).jsonValue()
return fieldValue
#獲取當前界面的寬、高、像素大小比率三個值
async def getPageWidthHight(self, page):
if page == None:
page = self.page
return await page.evaluate('''() => {
return {
width: document.documentElement.clientWidth,
height: document.documentElement.clientHeight,
deviceScaleFactor: window.devicePixelRatio,
}
}''')
#獲取當前瀏覽器的所有界面集合;
async def getCurBrowserAllPages(self):
return await self.browser.pages()
#獲取當前界面中某個元素的內容;
async def getElementsByXpaths(self, page, xpath:'如://div[@class="title-box"]/a'):
if xpath == None:
print("當前傳入的【xpath】不能為空,參數錯誤!!")
return None
if page == None:
page = self.page
elemList = None
try:elemList = await page.xpath(xpath)
except:
print("獲取xpath路徑為【" + str(xpath) + "】的標籤對象異常...")
return elemList#返回類型為:list集合;
#獲取當前界面的所有內容(不帶html標籤的內容);(效果較差;)--
async def getPageText(self, page):
if page == None:
page = self.page
'''Pyppeteer的evaluate()方法只使用JavaScript字符串,該字符串可以是函數也可以是表達式,
Pyppeteer會進行自動判斷。但有時會判斷錯誤,如果字符串被判斷成了函數,並且報錯,
可以添加選項force_expr=True,強制Pyppeteer作為表達式處理。'''
return await page.evaluate('document.body.textContent', force_expr=True)
#獲取元素內容; --
async def getElementText(self, page, element):
if element == None:
print("當前傳入的【element】不能為空,參數錯誤!!")
return None
if page == None:
page = self.page
if str(type(element)) == "<class>":/<class>
print("當前傳入的【element】不是單個對象,為list集合,參數錯誤!!")
return None
return await page.evaluate('(element) => element.textContent', element)
#通過selector獲取元素內容; --
async def getElementBySelector(self, page , selector):
if selector == None:
print("當前傳入的【selector】不能為空,參數錯誤!!")
return None
if page == None:
page = self.page
return await page.querySelector(selector)
#向輸入框輸入數據 --
async def inputKw(self, page, selector:"如:'input#kw.s_ipt':獲取input標籤中id='kw',class='s_ipt'的對象。不可用xpath路徑", kw:'待輸入的關鍵詞'):
if kw == None:
print("當前傳入的【kw】不能為空,參數錯誤!!")
return None
if selector == None:
print("當前傳入的【selector】不能為空,參數錯誤!!")
return None
if page == None:
page = self.page
mylist = [1.55, 0.798, 1.187]
if len(kw) <= 5:
mylist = [0.695, 0.798, 1.087, 0.343, 0.4067]
else:
mylist = [1.095, 0.798, 1.127, 1.0543, 1.1267, 0.8067]
for i in str(kw):#逐個字符輸入,減少被識別為機器的改了;
await page.type(selector, i)
time.sleep(random.choice(mylist))
return None
#鼠標單擊某一個元素; --
async def clickElement(self, page, selector:"如:'input#kw.s_ipt':獲取input標籤中id='kw',class='s_ipt'的對象。。不可用xpath路徑"):
if selector == None:
print("當前傳入的【selector】不能為空,參數錯誤!!")
if page == None:
page = self.page
await page.click(selector)#如果selector獲取的對象是list集合,則執行第一個元素的點擊;
#清空某個input的值
async def removeInputValue(self, page, idValue):
if idValue == None:
print("當前傳入的【idValue】不能為空,參數錯誤!!")
if page == None:
page = self.page
await page.evaluate("document.querySelector('#" + str(idValue) + "').value=''")
print("清空【" + str(idValue) + "】的內容")
#
async def clickByEle(self, ele):
if ele == None:
return
return ele.click()
#獲取當前瀏覽器打開的【最後一個】界面對象
async def getLastPage(self):
pages = await self.browser.pages()
return pages[-1]
#獲取當前瀏覽器打開的【最後一個】界面對象
async def getPageTotal(self):
pages = await self.browser.pages()
return len(pages)
#獲取當前瀏覽器打開的【最一個】界面對象
async def getFirstPage(self):
pages = await self.browser.pages()
return pages[0]
#獲取當前界面中所有的frame對象
async def getAllFrames(self, page):
if page == None:
page = self.page
return page.frames
async def getScreenshotByEle(self, page, ele, screenshotFilePath:"目前測試只有.png圖片可正常生成,jpg異常;"):
picture = ''
try:
fu = File_Utils(None)
fu = File_Utils(fu.getParentDir(screenshotFilePath))
if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()#如果圖片的保存目錄不存在,則創建;
# 進行截圖
time.sleep(3)
print("驗證碼路徑:", screenshotFilePath)
try:
for _ in range(6):
clip = await ele.boundingBox()
picture = base64.b64encode(await page.screenshot({
'path': screenshotFilePath, # 圖片路徑, 不指定就不保存
'clip': clip, # 指定圖片位置,大小
# 'encoding': 'base64',# 返回的圖片格式, 默認二進制
}))
if picture != '':
break
except Exception as e:
print('截圖獲取失敗')
print(traceback.print_exc())
except Exception as e:
print('截圖獲取失敗')
print(traceback.print_exc())
return picture
ppy = PyppeteerBrowser()
from com.fy.utils.date.DateUtils import Date_Utils
du = Date_Utils()
userDataDir = "d://pyppeteer" + str(du.getCurrentTimeLong())
asyncio.get_event_loop() .run_until_complete(ppy.getbrowser(False, userDataDir))
asyncio.get_event_loop() .run_until_complete(ppy.open("http://caifuhao.eastmoney.com/discover/finance", 60))
time.sleep(2)
print(asyncio.get_event_loop() .run_until_complete(ppy.getCookies(None)))
asyncio.get_event_loop() .run_until_complete(ppy.closeBrowser(None))
閱讀更多 採集小鋼炮 的文章