Fix : make playwright tests idempotent (#13332)

### What problem does this PR solve?

Playwright tests previously depended on cross-file execution order
(`auth -> provider -> dataset -> chat`).
This change makes setup explicit and idempotent via fixtures so tests
can run independently.

- Added/standardized prerequisite fixtures in
`test/playwright/conftest.py`:
- `ensure_auth_context`, `ensure_model_provider_configured`,
`ensure_dataset_ready`, `ensure_chat_ready`
- Made provisioning reusable/idempotent with `RUN_ID`-based resource
naming.
- Synced auth envs (`E2E_ADMIN_EMAIL`, `E2E_ADMIN_PASSWORD`) into seeded
creds.
- Fixed provider cache freshness (`auth_header`/`page` refresh on cache
hit).

Also included minimal stability fixes:
- dataset create stale-element click handling,
- search wait logic for results/empty-state,
- agent create-menu handling,
- agent run-step retry when run UI doesn’t open first click.

### Type of change

- [x] Test fix
- [x] Refactoring

---------

Co-authored-by: Liu An <asiro@qq.com>
This commit is contained in:
Idriss Sbaaoui
2026-03-04 10:07:14 +08:00
committed by GitHub
parent 1c87f97dde
commit 2f4ca38adf
10 changed files with 500 additions and 214 deletions

View File

@ -50,6 +50,8 @@ AUTH_SUBMIT_SELECTOR = (
_PUBLIC_KEY_CACHE = None
_RSA_CIPHER_CACHE = None
_HANG_WATCHDOG_INSTALLED = False
_PROVIDER_READY_CACHE: dict[str, dict] = {}
_DATASET_READY_CACHE: dict[str, dict] = {}
class _RegisterDisabled(RuntimeError):
@ -85,6 +87,15 @@ def _env_int_with_fallback(primary: str, fallback: str | None, default: int) ->
return default
def _sync_seeded_credentials_from_admin_env() -> None:
admin_email = os.getenv("E2E_ADMIN_EMAIL")
admin_password = os.getenv("E2E_ADMIN_PASSWORD")
if admin_email and not os.getenv("SEEDED_USER_EMAIL"):
os.environ["SEEDED_USER_EMAIL"] = admin_email
if admin_password and not os.getenv("SEEDED_USER_PASSWORD"):
os.environ["SEEDED_USER_PASSWORD"] = admin_password
def _sanitize_timeout_ms(value: int | None, fallback: int | None) -> int | None:
if value is None or value <= 0:
return fallback
@ -274,6 +285,73 @@ def _api_post_json(url: str, payload: dict, timeout_s: int = 10) -> tuple[int, d
raise RuntimeError(f"URLError: {exc}") from exc
def _api_request_json(
url: str,
method: str = "GET",
payload: dict | None = None,
headers: dict | None = None,
timeout_s: int = 10,
) -> tuple[int, dict | None]:
data = None
if payload is not None:
data = json.dumps(payload).encode("utf-8")
req_headers = {"Content-Type": "application/json"}
if headers:
req_headers.update(headers)
req = Request(url, data=data, headers=req_headers, method=method)
try:
with urlopen(req, timeout=timeout_s) as resp:
body = resp.read()
if body:
try:
return resp.status, json.loads(body.decode("utf-8"))
except Exception:
return resp.status, None
return resp.status, None
except HTTPError as exc:
body = exc.read()
parsed = None
if body:
try:
parsed = json.loads(body.decode("utf-8"))
except Exception:
parsed = None
raise RuntimeError(
f"{method} {url} failed with HTTPError {exc.code}: {parsed or body!r}"
) from exc
except URLError as exc:
raise RuntimeError(f"{method} {url} failed with URLError: {exc}") from exc
def _response_data(payload: dict | None) -> dict:
if not isinstance(payload, dict):
return {}
if payload.get("code") not in (0, None):
raise RuntimeError(f"API returned failure payload: {payload}")
data = payload.get("data")
return data if isinstance(data, dict) else {}
def _extract_auth_header_from_page(page) -> str:
token = page.evaluate(
"""
() => {
const auth = localStorage.getItem('Authorization');
if (auth && auth.length) return auth;
const token = localStorage.getItem('Token');
if (token && token.length) return token;
return '';
}
"""
)
if not token:
raise AssertionError(
"Missing Authorization/Token in localStorage after login. "
"Cannot provision prerequisites via API."
)
return str(token)
def _rsa_encrypt_password(password: str) -> str:
global _PUBLIC_KEY_CACHE
global _RSA_CIPHER_CACHE
@ -762,6 +840,7 @@ def reg_password() -> str:
@pytest.fixture(scope="session")
def seeded_user_credentials(base_url: str, login_url: str, browser) -> tuple[str, str]:
_sync_seeded_credentials_from_admin_env()
env_email = os.getenv("SEEDED_USER_EMAIL")
env_password = os.getenv("SEEDED_USER_PASSWORD")
if env_email and env_password:
@ -836,6 +915,253 @@ def reg_nickname() -> str:
return REG_NICKNAME_DEFAULT
@pytest.fixture(scope="session")
def run_id() -> str:
value = os.getenv("RUN_ID")
if not value:
value = f"{int(time.time())}_{secrets.token_hex(2)}"
safe = _sanitize_filename(value) or f"{int(time.time())}_{secrets.token_hex(2)}"
os.environ["RUN_ID"] = safe
return safe
@pytest.fixture(scope="module")
def ensure_auth_context(
flow_page,
login_url: str,
seeded_user_credentials,
):
from test.playwright.helpers.auth_waits import wait_for_login_complete
page_instance = flow_page
email, password = seeded_user_credentials
timeout_ms = _playwright_auth_ready_timeout_ms() or DEFAULT_TIMEOUT_MS
token_wait_js = """
() => {
const token = localStorage.getItem('Token');
const auth = localStorage.getItem('Authorization');
return Boolean((token && token.length) || (auth && auth.length));
}
"""
try:
if "/login" not in page_instance.url:
page_instance.wait_for_function(token_wait_js, timeout=1500)
return page_instance
except Exception:
pass
page_instance.goto(login_url, wait_until="domcontentloaded")
active_form = page_instance.locator(AUTH_ACTIVE_FORM_SELECTOR)
expect(active_form).to_have_count(1, timeout=timeout_ms)
email_input = active_form.locator(AUTH_EMAIL_INPUT_SELECTOR).first
password_input = active_form.locator(AUTH_PASSWORD_INPUT_SELECTOR).first
submit_button = active_form.locator(AUTH_SUBMIT_SELECTOR).first
expect(email_input).to_be_visible(timeout=timeout_ms)
expect(password_input).to_be_visible(timeout=timeout_ms)
email_input.fill(email)
password_input.fill(password)
password_input.blur()
try:
submit_button.click(timeout=timeout_ms)
except PlaywrightTimeoutError:
submit_button.click(force=True, timeout=timeout_ms)
wait_for_login_complete(page_instance, timeout_ms=timeout_ms)
return page_instance
def _ensure_model_provider_ready_via_api(base_url: str, auth_header: str) -> dict:
headers = {"Authorization": auth_header}
_, my_llms_payload = _api_request_json(
_build_url(base_url, "/v1/llm/my_llms"), headers=headers
)
my_llms_data = _response_data(my_llms_payload)
has_provider = bool(my_llms_data)
created_provider = False
zhipu_key = os.getenv("ZHIPU_AI_API_KEY")
if not has_provider and zhipu_key:
_, set_key_payload = _api_request_json(
_build_url(base_url, "/v1/llm/set_api_key"),
method="POST",
payload={"llm_factory": "ZHIPU-AI", "api_key": zhipu_key},
headers=headers,
)
_response_data(set_key_payload)
has_provider = True
created_provider = True
_, my_llms_payload = _api_request_json(
_build_url(base_url, "/v1/llm/my_llms"), headers=headers
)
my_llms_data = _response_data(my_llms_payload)
if not has_provider:
pytest.skip("No model provider configured and ZHIPU_AI_API_KEY is not set.")
_, tenant_payload = _api_request_json(
_build_url(base_url, "/v1/user/tenant_info"), headers=headers
)
tenant_data = _response_data(tenant_payload)
tenant_id = tenant_data.get("tenant_id")
if not tenant_id:
raise RuntimeError(f"tenant_info missing tenant_id: {tenant_data}")
if not tenant_data.get("llm_id"):
llm_id = "glm-4-flash@ZHIPU-AI" if "ZHIPU-AI" in my_llms_data else None
if not llm_id:
pytest.skip(
"Provider exists but no default llm_id could be inferred for tenant setup."
)
tenant_payload = {
"tenant_id": tenant_id,
"llm_id": llm_id,
"embd_id": tenant_data.get("embd_id") or "BAAI/bge-small-en-v1.5@Builtin",
"img2txt_id": tenant_data.get("img2txt_id") or "",
"asr_id": tenant_data.get("asr_id") or "",
"tts_id": tenant_data.get("tts_id"),
}
_, set_tenant_payload = _api_request_json(
_build_url(base_url, "/v1/user/set_tenant_info"),
method="POST",
payload=tenant_payload,
headers=headers,
)
_response_data(set_tenant_payload)
return {
"tenant_id": tenant_id,
"has_provider": True,
"created_provider": created_provider,
"llm_factories": list(my_llms_data.keys()) if isinstance(my_llms_data, dict) else [],
}
@pytest.fixture(scope="module")
def ensure_model_provider_configured(
ensure_auth_context,
base_url: str,
seeded_user_credentials,
):
page_instance = ensure_auth_context
auth_header = _extract_auth_header_from_page(page_instance)
email = seeded_user_credentials[0] if seeded_user_credentials else "unknown"
cache_key = f"{base_url}|{email}|provider"
cached = _PROVIDER_READY_CACHE.get(cache_key)
if cached:
cached["page"] = page_instance
cached["auth_header"] = auth_header
return cached
provider_info = _ensure_model_provider_ready_via_api(base_url, auth_header)
payload = {
"page": page_instance,
"auth_header": auth_header,
"email": email,
**provider_info,
}
if _env_bool("PW_FIXTURE_DEBUG", False):
print(
"[prereq] provider_ready "
f"email={email} created_provider={payload.get('created_provider', False)} "
f"llm_factories={payload.get('llm_factories', [])}",
flush=True,
)
_PROVIDER_READY_CACHE[cache_key] = payload
return payload
def _find_dataset_by_name(kbs_payload: dict | None, dataset_name: str) -> dict | None:
data = _response_data(kbs_payload)
kbs = data.get("kbs")
if not isinstance(kbs, list):
return None
for item in kbs:
if isinstance(item, dict) and item.get("name") == dataset_name:
return item
return None
def _ensure_dataset_ready_via_api(
base_url: str, auth_header: str, dataset_name: str
) -> dict:
headers = {"Authorization": auth_header}
list_url = _build_url(base_url, "/v1/kb/list?page=1&page_size=200")
_, list_payload = _api_request_json(list_url, method="POST", payload={}, headers=headers)
existing = _find_dataset_by_name(list_payload, dataset_name)
if existing:
return {
"kb_id": existing.get("id"),
"kb_name": dataset_name,
"reused": True,
}
_, create_payload = _api_request_json(
_build_url(base_url, "/v1/kb/create"),
method="POST",
payload={"name": dataset_name},
headers=headers,
)
created_data = _response_data(create_payload)
kb_id = created_data.get("id")
if kb_id:
return {"kb_id": kb_id, "kb_name": dataset_name, "reused": False}
_, list_payload_after = _api_request_json(
list_url, method="POST", payload={}, headers=headers
)
existing_after = _find_dataset_by_name(list_payload_after, dataset_name)
if not existing_after:
raise RuntimeError(
f"Dataset {dataset_name!r} not found after kb/create response={create_payload}"
)
return {
"kb_id": existing_after.get("id"),
"kb_name": dataset_name,
"reused": False,
}
@pytest.fixture(scope="module")
def ensure_dataset_ready(
ensure_model_provider_configured,
base_url: str,
run_id: str,
):
provider_state = ensure_model_provider_configured
dataset_name = f"e2e-dataset-{run_id}"
cache_key = f"{base_url}|{provider_state.get('email', 'unknown')}|{dataset_name}"
cached = _DATASET_READY_CACHE.get(cache_key)
if cached:
return cached
dataset_info = _ensure_dataset_ready_via_api(
base_url,
provider_state["auth_header"],
dataset_name,
)
payload = {
**dataset_info,
"run_id": run_id,
}
if _env_bool("PW_FIXTURE_DEBUG", False):
print(
"[prereq] dataset_ready "
f"kb_name={payload.get('kb_name')} reused={payload.get('reused')} "
f"kb_id={payload.get('kb_id')}",
flush=True,
)
_DATASET_READY_CACHE[cache_key] = payload
return payload
@pytest.fixture(scope="module")
def ensure_chat_ready(ensure_dataset_ready):
return ensure_dataset_ready
@pytest.fixture
def snap(page, request):
if "flow_page" in request.fixturenames:
@ -1215,17 +1541,36 @@ def _locator_is_topmost(locator) -> bool:
def auth_click():
def _click(locator, label: str = "click") -> None:
timeout_ms = _playwright_auth_ready_timeout_ms()
try:
locator.click(timeout=timeout_ms)
except PlaywrightTimeoutError as exc:
if "intercepts pointer events" in str(exc) and _locator_is_topmost(
locator
):
if _env_bool("PW_FIXTURE_DEBUG", False):
print(f"[auth-click] forcing {label}", flush=True)
locator.click(force=True, timeout=timeout_ms)
attempts = 3
for idx in range(attempts):
try:
locator.click(timeout=timeout_ms)
return
raise
except PlaywrightTimeoutError as exc:
message = str(exc).lower()
can_force = (
"intercepts pointer events" in message
or "element was detached" in message
or "element is not stable" in message
)
if not can_force:
raise
if "intercepts pointer events" in message and not _locator_is_topmost(
locator
):
if idx >= attempts - 1:
raise
time.sleep(0.15)
continue
try:
if _env_bool("PW_FIXTURE_DEBUG", False):
print(f"[auth-click] forcing {label} attempt={idx + 1}", flush=True)
locator.click(force=True, timeout=timeout_ms)
return
except PlaywrightTimeoutError:
if idx >= attempts - 1:
raise
time.sleep(0.15)
return _click