""" 辅助工具模块 提供常用的测试辅助函数和工具 """ import os import re import time import hashlib import json from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, Callable from urllib.parse import urlparse, parse_qs, urljoin from functools import lru_cache from playwright.sync_api import Page, Locator, FrameLocator from playwright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError from config.settings import get_settings from utils.logger import get_logger, PerformanceTimer class WaitCondition: """等待条件类""" @staticmethod def for_element_visible(selector: str, timeout: Optional[int] = None) -> Callable: """等待元素可见""" def condition(page: Page) -> bool: try: element = page.locator(selector).first return element.is_visible(timeout=timeout or 1000) except (PlaywrightError, PlaywrightTimeoutError): return False return condition @staticmethod def for_element_hidden(selector: str, timeout: Optional[int] = None) -> Callable: """等待元素隐藏""" def condition(page: Page) -> bool: try: element = page.locator(selector).first return not element.is_visible(timeout=timeout or 1000) except (PlaywrightError, PlaywrightTimeoutError): return True return condition @staticmethod def for_element_enabled(selector: str, timeout: Optional[int] = None) -> Callable: """等待元素可用""" def condition(page: Page) -> bool: try: element = page.locator(selector).first return element.is_enabled(timeout=timeout or 1000) except (PlaywrightError, PlaywrightTimeoutError): return False return condition @staticmethod def for_url_change(expected_url: str, timeout: Optional[int] = None) -> Callable: """等待URL变化""" def condition(page: Page) -> bool: return expected_url in page.url return condition @staticmethod def for_load_state(state: str = "networkidle", timeout: Optional[int] = None) -> Callable: """等待页面加载状态""" def condition(page: Page) -> bool: try: page.wait_for_load_state(state, timeout=timeout) return True except PlaywrightTimeoutError: return False return condition @staticmethod def for_function_return_true(func: Callable[[], bool], timeout: int = 5000, interval: int = 100) -> Callable: """等待函数返回True""" def condition(page: Page) -> bool: start_time = time.time() * 1000 while time.time() * 1000 - start_time < timeout: try: result = page.evaluate(f"() => ({func.__code__.co_code})") if result: return True except Exception: pass time.sleep(interval / 1000) return False return condition class ElementHelper: """元素操作辅助类""" def __init__(self, page: Page): self.page = page self.logger = get_logger() def find_element( self, selector: str, timeout: Optional[int] = None, state: str = "visible" ) -> Locator: """查找元素""" timeout = timeout or get_settings().element_timeout locator = self.page.locator(selector).first try: locator.wait_for(state=state, timeout=timeout) return locator except PlaywrightTimeoutError: raise PlaywrightTimeoutError( f"未找到元素: {selector} (超时: {timeout}ms)" ) def find_elements(self, selector: str) -> List[Locator]: """查找多个元素""" return self.page.locator(selector).all() def click_element( self, selector: str, timeout: Optional[int] = None, force: bool = False, **kwargs ) -> None: """点击元素""" self.logger.log_action(f"点击元素: {selector}") element = self.find_element(selector, timeout) element.click(force=force, **kwargs) def fill_input( self, selector: str, value: str, timeout: Optional[int] = None, clear: bool = True ) -> None: """填充输入框""" self.logger.log_action(f"填充输入框: {selector} = '{value}'") element = self.find_element(selector, timeout) if clear: element.clear() element.fill(value) def type_text( self, selector: str, text: str, timeout: Optional[int] = None, delay: Optional[int] = None ) -> None: """输入文本(逐字符)""" self.logger.log_action(f"输入文本: {selector} = '{text}'") element = self.find_element(selector, timeout) element.type(text, delay=delay) def get_element_text( self, selector: str, timeout: Optional[int] = None, strip: bool = True ) -> str: """获取元素文本""" element = self.find_element(selector, timeout) text = element.text_content() return text.strip() if strip and text else text def get_element_attribute( self, selector: str, attribute: str, timeout: Optional[int] = None ) -> Optional[str]: """获取元素属性""" element = self.find_element(selector, timeout) return element.get_attribute(attribute) def is_element_visible( self, selector: str, timeout: Optional[int] = None ) -> bool: """检查元素是否可见""" try: element = self.find_element(selector, timeout) return element.is_visible() except PlaywrightTimeoutError: return False def is_element_enabled( self, selector: str, timeout: Optional[int] = None ) -> bool: """检查元素是否可用""" try: element = self.find_element(selector, timeout) return element.is_enabled() except PlaywrightTimeoutError: return False def wait_for_selector( self, selector: str, timeout: Optional[int] = None, state: str = "visible" ) -> Locator: """等待选择器出现""" timeout = timeout or get_settings().element_timeout return self.page.wait_for_selector(selector, timeout=timeout, state=state) def wait_for_url( self, pattern: Union[str, re.Pattern], timeout: Optional[int] = None ) -> None: """等待URL匹配模式""" timeout = timeout or get_settings().page_load_timeout self.page.wait_for_url(pattern, timeout=timeout) def wait_for_load_state( self, state: str = "networkidle", timeout: Optional[int] = None ) -> None: """等待加载状态""" timeout = timeout or get_settings().page_load_timeout self.page.wait_for_load_state(state, timeout=timeout) class PageHelper: """页面操作辅助类""" def __init__(self, page: Page): self.page = page self.logger = get_logger() self.element_helper = ElementHelper(page) def navigate( self, url: str, wait_until: str = "networkidle", timeout: Optional[int] = None ) -> None: """导航到指定URL""" self.logger.log_action(f"导航到: {url}") timeout = timeout or get_settings().page_load_timeout self.page.goto(url, wait_until=wait_until, timeout=timeout) def reload_page(self, wait_until: str = "networkidle") -> None: """刷新页面""" self.logger.log_action("刷新页面") self.page.reload(wait_until=wait_until) def go_back(self, wait_until: str = "networkidle") -> None: """返回上一页""" self.logger.log_action("返回上一页") self.page.go_back(wait_until=wait_until) def go_forward(self, wait_until: str = "networkidle") -> None: """前进到下一页""" self.logger.log_action("前进到下一页") self.page.go_forward(wait_until=wait_until) def scroll_to_top(self) -> None: """滚动到页面顶部""" self.page.evaluate("window.scrollTo(0, 0)") def scroll_to_bottom(self) -> None: """滚动到页面底部""" self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") def scroll_to_element(self, selector: str) -> None: """滚动到指定元素""" element = self.page.locator(selector).first element.scroll_into_view_if_needed() def get_current_url(self) -> str: """获取当前URL""" return self.page.url def get_page_title(self) -> str: """获取页面标题""" return self.page.title() def get_page_source(self) -> str: """获取页面源码""" return self.page.content() def take_screenshot( self, filename: str, path: Optional[str] = None, full_page: bool = False ) -> str: """截取页面截图""" path = path or get_settings().screenshots_dir Path(path).mkdir(parents=True, exist_ok=True) filepath = str(Path(path) / filename) self.page.screenshot(path=filepath, full_page=full_page) self.logger.log_action(f"截图已保存: {filepath}") return filepath def execute_javascript(self, script: str, *args) -> Any: """执行JavaScript代码""" return self.page.evaluate(script, *args) def wait_for_load_state(self, state: str = "networkidle", timeout: Optional[int] = None) -> None: """等待页面加载状态""" timeout = timeout or get_settings().page_load_timeout self.page.wait_for_load_state(state, timeout=timeout) class AssertionHelper: """断言辅助类""" def __init__(self, page: Page): self.page = page self.logger = get_logger() def assert_element_visible( self, selector: str, timeout: Optional[int] = None, message: Optional[str] = None ) -> None: """断言元素可见""" try: element = ElementHelper(self.page).find_element(selector, timeout) assert element.is_visible(), message or f"元素不可见: {selector}" self.logger.log_assertion(f"元素可见: {selector}", True) except (PlaywrightError, PlaywrightTimeoutError, AssertionError): self.logger.log_assertion(f"元素可见: {selector}", False) raise def assert_element_hidden( self, selector: str, timeout: Optional[int] = None, message: Optional[str] = None ) -> None: """断言元素隐藏""" self.logger.log_assertion(f"元素隐藏: {selector}", True) element = ElementHelper(self.page).find_element(selector, timeout) assert not element.is_visible(), message or f"元素应该隐藏但可见: {selector}" def assert_element_text_contains( self, selector: str, expected_text: str, timeout: Optional[int] = None, message: Optional[str] = None ) -> None: """断言元素文本包含预期文本""" self.logger.log_assertion(f"文本包含: {selector} 包含 '{expected_text}'", True) element = ElementHelper(self.page).find_element(selector, timeout) actual_text = element.text_content() assert expected_text in actual_text, message or ( f"元素文本不匹配: 预期包含 '{expected_text}',实际为 '{actual_text}'" ) def assert_element_text_equals( self, selector: str, expected_text: str, timeout: Optional[int] = None, message: Optional[str] = None ) -> None: """断言元素文本等于预期文本""" self.logger.log_assertion(f"文本相等: {selector} == '{expected_text}'", True) element = ElementHelper(self.page).find_element(selector, timeout) actual_text = element.text_content() assert actual_text == expected_text, message or ( f"元素文本不匹配: 预期 '{expected_text}',实际为 '{actual_text}'" ) def assert_url_contains(self, expected_url: str, message: Optional[str] = None) -> None: """断言URL包含预期文本""" self.logger.log_assertion(f"URL包含: {expected_url}", True) assert expected_url in self.page.url, message or ( f"URL不匹配: 预期包含 '{expected_url}',实际为 '{self.page.url}'" ) def assert_url_equals(self, expected_url: str, message: Optional[str] = None) -> None: """断言URL等于预期URL""" self.logger.log_assertion(f"URL相等: {expected_url}", True) assert self.page.url == expected_url, message or ( f"URL不匹配: 预期 '{expected_url}',实际为 '{self.page.url}'" ) def assert_page_title_contains(self, expected_title: str, message: Optional[str] = None) -> None: """断言页面标题包含预期文本""" self.logger.log_assertion(f"标题包含: {expected_title}", True) actual_title = self.page.title() assert expected_title in actual_title, message or ( f"页面标题不匹配: 预期包含 '{expected_title}',实际为 '{actual_title}'" ) def assert_element_count( self, selector: str, expected_count: int, message: Optional[str] = None ) -> None: """断言元素数量""" self.logger.log_assertion(f"元素数量: {selector} == {expected_count}", True) elements = self.page.locator(selector).all() assert len(elements) == expected_count, message or ( f"元素数量不匹配: 预期 {expected_count},实际 {len(elements)}" ) def assert_element_attribute_equals( self, selector: str, attribute: str, expected_value: str, timeout: Optional[int] = None, message: Optional[str] = None ) -> None: """断言元素属性等于预期值""" self.logger.log_assertion(f"属性相等: {selector}.{attribute} == '{expected_value}'", True) element = ElementHelper(self.page).find_element(selector, timeout) actual_value = element.get_attribute(attribute) assert actual_value == expected_value, message or ( f"元素属性不匹配: {attribute} 预期 '{expected_value}',实际 '{actual_value}'" ) class UrlHelper: """URL辅助类""" @staticmethod def parse_url(url: str) -> Dict[str, Any]: """解析URL""" parsed = urlparse(url) return { "scheme": parsed.scheme, "netloc": parsed.netloc, "path": parsed.path, "params": parse_qs(parsed.query), "query": parsed.query, "fragment": parsed.fragment } @staticmethod def get_domain(url: str) -> str: """获取URL域名""" parsed = urlparse(url) return parsed.netloc @staticmethod def get_path(url: str) -> str: """获取URL路径""" parsed = urlparse(url) return parsed.path @staticmethod def is_absolute_url(url: str) -> bool: """判断是否为绝对URL""" return bool(urlparse(url).scheme) @staticmethod def join_url(base: str, path: str) -> str: """拼接URL""" return urljoin(base, path) @staticmethod def remove_trailing_slash(url: str) -> str: """移除URL末尾的斜杠""" if url.endswith("/") and len(url) > 1: return url[:-1] return url class FileHelper: """文件操作辅助类""" @staticmethod def read_json(filepath: str) -> Dict[str, Any]: """读取JSON文件""" with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) @staticmethod def write_json(filepath: str, data: Any, indent: int = 2) -> None: """写入JSON文件""" Path(filepath).parent.mkdir(parents=True, exist_ok=True) with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=indent) @staticmethod def read_text(filepath: str) -> str: """读取文本文件""" with open(filepath, 'r', encoding='utf-8') as f: return f.read() @staticmethod def write_text(filepath: str, content: str) -> None: """写入文本文件""" Path(filepath).parent.mkdir(parents=True, exist_ok=True) with open(filepath, 'w', encoding='utf-8') as f: f.write(content) @staticmethod def create_directory(path: str) -> None: """创建目录""" Path(path).mkdir(parents=True, exist_ok=True) @staticmethod def delete_directory(path: str) -> None: """删除目录""" import shutil if Path(path).exists(): shutil.rmtree(path) @staticmethod def cleanup_directory(path: str, pattern: str = "*") -> None: """清理目录中的文件""" import glob files = glob.glob(str(Path(path) / pattern)) for file in files: try: os.remove(file) except IsADirectoryError: FileHelper.delete_directory(file) def wait( condition: Callable, timeout: int = 10000, interval: int = 500, message: str = "等待条件超时" ) -> bool: """等待条件满足""" start_time = time.time() * 1000 while time.time() * 1000 - start_time < timeout: if condition(): return True time.sleep(interval / 1000) raise TimeoutError(message) def retry( func: Callable, max_retries: int = 3, delay: int = 1000, exceptions: Tuple[Exception, ...] = (Exception,) ) -> Any: """重试函数""" last_exception = None for attempt in range(max_retries + 1): try: return func() except exceptions as e: last_exception = e if attempt < max_retries: time.sleep(delay / 1000) raise last_exception def generate_test_id(prefix: str = "test") -> str: """生成测试ID""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") random_hash = hashlib.md5(f"{timestamp}_{os.getpid()}".encode()).hexdigest()[:8] return f"{prefix}_{timestamp}_{random_hash}" @lru_cache(maxsize=128) def normalize_text(text: str) -> str: """标准化文本(去除多余空白)""" return re.sub(r'\s+', ' ', text).strip()