接口自動化測試框架開發(Pytest+Allure+AIOHTTP+用例自動生成上

接口自動化測試框架開發(Pytest+Allure+AIOHTTP+用例自動生成上

近期準備做接口測試的覆蓋,為此需要開發一個測試框架,思考了以下幾個特性要求:

  • 接口測試是比較講究效率的,測試人員會希望很快能得到結果反饋,然而接口的數量一般都很多,而且會越來越多,所以提高執行效率很有必要;
  • 接口測試的用例其實也可以用來兼做簡單的壓力測試,而壓力測試需要併發;
  • 接口測試的用例有很多重複的東西,測試人員應該只需要關注接口測試的設計,這些重複勞動最好自動化來做;
  • Pytest 和 Allure 太好用了,新框架要集成它們;
  • 接口測試的用例應該儘量簡潔,最好用 yaml,這樣數據能直接映射為請求數據,寫起用例來跟做填空題一樣,便於向沒有自動化經驗的成員推廣;
  • 加上我對 Python 的協程很感興趣,也學了一段時間,一直希望學以致用,所以 HTTP 請求我決定用 AIOHTTP 來實現;
  • 但是 pytest 是不支持事件循環的,如果想把它們結合還需要一番功夫。

於是繼續思考,思考的結果是其實我可以把整個事情分為兩部分;

第一部分,讀取 yaml 測試用例,HTTP 請求測試接口,收集測試數據。第二部分,根據測試數據,動態生成 pytest 認可的測試用例,然後執行,生成測試報告。

這樣一來,兩者就能完美結合了,也完美符合我所做的設想。接著就來實現它。

第一部分(整個過程都要求是異步非阻塞的)

讀取 yaml 測試用例

一份簡單的用例模板我是這樣設計的,這樣的好處是,參數名和 aioHTTP.ClientSession().request(method,url,**kwargs) 是直接對應上的,我可以不費力氣的直接傳給請求方法,避免各種轉換,簡潔優雅,表達力又強。

<code>args:
  - post
  - /xxx/add
kwargs:
  -
    caseName: 新增 xxx
    data:
      name: ${gen_uid(10)}
validator:
  -
    json:
      successed: True/<code>

異步讀取文件可以使用 aiofiles 這個第三方庫,yaml_load 是一個協程,可以保證主進程讀取 yaml 測試用例時不被阻塞,通過await yaml_load()便能獲取測試用例的數據

<code>async def yaml_load(dir='', file=''):
    """
    異步讀取 yaml 文件,並轉義其中的特殊值
    :param file:
    :return:
    """
    if dir:
        file = os.path.join(dir, file)
    async with aiofiles.open(file, 'r', encoding='utf-8', errors='ignore') as f:
        data = await f.read()

    data = yaml.load(data)

    # 匹配函數調用形式的語法
    pattern_function = re.compile(r'^\\${([A-Za-z_]+\\w*\\(.*\\))}$')
    pattern_function2 = re.compile(r'^\\${(.*)}$')
    # 匹配取默認值的語法
    pattern_function3 = re.compile(r'^\\$\\((.*)\\)$')

    def my_iter(data):
        """
        遞歸測試用例,根據不同數據類型做相應處理,將模板語法轉化為正常值
        :param data:
        :return:
        """
        if isinstance(data, (list, tuple)):
            for index, _data in enumerate(data):
                data[index] = my_iter(_data) or _data
        elif isinstance(data, dict):
            for k, v in data.items():
                data[k] = my_iter(v) or v
        elif isinstance(data, (str, bytes)):
            m = pattern_function.match(data)
            if not m:
                m = pattern_function2.match(data)
            if m:
                return eval(m.group(1))
            if not m:
                m = pattern_function3.match(data)
            if m:
                K, k = m.group(1).split(':')
                return bxmat.default_values.get(K).get(k)

            return data

    my_iter(data)

    return BXMDict(data)/<code>

可以看到,測試用例還支持一定的模板語法,如${function}、$(a:b)等,這能在很大程度上拓展測試人員用例編寫的能力

HTTP 請求測試接口

HTTP 請求可以直接用aioHTTP.ClientSession().request(method,url,**kwargs),HTTP 也是一個協程,可以保證網絡請求時不被阻塞,通過await HTTP()便可以拿到接口測試數據

<code>async def HTTP(domain, *args, **kwargs):
    """
    HTTP 請求處理器
    :param domain: 服務地址
    :param args:
    :param kwargs:
    :return:
    """
    method, api = args
    arguments = kwargs.get('data') or kwargs.get('params') or kwargs.get('json') or {}

    # kwargs 中加入 token
    kwargs.setdefault('headers', {}).update({'token': bxmat.token})
    # 拼接服務地址和 api
    url = ''.join([domain, api])

    async with ClientSession() as session:
        async with session.request(method, url, **kwargs) as response:
            res = await response_handler(response)
            return {
                'response': res,
                'url': url,
                'arguments': arguments
            }/<code>
收集測試數據

協程的併發真的很快,這裡為了避免服務響應不過來導致熔斷,可以引入asyncio.Semaphore(num)來控制併發

<code>async def entrace(test_cases, loop, semaphore=None):
    """
    HTTP 執行入口
    :param test_cases:
    :param semaphore:
    :return:
    """
    res = BXMDict()
    # 在 CookieJar 的 update_cookies 方法中,如果 unsafe=False 並且訪問的是 IP 地址,客戶端是不會更新 cookie 信息
    # 這就導致 session 不能正確處理登錄態的問題
    # 所以這裡使用的 cookie_jar 參數使用手動生成的 CookieJar 對象,並將其 unsafe 設置為 True
    async with ClientSession(loop=loop, cookie_jar=CookieJar(unsafe=True), headers={'token': bxmat.token}) as session:
        await advertise_cms_login(session)
        if semaphore:
            async with semaphore:
                for test_case in test_cases:
                    data = await one(session, case_name=test_case)
                    res.setdefault(data.pop('case_dir'), BXMList()).append(data)
        else:
            for test_case in test_cases:
                data = await one(session, case_name=test_case)
                res.setdefault(data.pop('case_dir'), BXMList()).append(data)

        return res


async def one(session, case_dir='', case_name=''):
    """
    一份測試用例執行的全過程,包括讀取 .yml 測試用例,執行 HTTP 請求,返回請求結果
    所有操作都是異步非阻塞的
    :param session: session 會話
    :param case_dir: 用例目錄

    :param case_name: 用例名稱
    :return:
    """
    project_name = case_name.split(os.sep)[1]
    domain = bxmat.url.get(project_name)
    test_data = await yaml_load(dir=case_dir, file=case_name)
    result = BXMDict({
        'case_dir': os.path.dirname(case_name),
        'api': test_data.args[1].replace('/', '_'),
    })
    if isinstance(test_data.kwargs, list):
        for index, each_data in enumerate(test_data.kwargs):
            step_name = each_data.pop('caseName')
            r = await HTTP(session, domain, *test_data.args, **each_data)
            r.update({'case_name': step_name})
            result.setdefault('responses', BXMList()).append({
                'response': r,
                'validator': test_data.validator[index]
            })
    else:
        step_name = test_data.kwargs.pop('caseName')
        r = await HTTP(session, domain, *test_data.args, **test_data.kwargs)
        r.update({'case_name': step_name})
        result.setdefault('responses', BXMList()).append({
            'response': r,
            'validator': test_data.validator
        })

    return result/<code>

事件循環負責執行協程並返回結果,在最後的結果收集中,我用測試用例目錄來對結果進行了分類,這為接下來的自動生成 pytest 認可的測試用例打下了良好的基礎。

<code>def main(test_cases):
    """
    事件循環主函數,負責所有接口請求的執行
    :param test_cases:
    :return:
    """
    loop = asyncio.get_event_loop()
    semaphore = asyncio.Semaphore(bxmat.semaphore)
    # 需要處理的任務

    # tasks = [asyncio.ensure_future(one(case_name=test_case, semaphore=semaphore)) for test_case in test_cases]
    task = loop.create_task(entrace(test_cases, loop, semaphore))
    # 將協程註冊到事件循環,並啟動事件循環
    try:
        # loop.run_until_complete(asyncio.gather(*tasks))
        loop.run_until_complete(task)
    finally:
        loop.close()

    return task.result()/<code>



想看第二篇文章可繼續看下文。


分享到:


相關文章: