Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion app_gradio/demo_gradio.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def create_config_dict(
"viewport": {"width": 1280, "height": 720},
"headless": True,
"language": "zh-CN",
"cookies": []
"cookies": [],
"save_screenshots": False # Always not save screenshots in Gradio demo
}
}

Expand Down Expand Up @@ -273,6 +274,11 @@ def build_test_configurations(config: Dict[str, Any]) -> list:
async def run_webqa_test(config: Dict[str, Any], lang: str = "zh-CN") -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Run WebQA test"""
try:
# Configure screenshot saving behavior
from webqa_agent.actions.action_handler import ActionHandler
save_screenshots = config.get("browser_config", {}).get("save_screenshots", False)
ActionHandler.set_screenshot_config(save_screenshots=save_screenshots)

# Validate LLM configuration
llm_config = {
"api": "openai",
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ browser_config:
headless: False # Docker environment will automatically override to True
language: zh-CN
cookies: []
save_screenshots: False # Whether to save screenshots to local disk (default: False)

report:
language: en-US # zh-CN, en-US
Expand Down
9 changes: 9 additions & 0 deletions webqa-agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ async def run_tests(cfg):
if is_docker:
print("🐳 Docker mode: automatically enable headless browser")

# 0.1. Configure screenshot saving behavior
from webqa_agent.actions.action_handler import ActionHandler
save_screenshots = cfg.get("browser_config", {}).get("save_screenshots", False)
ActionHandler.set_screenshot_config(save_screenshots=save_screenshots)
if not save_screenshots:
print("📸 Screenshot saving: disabled (screenshots will be captured but not saved to disk)")
else:
print("📸 Screenshot saving: enabled")

# 1. Check required tools based on configuration
tconf = cfg.get("test_config", {})

Expand Down
180 changes: 142 additions & 38 deletions webqa_agent/actions/action_handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import base64
import datetime
import json
import os
import re
from contextvars import ContextVar
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from playwright.async_api import Page
Expand Down Expand Up @@ -64,6 +66,40 @@ def reset(self):


class ActionHandler:
# Session management for screenshot organization
_screenshot_session_dir: Optional[Path] = None
_screenshot_session_timestamp: Optional[str] = None
_save_screenshots: bool = False # Default: not save screenshots to disk

@classmethod
def set_screenshot_config(cls, save_screenshots: bool = False):
"""Set global screenshot saving behavior.

Args:
save_screenshots: Whether to save screenshots to local disk (default: False)
"""
cls._save_screenshots = save_screenshots
logging.debug(f"Screenshot saving config set to: {save_screenshots}")

@classmethod
def init_screenshot_session(cls) -> Path:
"""Initialize screenshot session directory for this test run.

Creates a timestamped directory under webqa_agent/crawler/screenshots/
for organizing all screenshots from a single test session.

Returns:
Path: The session directory path
"""
if cls._screenshot_session_dir is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
base_dir = Path(__file__).parent.parent / "crawler" / "screenshots"
cls._screenshot_session_dir = base_dir / timestamp
cls._screenshot_session_timestamp = timestamp
cls._screenshot_session_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Initialized screenshot session directory: {cls._screenshot_session_dir}")
return cls._screenshot_session_dir

def __init__(self):
self.page_data = {}
self.page_element_buffer = {} # page element buffer
Expand Down Expand Up @@ -348,12 +384,18 @@ async def ensure_element_in_viewport(self, element_id: str, max_retries: int = 3
"""
# Get current active page
page = self._get_current_page()

# Initialize action context for error propagation
ctx = ActionContext()
action_context_var.set(ctx)

# Get existing context or create new one (preserves parent context)
ctx = action_context_var.get()
if ctx is None:
ctx = ActionContext()
action_context_var.set(ctx)

# Update scroll-specific context info
ctx.max_scroll_attempts = max_retries
ctx.element_info = {"element_id": element_id, "action": "ensure_viewport"}
# Only set element_info if not already set by parent method
if not ctx.element_info.get("element_id"):
ctx.element_info = {"element_id": element_id, "action": "ensure_viewport"}

element = self.page_element_buffer.get(str(element_id))
if not element:
Expand Down Expand Up @@ -924,9 +966,11 @@ async def wait(self, timeMs) -> bool:
async def type(self, id, text, clear_before_type: bool = False) -> bool:
"""Types text into the specified element, optionally clearing it
first."""
# Initialize action context for error propagation
ctx = ActionContext()
action_context_var.set(ctx)
# Get existing context or create new one (preserves context from helpers)
ctx = action_context_var.get()
if ctx is None:
ctx = ActionContext()
action_context_var.set(ctx)
ctx.element_info = {"element_id": str(id), "action": "type", "text_length": len(text), "clear_before_type": clear_before_type}

try:
Expand Down Expand Up @@ -1071,8 +1115,12 @@ async def _fill_element_text(
"""
# Get current active page
page = self._get_current_page()


# Get existing context or create new one if none exists
ctx = action_context_var.get()
if ctx is None:
ctx = ActionContext()
action_context_var.set(ctx)

# Strategy 1: Try CSS selector if format is valid
if self._is_valid_css_selector(selector):
Expand Down Expand Up @@ -1154,9 +1202,11 @@ async def _fill_element_text(

async def clear(self, id) -> bool:
"""Clears the text in the specified input element."""
# Initialize action context for error propagation
ctx = ActionContext()
action_context_var.set(ctx)
# Get existing context or create new one (preserves context from helpers)
ctx = action_context_var.get()
if ctx is None:
ctx = ActionContext()
action_context_var.set(ctx)
ctx.element_info = {"element_id": str(id), "action": "clear"}

try:
Expand Down Expand Up @@ -1236,27 +1286,67 @@ async def keyboard_press(self, key) -> bool:
)
return False

async def b64_page_screenshot(self, full_page=False, file_path=None, file_name=None, save_to_log=True):
"""Get page screenshot (Base64 encoded)
async def b64_page_screenshot(
self,
full_page: bool = False,
file_name: Optional[str] = None,
context: str = 'default'
) -> Optional[str]:
"""Get page screenshot (Base64 encoded) and optionally save to local file.

Args:
full_page: whether to capture the whole page
file_path: screenshot save path (optional)
file_name: screenshot file name (optional)
save_to_log: whether to save to log system (default True)
file_name: descriptive screenshot name (e.g., "marker", "action_click_button")
context: test context category (e.g., 'test', 'agent', 'scroll', 'error')

Returns:
str: screenshot base64 encoded, or None if screenshot fails

Note:
The screenshot is always returned as base64 for HTML reports and LLM analysis.
Local file saving is controlled by the _save_screenshots class variable.
"""
try:
# get screenshot from current active page (dynamically resolves to latest page)
# Get current active page (dynamically resolves to latest page)
current_page = self._get_current_page()
screenshot_bytes = await self.take_screenshot(current_page, full_page=full_page, timeout=30000)
timeout = 90000 if full_page else 60000 # 90s for full page, 60s for viewport

# Prepare file path only if saving is enabled
file_path_str = None
if self._save_screenshots:
# Initialize session directory if needed
session_dir = self.init_screenshot_session()

# Generate timestamp and filename
timestamp = datetime.datetime.now().strftime("%H%M%S")

# convert to Base64
# Build filename: {timestamp}_{context}_{file_name}.png
if file_name:
filename = f"{timestamp}_{context}_{file_name}.png"
else:
filename = f"{timestamp}_{context}_screenshot.png"

file_path_str = str(session_dir / filename)

# Capture screenshot (with or without file saving based on config)
screenshot_bytes = await self.take_screenshot(
current_page,
full_page=full_page,
file_path=file_path_str,
timeout=timeout
)

# Convert to Base64 for HTML reports
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
base64_data = f'data:image/png;base64,{screenshot_base64}'

if self._save_screenshots and file_path_str:
logging.debug(f"Screenshot saved to {file_path_str}")
else:
logging.debug("Screenshot captured (not saved to disk)")

return base64_data

except Exception as e:
logging.warning(f"Failed to capture screenshot: {e}")
return None
Expand All @@ -1273,32 +1363,46 @@ async def take_screenshot(
Args:
page: page object
full_page: whether to capture the whole page
file_path: screenshot save path (only used for direct saving, not recommended in test flow)
timeout: timeout
file_path: screenshot save path (only used when save_screenshots=True)
timeout: timeout (milliseconds)

Returns:
bytes: screenshot binary data

Note:
If save_screenshots is False, the screenshot will not be saved to disk
regardless of the file_path parameter. The method always returns the
screenshot bytes for in-memory use (e.g., Base64 encoding).
"""
try:
# Shortened and more lenient load state check
# Note: page.screenshot() already waits for fonts and basic rendering internally
try:
await page.wait_for_load_state(timeout=60000)
await page.wait_for_load_state('domcontentloaded', timeout=10000)
except Exception as e:
logging.warning(f'wait_for_load_state before screenshot failed: {e}; attempting screenshot anyway')
logging.debug('Page is fully loaded or skipped wait; taking screenshot')

# Directly capture screenshot as binary data
if file_path:
screenshot: bytes = await page.screenshot(
path=file_path,
full_page=full_page,
timeout=timeout,
)
else:
screenshot: bytes = await page.screenshot(
full_page=full_page,
timeout=timeout,
)
logging.debug(f'Load state check: {e}; proceeding with screenshot')

logging.debug(f'Taking screenshot (full_page={full_page}, save={self._save_screenshots}, timeout={timeout}ms)')

# Prepare screenshot options with Playwright best practices
screenshot_options = {
'full_page': full_page,
'timeout': timeout,
'animations': 'disabled', # Skip waiting for CSS animations/transitions (Playwright 1.25+)
'caret': 'hide', # Hide text input cursor for cleaner screenshots
}

# Only save to disk if _save_screenshots is True and file_path is provided
if self._save_screenshots and file_path:
screenshot_options['path'] = file_path
logging.debug(f'Screenshot will be saved to: {file_path}')
elif not self._save_screenshots:
logging.debug('Screenshot saving disabled, returning bytes only')

# Capture screenshot with optimized options
screenshot: bytes = await page.screenshot(**screenshot_options)

logging.debug(f'Screenshot captured successfully ({len(screenshot)} bytes)')
return screenshot

except Exception as e:
Expand Down
6 changes: 4 additions & 2 deletions webqa_agent/actions/click_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def handle_new_page(page_obj):
new_page_action_handler = ActionHandler()
new_page_action_handler.page = new_page
screenshot_b64 = await new_page_action_handler.b64_page_screenshot(
file_name=f"element_{element_index}_new_page"
file_name=f"element_{element_index}_new_page",
context="test"
)
click_result["new_page_screenshot"] = screenshot_b64
logging.debug("New page screenshot saved")
Expand All @@ -135,7 +136,8 @@ def handle_new_page(page_obj):
await page.wait_for_load_state("networkidle", timeout=30000)
else:
screenshot_b64 = await action_handler.b64_page_screenshot(
file_name=f"element_{element_index}_after_click"
file_name=f"element_{element_index}_after_click",
context="test"
)
click_result["screenshot_after"] = screenshot_b64
logging.debug("After click screenshot saved")
Expand Down
Loading