f14002559e
refactor(components): 调整头部和页脚布局样式 style(hero-section): 更新徽章动画效果 docs: 添加测试框架README文档 test: 实现首页、导航和联系表单的测试用例 ci: 添加CI测试脚本和配置
585 lines
19 KiB
Python
585 lines
19 KiB
Python
"""
|
|
辅助工具模块
|
|
提供常用的测试辅助函数和工具
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple, Union, Callable
|
|
from urllib.parse import urlparse, parse_qs, urljoin
|
|
from functools import lru_cache
|
|
|
|
from playwright.sync_api import Page, Locator, FrameLocator
|
|
from playwright.sync_api import Error as PlaywrightError, TimeoutError as PlaywrightTimeoutError
|
|
|
|
from config.settings import get_settings
|
|
from utils.logger import get_logger, PerformanceTimer
|
|
|
|
|
|
class WaitCondition:
|
|
"""等待条件类"""
|
|
|
|
@staticmethod
|
|
def for_element_visible(selector: str, timeout: Optional[int] = None) -> Callable:
|
|
"""等待元素可见"""
|
|
def condition(page: Page) -> bool:
|
|
try:
|
|
element = page.locator(selector).first
|
|
return element.is_visible(timeout=timeout or 1000)
|
|
except (PlaywrightError, PlaywrightTimeoutError):
|
|
return False
|
|
return condition
|
|
|
|
@staticmethod
|
|
def for_element_hidden(selector: str, timeout: Optional[int] = None) -> Callable:
|
|
"""等待元素隐藏"""
|
|
def condition(page: Page) -> bool:
|
|
try:
|
|
element = page.locator(selector).first
|
|
return not element.is_visible(timeout=timeout or 1000)
|
|
except (PlaywrightError, PlaywrightTimeoutError):
|
|
return True
|
|
return condition
|
|
|
|
@staticmethod
|
|
def for_element_enabled(selector: str, timeout: Optional[int] = None) -> Callable:
|
|
"""等待元素可用"""
|
|
def condition(page: Page) -> bool:
|
|
try:
|
|
element = page.locator(selector).first
|
|
return element.is_enabled(timeout=timeout or 1000)
|
|
except (PlaywrightError, PlaywrightTimeoutError):
|
|
return False
|
|
return condition
|
|
|
|
@staticmethod
|
|
def for_url_change(expected_url: str, timeout: Optional[int] = None) -> Callable:
|
|
"""等待URL变化"""
|
|
def condition(page: Page) -> bool:
|
|
return expected_url in page.url
|
|
return condition
|
|
|
|
@staticmethod
|
|
def for_load_state(state: str = "networkidle", timeout: Optional[int] = None) -> Callable:
|
|
"""等待页面加载状态"""
|
|
def condition(page: Page) -> bool:
|
|
try:
|
|
page.wait_for_load_state(state, timeout=timeout)
|
|
return True
|
|
except PlaywrightTimeoutError:
|
|
return False
|
|
return condition
|
|
|
|
@staticmethod
|
|
def for_function_return_true(func: Callable[[], bool], timeout: int = 5000, interval: int = 100) -> Callable:
|
|
"""等待函数返回True"""
|
|
def condition(page: Page) -> bool:
|
|
start_time = time.time() * 1000
|
|
while time.time() * 1000 - start_time < timeout:
|
|
try:
|
|
result = page.evaluate(f"() => ({func.__code__.co_code})")
|
|
if result:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
time.sleep(interval / 1000)
|
|
return False
|
|
return condition
|
|
|
|
|
|
class ElementHelper:
|
|
"""元素操作辅助类"""
|
|
|
|
def __init__(self, page: Page):
|
|
self.page = page
|
|
self.logger = get_logger()
|
|
|
|
def find_element(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None,
|
|
state: str = "visible"
|
|
) -> Locator:
|
|
"""查找元素"""
|
|
timeout = timeout or get_settings().element_timeout
|
|
locator = self.page.locator(selector).first
|
|
|
|
try:
|
|
locator.wait_for(state=state, timeout=timeout)
|
|
return locator
|
|
except PlaywrightTimeoutError:
|
|
raise PlaywrightTimeoutError(
|
|
f"未找到元素: {selector} (超时: {timeout}ms)"
|
|
)
|
|
|
|
def find_elements(self, selector: str) -> List[Locator]:
|
|
"""查找多个元素"""
|
|
return self.page.locator(selector).all()
|
|
|
|
def click_element(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None,
|
|
force: bool = False,
|
|
**kwargs
|
|
) -> None:
|
|
"""点击元素"""
|
|
self.logger.log_action(f"点击元素: {selector}")
|
|
element = self.find_element(selector, timeout)
|
|
element.click(force=force, **kwargs)
|
|
|
|
def fill_input(
|
|
self,
|
|
selector: str,
|
|
value: str,
|
|
timeout: Optional[int] = None,
|
|
clear: bool = True
|
|
) -> None:
|
|
"""填充输入框"""
|
|
self.logger.log_action(f"填充输入框: {selector} = '{value}'")
|
|
element = self.find_element(selector, timeout)
|
|
|
|
if clear:
|
|
element.clear()
|
|
|
|
element.fill(value)
|
|
|
|
def type_text(
|
|
self,
|
|
selector: str,
|
|
text: str,
|
|
timeout: Optional[int] = None,
|
|
delay: Optional[int] = None
|
|
) -> None:
|
|
"""输入文本(逐字符)"""
|
|
self.logger.log_action(f"输入文本: {selector} = '{text}'")
|
|
element = self.find_element(selector, timeout)
|
|
element.type(text, delay=delay)
|
|
|
|
def get_element_text(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None,
|
|
strip: bool = True
|
|
) -> str:
|
|
"""获取元素文本"""
|
|
element = self.find_element(selector, timeout)
|
|
text = element.text_content()
|
|
return text.strip() if strip and text else text
|
|
|
|
def get_element_attribute(
|
|
self,
|
|
selector: str,
|
|
attribute: str,
|
|
timeout: Optional[int] = None
|
|
) -> Optional[str]:
|
|
"""获取元素属性"""
|
|
element = self.find_element(selector, timeout)
|
|
return element.get_attribute(attribute)
|
|
|
|
def is_element_visible(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None
|
|
) -> bool:
|
|
"""检查元素是否可见"""
|
|
try:
|
|
element = self.find_element(selector, timeout)
|
|
return element.is_visible()
|
|
except PlaywrightTimeoutError:
|
|
return False
|
|
|
|
def is_element_enabled(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None
|
|
) -> bool:
|
|
"""检查元素是否可用"""
|
|
try:
|
|
element = self.find_element(selector, timeout)
|
|
return element.is_enabled()
|
|
except PlaywrightTimeoutError:
|
|
return False
|
|
|
|
def wait_for_selector(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None,
|
|
state: str = "visible"
|
|
) -> Locator:
|
|
"""等待选择器出现"""
|
|
timeout = timeout or get_settings().element_timeout
|
|
return self.page.wait_for_selector(selector, timeout=timeout, state=state)
|
|
|
|
def wait_for_url(
|
|
self,
|
|
pattern: Union[str, re.Pattern],
|
|
timeout: Optional[int] = None
|
|
) -> None:
|
|
"""等待URL匹配模式"""
|
|
timeout = timeout or get_settings().page_load_timeout
|
|
self.page.wait_for_url(pattern, timeout=timeout)
|
|
|
|
def wait_for_load_state(
|
|
self,
|
|
state: str = "networkidle",
|
|
timeout: Optional[int] = None
|
|
) -> None:
|
|
"""等待加载状态"""
|
|
timeout = timeout or get_settings().page_load_timeout
|
|
self.page.wait_for_load_state(state, timeout=timeout)
|
|
|
|
|
|
class PageHelper:
|
|
"""页面操作辅助类"""
|
|
|
|
def __init__(self, page: Page):
|
|
self.page = page
|
|
self.logger = get_logger()
|
|
self.element_helper = ElementHelper(page)
|
|
|
|
def navigate(
|
|
self,
|
|
url: str,
|
|
wait_until: str = "networkidle",
|
|
timeout: Optional[int] = None
|
|
) -> None:
|
|
"""导航到指定URL"""
|
|
self.logger.log_action(f"导航到: {url}")
|
|
timeout = timeout or get_settings().page_load_timeout
|
|
self.page.goto(url, wait_until=wait_until, timeout=timeout)
|
|
|
|
def reload_page(self, wait_until: str = "networkidle") -> None:
|
|
"""刷新页面"""
|
|
self.logger.log_action("刷新页面")
|
|
self.page.reload(wait_until=wait_until)
|
|
|
|
def go_back(self, wait_until: str = "networkidle") -> None:
|
|
"""返回上一页"""
|
|
self.logger.log_action("返回上一页")
|
|
self.page.go_back(wait_until=wait_until)
|
|
|
|
def go_forward(self, wait_until: str = "networkidle") -> None:
|
|
"""前进到下一页"""
|
|
self.logger.log_action("前进到下一页")
|
|
self.page.go_forward(wait_until=wait_until)
|
|
|
|
def scroll_to_top(self) -> None:
|
|
"""滚动到页面顶部"""
|
|
self.page.evaluate("window.scrollTo(0, 0)")
|
|
|
|
def scroll_to_bottom(self) -> None:
|
|
"""滚动到页面底部"""
|
|
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
|
|
def scroll_to_element(self, selector: str) -> None:
|
|
"""滚动到指定元素"""
|
|
element = self.page.locator(selector).first
|
|
element.scroll_into_view_if_needed()
|
|
|
|
def get_current_url(self) -> str:
|
|
"""获取当前URL"""
|
|
return self.page.url
|
|
|
|
def get_page_title(self) -> str:
|
|
"""获取页面标题"""
|
|
return self.page.title()
|
|
|
|
def get_page_source(self) -> str:
|
|
"""获取页面源码"""
|
|
return self.page.content()
|
|
|
|
def take_screenshot(
|
|
self,
|
|
filename: str,
|
|
path: Optional[str] = None,
|
|
full_page: bool = False
|
|
) -> str:
|
|
"""截取页面截图"""
|
|
path = path or get_settings().screenshots_dir
|
|
Path(path).mkdir(parents=True, exist_ok=True)
|
|
|
|
filepath = str(Path(path) / filename)
|
|
self.page.screenshot(path=filepath, full_page=full_page)
|
|
|
|
self.logger.log_action(f"截图已保存: {filepath}")
|
|
return filepath
|
|
|
|
def execute_javascript(self, script: str, *args) -> Any:
|
|
"""执行JavaScript代码"""
|
|
return self.page.evaluate(script, *args)
|
|
|
|
def wait_for_load_state(self, state: str = "networkidle", timeout: Optional[int] = None) -> None:
|
|
"""等待页面加载状态"""
|
|
timeout = timeout or get_settings().page_load_timeout
|
|
self.page.wait_for_load_state(state, timeout=timeout)
|
|
|
|
|
|
class AssertionHelper:
|
|
"""断言辅助类"""
|
|
|
|
def __init__(self, page: Page):
|
|
self.page = page
|
|
self.logger = get_logger()
|
|
|
|
def assert_element_visible(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None,
|
|
message: Optional[str] = None
|
|
) -> None:
|
|
"""断言元素可见"""
|
|
try:
|
|
element = ElementHelper(self.page).find_element(selector, timeout)
|
|
assert element.is_visible(), message or f"元素不可见: {selector}"
|
|
self.logger.log_assertion(f"元素可见: {selector}", True)
|
|
except (PlaywrightError, PlaywrightTimeoutError, AssertionError):
|
|
self.logger.log_assertion(f"元素可见: {selector}", False)
|
|
raise
|
|
|
|
def assert_element_hidden(
|
|
self,
|
|
selector: str,
|
|
timeout: Optional[int] = None,
|
|
message: Optional[str] = None
|
|
) -> None:
|
|
"""断言元素隐藏"""
|
|
self.logger.log_assertion(f"元素隐藏: {selector}", True)
|
|
element = ElementHelper(self.page).find_element(selector, timeout)
|
|
assert not element.is_visible(), message or f"元素应该隐藏但可见: {selector}"
|
|
|
|
def assert_element_text_contains(
|
|
self,
|
|
selector: str,
|
|
expected_text: str,
|
|
timeout: Optional[int] = None,
|
|
message: Optional[str] = None
|
|
) -> None:
|
|
"""断言元素文本包含预期文本"""
|
|
self.logger.log_assertion(f"文本包含: {selector} 包含 '{expected_text}'", True)
|
|
element = ElementHelper(self.page).find_element(selector, timeout)
|
|
actual_text = element.text_content()
|
|
assert expected_text in actual_text, message or (
|
|
f"元素文本不匹配: 预期包含 '{expected_text}',实际为 '{actual_text}'"
|
|
)
|
|
|
|
def assert_element_text_equals(
|
|
self,
|
|
selector: str,
|
|
expected_text: str,
|
|
timeout: Optional[int] = None,
|
|
message: Optional[str] = None
|
|
) -> None:
|
|
"""断言元素文本等于预期文本"""
|
|
self.logger.log_assertion(f"文本相等: {selector} == '{expected_text}'", True)
|
|
element = ElementHelper(self.page).find_element(selector, timeout)
|
|
actual_text = element.text_content()
|
|
assert actual_text == expected_text, message or (
|
|
f"元素文本不匹配: 预期 '{expected_text}',实际为 '{actual_text}'"
|
|
)
|
|
|
|
def assert_url_contains(self, expected_url: str, message: Optional[str] = None) -> None:
|
|
"""断言URL包含预期文本"""
|
|
self.logger.log_assertion(f"URL包含: {expected_url}", True)
|
|
assert expected_url in self.page.url, message or (
|
|
f"URL不匹配: 预期包含 '{expected_url}',实际为 '{self.page.url}'"
|
|
)
|
|
|
|
def assert_url_equals(self, expected_url: str, message: Optional[str] = None) -> None:
|
|
"""断言URL等于预期URL"""
|
|
self.logger.log_assertion(f"URL相等: {expected_url}", True)
|
|
assert self.page.url == expected_url, message or (
|
|
f"URL不匹配: 预期 '{expected_url}',实际为 '{self.page.url}'"
|
|
)
|
|
|
|
def assert_page_title_contains(self, expected_title: str, message: Optional[str] = None) -> None:
|
|
"""断言页面标题包含预期文本"""
|
|
self.logger.log_assertion(f"标题包含: {expected_title}", True)
|
|
actual_title = self.page.title()
|
|
assert expected_title in actual_title, message or (
|
|
f"页面标题不匹配: 预期包含 '{expected_title}',实际为 '{actual_title}'"
|
|
)
|
|
|
|
def assert_element_count(
|
|
self,
|
|
selector: str,
|
|
expected_count: int,
|
|
message: Optional[str] = None
|
|
) -> None:
|
|
"""断言元素数量"""
|
|
self.logger.log_assertion(f"元素数量: {selector} == {expected_count}", True)
|
|
elements = self.page.locator(selector).all()
|
|
assert len(elements) == expected_count, message or (
|
|
f"元素数量不匹配: 预期 {expected_count},实际 {len(elements)}"
|
|
)
|
|
|
|
def assert_element_attribute_equals(
|
|
self,
|
|
selector: str,
|
|
attribute: str,
|
|
expected_value: str,
|
|
timeout: Optional[int] = None,
|
|
message: Optional[str] = None
|
|
) -> None:
|
|
"""断言元素属性等于预期值"""
|
|
self.logger.log_assertion(f"属性相等: {selector}.{attribute} == '{expected_value}'", True)
|
|
element = ElementHelper(self.page).find_element(selector, timeout)
|
|
actual_value = element.get_attribute(attribute)
|
|
assert actual_value == expected_value, message or (
|
|
f"元素属性不匹配: {attribute} 预期 '{expected_value}',实际 '{actual_value}'"
|
|
)
|
|
|
|
|
|
class UrlHelper:
|
|
"""URL辅助类"""
|
|
|
|
@staticmethod
|
|
def parse_url(url: str) -> Dict[str, Any]:
|
|
"""解析URL"""
|
|
parsed = urlparse(url)
|
|
return {
|
|
"scheme": parsed.scheme,
|
|
"netloc": parsed.netloc,
|
|
"path": parsed.path,
|
|
"params": parse_qs(parsed.query),
|
|
"query": parsed.query,
|
|
"fragment": parsed.fragment
|
|
}
|
|
|
|
@staticmethod
|
|
def get_domain(url: str) -> str:
|
|
"""获取URL域名"""
|
|
parsed = urlparse(url)
|
|
return parsed.netloc
|
|
|
|
@staticmethod
|
|
def get_path(url: str) -> str:
|
|
"""获取URL路径"""
|
|
parsed = urlparse(url)
|
|
return parsed.path
|
|
|
|
@staticmethod
|
|
def is_absolute_url(url: str) -> bool:
|
|
"""判断是否为绝对URL"""
|
|
return bool(urlparse(url).scheme)
|
|
|
|
@staticmethod
|
|
def join_url(base: str, path: str) -> str:
|
|
"""拼接URL"""
|
|
return urljoin(base, path)
|
|
|
|
@staticmethod
|
|
def remove_trailing_slash(url: str) -> str:
|
|
"""移除URL末尾的斜杠"""
|
|
if url.endswith("/") and len(url) > 1:
|
|
return url[:-1]
|
|
return url
|
|
|
|
|
|
class FileHelper:
|
|
"""文件操作辅助类"""
|
|
|
|
@staticmethod
|
|
def read_json(filepath: str) -> Dict[str, Any]:
|
|
"""读取JSON文件"""
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
@staticmethod
|
|
def write_json(filepath: str, data: Any, indent: int = 2) -> None:
|
|
"""写入JSON文件"""
|
|
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=indent)
|
|
|
|
@staticmethod
|
|
def read_text(filepath: str) -> str:
|
|
"""读取文本文件"""
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return f.read()
|
|
|
|
@staticmethod
|
|
def write_text(filepath: str, content: str) -> None:
|
|
"""写入文本文件"""
|
|
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
@staticmethod
|
|
def create_directory(path: str) -> None:
|
|
"""创建目录"""
|
|
Path(path).mkdir(parents=True, exist_ok=True)
|
|
|
|
@staticmethod
|
|
def delete_directory(path: str) -> None:
|
|
"""删除目录"""
|
|
import shutil
|
|
if Path(path).exists():
|
|
shutil.rmtree(path)
|
|
|
|
@staticmethod
|
|
def cleanup_directory(path: str, pattern: str = "*") -> None:
|
|
"""清理目录中的文件"""
|
|
import glob
|
|
files = glob.glob(str(Path(path) / pattern))
|
|
for file in files:
|
|
try:
|
|
os.remove(file)
|
|
except IsADirectoryError:
|
|
FileHelper.delete_directory(file)
|
|
|
|
|
|
def wait(
|
|
condition: Callable,
|
|
timeout: int = 10000,
|
|
interval: int = 500,
|
|
message: str = "等待条件超时"
|
|
) -> bool:
|
|
"""等待条件满足"""
|
|
start_time = time.time() * 1000
|
|
|
|
while time.time() * 1000 - start_time < timeout:
|
|
if condition():
|
|
return True
|
|
time.sleep(interval / 1000)
|
|
|
|
raise TimeoutError(message)
|
|
|
|
|
|
def retry(
|
|
func: Callable,
|
|
max_retries: int = 3,
|
|
delay: int = 1000,
|
|
exceptions: Tuple[Exception, ...] = (Exception,)
|
|
) -> Any:
|
|
"""重试函数"""
|
|
last_exception = None
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
return func()
|
|
except exceptions as e:
|
|
last_exception = e
|
|
if attempt < max_retries:
|
|
time.sleep(delay / 1000)
|
|
|
|
raise last_exception
|
|
|
|
|
|
def generate_test_id(prefix: str = "test") -> str:
|
|
"""生成测试ID"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
random_hash = hashlib.md5(f"{timestamp}_{os.getpid()}".encode()).hexdigest()[:8]
|
|
return f"{prefix}_{timestamp}_{random_hash}"
|
|
|
|
|
|
@lru_cache(maxsize=128)
|
|
def normalize_text(text: str) -> str:
|
|
"""标准化文本(去除多余空白)"""
|
|
return re.sub(r'\s+', ' ', text).strip()
|