爬蟲系列之Pyppeteer:動態數據的剋星


在處理數據採集過程中,相信大家都會遇到動態網站的採集,如果是幾個或者幾十個,都可以通過抓包,分析數據流直接獲取數據,但是當有幾千或者幾萬個的時候,抓包就顯得太過於浪費金錢和時間了。這也是Pyppeteer、selenium、PhantomJS等存在的原因。今天主要介紹一下Pyppeteer。

Pyppeteer其實是Puppeteer的Python版本,下面簡單介紹下Pyppeteer的兩大特點,chromium瀏覽器和asyncio框架:

1).chromium

Chromium是一款獨立的瀏覽器,是Google為發展自家的瀏覽器Google Chrome而開啟的計劃,相當於Chrome的實驗版,Chromium的穩定性不如Chrome但是功能更加豐富,而且更新速度很快,通常每隔數小時就有新的開發版本發佈。

Pyppeteer的web自動化是基於

爬蟲系列之Pyppeteer:動態數據的剋星

來實現的,由於chromium中某些特性的關係,Pyppeteer的安裝配置非常簡單,關於這一點稍後我們會詳細介紹。

2).asyncio

asyncio是Python的一個異步協程庫,自3.4版本引入的標準庫,直接內置了對異步IO的支持,號稱是Python最有野心的庫,官網上有非常詳細的介紹。

由於Pyppeteer是基於asyncio實現的,所以它本身就支持異步操作,執行效率得到大幅提升。


3).安裝

使用pip install pyppeteer命令就能完成pyppeteer庫的安裝,至於chromium瀏覽器,只需要一條pyppeteer-install命令就會自動下載對應的最新版本chromium瀏覽器到pyppeteer的默認位置。

如果不運行pyppeteer-install命令,在第一次使用pyppeteer的時候也會自動下載並安裝chromium瀏覽器,效果是一樣的。總的來說,pyppeteer比起selenium省去了driver配置的環節。

當然,出於某種原因,也可能會出現chromium自動安裝無法順利完成的情況,這時可以考慮手動安裝:

首先,從下列網址中找到自己系統的對應版本,下載chromium壓縮包;

1·'linux':'https://storage.googleapis.com/chromium-browser-snapshots/Linux_x64/575458/chrome-linux.zip'

2·'mac':'https://storage.googleapis.com/chromium-browser-snapshots/Mac/575458/chrome-mac.zip'

3·'win32':'https://storage.googleapis.com/chromium-browser-snapshots/Win/575458/chrome-win32.zip'

4·'win64':'https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/575458/chrome-win32.zip'

4).使用

下面是我基於工作中常用到的方法,整理的一個公共類,大家可以借鑑一下:

'''

Created on Jul 27, 2019

@author: admin

'''

import asyncio, tkinter, traceback

import base64, time, random

from pyppeteer import launch

from com.fy.utils.http.UserAgentUtils import UserAgentUtils

from com.fy.utils.hash.HashUtils import Hash_Utils

from com.fy.utils.file.FileUtils import File_Utils

class PyppeteerBrowser:

def __init__(self):

self.hash = Hash_Utils()

self.url = None

self.ua = UserAgentUtils()

#"""使用tkinter獲取屏幕大小""")

def screen_size(self):

tk = tkinter.Tk()

width = tk.winfo_screenwidth()

height = tk.winfo_screenheight()

tk.quit()

return width, height

#構造一個瀏覽器對象; ; 如果需要每次初始化新的瀏覽器對象,則userDataDir路徑必須不同,否則,始終是在第一次初始化的瀏覽器對象上進行操作,且容易出異常;

async def getbrowser(self, headless=False, userDataDir=None):

'''

參數:

•ignoreHTTPSErrors(bool):是否忽略 HTTPS 錯誤。默認為 False

•headless(bool):是否在無頭模式下運行瀏覽器。默認為 True除非appMode或devtools選項True

•executablePath (str):運行 Chromium 或 Chrome 可執行文件的路徑,而不是默認捆綁的 Chromium。如果指定之後就不需要使用默認的 Chromium 了,可以指定為已有的 Chrome 或 Chromium。

•slowMo (int | float):通過傳入指定的時間,可以減緩 Pyppeteer 的一些模擬操作。 (按指定的毫秒數減慢 pyppeteer 操作。)

•args (List [str]):傳遞給瀏覽器進程的附加參數(標誌)。

•dumpio(bool):是否管道瀏覽器進程 stdout 和 stderr 進入process.stdout和process.stderr。默認為False。為 True時,可以解決chromium瀏覽器多開頁面卡死問題。

•userDataDir (str):用戶數據目錄的路徑。即用戶數據文件夾,即可以保留一些個性化配置和操作記錄。(比如登錄信息等;可以在以後打開時自動登錄;)

•env(dict):指定瀏覽器可見的環境變量。默認與 python 進程相同。

•devtools(bool):是否為每個選項卡自動打開 DevTools 面板。如果是此選項True,headless則將設置該選項 False。

•logLevel(int | str):用於打印日誌的日誌級別。默認值與根記錄器相同。

•autoClose(bool):腳本完成時自動關閉瀏覽器進程。默認為True。

•loop(asyncio.AbstractEventLoop):事件循環(實驗)。

•args:常用的有['--no-sandbox','--disable-gpu', '--disable-setuid-sandbox','--window-size=1440x900']

•dumpio: 不知道為什麼,如果不加 dumpio=True 有時會出現瀏覽器卡頓

•autoClose:默認就好,不過如果你需要保持瀏覽器狀態,可以不關閉,下次直接連接這個已存在的瀏覽器

ignoreDefaultArgs (bool): 不使用 Pyppeteer 的默認參數,如果使用了這個參數,那麼最好通過 args 參數來設定一些參數,否則可能會出現一些意想不到的問題。這個參數相對比較危險,慎用。

handleSIGINT (bool): 是否響應 SIGINT 信號,也就是可以使用 Ctrl + C 來終止瀏覽器程序,默認是 True。

handleSIGTERM (bool): 是否響應 SIGTERM 信號,一般是 kill 命令,默認是 True。

handleSIGHUP (bool): 是否響應 SIGHUP 信號,即掛起信號,比如終端退出操作,默認是 True。

launch_kwargs = {

# 控制是否為無頭模式

"headless": False,

# chrome啟動命令行參數

"args": [

# 瀏覽器代理 配合某些中間人代理使用

"--proxy-server=http://127.0.0.1:8008",

# 最大化窗口

"--start-maximized",

# 取消沙盒模式 沙盒模式下權限太小

"--no-sandbox",

# 不顯示信息欄 比如 chrome正在受到自動測試軟件的控制 ...

"--disable-infobars",

# log等級設置 在某些不是那麼完整的系統裡 如果使用默認的日誌等級 可能會出現一大堆的warning信息

"--log-level=3",

# 設置UA

"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",

],

# 用戶數據保存目錄 這個最好也自己指定一個目錄

# 如果不指定的話,chrome會自動新建一個臨時目錄使用,在瀏覽器退出的時候會自動刪除臨時目錄

# 在刪除的時候可能會刪除失敗(不知道為什麼會出現權限問題,我用的windows) 導致瀏覽器退出失敗

# 然後chrome進程就會一直沒有退出 CPU就會狂飆到99%

"userDataDir": "",

}

'''

print("構造瀏覽器對象開始...")

args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"]

parameters = {}

if userDataDir == None:

parameters = {'headless': headless, #是否打開瀏覽器;False:打開瀏覽器;True:進程中運行;

'args': args,

'dumpio': True #'dumpio': True:解決chromium瀏覽器多開頁面卡死問題。

}

else:

parameters = {'headless': headless, #是否打開瀏覽器;False:打開瀏覽器;True:進程中運行;

'args': args,

"userDataDir": userDataDir,

'dumpio': True #'dumpio': True:解決chromium瀏覽器多開頁面卡死問題。

}

#注意:同一個用戶目錄(userDataDir)不能被兩個chrome進程使用,如果你要多開,記得分別指定用戶目錄。否則會報編碼錯誤。

self.browser = await launch(parameters)

self.page = await self.browser.newPage()#在此瀏覽器上創建新頁面並返回其對象。

width, height = self.screen_size()

# 設置網頁可視區域大小

await self.page.setViewport({

"width": width,

"height": height

})

# 是否啟用JS,enabled設為False,則無渲染效果

await self.page.setJavaScriptEnabled(enabled=True)

#設置請求頭userAgent

await self.page.setUserAgent(self.ua.getheaders())

await self.preventCheckWebdriver(self.page)

print("構造瀏覽器對象完畢....", self.page)


#獲取當前操作的界面

async def getPage(self):

return self.page

#獲取當前page對象的鏈接;

async def getCurUrl(self, page):

if page == None:

page = self.page

return page.url

#打開一個新的界面;)

async def getnewpage(self):

return await self.browser.newPage()

#獲取當前操作的界面重新加載

async def reload(self):

await self.page.reload()

#當前操作界面返回

async def goBack(self):

await self.page.goBack()

#獲取當前操作的界面的URL

async def getPageUrl(self):

await self.page.url()

#打開連接;

async def open(self, url, timeout=60):

try:

if url == None:

print("當前傳入的【url】不能為空,參數錯誤!!")

self.url = url

print("打開網頁:" + (url))

self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)})#打開連接;

await asyncio.sleep(1)#強行等待3秒

status = self.res.status

curUrl = self.page.url

await self.preventCheckWebdriver(self.page)

return status, curUrl

except:return 404, None

#

#防止 webdriver 檢測,如淘寶登錄。其實淘寶主要通過 window.navigator.webdriver 來對 webdriver 進行檢測,所以我們只需要使用 JavaScript 將它設置為 false 即可

async def preventCheckWebdriver(self, page):

if page == None:

page = self.page

# 替換淘寶在檢測瀏覽時採集的一些參數。

# 就是在瀏覽器運行的時候,始終讓window.navigator.webdriver=false

# navigator是windiw對象的一個屬性,同時修改plugins,languages,navigator

await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''') # 以下為插入中間js,將淘寶會為了檢測瀏覽器而調用的js修改其結果。

await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')

await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')

await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')

#關閉當前打開的瀏覽器;

async def closeBrowser(self, browser):

if browser == None:

browser = self.browser

try:

await browser.close()

except:pass

#關閉當前打開的瀏覽器中的一個界面;

async def closePage(self, page):

if page == None:

page = self.page

await page.close()

#關閉當前打開的瀏覽器中的某一個界面;

async def closeNumPage(self, number:"號碼從0開始"):

pages = await self.browser.pages()

await pages[number].close()

return True


#關閉除了最後一個所有的界面;

async def retainLastPage(self):

pages = await self.browser.pages()

num = 0

for page in pages:

if num != (len(pages) - 1):

await page.close()

else:

self.page = page

num += 1

#獲取當前打開頁面的響應狀態

async def gerReponseStatus(self):

try:return self.res.status # 響應狀態

except:return 200

#截個圖

async def screenshot(self, page):

hashCode = self.hash.getMd5Hash(self.url)

if page == None:

page = self.page

await page.screenshot({'path': './screenshots/' + str(hashCode) + '.png'})

# 得到響應頭;

async def getHeader(self):

return self.res.headers # 響應頭;

# 滾動到頁面底部

async def scrollToButtom(self, page):

if page == None:

page = self.page

await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')

print("滑動到當前界面底部【完畢】")

# 獲取當前頁面的cookie

async def getCookies(self, page):

if page == None:

page = self.page

return await page.cookies()

# 獲取登錄後cookie尚未測試是否正常;

async def getCookieStr(page):

if page == None:

page = self.page

cookies_list = await page.cookies()

cookies = ''

for cookie in cookies_list:

str_cookie = '{0}={1};'

str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))

cookies += str_cookie

print(cookies)

# 將cookie 放入 cookie 池 以便多次請求 封賬號 利用cookie 對搜索內容進行爬取

return cookies

# 獲取當前頁面的cookie

async def setCookies(self, page, cookies):

if page == None:

page = self.page

return await page.setCookie(*cookies)

# 獲取所有 html源碼

async def getHtml(self, page):

if page == None:

page = self.page

return (await page.content())

#當前頁標題

async def getCurPageTitle(self, page):

if page == None:

page = self.page

return (await page.title())

#獲取對象屬性值;

async def getElementFieldValue(self, page, element, field):

if element == None:

print("當前傳入的【element】不能為空,參數錯誤!!")

return None

if field == None:

print("當前傳入的【field】不能為空,參數錯誤!!")

return None

if page == None:

page = self.page

if str(type(element)) == "<class>":/<class>

print("當前傳入的【element】不是單個對象,為list集合,參數錯誤!!")

return None

fieldValue = await (await element.getProperty(field)).jsonValue()

return fieldValue

#獲取當前界面的寬、高、像素大小比率三個值

async def getPageWidthHight(self, page):

if page == None:

page = self.page

return await page.evaluate('''() => {

return {

width: document.documentElement.clientWidth,

height: document.documentElement.clientHeight,

deviceScaleFactor: window.devicePixelRatio,

}

}''')

#獲取當前瀏覽器的所有界面集合;

async def getCurBrowserAllPages(self):

return await self.browser.pages()

#獲取當前界面中某個元素的內容;

async def getElementsByXpaths(self, page, xpath:'如://div[@class="title-box"]/a'):

if xpath == None:

print("當前傳入的【xpath】不能為空,參數錯誤!!")

return None

if page == None:

page = self.page

elemList = None

try:elemList = await page.xpath(xpath)

except:

print("獲取xpath路徑為【" + str(xpath) + "】的標籤對象異常...")

return elemList#返回類型為:list集合;

#獲取當前界面的所有內容(不帶html標籤的內容);(效果較差;)--

async def getPageText(self, page):

if page == None:

page = self.page

'''Pyppeteer的evaluate()方法只使用JavaScript字符串,該字符串可以是函數也可以是表達式,

Pyppeteer會進行自動判斷。但有時會判斷錯誤,如果字符串被判斷成了函數,並且報錯,

可以添加選項force_expr=True,強制Pyppeteer作為表達式處理。'''

return await page.evaluate('document.body.textContent', force_expr=True)

#獲取元素內容; --

async def getElementText(self, page, element):

if element == None:

print("當前傳入的【element】不能為空,參數錯誤!!")

return None

if page == None:

page = self.page

if str(type(element)) == "<class>":/<class>

print("當前傳入的【element】不是單個對象,為list集合,參數錯誤!!")

return None

return await page.evaluate('(element) => element.textContent', element)

#通過selector獲取元素內容; --

async def getElementBySelector(self, page , selector):

if selector == None:

print("當前傳入的【selector】不能為空,參數錯誤!!")

return None

if page == None:

page = self.page

return await page.querySelector(selector)

#向輸入框輸入數據 --

async def inputKw(self, page, selector:"如:'input#kw.s_ipt':獲取input標籤中id='kw',class='s_ipt'的對象。不可用xpath路徑", kw:'待輸入的關鍵詞'):

if kw == None:

print("當前傳入的【kw】不能為空,參數錯誤!!")

return None

if selector == None:

print("當前傳入的【selector】不能為空,參數錯誤!!")

return None

if page == None:

page = self.page

mylist = [1.55, 0.798, 1.187]

if len(kw) <= 5:

mylist = [0.695, 0.798, 1.087, 0.343, 0.4067]

else:

mylist = [1.095, 0.798, 1.127, 1.0543, 1.1267, 0.8067]

for i in str(kw):#逐個字符輸入,減少被識別為機器的改了;

await page.type(selector, i)

time.sleep(random.choice(mylist))

return None

#鼠標單擊某一個元素; --

async def clickElement(self, page, selector:"如:'input#kw.s_ipt':獲取input標籤中id='kw',class='s_ipt'的對象。。不可用xpath路徑"):

if selector == None:

print("當前傳入的【selector】不能為空,參數錯誤!!")

if page == None:

page = self.page

await page.click(selector)#如果selector獲取的對象是list集合,則執行第一個元素的點擊;

#清空某個input的值

async def removeInputValue(self, page, idValue):

if idValue == None:

print("當前傳入的【idValue】不能為空,參數錯誤!!")

if page == None:

page = self.page

await page.evaluate("document.querySelector('#" + str(idValue) + "').value=''")

print("清空【" + str(idValue) + "】的內容")

#

async def clickByEle(self, ele):

if ele == None:

return

return ele.click()

#獲取當前瀏覽器打開的【最後一個】界面對象

async def getLastPage(self):

pages = await self.browser.pages()

return pages[-1]

#獲取當前瀏覽器打開的【最後一個】界面對象

async def getPageTotal(self):

pages = await self.browser.pages()

return len(pages)

#獲取當前瀏覽器打開的【最一個】界面對象

async def getFirstPage(self):

pages = await self.browser.pages()

return pages[0]

#獲取當前界面中所有的frame對象

async def getAllFrames(self, page):

if page == None:

page = self.page

return page.frames

async def getScreenshotByEle(self, page, ele, screenshotFilePath:"目前測試只有.png圖片可正常生成,jpg異常;"):

picture = ''

try:

fu = File_Utils(None)

fu = File_Utils(fu.getParentDir(screenshotFilePath))

if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()#如果圖片的保存目錄不存在,則創建;

# 進行截圖

time.sleep(3)

print("驗證碼路徑:", screenshotFilePath)

try:

for _ in range(6):

clip = await ele.boundingBox()

picture = base64.b64encode(await page.screenshot({

'path': screenshotFilePath, # 圖片路徑, 不指定就不保存

'clip': clip, # 指定圖片位置,大小

# 'encoding': 'base64',# 返回的圖片格式, 默認二進制

}))

if picture != '':

break

except Exception as e:

print('截圖獲取失敗')

print(traceback.print_exc())

except Exception as e:

print('截圖獲取失敗')

print(traceback.print_exc())

return picture

ppy = PyppeteerBrowser()

from com.fy.utils.date.DateUtils import Date_Utils

du = Date_Utils()

userDataDir = "d://pyppeteer" + str(du.getCurrentTimeLong())

asyncio.get_event_loop() .run_until_complete(ppy.getbrowser(False, userDataDir))

asyncio.get_event_loop() .run_until_complete(ppy.open("http://caifuhao.eastmoney.com/discover/finance", 60))

time.sleep(2)

print(asyncio.get_event_loop() .run_until_complete(ppy.getCookies(None)))

asyncio.get_event_loop() .run_until_complete(ppy.closeBrowser(None))



分享到:


相關文章: