Files
ragflow/test/playwright/conftest.py
Idriss Sbaaoui 2f4ca38adf Fix : make playwright tests idempotent (#13332)
### What problem does this PR solve?

Playwright tests previously depended on cross-file execution order
(`auth -> provider -> dataset -> chat`).
This change makes setup explicit and idempotent via fixtures so tests
can run independently.

- Added/standardized prerequisite fixtures in
`test/playwright/conftest.py`:
- `ensure_auth_context`, `ensure_model_provider_configured`,
`ensure_dataset_ready`, `ensure_chat_ready`
- Made provisioning reusable/idempotent with `RUN_ID`-based resource
naming.
- Synced auth envs (`E2E_ADMIN_EMAIL`, `E2E_ADMIN_PASSWORD`) into seeded
creds.
- Fixed provider cache freshness (`auth_header`/`page` refresh on cache
hit).

Also included minimal stability fixes:
- dataset create stale-element click handling,
- search wait logic for results/empty-state,
- agent create-menu handling,
- agent run-step retry when run UI doesn’t open first click.

### Type of change

- [x] Test fix
- [x] Refactoring

---------

Co-authored-by: Liu An <asiro@qq.com>
2026-03-04 10:07:14 +08:00

1717 lines
60 KiB
Python

import sys
from pathlib import Path
_PW_DIR = Path(__file__).resolve().parent
if str(_PW_DIR) not in sys.path:
sys.path.insert(0, str(_PW_DIR))
import base64
import faulthandler
import json
import os
import re
import secrets
import signal
import time
from contextlib import contextmanager
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.parse import urljoin
from urllib.request import Request, urlopen
import pytest
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import expect, sync_playwright
ROOT_DIR = Path(__file__).resolve().parents[2]
PLAYWRIGHT_TEST_DIR = Path(__file__).resolve().parent
ARTIFACTS_DIR = Path(__file__).resolve().parent / "artifacts"
BASE_URL_DEFAULT = "http://127.0.0.1"
LOGIN_PATH_DEFAULT = "/login"
DEFAULT_TIMEOUT_MS = 30000
DEFAULT_HANG_TIMEOUT_S = 1800
AUTH_READY_TIMEOUT_MS_DEFAULT = 15000
REG_EMAIL_BASE_DEFAULT = "qa@infiniflow.org"
REG_NICKNAME_DEFAULT = "qa"
REG_PASSWORD_DEFAULT = "123"
REG_EMAIL_LOCAL_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
REG_EMAIL_BACKEND_RE = re.compile(r"^[\w\._-]{1,}@([\w_-]+\.)+[\w-]{2,}$")
AUTH_FORM_SELECTOR = "form[data-testid='auth-form']"
AUTH_ACTIVE_FORM_SELECTOR = "form[data-testid='auth-form'][data-active='true']"
AUTH_EMAIL_INPUT_SELECTOR = (
"input[data-testid='auth-email'], [data-testid='auth-email'] input"
)
AUTH_PASSWORD_INPUT_SELECTOR = (
"input[data-testid='auth-password'], [data-testid='auth-password'] input"
)
AUTH_SUBMIT_SELECTOR = (
"button[data-testid='auth-submit'], [data-testid='auth-submit'] button, [data-testid='auth-submit']"
)
_PUBLIC_KEY_CACHE = None
_RSA_CIPHER_CACHE = None
_HANG_WATCHDOG_INSTALLED = False
_PROVIDER_READY_CACHE: dict[str, dict] = {}
_DATASET_READY_CACHE: dict[str, dict] = {}
class _RegisterDisabled(RuntimeError):
pass
def _env_bool(name: str, default: bool = False) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def _env_int(name: str, default: int) -> int:
value = os.getenv(name)
if not value:
return default
try:
return int(value)
except ValueError:
return default
def _env_int_with_fallback(primary: str, fallback: str | None, default: int) -> int:
value = os.getenv(primary)
if not value and fallback:
value = os.getenv(fallback)
if not value:
return default
try:
return int(value)
except ValueError:
return default
def _sync_seeded_credentials_from_admin_env() -> None:
admin_email = os.getenv("E2E_ADMIN_EMAIL")
admin_password = os.getenv("E2E_ADMIN_PASSWORD")
if admin_email and not os.getenv("SEEDED_USER_EMAIL"):
os.environ["SEEDED_USER_EMAIL"] = admin_email
if admin_password and not os.getenv("SEEDED_USER_PASSWORD"):
os.environ["SEEDED_USER_PASSWORD"] = admin_password
def _sanitize_timeout_ms(value: int | None, fallback: int | None) -> int | None:
if value is None or value <= 0:
return fallback
return value
def _playwright_action_timeout_ms() -> int | None:
raw = _env_int_with_fallback(
"PLAYWRIGHT_ACTION_TIMEOUT_MS", "PW_TIMEOUT_MS", DEFAULT_TIMEOUT_MS
)
return _sanitize_timeout_ms(raw, DEFAULT_TIMEOUT_MS)
def _playwright_auth_ready_timeout_ms() -> int | None:
raw = _env_int_with_fallback(
"PLAYWRIGHT_AUTH_READY_TIMEOUT_MS",
"AUTH_READY_TIMEOUT_MS",
AUTH_READY_TIMEOUT_MS_DEFAULT,
)
return _sanitize_timeout_ms(raw, AUTH_READY_TIMEOUT_MS_DEFAULT)
def _playwright_hang_timeout_s() -> int:
raw = _env_int_with_fallback(
"PLAYWRIGHT_HANG_TIMEOUT_S", "HANG_TIMEOUT_S", DEFAULT_HANG_TIMEOUT_S
)
return raw if raw > 0 else 0
def _failure_text(req) -> str:
failure = getattr(req, "failure", None)
if callable(failure):
try:
failure = failure()
except Exception:
return "unknown"
if failure is None:
return "unknown"
if isinstance(failure, str):
return failure or "unknown"
try:
error_text = getattr(failure, "error_text", None)
if error_text:
return str(error_text)
except Exception:
pass
try:
if isinstance(failure, dict):
for key in ("errorText", "error_text"):
value = failure.get(key)
if value:
return str(value)
except Exception:
pass
try:
getter = getattr(failure, "get", None)
if callable(getter):
for key in ("errorText", "error_text"):
value = getter(key)
if value:
return str(value)
except Exception:
pass
try:
return str(failure)
except Exception:
return "unknown"
def _build_url(base_url: str, path: str) -> str:
if not base_url:
return path
base = base_url.rstrip("/") + "/"
return urljoin(base, path.lstrip("/"))
def _sanitize_filename(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", value).strip("_")
def _request_test_file(request) -> Path | None:
node = getattr(request, "node", None)
if node is None:
return None
node_path = getattr(node, "path", None)
if node_path is not None:
return Path(str(node_path))
fspath = getattr(node, "fspath", None)
if fspath is not None:
return Path(str(fspath))
nodeid = getattr(node, "nodeid", "")
if nodeid:
return Path(nodeid.split("::", 1)[0])
return None
def _request_artifacts_dir(request) -> Path:
test_file = _request_test_file(request)
if test_file is None:
base_dir = ARTIFACTS_DIR / "unknown"
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
try:
rel_path = test_file.resolve().relative_to(PLAYWRIGHT_TEST_DIR.resolve())
base_dir = ARTIFACTS_DIR / rel_path.with_suffix("")
except Exception:
file_stem = _sanitize_filename(test_file.stem or str(test_file))
base_dir = ARTIFACTS_DIR / (file_stem or "unknown")
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
def _request_artifact_prefix(request) -> str:
node = getattr(request, "node", None)
node_name = getattr(node, "name", "") if node is not None else ""
safe_name = _sanitize_filename(node_name)
if safe_name:
return safe_name
nodeid = getattr(node, "nodeid", "") if node is not None else ""
fallback = _sanitize_filename(nodeid)
return fallback or "node"
def _split_email_base(value: str) -> tuple[str, str]:
if value.count("@") != 1:
raise ValueError("REG_EMAIL_BASE must be a single email address")
local, domain = value.split("@", 1)
if not local or not domain:
raise ValueError("REG_EMAIL_BASE must include local part and domain")
return local, domain
def _unique_email(base: str, suffix: str) -> str:
local, domain = _split_email_base(base)
if "+" in local:
local = local.split("+", 1)[0]
return f"{local}_{suffix}@{domain}"
def _assert_reg_email(email: str) -> None:
if "+" in email:
raise AssertionError(f"Registration email contains '+': {email}")
try:
local, _ = _split_email_base(email)
except ValueError as exc:
raise AssertionError(f"Registration email is invalid: {email}") from exc
if not REG_EMAIL_LOCAL_RE.match(local):
raise AssertionError(f"Registration email local part invalid: {email}")
if not REG_EMAIL_BACKEND_RE.match(email):
raise AssertionError(f"Registration email fails backend regex: {email}")
def _api_post_json(url: str, payload: dict, timeout_s: int = 10) -> tuple[int, dict | None]:
data = json.dumps(payload).encode("utf-8")
req = Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urlopen(req, timeout=timeout_s) as resp:
body = resp.read()
if body:
try:
return resp.status, json.loads(body.decode("utf-8"))
except Exception:
return resp.status, None
return resp.status, None
except HTTPError as exc:
body = exc.read()
parsed = None
if body:
try:
parsed = json.loads(body.decode("utf-8"))
except Exception:
parsed = None
raise RuntimeError(f"HTTPError {exc.code}: {parsed or body!r}") from exc
except URLError as exc:
raise RuntimeError(f"URLError: {exc}") from exc
def _api_request_json(
url: str,
method: str = "GET",
payload: dict | None = None,
headers: dict | None = None,
timeout_s: int = 10,
) -> tuple[int, dict | None]:
data = None
if payload is not None:
data = json.dumps(payload).encode("utf-8")
req_headers = {"Content-Type": "application/json"}
if headers:
req_headers.update(headers)
req = Request(url, data=data, headers=req_headers, method=method)
try:
with urlopen(req, timeout=timeout_s) as resp:
body = resp.read()
if body:
try:
return resp.status, json.loads(body.decode("utf-8"))
except Exception:
return resp.status, None
return resp.status, None
except HTTPError as exc:
body = exc.read()
parsed = None
if body:
try:
parsed = json.loads(body.decode("utf-8"))
except Exception:
parsed = None
raise RuntimeError(
f"{method} {url} failed with HTTPError {exc.code}: {parsed or body!r}"
) from exc
except URLError as exc:
raise RuntimeError(f"{method} {url} failed with URLError: {exc}") from exc
def _response_data(payload: dict | None) -> dict:
if not isinstance(payload, dict):
return {}
if payload.get("code") not in (0, None):
raise RuntimeError(f"API returned failure payload: {payload}")
data = payload.get("data")
return data if isinstance(data, dict) else {}
def _extract_auth_header_from_page(page) -> str:
token = page.evaluate(
"""
() => {
const auth = localStorage.getItem('Authorization');
if (auth && auth.length) return auth;
const token = localStorage.getItem('Token');
if (token && token.length) return token;
return '';
}
"""
)
if not token:
raise AssertionError(
"Missing Authorization/Token in localStorage after login. "
"Cannot provision prerequisites via API."
)
return str(token)
def _rsa_encrypt_password(password: str) -> str:
global _PUBLIC_KEY_CACHE
global _RSA_CIPHER_CACHE
try:
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
except Exception as exc:
raise RuntimeError(
"Cryptodome is required to encrypt passwords for API seeding. "
"Set RAGFLOW_SEEDING_MODE=ui to skip API seeding."
) from exc
if _PUBLIC_KEY_CACHE is None:
public_key_path = ROOT_DIR / "conf" / "public.pem"
if not public_key_path.exists():
raise RuntimeError(f"Missing RSA public key at {public_key_path}")
_PUBLIC_KEY_CACHE = public_key_path.read_text(encoding="utf-8")
if _RSA_CIPHER_CACHE is None:
rsa_key = RSA.importKey(_PUBLIC_KEY_CACHE, "Welcome")
_RSA_CIPHER_CACHE = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64 = base64.b64encode(password.encode("utf-8")).decode("utf-8")
encrypted_password = _RSA_CIPHER_CACHE.encrypt(password_base64.encode("utf-8"))
return base64.b64encode(encrypted_password).decode("utf-8")
def _is_register_disabled_message(message: str) -> bool:
lowered = (message or "").lower()
return "registration is disabled" in lowered or "register disabled" in lowered
def _api_register_user(base_url: str, email: str, password: str, nickname: str) -> None:
url = _build_url(base_url, "/v1/user/register")
encrypted_password = _rsa_encrypt_password(password)
status, payload = _api_post_json(
url,
{"email": email, "password": encrypted_password, "nickname": nickname},
timeout_s=10,
)
if status >= 400:
raise RuntimeError(f"register failed status={status}")
if isinstance(payload, dict) and payload.get("code") not in (0, None):
message = str(payload.get("message") or payload)
if _is_register_disabled_message(message):
raise _RegisterDisabled(message)
raise RuntimeError(f"register failed payload={payload}")
def _api_login_user(base_url: str, email: str, password: str) -> None:
url = _build_url(base_url, "/v1/user/login")
encrypted_password = _rsa_encrypt_password(password)
status, payload = _api_post_json(
url,
{"email": email, "password": encrypted_password},
timeout_s=10,
)
if status >= 400:
raise RuntimeError(f"login failed status={status}")
if isinstance(payload, dict) and payload.get("code") not in (0, None):
raise RuntimeError(f"login failed payload={payload}")
def _generate_seeded_email(base_email: str) -> str:
local, domain = _split_email_base(base_email)
if "+" in local:
local = local.split("+", 1)[0]
suffix = f"{int(time.time() * 1000)}_{secrets.token_hex(3)}"
return f"{local}_{suffix}@{domain}"
def _auth_form_locator(card, require_nickname: bool = False):
form = card.locator("form[data-testid='auth-form']")
form = form.filter(has=card.locator("[data-testid='auth-email']"))
form = form.filter(has=card.locator("[data-testid='auth-submit']"))
if require_nickname:
form = form.filter(has=card.locator("[data-testid='auth-nickname']"))
return form
def _describe_auth_ui(page, card, register_toggle) -> str:
lines = []
if card is None:
lines.append("auth_card_count=unavailable")
else:
try:
lines.append(f"auth_card_count={card.count()}")
except Exception as exc:
lines.append(f"auth_card_count_error={exc}")
if register_toggle is None:
lines.append("register_toggle_count=unavailable")
else:
try:
toggle_count = register_toggle.count()
toggle_visible = False
if toggle_count:
try:
toggle_visible = register_toggle.first.is_visible()
except Exception:
toggle_visible = False
lines.append(f"register_toggle_count={toggle_count}")
lines.append(f"register_toggle_visible={toggle_visible}")
except Exception as exc:
lines.append(f"register_toggle_error={exc}")
try:
summary = _auth_ready_summary(page)
lines.append(_format_auth_ready_summary(summary).strip())
except Exception as exc:
lines.append(f"auth_summary_error={exc}")
return "\n".join(line for line in lines if line)
def _wait_for_auth_success(page, card, form) -> None:
timeout_ms = _playwright_auth_ready_timeout_ms()
status_marker = page.locator("[data-testid='auth-status']")
if status_marker.count() > 0:
try:
expect(status_marker).to_have_attribute(
"data-state", "success", timeout=timeout_ms
)
return
except AssertionError:
pass
try:
page.wait_for_function(
"() => Boolean(localStorage.getItem('token') || localStorage.getItem('Authorization'))",
timeout=timeout_ms,
)
return
except PlaywrightTimeoutError:
pass
try:
expect(card.locator("[data-testid='auth-nickname']")).to_have_count(
0, timeout=timeout_ms
)
except AssertionError as exc:
raise RuntimeError(
"Auth success marker not detected after registration."
) from exc
def _ui_register_user(
browser,
login_url: str,
email: str,
password: str,
nickname: str,
) -> None:
context_instance = browser.new_context(ignore_https_errors=True)
page = _configure_page(context_instance.new_page())
card = None
register_toggle = None
try:
page.goto(login_url, wait_until="domcontentloaded")
timeout_ms = _playwright_auth_ready_timeout_ms()
card = page.locator("[data-testid='auth-card-active']")
expect(card).to_have_count(1, timeout=timeout_ms)
register_toggle = card.locator("[data-testid='auth-toggle-register']")
if register_toggle.count() == 0:
raise _RegisterDisabled("Register toggle not found; registration disabled?")
register_toggle.first.click()
register_form = _auth_form_locator(card, require_nickname=True)
expect(register_form).to_have_count(1, timeout=timeout_ms)
nickname_input = register_form.locator("[data-testid='auth-nickname']")
email_input = register_form.locator("[data-testid='auth-email']")
password_input = register_form.locator("[data-testid='auth-password']")
expect(nickname_input).to_have_count(1, timeout=timeout_ms)
expect(email_input).to_have_count(1, timeout=timeout_ms)
expect(password_input).to_have_count(1, timeout=timeout_ms)
nickname_input.fill(nickname)
email_input.fill(email)
password_input.fill(password)
password_input.blur()
submit_button = register_form.locator(AUTH_SUBMIT_SELECTOR)
expect(submit_button).to_have_count(1, timeout=timeout_ms)
submit_button.click()
_wait_for_auth_success(page, card, register_form)
except _RegisterDisabled:
raise
except Exception as _:
diagnostics = _describe_auth_ui(page, card, register_toggle)
if diagnostics:
print(f"[seeded-ui-register] diagnostics:\n{diagnostics}", flush=True)
raise
finally:
try:
page.close()
finally:
context_instance.close()
def _make_reg_email(base: str, unique: bool) -> str:
if not unique:
email = base
else:
suffix = f"{int(time.time() * 1000)}_{os.getpid()}_{secrets.randbelow(1000000)}"
email = _unique_email(base, suffix)
_assert_reg_email(email)
return email
@contextmanager
def _step(label: str, enabled: bool) -> None:
start = time.perf_counter()
if enabled:
print(f"[STEP] {label}", flush=True)
try:
yield
finally:
if enabled:
elapsed = time.perf_counter() - start
print(f"[STEP] done in {elapsed:.2f}s: {label}", flush=True)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
report = outcome.get_result()
setattr(item, f"_rep_{report.when}", report)
def pytest_sessionstart(session):
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
faulthandler.enable()
global _HANG_WATCHDOG_INSTALLED
hang_timeout = _playwright_hang_timeout_s()
if hang_timeout > 0:
if not _HANG_WATCHDOG_INSTALLED:
faulthandler.dump_traceback_later(hang_timeout, repeat=True)
_HANG_WATCHDOG_INSTALLED = True
print(
"Playwright hang watchdog enabled: dumps after "
f"{hang_timeout}s (set PLAYWRIGHT_HANG_TIMEOUT_S=0 to disable)",
flush=True,
)
else:
print(
"Playwright hang watchdog disabled (PLAYWRIGHT_HANG_TIMEOUT_S=0)",
flush=True,
)
try:
faulthandler.register(signal.SIGUSR1, all_threads=True)
except (AttributeError, ValueError):
pass
def pytest_sessionfinish(session, exitstatus):
try:
faulthandler.cancel_dump_traceback_later()
except Exception:
pass
def pytest_collection_modifyitems(session, config, items):
ordered_paths = [
"test/playwright/auth/test_smoke_auth_page.py",
"test/playwright/auth/test_toggle_login_register.py",
"test/playwright/auth/test_validation_presence.py",
"test/playwright/auth/test_sso_optional.py",
"test/playwright/auth/test_register_success_optional.py",
"test/playwright/auth/test_login_success_optional.py",
"test/playwright/e2e/test_model_providers_zhipu_ai_defaults.py",
"test/playwright/e2e/test_dataset_upload_parse.py",
"test/playwright/e2e/test_next_apps_chat.py",
"test/playwright/e2e/test_next_apps_search.py",
"test/playwright/e2e/test_next_apps_agent.py",
]
order_map = {path: idx for idx, path in enumerate(ordered_paths)}
def _rel_path(item) -> str:
try:
return Path(str(item.fspath)).resolve().relative_to(ROOT_DIR).as_posix()
except Exception:
return str(item.fspath)
indexed = list(enumerate(items))
def _sort_key(entry):
orig_idx, item = entry
rel_path = _rel_path(item)
order_idx = order_map.get(rel_path)
if order_idx is not None:
return (0, order_idx, orig_idx)
return (1, rel_path, item.name, orig_idx)
items[:] = [item for _, item in sorted(indexed, key=_sort_key)]
@pytest.fixture(scope="session")
def base_url() -> str:
value = os.getenv("RAGFLOW_BASE_URL") or os.getenv("BASE_URL")
if not value:
value = BASE_URL_DEFAULT
return value.rstrip("/")
@pytest.fixture(scope="session")
def login_path() -> str:
value = os.getenv("LOGIN_PATH")
if not value:
value = LOGIN_PATH_DEFAULT
if not value.startswith("/"):
value = "/" + value
return value
@pytest.fixture(scope="session")
def login_url(base_url: str, login_path: str) -> str:
return _build_url(base_url, login_path)
@pytest.fixture(scope="session")
def smoke_login_url(login_url: str) -> str:
return login_url
@pytest.fixture(scope="session")
def browser():
browser_name = os.getenv("PW_BROWSER", "chromium")
headless = _env_bool("PW_HEADLESS", True)
slow_mo = _env_int("PW_SLOWMO_MS", 0)
with sync_playwright() as playwright:
if not hasattr(playwright, browser_name):
raise ValueError(f"Unsupported browser: {browser_name}")
browser_type = getattr(playwright, browser_name)
browser_instance = browser_type.launch(headless=headless, slow_mo=slow_mo)
try:
yield browser_instance
finally:
browser_instance.close()
@pytest.fixture
def context(browser):
context_instance = browser.new_context(ignore_https_errors=True)
trace_enabled = _env_bool("PW_TRACE", False)
if trace_enabled:
context_instance.tracing.start(screenshots=True, snapshots=True, sources=True)
context_instance._trace_enabled = True
context_instance._trace_saved = False
try:
yield context_instance
finally:
if getattr(context_instance, "_trace_enabled", False) and not getattr(
context_instance, "_trace_saved", False
):
try:
context_instance.tracing.stop()
except Exception:
pass
context_instance.close()
def _configure_page(page_instance):
timeout_ms = _playwright_action_timeout_ms()
if timeout_ms is not None:
page_instance.set_default_timeout(timeout_ms)
page_instance.set_default_navigation_timeout(timeout_ms)
page_instance._diag = {
"console_errors": [],
"page_errors": [],
"request_failed": [],
}
net_log = _env_bool("PW_NET_LOG", False)
def on_console(msg):
if msg.type != "error":
return
entry = f"console[{msg.type}]: {msg.text}"
page_instance._diag["console_errors"].append(entry)
if net_log:
print(entry, flush=True)
def on_page_error(err):
entry = f"pageerror: {err}"
page_instance._diag["page_errors"].append(entry)
if net_log:
print(entry, flush=True)
def on_request_failed(req):
try:
failure_text = _failure_text(req)
entry = f"requestfailed: {req.method} {req.url} -> {failure_text}"
page_instance._diag["request_failed"].append(entry)
if net_log:
print(entry, flush=True)
except Exception as exc:
if net_log:
print(f"requestfailed: <handler_error> {exc}", flush=True)
return
page_instance.on("console", on_console)
page_instance.on("pageerror", on_page_error)
page_instance.on("requestfailed", on_request_failed)
return page_instance
@pytest.fixture
def page(context, request):
page_instance = _configure_page(context.new_page())
try:
yield page_instance
finally:
_write_artifacts_if_failed(page_instance, context, request)
page_instance.close()
@pytest.fixture(scope="module")
def flow_context(browser, request):
try:
browser_context_args = request.getfixturevalue("browser_context_args")
except Exception:
browser_context_args = {}
if browser_context_args is None:
browser_context_args = {}
args = dict(browser_context_args)
args.setdefault("ignore_https_errors", True)
ctx = browser.new_context(**args)
yield ctx
ctx.close()
@pytest.fixture(scope="module")
def flow_page(flow_context):
page_instance = _configure_page(flow_context.new_page())
yield page_instance
page_instance.close()
@pytest.fixture(scope="module")
def flow_state():
return {}
@pytest.fixture(autouse=True)
def _flow_artifacts(request):
if "flow_page" not in request.fixturenames:
yield
return
yield
try:
page_instance = request.getfixturevalue("flow_page")
context = request.getfixturevalue("flow_context")
except Exception:
return
_write_artifacts_if_failed(page_instance, context, request)
@pytest.fixture
def step():
enabled = _env_bool("PW_STEP_LOG", False)
def _stepper(label: str):
return _step(label, enabled)
return _stepper
@pytest.fixture
def reg_email_base() -> str:
return os.getenv("REG_EMAIL_BASE", REG_EMAIL_BASE_DEFAULT)
@pytest.fixture
def reg_email_unique() -> bool:
return _env_bool("REG_EMAIL_UNIQUE", False)
@pytest.fixture
def reg_email_generator(reg_email_base: str, reg_email_unique: bool):
def _generate(force_unique: bool = False) -> str:
unique = reg_email_unique or force_unique
return _make_reg_email(reg_email_base, unique)
return _generate
@pytest.fixture
def reg_email(reg_email_generator) -> str:
return reg_email_generator()
@pytest.fixture
def reg_password() -> str:
return REG_PASSWORD_DEFAULT
@pytest.fixture(scope="session")
def seeded_user_credentials(base_url: str, login_url: str, browser) -> tuple[str, str]:
_sync_seeded_credentials_from_admin_env()
env_email = os.getenv("SEEDED_USER_EMAIL")
env_password = os.getenv("SEEDED_USER_PASSWORD")
if env_email and env_password:
return env_email, env_password
seeding_mode = os.getenv("RAGFLOW_SEEDING_MODE", "auto").strip().lower()
if seeding_mode not in {"auto", "api", "ui"}:
if _env_bool("PW_FIXTURE_DEBUG", False):
print(
f"[seeded] Unknown RAGFLOW_SEEDING_MODE={seeding_mode!r}; using auto.",
flush=True,
)
seeding_mode = "auto"
base_email = os.getenv("REG_EMAIL_BASE", REG_EMAIL_BASE_DEFAULT)
password = os.getenv("SEEDED_USER_PASSWORD") or REG_PASSWORD_DEFAULT
nickname = os.getenv("REG_NICKNAME", REG_NICKNAME_DEFAULT)
email = _generate_seeded_email(base_email)
_assert_reg_email(email)
seed_errors = []
seeded_via = None
if seeding_mode in {"auto", "api"}:
seeded_via = "api"
try:
_api_register_user(base_url, email, password, nickname)
try:
_api_login_user(base_url, email, password)
except Exception as exc:
if _env_bool("PW_FIXTURE_DEBUG", False):
print(f"[seeded] api login verification failed: {exc}", flush=True)
except _RegisterDisabled as exc:
seed_errors.append(f"api: {exc}")
seeded_via = None
except Exception as exc:
seed_errors.append(f"api: {exc}")
seeded_via = None
if seeding_mode == "api":
details = "; ".join(seed_errors)
raise RuntimeError(
f"Failed to seed user via API registration. {details}"
) from exc
if seeded_via is None and seeding_mode in {"auto", "ui"}:
seeded_via = "ui"
try:
_ui_register_user(browser, login_url, email, password, nickname)
except _RegisterDisabled as exc:
seed_errors.append(f"ui: {exc}")
default_email = os.getenv("DEFAULT_SUPERUSER_EMAIL", "admin@ragflow.io")
raise RuntimeError(
"User registration is disabled and no default account is available. "
f"Known superuser defaults ({default_email}) cannot be used with the "
"normal login endpoint. Enable registration or seed a test account."
) from exc
except Exception as ui_exc:
seed_errors.append(f"ui: {ui_exc}")
details = "; ".join(seed_errors)
raise RuntimeError(
f"Failed to seed user via API or UI registration. {details}"
) from ui_exc
os.environ["SEEDED_USER_EMAIL"] = email
os.environ["SEEDED_USER_PASSWORD"] = password
if _env_bool("PW_FIXTURE_DEBUG", False):
print(f"[seeded] created user via {seeded_via}: {email}", flush=True)
return email, password
@pytest.fixture
def reg_nickname() -> str:
return REG_NICKNAME_DEFAULT
@pytest.fixture(scope="session")
def run_id() -> str:
value = os.getenv("RUN_ID")
if not value:
value = f"{int(time.time())}_{secrets.token_hex(2)}"
safe = _sanitize_filename(value) or f"{int(time.time())}_{secrets.token_hex(2)}"
os.environ["RUN_ID"] = safe
return safe
@pytest.fixture(scope="module")
def ensure_auth_context(
flow_page,
login_url: str,
seeded_user_credentials,
):
from test.playwright.helpers.auth_waits import wait_for_login_complete
page_instance = flow_page
email, password = seeded_user_credentials
timeout_ms = _playwright_auth_ready_timeout_ms() or DEFAULT_TIMEOUT_MS
token_wait_js = """
() => {
const token = localStorage.getItem('Token');
const auth = localStorage.getItem('Authorization');
return Boolean((token && token.length) || (auth && auth.length));
}
"""
try:
if "/login" not in page_instance.url:
page_instance.wait_for_function(token_wait_js, timeout=1500)
return page_instance
except Exception:
pass
page_instance.goto(login_url, wait_until="domcontentloaded")
active_form = page_instance.locator(AUTH_ACTIVE_FORM_SELECTOR)
expect(active_form).to_have_count(1, timeout=timeout_ms)
email_input = active_form.locator(AUTH_EMAIL_INPUT_SELECTOR).first
password_input = active_form.locator(AUTH_PASSWORD_INPUT_SELECTOR).first
submit_button = active_form.locator(AUTH_SUBMIT_SELECTOR).first
expect(email_input).to_be_visible(timeout=timeout_ms)
expect(password_input).to_be_visible(timeout=timeout_ms)
email_input.fill(email)
password_input.fill(password)
password_input.blur()
try:
submit_button.click(timeout=timeout_ms)
except PlaywrightTimeoutError:
submit_button.click(force=True, timeout=timeout_ms)
wait_for_login_complete(page_instance, timeout_ms=timeout_ms)
return page_instance
def _ensure_model_provider_ready_via_api(base_url: str, auth_header: str) -> dict:
headers = {"Authorization": auth_header}
_, my_llms_payload = _api_request_json(
_build_url(base_url, "/v1/llm/my_llms"), headers=headers
)
my_llms_data = _response_data(my_llms_payload)
has_provider = bool(my_llms_data)
created_provider = False
zhipu_key = os.getenv("ZHIPU_AI_API_KEY")
if not has_provider and zhipu_key:
_, set_key_payload = _api_request_json(
_build_url(base_url, "/v1/llm/set_api_key"),
method="POST",
payload={"llm_factory": "ZHIPU-AI", "api_key": zhipu_key},
headers=headers,
)
_response_data(set_key_payload)
has_provider = True
created_provider = True
_, my_llms_payload = _api_request_json(
_build_url(base_url, "/v1/llm/my_llms"), headers=headers
)
my_llms_data = _response_data(my_llms_payload)
if not has_provider:
pytest.skip("No model provider configured and ZHIPU_AI_API_KEY is not set.")
_, tenant_payload = _api_request_json(
_build_url(base_url, "/v1/user/tenant_info"), headers=headers
)
tenant_data = _response_data(tenant_payload)
tenant_id = tenant_data.get("tenant_id")
if not tenant_id:
raise RuntimeError(f"tenant_info missing tenant_id: {tenant_data}")
if not tenant_data.get("llm_id"):
llm_id = "glm-4-flash@ZHIPU-AI" if "ZHIPU-AI" in my_llms_data else None
if not llm_id:
pytest.skip(
"Provider exists but no default llm_id could be inferred for tenant setup."
)
tenant_payload = {
"tenant_id": tenant_id,
"llm_id": llm_id,
"embd_id": tenant_data.get("embd_id") or "BAAI/bge-small-en-v1.5@Builtin",
"img2txt_id": tenant_data.get("img2txt_id") or "",
"asr_id": tenant_data.get("asr_id") or "",
"tts_id": tenant_data.get("tts_id"),
}
_, set_tenant_payload = _api_request_json(
_build_url(base_url, "/v1/user/set_tenant_info"),
method="POST",
payload=tenant_payload,
headers=headers,
)
_response_data(set_tenant_payload)
return {
"tenant_id": tenant_id,
"has_provider": True,
"created_provider": created_provider,
"llm_factories": list(my_llms_data.keys()) if isinstance(my_llms_data, dict) else [],
}
@pytest.fixture(scope="module")
def ensure_model_provider_configured(
ensure_auth_context,
base_url: str,
seeded_user_credentials,
):
page_instance = ensure_auth_context
auth_header = _extract_auth_header_from_page(page_instance)
email = seeded_user_credentials[0] if seeded_user_credentials else "unknown"
cache_key = f"{base_url}|{email}|provider"
cached = _PROVIDER_READY_CACHE.get(cache_key)
if cached:
cached["page"] = page_instance
cached["auth_header"] = auth_header
return cached
provider_info = _ensure_model_provider_ready_via_api(base_url, auth_header)
payload = {
"page": page_instance,
"auth_header": auth_header,
"email": email,
**provider_info,
}
if _env_bool("PW_FIXTURE_DEBUG", False):
print(
"[prereq] provider_ready "
f"email={email} created_provider={payload.get('created_provider', False)} "
f"llm_factories={payload.get('llm_factories', [])}",
flush=True,
)
_PROVIDER_READY_CACHE[cache_key] = payload
return payload
def _find_dataset_by_name(kbs_payload: dict | None, dataset_name: str) -> dict | None:
data = _response_data(kbs_payload)
kbs = data.get("kbs")
if not isinstance(kbs, list):
return None
for item in kbs:
if isinstance(item, dict) and item.get("name") == dataset_name:
return item
return None
def _ensure_dataset_ready_via_api(
base_url: str, auth_header: str, dataset_name: str
) -> dict:
headers = {"Authorization": auth_header}
list_url = _build_url(base_url, "/v1/kb/list?page=1&page_size=200")
_, list_payload = _api_request_json(list_url, method="POST", payload={}, headers=headers)
existing = _find_dataset_by_name(list_payload, dataset_name)
if existing:
return {
"kb_id": existing.get("id"),
"kb_name": dataset_name,
"reused": True,
}
_, create_payload = _api_request_json(
_build_url(base_url, "/v1/kb/create"),
method="POST",
payload={"name": dataset_name},
headers=headers,
)
created_data = _response_data(create_payload)
kb_id = created_data.get("id")
if kb_id:
return {"kb_id": kb_id, "kb_name": dataset_name, "reused": False}
_, list_payload_after = _api_request_json(
list_url, method="POST", payload={}, headers=headers
)
existing_after = _find_dataset_by_name(list_payload_after, dataset_name)
if not existing_after:
raise RuntimeError(
f"Dataset {dataset_name!r} not found after kb/create response={create_payload}"
)
return {
"kb_id": existing_after.get("id"),
"kb_name": dataset_name,
"reused": False,
}
@pytest.fixture(scope="module")
def ensure_dataset_ready(
ensure_model_provider_configured,
base_url: str,
run_id: str,
):
provider_state = ensure_model_provider_configured
dataset_name = f"e2e-dataset-{run_id}"
cache_key = f"{base_url}|{provider_state.get('email', 'unknown')}|{dataset_name}"
cached = _DATASET_READY_CACHE.get(cache_key)
if cached:
return cached
dataset_info = _ensure_dataset_ready_via_api(
base_url,
provider_state["auth_header"],
dataset_name,
)
payload = {
**dataset_info,
"run_id": run_id,
}
if _env_bool("PW_FIXTURE_DEBUG", False):
print(
"[prereq] dataset_ready "
f"kb_name={payload.get('kb_name')} reused={payload.get('reused')} "
f"kb_id={payload.get('kb_id')}",
flush=True,
)
_DATASET_READY_CACHE[cache_key] = payload
return payload
@pytest.fixture(scope="module")
def ensure_chat_ready(ensure_dataset_ready):
return ensure_dataset_ready
@pytest.fixture
def snap(page, request):
if "flow_page" in request.fixturenames:
page = request.getfixturevalue("flow_page")
base_dir = _request_artifacts_dir(request)
node_prefix = _request_artifact_prefix(request)
counter = {"value": 0}
def _snap(label: str):
counter["value"] += 1
safe_label = _sanitize_filename(label) or "step"
filename = f"{node_prefix}__{counter['value']:02d}_{safe_label}.png"
path = base_dir / filename
page.screenshot(path=str(path), full_page=True)
if _env_bool("PW_FIXTURE_DEBUG", False):
print(f"[artifact] snapshot: {path}", flush=True)
return path
_snap.dir = base_dir
return _snap
def _debug_dump_auth_state(page, label: str, submit_locator=None) -> None:
if not _env_bool("PW_DEBUG_DUMP", False):
return
print(f"[auth-debug] label={label}", flush=True)
form_count = page.locator("form").count()
visible_form_count = page.locator("form:visible").count()
print(
f"[auth-debug] forms total={form_count} visible={visible_form_count}",
flush=True,
)
forms_info = page.evaluate(
"""
() => {
const forms = Array.from(document.querySelectorAll('form'));
const getFace = (el) => {
let node = el;
while (node && node !== document.body) {
const style = window.getComputedStyle(node);
if (style && style.backfaceVisibility === 'hidden') {
return node;
}
node = node.parentElement;
}
return el;
};
const getFlip = (el) => {
let node = el;
while (node && node !== document.body) {
const style = window.getComputedStyle(node);
if (style && style.transformStyle === 'preserve-3d') {
return node;
}
node = node.parentElement;
}
return null;
};
const isVisible = (el) => {
const style = window.getComputedStyle(el);
if (style && (style.visibility === 'hidden' || style.display === 'none')) {
return false;
}
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
};
return forms.filter(isVisible).map((form, idx) => {
const rect = form.getBoundingClientRect();
const button = form.querySelector('button[type="submit"]');
const buttonText = button ? (button.textContent || '').trim() : '';
const face = getFace(form);
const flip = getFlip(face);
return {
index: idx,
authMode: form.getAttribute('data-auth-mode') || '',
isActive: form.getAttribute('data-active') === 'true',
rect: {
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height,
},
submitText: buttonText.slice(0, 60),
submitHasContinue: buttonText.toLowerCase().includes('continue'),
faceTransform: window.getComputedStyle(face).transform,
faceBackface: window.getComputedStyle(face).backfaceVisibility,
flipTransform: flip ? window.getComputedStyle(flip).transform : null,
flipTransformStyle: flip ? window.getComputedStyle(flip).transformStyle : null,
};
});
}
"""
)
for info in forms_info:
print(f"[auth-debug] visible_form={info}", flush=True)
if submit_locator is None or submit_locator.count() == 0:
print("[auth-debug] submit button not found", flush=True)
return
try:
bbox = submit_locator.bounding_box()
except Exception as exc:
print(f"[auth-debug] submit bounding box failed: {exc}", flush=True)
return
if not bbox:
print("[auth-debug] submit bounding box empty", flush=True)
return
center_x = bbox["x"] + bbox["width"] / 2
center_y = bbox["y"] + bbox["height"] / 2
element_html = page.evaluate(
"""
({ x, y }) => {
const el = document.elementFromPoint(x, y);
if (!el) return null;
return el.outerHTML ? el.outerHTML.slice(0, 500) : String(el);
}
""",
{"x": center_x, "y": center_y},
)
print(f"[auth-debug] elementFromPoint={element_html}", flush=True)
@pytest.fixture
def auth_debug_dump(page, request):
if "flow_page" in request.fixturenames:
page = request.getfixturevalue("flow_page")
def _dump(label: str, submit_locator=None) -> None:
_debug_dump_auth_state(page, label, submit_locator)
return _dump
def _write_artifacts_if_failed(page, context, request) -> None:
report = getattr(request.node, "_rep_call", None)
if not report or not report.failed:
return
timestamp = time.strftime("%Y%m%d-%H%M%S")
base_dir = _request_artifacts_dir(request)
safe_name = _request_artifact_prefix(request)
screenshot_path = base_dir / f"{safe_name}_{timestamp}.png"
html_path = base_dir / f"{safe_name}_{timestamp}.html"
events_path = base_dir / f"{safe_name}_{timestamp}.log"
trace_path = base_dir / f"{safe_name}_{timestamp}.zip"
try:
page.screenshot(path=str(screenshot_path), full_page=True)
except Exception as exc:
print(f"[artifact] screenshot failed: {exc}", flush=True)
try:
html_path.write_text(page.content(), encoding="utf-8")
except Exception as exc:
print(f"[artifact] html dump failed: {exc}", flush=True)
try:
lines = []
diag = getattr(page, "_diag", {})
for key in ("console_errors", "page_errors", "request_failed"):
entries = diag.get(key, [])
if entries:
lines.append(f"{key}:")
lines.extend(entries)
if lines:
events_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
except Exception as exc:
print(f"[artifact] events dump failed: {exc}", flush=True)
if getattr(context, "_trace_enabled", False) and not getattr(
context, "_trace_saved", False
):
try:
context.tracing.stop(path=str(trace_path))
context._trace_saved = True
except Exception as exc:
print(f"[artifact] trace dump failed: {exc}", flush=True)
def _auth_ready_summary(page) -> dict:
return page.evaluate(
"""
() => {
const summarizeInputs = (form) => {
const inputs = Array.from(form.querySelectorAll('input'));
return inputs.map((input) => ({
type: input.getAttribute('type') || '',
name: input.getAttribute('name') || '',
autocomplete: input.getAttribute('autocomplete') || '',
placeholder: input.getAttribute('placeholder') || '',
}));
};
const allForms = Array.from(document.querySelectorAll('form'));
const visibleForms = allForms.filter((el) => {
const style = window.getComputedStyle(el);
if (style && (style.visibility === 'hidden' || style.display === 'none')) {
return false;
}
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
});
return {
formCount: allForms.length,
visibleFormCount: visibleForms.length,
visibleFormInputs: visibleForms.map(summarizeInputs),
};
}
"""
)
def _format_auth_ready_summary(summary: dict) -> str:
lines = [
f"form_count: {summary.get('formCount')}",
f"visible_form_count: {summary.get('visibleFormCount')}",
]
visible_inputs = summary.get("visibleFormInputs") or []
for idx, inputs in enumerate(visible_inputs, start=1):
input_parts = []
for item in inputs:
parts = []
for key in ("type", "name", "autocomplete", "placeholder"):
value = item.get(key)
if value:
parts.append(f"{key}={value}")
input_parts.append("{" + ", ".join(parts) + "}")
lines.append(f"visible_form_{idx}_inputs: {input_parts}")
return "\n".join(lines) + "\n"
def _write_auth_ready_diagnostics(page, request, reason: str) -> None:
timestamp = time.strftime("%Y%m%d-%H%M%S")
base_dir = _request_artifacts_dir(request)
safe_name = _request_artifact_prefix(request)
screenshot_path = base_dir / f"{safe_name}_auth_ready_{timestamp}.png"
html_path = base_dir / f"{safe_name}_auth_ready_{timestamp}.html"
summary_path = base_dir / f"{safe_name}_auth_ready_{timestamp}.log"
try:
page.screenshot(path=str(screenshot_path), full_page=True)
except Exception as exc:
print(f"[auth_ready] screenshot failed: {exc}", flush=True)
try:
html_path.write_text(page.content(), encoding="utf-8")
except Exception as exc:
print(f"[auth_ready] html dump failed: {exc}", flush=True)
try:
summary = _auth_ready_summary(page)
summary_text = (
f"reason: {reason}\nurl: {page.url}\ntitle: {page.title()}\n"
+ _format_auth_ready_summary(summary)
)
summary_path.write_text(summary_text, encoding="utf-8")
print(summary_text, flush=True)
except Exception as exc:
print(f"[auth_ready] summary failed: {exc}", flush=True)
def _wait_for_auth_ui_ready(page, request) -> None:
timeout_ms = _playwright_auth_ready_timeout_ms()
email_selector = AUTH_EMAIL_INPUT_SELECTOR
password_selector = AUTH_PASSWORD_INPUT_SELECTOR
submit_selector = AUTH_SUBMIT_SELECTOR
active_forms = page.locator(AUTH_ACTIVE_FORM_SELECTOR)
try:
expect(active_forms).to_have_count(1, timeout=timeout_ms)
except AssertionError as exc:
_write_auth_ready_diagnostics(page, request, "auth active form not unique")
raise AssertionError(
"Auth UI not ready within "
f"{timeout_ms}ms. Expected a single active auth form."
) from exc
ready_forms = active_forms.filter(
has=page.locator(password_selector)
).filter(has=page.locator(email_selector)).filter(
has=page.locator(submit_selector)
)
try:
expect(ready_forms).not_to_have_count(0, timeout=timeout_ms)
except AssertionError as exc:
_write_auth_ready_diagnostics(page, request, "auth UI readiness timeout")
raise AssertionError(
"Auth UI not ready within "
f"{timeout_ms}ms. Expected a visible form with email-like and password inputs."
) from exc
def _wait_for_active_form_clickable(page, request, form) -> None:
timeout_ms = _playwright_auth_ready_timeout_ms()
active_forms = page.locator(AUTH_ACTIVE_FORM_SELECTOR)
submit_buttons = form.locator(AUTH_SUBMIT_SELECTOR)
try:
expect(active_forms).to_have_count(1, timeout=timeout_ms)
expect(submit_buttons).to_have_count(1, timeout=timeout_ms)
expect(submit_buttons).to_be_visible()
expect(submit_buttons).to_be_enabled()
status = page.locator("[data-testid='auth-status']")
if status.count() > 0:
expect(status).not_to_have_attribute("data-state", "loading")
except AssertionError as exc:
try:
total_forms = page.locator(AUTH_FORM_SELECTOR).count()
active_form_count = active_forms.count()
forms_info = []
for idx in range(min(total_forms, 5)):
form_node = page.locator(AUTH_FORM_SELECTOR).nth(idx)
try:
info = form_node.evaluate(
"""
(el) => {
const submit = el.querySelector("button[type='submit'], [data-testid='auth-submit']");
const isVisible = (node) => {
const style = window.getComputedStyle(node);
if (style && (style.visibility === 'hidden' || style.display === 'none')) {
return false;
}
const rect = node.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
};
return {
authMode: el.getAttribute('data-auth-mode') || '',
active: el.getAttribute('data-active') || '',
submit: submit
? {
tag: submit.tagName,
type: submit.getAttribute('type'),
text: (submit.innerText || '').trim(),
testid: submit.getAttribute('data-testid'),
visible: isVisible(submit),
enabled: !submit.disabled,
}
: null,
};
}
"""
)
except Exception as inner_exc:
info = {"error": str(inner_exc)}
forms_info.append(info)
print(
f"[auth-debug] forms total={total_forms} active_forms={active_form_count} details={forms_info}",
flush=True,
)
except Exception:
pass
_write_auth_ready_diagnostics(
page, request, "active auth form submit not clickable"
)
_debug_dump_auth_state(page, "active_form_not_clickable", submit_buttons)
raise AssertionError(
"Active auth form submit button not clickable within "
f"{timeout_ms}ms. The flip animation may still be in progress."
) from exc
def _locator_is_topmost(locator) -> bool:
try:
return bool(
locator.evaluate(
"""
(el) => {
const rect = el.getBoundingClientRect();
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const top = document.elementFromPoint(x, y);
return top && (top === el || el.contains(top));
}
"""
)
)
except Exception:
return False
@pytest.fixture
def auth_click():
def _click(locator, label: str = "click") -> None:
timeout_ms = _playwright_auth_ready_timeout_ms()
attempts = 3
for idx in range(attempts):
try:
locator.click(timeout=timeout_ms)
return
except PlaywrightTimeoutError as exc:
message = str(exc).lower()
can_force = (
"intercepts pointer events" in message
or "element was detached" in message
or "element is not stable" in message
)
if not can_force:
raise
if "intercepts pointer events" in message and not _locator_is_topmost(
locator
):
if idx >= attempts - 1:
raise
time.sleep(0.15)
continue
try:
if _env_bool("PW_FIXTURE_DEBUG", False):
print(f"[auth-click] forcing {label} attempt={idx + 1}", flush=True)
locator.click(force=True, timeout=timeout_ms)
return
except PlaywrightTimeoutError:
if idx >= attempts - 1:
raise
time.sleep(0.15)
return _click
@pytest.fixture
def active_auth_context(page, request):
if "flow_page" in request.fixturenames:
page = request.getfixturevalue("flow_page")
def _mark_active_form() -> None:
timeout_ms = _playwright_auth_ready_timeout_ms()
try:
page.wait_for_function(
"""
() => {
const forms = Array.from(document.querySelectorAll("form[data-testid='auth-form']"))
.filter((el) => el.querySelector("[data-testid='auth-email']"));
const getFace = (el) => {
let node = el;
while (node && node !== document.body) {
const style = window.getComputedStyle(node);
if (style && style.backfaceVisibility === 'hidden') {
return node;
}
node = node.parentElement;
}
return el;
};
const getFlip = (el) => {
let node = el;
while (node && node !== document.body) {
const style = window.getComputedStyle(node);
if (style && style.transformStyle === 'preserve-3d') {
return node;
}
node = node.parentElement;
}
return null;
};
const parseSign = (transform) => {
if (!transform || transform === 'none') return 1;
const match3d = transform.match(/^matrix3d\\((.+)\\)$/);
if (match3d) {
const parts = match3d[1].split(',').map((v) => parseFloat(v.trim()));
return Number.isFinite(parts[0]) ? Math.sign(parts[0]) : 0;
}
const match2d = transform.match(/^matrix\\((.+)\\)$/);
if (match2d) {
const parts = match2d[1].split(',').map((v) => parseFloat(v.trim()));
return Number.isFinite(parts[0]) ? Math.sign(parts[0]) : 0;
}
return 0;
};
const computeFacing = (el) => {
const face = getFace(el);
const faceTransform = window.getComputedStyle(face).transform;
const faceSign = parseSign(faceTransform);
const flip = getFlip(face);
const flipTransform = flip
? window.getComputedStyle(flip).transform
: 'none';
const flipSign = parseSign(flipTransform);
return faceSign * flipSign;
};
if (forms.length > 0) {
const firstFace = getFace(forms[0]);
const flip = getFlip(firstFace);
if (flip) {
const flipTransform = window.getComputedStyle(flip).transform;
const now = performance.now();
const state = window.__qa_flip_state || { transform: null, time: 0 };
if (state.transform !== flipTransform) {
window.__qa_flip_state = { transform: flipTransform, time: now };
return false;
}
if (now - state.time < 150) {
return false;
}
}
}
const candidates = forms
.map((el) => {
const rect = el.getBoundingClientRect();
if (!rect.width || !rect.height) return null;
return { el, facing: computeFacing(el) };
})
.filter(Boolean);
candidates.sort((a, b) => b.facing - a.facing);
let pick = null;
if (candidates.length === 1) {
pick = candidates[0];
} else if (candidates.length > 1 && candidates[0].facing !== candidates[1].facing) {
pick = candidates[0];
}
if (!pick) {
const fallback = forms.find((el) => {
const rect = el.getBoundingClientRect();
if (!rect.width || !rect.height) return false;
const x = rect.left + rect.width / 2;
const y = rect.top + Math.min(rect.height / 2, 10);
const top = document.elementFromPoint(x, y);
return top && el.contains(top);
});
if (fallback) {
pick = { el: fallback, facing: computeFacing(fallback) };
}
}
forms.forEach((el) => el.removeAttribute('data-qa-active'));
if (!pick || !pick.el) return false;
pick.el.setAttribute('data-qa-active', 'true');
const submit = pick.el.querySelector("[data-testid='auth-submit']");
return Boolean(submit) && pick.facing > 0;
}
""",
timeout=timeout_ms,
)
except Exception as exc:
_write_auth_ready_diagnostics(
page, request, "active auth form did not become front-facing"
)
_debug_dump_auth_state(page, "active_form_not_front_facing")
raise AssertionError(
"Active auth form not ready within "
f"{timeout_ms}ms. The flip animation may not have settled."
) from exc
def _get():
_wait_for_auth_ui_ready(page, request)
card = page.locator("[data-testid='auth-card-active']")
form = page.locator(AUTH_ACTIVE_FORM_SELECTOR)
timeout_ms = _playwright_auth_ready_timeout_ms()
try:
expect(form).to_have_count(1, timeout=timeout_ms)
except AssertionError as exc:
_write_auth_ready_diagnostics(
page, request, "active auth form selection failed"
)
raise AssertionError(
"Active auth form not found. The login card may not be visible or the DOM changed."
) from exc
_wait_for_active_form_clickable(page, request, form)
return form, card
return _get