browser_use_plugin

name change

name change

name change

name change

name change

name change

暂存

暂存

暂存

版本更新

版本更新

版本更新

和网关协议对齐

和网关协议对齐

和网关协议对齐

再升级下

再升

再完善下

升级

final resp

修复

修复

修复

再测试下

再测试下

包顺序

包顺序

包顺序

包顺序

修改为answer

更新下

更新版本

使用logger

使用logger

使用

滚滚滚

更新版本

screen opmot

test

use context

有问题

gogogo

agent browser

agent browser

screen

resume

gogo

gogo

file upload to debug

file upload base64

screen

screen

修复

修复
This commit is contained in:
liuyunchao.0510
2025-09-01 15:23:09 +08:00
parent 42147df459
commit ee46dd8417
22 changed files with 2051 additions and 0 deletions

1
.gitignore vendored
View File

@ -60,3 +60,4 @@ values-dev.yaml
*.tsbuildinfo
*venv/

View File

@ -0,0 +1 @@
3.12

View File

View File

@ -0,0 +1,30 @@
from pydantic import BaseModel
from browser_agent.index import RunBrowserUseAgentCtx,LLMConfig
from typing import AsyncGenerator,Dict,Optional
from abc import ABC, abstractmethod
from datetime import datetime
from stream_helper.schema import SSEData
from langchain_core.language_models.chat_models import BaseChatModel
class BrowserAgentBase(BaseModel,ABC):
query: str
conversation_id: str = ''
llm: BaseChatModel
browser_session_endpoint: str
endpoint_header: Dict[str, str] = {}
max_steps: int = 20
system_prompt: str = None
extend_prompt: str = None
@abstractmethod
async def save_cookies(self):
pass
@abstractmethod
async def get_llm(self)->BaseChatModel:
pass
@abstractmethod
async def get_system_prompt(self)->str:
pass
async def run(self)->AsyncGenerator[SSEData,None]:
pass

View File

@ -0,0 +1,158 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
import aiohttp
from playwright.async_api import async_playwright
from playwright.async_api import Browser
from playwright.async_api._generated import Playwright as AsyncPlaywright
browser_ready_event = asyncio.Event()
class BrowserWrapper:
def __init__(self, port, browser: Browser, playwright: AsyncPlaywright, remote_browser_id: str = None, endpoint: str = None):
self.port = port
self.browser = browser
self.playwright = playwright
self.remote_browser_id = remote_browser_id
self.endpoint = endpoint
async def stop(self):
if self.browser:
logging.info(f"Closing browser on port {self.port}...")
# try:
# for context in self.browser.contexts:
# await context.close()
# except Exception as e:
# logging.error(f"Error closing contexts: {e}")
await self.browser.close()
logging.info(f"Browser on port {self.port} closed successfully")
if self.playwright:
logging.info(f"Closing playwright session {self.port}...")
await self.playwright.stop()
logging.info(
f"Paywright session on port {self.port} closed successfully")
if self.remote_browser_id and self.endpoint:
logging.info(f"Closing remote browser {self.remote_browser_id}...")
try:
async with aiohttp.ClientSession() as session:
delete_url = f"{self.endpoint}/{self.remote_browser_id}"
async with session.delete(delete_url, timeout=30) as response:
if response.status == 200:
logging.info(f"Remote browser {self.remote_browser_id} closed successfully")
else:
error_text = await response.text()
logging.error(f"Failed to close remote browser. Status: {response.status}, Error: {error_text}")
except Exception as e:
logging.error(f"Error closing remote browser {self.remote_browser_id}: {e}")
async def start_local_browser(port):
logging.info(f"Attempting to start browser on port {port}")
p = None
browser = None
try:
p = await async_playwright().start()
w = BrowserWrapper(port, None, p)
try:
# Configure proxy if environment variables are set
proxy_config = None
proxy_server = os.getenv('PROXY_SERVER')
if proxy_server:
proxy_config = {
'server': proxy_server,
'bypass': '127.0.0.1,localhost'
}
browser = await p.chromium.launch(
# executable_path="/opt/chromium.org/browser_use/chromium/chromium-browser-use",
headless=False,
args=[
f'--remote-debugging-port={port}',
'--remote-allow-origins=*',
'--remote-debugging-address=0.0.0.0',
'--no-sandbox'
],
proxy=proxy_config
)
w = BrowserWrapper(port, browser, p)
logging.info(f"Browser launched successfully on port {port}")
# Verify CDP is actually running
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"http://127.0.0.1:{port}/json/version", timeout=5) as response:
if response.status == 200:
version_info = await response.json()
logging.info(
f"successfully connected to cdp: {version_info}")
else:
logging.error(
f"Failed to get CDP version. Status: {response.status}")
except Exception as cdp_e:
logging.error(f"Error checking CDP availability: {cdp_e}")
logging.info('closing playwright driver and browser')
await w.stop()
raise
return BrowserWrapper(port, browser, p)
except Exception as launch_e:
logging.error(f"Failed to launch browser: {launch_e}")
browser_ready_event.clear()
logging.info('closing playwright driver and browser')
await w.stop()
raise
except Exception as p_e:
logging.error(f"Playwright initialization error: {p_e}")
browser_ready_event.clear()
logging.info('closing playwright driver and browser')
await w.stop()
raise
async def start_remote_browser(endpoint):
logging.info(f"Attempting to create browser via remote endpoint: {endpoint}")
try:
async with aiohttp.ClientSession() as session:
data = {"timeout": 30}
async with session.post(
endpoint,
json=data,
headers={'Content-Type': 'application/json'},
timeout=30
) as response:
if response.status == 200:
result = await response.json()
logging.info(f"Browser created successfully: {result}")
id_value = result.get('id', None)
if id_value is None:
result['id'] = 'id'
return BrowserWrapper(None, None, None, result['id'], endpoint)
else:
error_text = await response.text()
logging.error(f"Failed to create browser. Status: {response.status}, Error: {error_text}")
raise Exception(f"Failed to create browser: {response.status} - {error_text}")
except Exception as e:
logging.error(f"Error creating remote browser: {e}")
raise

View File

@ -0,0 +1,6 @@
from browser_use import Agent
class MyAgent(Agent):
# browser use 0.2.5版本 移除模型检测
def _test_tool_calling_method(self, method: str) -> bool:
return True

View File

@ -0,0 +1,185 @@
import asyncio
import io,base64
import logging,time
from typing import Optional, Tuple
from PIL import Image, ImageChops
import numpy as np
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class VisualChangeDetector:
"""视觉变化检测器(基于屏幕截图比对)"""
def __init__(
self,
similarity_threshold: float = 0.95,
pixel_change_threshold: int = 1000,
resize_width: Optional[int] = 800
):
"""
Args:
similarity_threshold: 相似度阈值0-1
pixel_change_threshold: 变化像素数量阈值
resize_width: 缩放宽度(提升性能)
"""
self.similarity_threshold = similarity_threshold
self.pixel_threshold = pixel_change_threshold
self.resize_width = resize_width
async def capture_screenshot(self, page, upload_service=None,logger=None) -> Image.Image:
"""捕获页面截图(自动优化)"""
screenshot_bytes = await page.screenshot(timeout=10000,type='jpeg',quality=50,full_page=False,animations='disabled')
img = Image.open(io.BytesIO(screenshot_bytes))
# 统一缩放尺寸(提升比对性能)
if self.resize_width:
w, h = img.size
new_h = int(h * (self.resize_width / w))
img = img.resize((self.resize_width, new_h))
if upload_service:
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='JPEG')
binary_data = img_byte_arr.getvalue()
if not binary_data:
return
file_name = f"screenshot_{int(time.time())}.jpg"
base64_str = base64.b64encode(binary_data).decode('ascii')
if logger:
logger.info(f'upload screenshot to {file_name}')
else:
logging.info(f'upload screenshot to {file_name}')
try:
await upload_service.upload_file('',file_name,base64_content=base64_str)
except Exception as e:
if logger:
logger.error(f"Failed to upload screenshot to {file_name}: {e}")
else:
logging.error(f"Failed to upload screenshot to {file_name}: {e}")
return img.convert('RGB')
return img.convert('RGB') # 确保RGB模式
def calculate_change(
self,
img1: Image.Image,
img2: Image.Image
) -> Tuple[float, Image.Image]:
"""
计算两图差异
Returns:
tuple: (相似度百分比, 差异图)
"""
# 确保尺寸一致
if img1.size != img2.size:
img2 = img2.resize(img1.size)
arr1 = np.array(img1) # shape: (height, width, 3)
arr2 = np.array(img2) # shape: (height, width, 3)
# 计算绝对差异(每个通道单独计算)
diff = np.abs(arr1.astype(int) - arr2.astype(int))
# 变化像素计算(任一通道有变化即视为该像素变化)
# 先检查每个像素的3个通道是否有变化
pixel_changed = np.any(diff > 0, axis=2) # shape: (height, width)
changed_pixels = np.sum(pixel_changed) # 变化像素总数
# 总像素数不需要除以3因为pixel_changed已经是像素级别的判断
total_pixels = arr1.shape[0] * arr1.shape[1]
# 生成差异图(可视化用)
diff_img = ImageChops.difference(img1, img2)
# 计算相似度
similarity = 1 - (changed_pixels / total_pixels)
return similarity, diff_img
async def detect_change(
self,
browser_session,
reference_img: Optional[Image.Image] = None,
max_attempts: int = 5,
attempt_interval: float = 1.5,
upload_service = None,
logger = None
) -> Tuple[bool, Optional[Image.Image], Optional[Image.Image]]:
"""
检测视觉变化
Args:
page: 浏览器页面对象
reference_img: 基准截图None则自动捕获
max_attempts: 最大检测次数
attempt_interval: 检测间隔(秒)
Returns:
tuple: (是否变化, 基准截图, 差异图)
"""
# 首次捕获基准图
if reference_img is None:
page = await browser_session.get_current_page()
reference_img = await self.capture_screenshot(page,upload_service=upload_service,logger=logger)
logger.info("start detect change")
for attempt in range(max_attempts):
await asyncio.sleep(attempt_interval)
# 捕获当前截图
page = await browser_session.get_current_page()
current_img = await self.capture_screenshot(page,upload_service=upload_service,logger=logger)
if not current_img:
logger.error(f"Failed to capture screenshot on attempt {attempt + 1}")
continue
# 计算变化
similarity, diff_img = self.calculate_change(reference_img, current_img)
logger.info(f"Attempt {attempt + 1}: 相似度 {similarity:.2f}, 变化像素 {np.sum(np.array(diff_img) > 0)}")
# 判断是否显著变化
if similarity < self.similarity_threshold:
diff_pixels = np.sum(np.array(diff_img) > 0)
if diff_pixels > self.pixel_threshold:
logger.info(f"视觉变化 detected (相似度: {similarity:.2f}, 变化像素: {diff_pixels})")
return True, reference_img, diff_img
return False, reference_img, None
class WaitForVisualChangeAction(BaseModel):
"""等待视觉变化的参数模型"""
timeout: int = 30
check_interval: float = 2
similarity_threshold: float = 0.95
pixel_threshold: int = 5000
async def wait_for_visual_change(
params: WaitForVisualChangeAction,
browser_session,
initial_screenshot: Optional[Image.Image] = None,
upload_service = None,
logger = None
) -> Tuple[bool, Optional[Image.Image], Optional[Image.Image]]:
"""
等待页面视觉变化(完整工作流)
Args:
params: 配置参数
browser_session: 浏览器会话
initial_screenshot: 初始截图(可选)
Returns:
tuple: (是否变化, 初始截图, 差异图)
"""
detector = VisualChangeDetector(
similarity_threshold=params.similarity_threshold,
pixel_change_threshold=params.pixel_threshold
)
max_attempts = int(params.timeout / params.check_interval)
return await detector.detect_change(
browser_session=browser_session,
reference_img=initial_screenshot,
max_attempts=max_attempts,
attempt_interval=params.check_interval,
upload_service=upload_service,
logger=logger,
)

View File

@ -0,0 +1,204 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import logging
from typing import Optional, Tuple
from PIL import Image, ImageChops
import numpy as np
from pydantic import BaseModel
import logging
import asyncio
import re
import markdownify
from bs4 import BeautifulSoup
from typing import Optional
from browser_use.browser import BrowserSession
from browser_use.controller.views import SearchGoogleAction
from browser_use.agent.views import ActionResult
from browser_use.controller.service import Controller
from playwright.async_api import Page
from pydantic import BaseModel,Field
from langchain_core.language_models.chat_models import BaseChatModel
from browser_agent.browser_use_custom.i18n import _
from langchain_core.prompts import PromptTemplate
from browser_agent.browser_use_custom.controller.screen import VisualChangeDetector,wait_for_visual_change,WaitForVisualChangeAction
logger = logging.getLogger(__name__)
class PauseAction(BaseModel):
reason: str
class WaitForLoginAction(BaseModel):
"""Action parameters for waiting for login completion."""
timeout: int = Field(
default=300,
description="Maximum time to wait for login completion in seconds"
)
check_interval: int = Field(
default=5,
description="Interval between checks for URL changes in seconds"
)
class MyController(Controller):
"""Custom controller extending base Controller with additional actions.
Features:
- Inherits core controller functionality
- Adds custom pause action handler
- Maintains action registry with exclusion support
"""
def __init__(
self,
exclude_actions: list[str] = [],
output_model: type[BaseModel] | None = None,
):
super().__init__(exclude_actions, output_model)
# Basic Navigation Actions
@self.registry.action(
_('Search the query in Baidu in the current tab, the query should be a search query like humans search in Baidu, concrete and not vague or super long. More the single most important items.'),
param_model=SearchGoogleAction,
)
async def search_google(params: SearchGoogleAction, browser_session: BrowserSession):
search_url = f'https://www.baidu.com/s?wd={params.query}'
page = await browser_session.get_current_page()
await page.goto(search_url)
await page.wait_for_load_state()
msg = _('🔍 Searched for "{query}" in Baidu').format(query=params.query)
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
# Content Actions
@self.registry.action(
_('Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about xyc, 4 links with companies in structured format. Use include_links true if the goal requires links'),
)
async def extract_content(
goal: str,
page: Page,
page_extraction_llm: BaseChatModel,
include_links: bool = False,
):
raw_content = await page.content()
soup = BeautifulSoup(
raw_content, 'html.parser')
# remove all unnecessary http metadata
for s in soup.select('script'):
s.decompose()
for s in soup.select('style'):
s.decompose()
for s in soup.select('textarea'):
s.decompose()
for s in soup.select('img'):
s.decompose()
for s in soup.find_all(style=re.compile("background-image.*")):
s.decompose()
content = markdownify.markdownify(str(soup))
# manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes)
for iframe in page.frames:
if iframe.url != page.url and not iframe.url.startswith('data:'):
content += f'\n\nIFRAME {iframe.url}:\n'
content += markdownify.markdownify(await iframe.content())
prompt = _('Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}')
template = PromptTemplate(input_variables=['goal', 'page'], template=prompt)
try:
output = await page_extraction_llm.ainvoke(template.format(goal=goal, page=content))
msg = _('📄 Extracted from page\n: {content}\n').format(content=output.content)
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
logger.debug(_('Error extracting content: {error}').format(error=e))
msg = _('📄 Extracted from page\n: {content}\n').format(content=content)
logger.info(msg)
return ActionResult(extracted_content=msg)
@self.registry.action(
_('Pause agent'),
param_model=PauseAction,
)
async def pause(params: PauseAction):
msg = _('👩 Pause agent, reason: {reason}').format(reason=params.reason)
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
# Login detection and waiting action
# @self.registry.action(
# _('Detects if current page requires login and waits for authentication to complete.Wait for login completion by monitoring URL changes.'),
# param_model=WaitForLoginAction,
# )
# async def wait_for_login(params: WaitForLoginAction, browser_session: BrowserSession):
# page = await browser_session.get_current_page()
# # Get initial URL for comparison
# initial_url = page.url
# logger.info(_('🔐 Starting login detection. Initial URL: {url}').format(url=initial_url))
# # Wait for URL change indicating login completion
# msg = _('🔐 Login page detected. Waiting for authentication completion (max {timeout}s)...').format(
# timeout=params.timeout
# )
# logger.info(msg)
# final_url = await self._wait_for_url_change(
# page,
# initial_url,
# params.timeout,
# params.check_interval
# )
# if final_url and final_url != initial_url:
# success_msg = _('✅ Login completed successfully! URL changed from {initial} to {final}').format(
# initial=initial_url,
# final=final_url
# )
# logger.info(success_msg)
# return ActionResult(
# extracted_content=success_msg,
# include_in_memory=True,
# success=True
# )
# else:
# timeout_msg = _('⏰ Login timeout or no URL change detected after {timeout} seconds').format(
# timeout=params.timeout
# )
# logger.warning(timeout_msg)
# return ActionResult(
# extracted_content=timeout_msg,
# include_in_memory=True,
# success=False
# )
# async def _wait_for_url_change(
# self,
# page: Page,
# initial_url: str,
# timeout: int,
# check_interval: int
# ) -> Optional[str]:
# """Wait for URL change indicating login completion."""
# start_time = asyncio.get_event_loop().time()
# while (asyncio.get_event_loop().time() - start_time) < timeout:
# try:
# current_url = page.url
# # If URL has changed, return the new URL
# if current_url != initial_url:
# return current_url
# # Wait before checking again
# await asyncio.sleep(check_interval)
# except Exception as e:
# logger.warning(_('Error checking URL change: {error}').format(error=str(e)))
# await asyncio.sleep(check_interval)
# return None # Timeout reached without URL change

View File

@ -0,0 +1,127 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gettext
import os
import copy
from typing import Optional
# Default language
_current_language = 'zh-CN'
_translator = None
def get_locales_dir():
"""Get the locales directory path"""
return os.path.join(os.path.dirname(__file__), 'locales')
def set_language(language: str = 'zh-CN'):
"""Set the current language for translations"""
global _current_language, _translator
_current_language = language
locales_dir = get_locales_dir()
try:
# Try to load the translation
translation = gettext.translation(
'vefaas_browser_use',
localedir=locales_dir,
languages=[language],
fallback=True
)
_translator = translation
except Exception:
# Fallback to NullTranslations if translation not found
_translator = gettext.NullTranslations()
def get_language():
"""Get the current language"""
return _current_language
def _(message: str) -> str:
"""Translate a message"""
if _translator is None:
set_language(_current_language)
return _translator.gettext(message)
def get_available_languages():
"""Get list of available languages"""
locales_dir = get_locales_dir()
if not os.path.exists(locales_dir):
return ['zh-CN']
languages = []
for item in os.listdir(locales_dir):
lang_dir = os.path.join(locales_dir, item)
if os.path.isdir(lang_dir):
mo_file = os.path.join(lang_dir, 'LC_MESSAGES', 'vefaas_browser_use.mo')
if os.path.exists(mo_file):
languages.append(item)
return languages if languages else ['zh-CN']
def translate_planning_step_data(conversation_update: dict) -> dict:
"""Translate planning step data fields using pattern matching"""
import re
translated_update = copy.deepcopy(conversation_update)
# Translate evaluation field
if 'evaluation' in translated_update:
evaluation = translated_update['evaluation']
# Basic status translations
if evaluation == "Unknown":
translated_update['evaluation'] = _("Unknown")
elif evaluation == "Failed":
translated_update['evaluation'] = _("Failed")
elif evaluation == "Success":
translated_update['evaluation'] = _("Success")
# Specific pattern translations
elif evaluation == "Unknown - The task has just started, and no actions have been taken yet.":
translated_update['evaluation'] = _("Unknown - The task has just started, and no actions have been taken yet.")
elif evaluation == "Unknown - The task has not yet begun as the current page is blank.":
translated_update['evaluation'] = _("Unknown - The task has not yet begun as the current page is blank.")
elif evaluation == "Success - Successfully navigated to Google's homepage.":
translated_update['evaluation'] = _("Success - Successfully navigated to Google's homepage.")
elif evaluation == "Failed - The search was blocked by Google's CAPTCHA verification.":
translated_update['evaluation'] = _("Failed - The search was blocked by Google's CAPTCHA verification.")
# General pattern matching for CAPTCHA-related failures
elif "CAPTCHA" in evaluation and evaluation.startswith("Failed"):
translated_update['evaluation'] = _("Failed - The search was blocked by a CAPTCHA verification page.")
# Partial translation for status prefixes
elif evaluation.startswith("Success - "):
translated_update['evaluation'] = evaluation.replace("Success", _("Success"), 1)
elif evaluation.startswith("Failed - "):
translated_update['evaluation'] = evaluation.replace("Failed", _("Failed"), 1)
elif evaluation.startswith("Unknown - "):
translated_update['evaluation'] = evaluation.replace("Unknown", _("Unknown"), 1)
# Translate goal field with pattern matching
if 'goal' in translated_update:
goal = translated_update['goal']
if goal == "Pause execution due to CAPTCHA verification.":
translated_update['goal'] = _("Pause execution due to CAPTCHA verification.")
# Add more general goal patterns as needed
# Translate actions if they contain pause reasons
if 'actions' in translated_update:
for action in translated_update['actions']:
if 'pause' in action and 'reason' in action['pause']:
reason = action['pause']['reason']
if reason == "CAPTCHA verification required. Human intervention is needed to proceed.":
action['pause']['reason'] = _("CAPTCHA verification required. Human intervention is needed to proceed.")
elif "CAPTCHA" in reason and "verification required" in reason:
# General CAPTCHA verification pattern
action['pause']['reason'] = _("CAPTCHA verification required. Human intervention is needed to proceed.")
return translated_update
# Initialize with default language
set_language('zh-CN')

View File

@ -0,0 +1,178 @@
msgid ""
msgstr ""
"Project-Id-Version: vefaas-browser-use\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2025-01-04 12:00+0000\n"
"PO-Revision-Date: 2025-01-04 12:00+0000\n"
"Last-Translator: Auto Generated\n"
"Language-Team: Chinese (Simplified)\n"
"Language: zh-CN\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=1; plural=0;\n"
# Task management messages
msgid "Task auto-stopped after 1 minute in paused state"
msgstr "任务在暂停状态1分钟后自动停止"
# Controller action descriptions
msgid "Search the query in Baidu in the current tab, the query should be a search query like humans search in Baidu, concrete and not vague or super long. More the single most important items."
msgstr "在当前标签页的百度中搜索查询,查询应该是像人类在百度中搜索的搜索查询,具体而不模糊或过长。更多的单个最重要的项目。"
msgid "🔍 Searched for \"{query}\" in Baidu"
msgstr "🔍 在百度搜索了\"{query}\""
msgid "Pause agent"
msgstr "暂停代理"
msgid "👩 Pause agent, reason: {reason}"
msgstr "👩 暂停代理,原因:{reason}"
msgid "Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about xyc, 4 links with companies in structured format. Use include_links true if the goal requires links"
msgstr "提取页面内容以从页面中检索特定信息例如所有公司名称、特定描述、关于xyc的所有信息、4个具有结构化格式公司的链接。如果目标需要链接请使用include_links true"
msgid "Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}"
msgstr "您的任务是提取页面的内容。您将获得一个页面和一个目标您应该从页面中提取围绕此目标的所有相关信息。如果目标模糊请总结页面。以json格式响应。提取目标{goal},页面:{page}"
msgid "📄 Extracted from page\n: {content}\n"
msgstr "📄 从页面提取\n{content}\n"
msgid "Error extracting content: {error}"
msgstr "提取内容错误:{error}"
# SSE messages
msgid "Starting new step..."
msgstr "开始新步骤..."
msgid "Taken action #{number}: {action}"
msgstr "执行动作 #{number}{action}"
msgid "Agent creation failed: {error}"
msgstr "代理创建失败:{error}"
msgid "Agent execution failed: {error}"
msgstr "代理执行失败:{error}"
# Planning step status messages
msgid "Unknown"
msgstr "未知"
msgid "Failed"
msgstr "失败"
msgid "Success"
msgstr "成功"
# Common action result messages
msgid "Unknown - The task has just started, and no actions have been taken yet."
msgstr "未知 - 任务刚刚开始,尚未执行任何操作。"
msgid "Failed - The attempt to search for 'MCP news' was blocked by a CAPTCHA verification page. This requires human intervention to proceed."
msgstr "失败 - 搜索'MCP news'的尝试被验证码页面阻止。需要人工干预才能继续。"
msgid "CAPTCHA verification required to proceed with the search for MCP news. Please solve the CAPTCHA to continue."
msgstr "需要验证码验证才能继续搜索MCP新闻。请解决验证码以继续。"
# Dynamic evaluation patterns
msgid "Unknown - The task has not yet begun as the current page is blank."
msgstr "未知 - 任务尚未开始,因为当前页面为空白。"
msgid "Success - Successfully navigated to Google's homepage."
msgstr "成功 - 成功导航到Google主页。"
# General pause patterns
msgid "Pause execution due to CAPTCHA verification."
msgstr "由于验证码验证而暂停执行。"
# General evaluation patterns
msgid "Failed - The search was blocked by Google's CAPTCHA verification."
msgstr "失败 - 搜索被Google的验证码验证阻止。"
msgid "Failed - The search was blocked by a CAPTCHA verification page."
msgstr "失败 - 搜索被验证码页面阻止。"
# General pause reasons
msgid "CAPTCHA verification required. Human intervention is needed to proceed."
msgstr "需要验证码验证。需要人工干预才能继续。"
msgid "pause"
msgstr "暂停"
msgid "reason"
msgstr "原因"
msgid "Human intervention required to solve it before proceeding with the task."
msgstr "需要人工干预解决后才能继续执行任务。"
# CAPTCHA detection messages
msgid "CAPTCHA detected on Amazon.com. Human intervention required to solve it before proceeding with the task."
msgstr "在Amazon.com检测到验证码。需要人工干预解决后才能继续执行任务。"
msgid "Pause execution and request human intervention to solve the CAPTCHA"
msgstr "暂停执行并请求人工干预解决验证码"
# Task status messages
msgid "Task started."
msgstr "任务已开始。"
msgid "Task started:"
msgstr "任务已开始:"
msgid "Pause the task and wait for human intervention to complete the CAPTCHA verification."
msgstr "暂停任务并等待人工干预完成验证码验证。"
# Extraction instruction
msgid "Extract the titles, sources, and summaries of"
msgstr "提取标题、来源和摘要"
# Initial task status messages
msgid "No previous actions have been taken yet."
msgstr "尚未执行任何操作。"
msgid "Submit the search query for"
msgstr "提交搜索查询"
# Key instruction messages
msgid "'keys': 'Enter'"
msgstr "'输入键': '回车'"
msgid "error"
msgstr "失败"
# Status/State translations
msgid "queued"
msgstr "排队中"
msgid "starting"
msgstr "开始执行"
msgid "browser_initialized"
msgstr "浏览器初始化"
msgid "running"
msgstr "执行中"
msgid "failed"
msgstr "失败"
msgid "agent_initialized"
msgstr "agent 初始化"
msgid "completed"
msgstr "完成"
msgid "go_to_url"
msgstr "访问网址"
msgid "click_element_by_index"
msgstr "按索引点击元素"
msgid "index"
msgstr "索引"
msgid "include_links"
msgstr "包含链接"

View File

@ -0,0 +1,116 @@
#!/usr/bin/env python3
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Translation management script for vefaas-browser-use
Usage:
python scripts/update_translations.py compile # Compile .po files to .mo files
python scripts/update_translations.py test # Test translations
"""
import sys
import subprocess
from pathlib import Path
def get_project_root():
"""Get the project root directory"""
return Path(__file__).parent.parent
def compile_translations():
"""Compile all .po files to .mo files"""
project_root = get_project_root()
locales_dir = project_root / "my_browser_use" / "locales"
if not locales_dir.exists():
print(f"Locales directory not found: {locales_dir}")
return False
success = True
for lang_dir in locales_dir.iterdir():
if lang_dir.is_dir() and lang_dir.name != "__pycache__":
po_file = lang_dir / "LC_MESSAGES" / "vefaas_browser_use.po"
mo_file = lang_dir / "LC_MESSAGES" / "vefaas_browser_use.mo"
if po_file.exists():
print(f"Compiling {lang_dir.name}...")
try:
subprocess.run([
"msgfmt", "-o", str(mo_file), str(po_file)
], check=True)
print(f" ✓ Compiled {lang_dir.name}")
except subprocess.CalledProcessError as e:
print(f" ✗ Failed to compile {lang_dir.name}: {e}")
success = False
else:
print(f" ⚠ No .po file found for {lang_dir.name}")
return success
def test_translations():
"""Test the translation system"""
sys.path.insert(0, str(get_project_root()))
try:
from browser_agent.browser_use_custom.i18n import _, set_language, get_available_languages
print("Testing translation system...")
print(f"Available languages: {get_available_languages()}")
# Test a few key messages
test_messages = [
"Task auto-stopped after 1 minute in paused state",
"Pause agent",
"🔍 Searched for \"{query}\" in Baidu",
"Starting new step...",
"Taken action #{number}: {action}",
"Agent creation failed: {error}",
"Pause execution due to CAPTCHA verification.",
"Unknown - The task has not yet begun as the current page is blank.",
"Success - Successfully navigated to Google's homepage.",
"CAPTCHA verification required. Human intervention is needed to proceed."
]
for lang in get_available_languages():
print(f"\n--- Testing {lang} ---")
set_language(lang)
for msg in test_messages:
translated = _(msg)
print(f" {msg[:50]}... -> {translated[:50]}...")
print("\n✓ Translation test completed")
return True
except Exception as e:
print(f"✗ Translation test failed: {e}")
return False
def main():
if len(sys.argv) != 2:
print(__doc__)
sys.exit(1)
command = sys.argv[1]
if command == "compile":
success = compile_translations()
sys.exit(0 if success else 1)
elif command == "test":
success = test_translations()
sys.exit(0 if success else 1)
else:
print(f"Unknown command: {command}")
print(__doc__)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,241 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
from urllib.parse import urlparse, urlunparse
import aiohttp
import websockets
from fastapi import HTTPException, WebSocket, WebSocketDisconnect
async def websocket_endpoint(websocket: WebSocket, page_id: str, port: str):
await websocket.accept()
try:
# Construct the WebSocket URL for the specific page
ws_url = f"ws://127.0.0.1:{port}/devtools/page/{page_id}"
# Establish a connection to the CDP WebSocket
async with websockets.connect(ws_url) as cdp_socket:
# Create tasks for bidirectional communication
receive_task = asyncio.create_task(
receive_from_cdp(cdp_socket, websocket))
send_task = asyncio.create_task(send_to_cdp(cdp_socket, websocket))
# Wait for either task to complete
done, pending = await asyncio.wait(
[receive_task, send_task],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel any remaining tasks
for task in pending:
task.cancel()
except Exception as e:
logging.error(f"WebSocket error: {e}")
await websocket.close()
async def websocket_browser_endpoint(websocket: WebSocket, browser_id: str, port: str):
await websocket.accept()
try:
# Construct the WebSocket URL for the specific page
ws_url = f"ws://0.0.0.0:{port}/devtools/browser/{browser_id}"
# Establish a connection to the CDP WebSocket
async with websockets.connect(ws_url) as cdp_socket:
# Create tasks for bidirectional communication
receive_task = asyncio.create_task(
receive_from_cdp(cdp_socket, websocket))
send_task = asyncio.create_task(send_to_cdp(cdp_socket, websocket))
# Wait for either task to complete
done, pending = await asyncio.wait(
[receive_task, send_task],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel any remaining tasks
for task in pending:
task.cancel()
except Exception as e:
logging.error(f"WebSocket error: {e}")
await websocket.close()
async def receive_from_cdp(cdp_socket, websocket):
try:
while True:
message = await cdp_socket.recv()
await websocket.send_text(message)
except websockets.exceptions.ConnectionClosed:
logging.info("CDP WebSocket connection closed")
except Exception as e:
logging.error(f"Error receiving from CDP: {e}")
async def send_to_cdp(cdp_socket, websocket):
try:
while True:
message = await websocket.receive_text()
await cdp_socket.send(message)
except WebSocketDisconnect:
logging.info("Client WebSocket disconnected")
except Exception as e:
logging.error(f"Error sending to CDP: {e}")
async def get_websocket_targets(port: str):
endpoint = os.getenv("CDP_ENDPOINT")
logging.info(f"Getting websocket targets for endpoint: {endpoint}")
try:
async with aiohttp.ClientSession() as session:
# Use the cdp_websocket parameter to dynamically construct the URL
base_url = f"http://127.0.0.1:{port}/json/list"
async with session.get(base_url) as response:
if response.status == 200:
result = await response.json()
# If result is an empty list, return it immediately
if not result:
return result
# Modify the URLs in the result to use the provided cdp_websocket
for target in result:
# Replace devtoolsFrontendUrl
if 'devtoolsFrontendUrl' in target:
target['devtoolsFrontendUrl'] = target['devtoolsFrontendUrl'].replace(
'127.0.0.1:' + str(port), str(endpoint)
)
# Replace webSocketDebuggerUrl
if 'webSocketDebuggerUrl' in target:
target['webSocketDebuggerUrl'] = target['webSocketDebuggerUrl'].replace(
'127.0.0.1:' + str(port), str(endpoint)
)
return result
else:
raise HTTPException(
status_code=response.status, detail="Failed to fetch JSON list")
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error fetching JSON list: {str(e)}")
async def get_websocket_version(port: str):
endpoint = os.getenv("CDP_ENDPOINT")
logging.info(f"Getting websocket version for endpoint: {endpoint}")
try:
async with aiohttp.ClientSession() as session:
# Use the cdp_websocket parameter to dynamically construct the URL
base_url = f"http://127.0.0.1:{port}/json/version"
async with session.get(base_url) as response:
if response.status == 200:
result = await response.json()
if not result:
return result
if 'webSocketDebuggerUrl' in result:
result['webSocketDebuggerUrl'] = result['webSocketDebuggerUrl'].replace(
'127.0.0.1:' + str(port), str(endpoint)
)
return result
else:
raise HTTPException(
status_code=response.status, detail="Failed to fetch JSON list")
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error fetching JSON list: {str(e)}")
async def get_remote_websocket_version(browser_session_endpoint: str, browser_id: str):
endpoint = os.getenv("CDP_ENDPOINT")
logging.info(f"Getting websocket version for endpoint: {endpoint}")
try:
async with aiohttp.ClientSession() as session:
# Use the cdp_websocket parameter to dynamically construct the URL
base_url = f"{browser_session_endpoint}/{browser_id}/json/version"
async with session.get(base_url) as response:
if response.status == 200:
result = await response.json()
return result
else:
raise HTTPException(
status_code=response.status, detail="Failed to fetch JSON version")
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error fetching JSON list: {str(e)}")
async def get_inspector(url):
endpoint = os.getenv("CDP_ENDPOINT")
logging.info(f"Getting websocket targets for endpoint: {endpoint}")
# Convert Starlette URL to string
url_str = str(url)
# like this: http://127.0.0.1:8000/devtools/inspector.html?ws=scqevor9btoi2t6dnkcpg.apigateway-cn-beijing.volceapi.com/devtools/page/7D574C7E6237CB326E385DD2C3A5C845
logging.info(f"Getting original inspector for URL: {url_str}")
# Parse the URL
parsed_url = urlparse(url_str)
# Check if the current domain matches the endpoint
if parsed_url.netloc == endpoint:
# Replace only the netloc
modified_url = parsed_url._replace(netloc="127.0.0.1:9222")
url_str = urlunparse(modified_url)
logging.info(f"Converted inspector for URL: {url_str}")
# if parsed_url contains "127.0.0.1:9222", run a 301 redirect
if "127.0.0.1" in parsed_url.netloc:
url_str = url_str.replace("127.0.0.1:9222", endpoint)
logging.info(f"Redirecting to: {url_str}")
return await get_inspector(url_str)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url_str) as response:
if response.status == 200:
# Check content type
content_type = response.headers.get(
'Content-Type', '').lower()
if 'application/json' not in content_type:
# If not JSON, read the content
content = await response.text()
logging.error(
f"Unexpected content type: {content_type}")
logging.error(f"Response content: {content[:500]}")
return {
"status": "error",
"content_type": content_type,
"content": content
}
result = await response.json()
return result
else:
raise HTTPException(
status_code=response.status, detail="Failed to fetch inspector")
except Exception as e:
logging.error(f"Error fetching inspector: {e}")
raise HTTPException(
status_code=500, detail=f"Error fetching inspector: {str(e)}")

View File

@ -0,0 +1,541 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict,List,Union
import asyncio
import json
import logging
import os
import uuid
from pathlib import Path
from typing import AsyncGenerator,Optional
import aiohttp
import aiohttp
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field,ConfigDict
from browser_use.browser.views import BrowserStateSummary
from browser_agent.browser import start_local_browser,BrowserWrapper
from browser_agent.browser_use_custom.controller.service import MyController
from browser_agent.utils import enforce_log_format
from browser_use import Agent
from browser_agent.browser_use_custom.agent.service import MyAgent
from browser_agent.browser_use_custom.i18n import _, set_language
from browser_use.agent.views import (
AgentOutput,
)
from browser_use import Agent, BrowserProfile, BrowserSession
from stream_helper.schema import SSEData,ContentTypeEnum,ReturnTypeEnum,OutputModeEnum,ContextModeEnum,MessageActionInfo,MessageActionItem,ReplyContentType,ContentTypeInReplyEnum,ReplyTypeInReplyEnum
from browser_agent.upload import UploadService
from browser_agent.upload import filter_file_by_time
from stream_helper.schema import FileChangeInfo,FileChangeType,FileChangeData
import base64
from langchain_core.language_models.chat_models import BaseChatModel
from datetime import datetime
from langchain_openai import ChatOpenAI
from logging import Logger
from browser_agent.browser_use_custom.controller.screen import wait_for_visual_change,WaitForVisualChangeAction,VisualChangeDetector
from browser_use.utils import match_url_with_domain_pattern, time_execution_async, time_execution_sync
app = FastAPI()
load_dotenv()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["x-faas-instance-name"],
)
llm_openai = "openai"
llm_deepseek = "deepseek"
llm_ark = "ark"
llm_name = llm_ark
# Global variable to track the port
CURRENT_CDP_PORT = 9222
browser_session_endpoint = None
set_language(os.getenv("LANGUAGE", "en"))
class Message(BaseModel):
role: str
content: str
class Messages(BaseModel):
messages: list[Message]
class TaskRequest(BaseModel):
task: str
import logging
class LLMConfig(BaseModel):
llm_type: str
model_id: str
api_key: str
extract_model_id: str
class RunBrowserUseAgentCtx(BaseModel):
query: str
conversation_id: str
llm: BaseChatModel
browser_session_endpoint: str
max_steps: int = 20
system_prompt: str | None = None
extend_prompt: str | None = None
upload_service:Optional[UploadService] = None
start_time:int = int(datetime.now().timestamp() * 1000)
logger: Logger = Field(default_factory=lambda: logging.getLogger(__name__))
model_config = ConfigDict(
arbitrary_types_allowed=True,
)
def genSSEData(stream_id:str,
content:str,
content_type:ContentTypeEnum = ContentTypeEnum.TEXT,
return_type:ReturnTypeEnum = ReturnTypeEnum.MODEL,
output_mode:OutputModeEnum = OutputModeEnum.STREAM,
is_last_msg:bool = False,
is_finish:bool = False,
is_last_packet_in_msg:bool = False,
message_title:str = None,
context_mode:ContextModeEnum = ContextModeEnum.NOT_IGNORE,
response_for_model:str = None,
ext:Dict[str,str] = None,
card_body:str = None,
reply_content_type:Optional[ReplyContentType] = None)->SSEData:
return SSEData(
stream_id = stream_id,
content = content,
content_type = content_type,
return_type = return_type,
output_mode = output_mode,
is_last_msg = is_last_msg,
is_finish = is_finish,
is_last_packet_in_msg = is_last_packet_in_msg,
message_title = message_title,
context_mode = context_mode,
response_for_model = response_for_model,
ext = ext,
card_body = card_body,
reply_content_type = reply_content_type
)
def convert_ws_url(original_url: str) -> str:
"""
将本地开发环境的 WebSocket URL 转换为生产环境的 URL 格式。
示例输入:
'ws://127.0.0.1:8001/v1/browsers/devtools/browser/77a025af-5483-46e2-ac57-9360812e4608?faasInstanceName=vefaas-duxz5kfb-jjecq2yuvf-d340et34rnmvf4b2ugd0-sandbox'
示例输出:
'ws://bots-sandbox.bytedance.net/api/sandbox/coze_studio/proxy/v1/browsers/devtools/browser/77a025af-5483-46e2-ac57-9360812e4608'
"""
# 提取 UUID 部分(从路径中截取)
uuid_part = original_url.split("/v1/browsers/devtools/browser/")[1].split("?")[0]
# 构建新 URL
new_url = (
"ws://bots-sandbox.bytedance.net"
"/ws/sandbox/coze_studio/proxy"
f"/v1/browsers/devtools/browser/{uuid_part}"
)
return new_url
async def get_data_files(directory: str | Path) -> List[Dict[str, str]]:
path = Path(directory)
if not path.is_dir():
raise ValueError(f"'{directory}' is not a valid directory")
result = []
for file in path.iterdir():
if file.is_file():
try:
with open(file, 'rb') as f:
content = f.read()
base64_encoded = base64.b64encode(content).decode('ascii')
result.append({
"name": file.name,
"content": base64_encoded
})
except Exception as e:
result.append({
"name": file.name,
"content": f"[ERROR READING FILE: {str(e)}]".encode()
})
return result
async def RunBrowserUseAgent(ctx: RunBrowserUseAgentCtx) -> AsyncGenerator[SSEData, None]:
task_id = str(uuid.uuid4())
event_queue = asyncio.Queue(maxsize=100)
# 初始化日志
ctx.logger.info(f"RunBrowserUseAgent with query: {ctx.query},task_id:{task_id}")
# 浏览器初始化
try:
if ctx.browser_session_endpoint == "" or ctx.browser_session_endpoint is None:
global CURRENT_CDP_PORT
CURRENT_CDP_PORT += 1
current_port = CURRENT_CDP_PORT
browser_wrapper = await start_local_browser(current_port)
else:
browser_wrapper = BrowserWrapper(None, None, None, 'id', ctx.browser_session_endpoint)
except Exception as e:
ctx.logger.error(f"[Failed to initialize browser: {e}")
yield genSSEData(
stream_id=ctx.conversation_id,
content=f'init browser_wrapper err:{e}',
is_finish=True,
is_last_msg=True,
is_last_packet_in_msg=True,
output_mode=OutputModeEnum.NOT_STREAM,
return_type=ReturnTypeEnum.MODEL,
response_for_model=f'init browser_wrapper err:{e}',
reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER)
)
return
# CDP URL 获取
cdp_url = None
try:
if browser_wrapper.remote_browser_id:
async with aiohttp.ClientSession() as session:
get_url = f"{browser_wrapper.endpoint}/v1/browsers/"
async with session.get(url = get_url,
timeout=aiohttp.ClientTimeout(total=30),
headers={
'x-sandbox-taskid':ctx.conversation_id,
'x-tt-env':'ppe_coze_sandbox',
'x-use-ppe':'1',
}) as response:
if response.status == 200:
reader = response.content
browser_info = json.loads(await reader.read())
cdp_url = convert_ws_url(browser_info['ws_url'])
ctx.logger.info(f"[{task_id}] Retrieved remote CDP URL: {cdp_url}")
else:
error_text = await response.text()
raise Exception(f"Failed to get browser info. Status: {response.status}, Error: {error_text}")
else:
current_port = CURRENT_CDP_PORT
ctx.logger.info(f"Starting task with local browser on port: {current_port}")
cdp_url = f"http://127.0.0.1:{current_port}"
except Exception as e:
ctx.logger.error(f"Error getting browser URL: {e}")
yield genSSEData(
stream_id=ctx.conversation_id,
content=f'Failed to get browser cdp_url:{e}',
is_finish=True,
is_last_msg=True,
is_last_packet_in_msg=True,
output_mode=OutputModeEnum.NOT_STREAM,
return_type=ReturnTypeEnum.MODEL,
response_for_model=f'',
reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER)
)
return
# 目录创建
base_dir = os.path.join("videos", task_id)
snapshot_dir = os.path.join(base_dir, "snapshots")
Path(snapshot_dir).mkdir(parents=True, exist_ok=True)
browser_session = None
agent = None
agent_task = None
try:
# 浏览器会话配置
headless = False if ctx.browser_session_endpoint != "" else True
browser_profile = BrowserProfile(
headless=headless,
disable_security=True,
highlight_elements=False,
wait_between_actions=1,
keep_alive=False,
headers={
'x-sandbox-taskid':ctx.conversation_id,
'x-tt-env':'ppe_coze_sandbox',
'x-use-ppe':'1',
},
)
browser_session = BrowserSession(
browser_profile=browser_profile,
cdp_url=cdp_url,
)
ctx.logger.info(f"[{task_id}] Browser initialized with CDP URL: {cdp_url}")
async def on_step_end(agent):
try:
ctx.logger.info("Agent step end")
page = await agent.browser_session.get_current_page()
detector = VisualChangeDetector()
current_screenshot = await detector.capture_screenshot(page,upload_service=ctx.upload_service,logger=ctx.logger)
if not current_screenshot:
ctx.logger.error(f"Failed to capture screenshot")
return
agent.context = {'current_screenshot':current_screenshot}
ctx.logger.info(f'captured screenshot success')
except Exception as e:
ctx.logger.error(f"Error in on_step_end: {e}")
# 回调函数定义
async def new_step_callback_wrapper(browser_state_summary: BrowserStateSummary,
model_output: AgentOutput,
step_number: int):
try:
islogin = False
for ac in model_output.action:
try:
action_data = ac.model_dump(exclude_unset=True)
action_name = next(iter(action_data.keys()))
if action_name == 'pause':
islogin = True
ctx.logger.info("pause action detected, browser task pause")
agent.pause()
except Exception as e:
ctx.logger.error(f"Error in new_step_callback_wrapper: {e}")
agent.resume()
data = ''
content_type = ContentTypeInReplyEnum.TXT
if islogin:
data = data + MessageActionInfo(actions=[MessageActionItem()]).model_dump_json()
content_type = ContentTypeInReplyEnum.ACTION_INFO
else:
data = data + model_output.current_state.next_goal or ''
await event_queue.put(genSSEData(
stream_id=ctx.conversation_id,
content=data,
reply_content_type= ReplyContentType(content_type=content_type)
))
if islogin:
current_screenshot = None
if agent.context:
current_screenshot = agent.context.get('current_screenshot',None)
if current_screenshot:
ctx.logger.info(f'current screenshot exists')
has_change, _, _ = await wait_for_visual_change(
params=WaitForVisualChangeAction(
timeout=300,
similarity_threshold=0.85,
),
browser_session=agent.browser_session,
initial_screenshot=current_screenshot,
upload_service=ctx.upload_service,
logger=ctx.logger,
)
if has_change:
ctx.logger.info('detected visual change,browser task resume')
agent.resume()
data = 'timeout: no visual change detected during login'
await event_queue.put(genSSEData(
stream_id=ctx.conversation_id,
content=data,
output_mode=OutputModeEnum.NOT_STREAM,
is_finish=True,
is_last_msg=True,
is_last_packet_in_msg=True,
response_for_model=data,
reply_content_type= ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER)
))
except Exception as e:
ctx.logger.error(f"Error in new_step_callback_wrapper: {e}")
agent.resume()
# Agent 创建
agent = MyAgent(
task=ctx.query,
browser_session=browser_session,
register_new_step_callback=new_step_callback_wrapper,
llm=ctx.llm,
page_extraction_llm=ctx.llm,
planner_llm=ctx.llm,
controller=MyController(),
planner_interval=int(os.getenv("ARK_PLANNER_INTERVAL", "1")),
override_system_message=ctx.system_prompt,
extend_planner_system_message=ctx.extend_prompt,
language='zh',
enable_memory=False,
)
ctx.logger.info(f"[{task_id}] Agent initialized and ready to run")
# 启动 Agent 任务
agent_task = asyncio.create_task(agent.run(20,on_step_end=on_step_end))
ctx.logger.info(f"[{task_id}] Agent started running")
# 事件流生成
while True:
try:
# 等待事件或检查任务完成
try:
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
yield event
# 检查是否应该结束
if event == "error" or (isinstance(event, dict) and event.get("status") == "completed"):
break
except asyncio.TimeoutError:
# 检查 Agent 任务是否完成
if agent_task.done():
break
continue
except Exception as e:
ctx.logger.error(f"[{task_id}] Error in event streaming: {e}")
break
# 等待 Agent 任务完成
if agent_task and not agent_task.done():
await agent_task
# 获取最终结果
result = await agent_task if agent_task else None
if result:
final_result = None
for history_item in reversed(result.history):
for result_item in history_item.result:
if hasattr(result_item, "is_done") and result_item.is_done:
final_result = result_item.extracted_content
break
if final_result:
break
if not final_result:
result = [
[item.extracted_content for item in history_item.result
if hasattr(item, "extracted_content")]
for history_item in result.history
]
final_result = "\n".join(
item
for sublist in result
for item in sublist
if isinstance(item, str)
)
if final_result:
ctx.logger.info(f"[{task_id}] final_result: {final_result}")
completion_event = genSSEData(
stream_id=ctx.conversation_id,
content= final_result,
return_type=ReturnTypeEnum.MODEL,
response_for_model=final_result,
content_type=ContentTypeEnum.TEXT,
output_mode=OutputModeEnum.STREAM,
is_finish= True,
is_last_msg=True,
is_last_packet_in_msg=True,
reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER)
)
yield completion_event
ctx.logger.info(f"[{task_id}] Task completed successfully")
except Exception as e:
ctx.logger.error(f"[{task_id}] Agent execution failed: {e}")
yield genSSEData(
stream_id=ctx.conversation_id,
content=f'err:{e}',
is_finish=True,
is_last_msg=True,
is_last_packet_in_msg=True,
return_type=ReturnTypeEnum.MODEL,
response_for_model=f'err:{e}',
reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER)
)
finally:
if agent:
if agent.browser_session:
agent.browser_session.cdp_url = None
if agent.browser_session.browser_context:
await agent.browser_session.browser_context.close()
if agent.browser_session.browser:
await agent.browser_session.browser.close()
agent.pause()
ctx.logger.info('agent close success')
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.post("/tasks")
async def sse_task(request: Messages):
task_id = str(uuid.uuid4())
# 提取用户消息
prompt = ""
for message in request.messages:
if message.role == "user":
prompt = message.content
logging.debug(f"Found user message: {prompt}")
break
logging.info(f"[{task_id}] Starting SSE task with prompt: {prompt}")
async def generate_sse_events():
try:
# 创建SSE格式的生成器
async for event in RunBrowserUseAgent(ctx=RunBrowserUseAgentCtx(
query=prompt,
conversation_id="",
cookie="",
llm_config=LLMConfig(
),
browser_session_endpoint="http://127.0.0.1:8001/v1/browsers",
max_steps=20
)):
# 确保事件是SSE格式
if isinstance(event, str):
# 如果是字符串,直接作为数据发送
yield f"data: {event}\n\n"
elif isinstance(event, dict):
# 如果是字典转换为JSON格式
event_json = json.dumps(event, ensure_ascii=False)
yield f"data: {event_json}\n\n"
else:
# 其他类型转换为字符串
yield f"data: {str(event)}\n\n"
except Exception as e:
logging.error(f"[{task_id}] Error in SSE generation: {e}")
# 发送错误事件
error_event = json.dumps({
"task_id": task_id,
"status": "error",
"error": str(e)
}, ensure_ascii=False)
yield f"data: {error_event}\n\n"
finally:
# 可选:发送结束标记
yield f"data: {json.dumps({'task_id': task_id, 'status': 'stream_completed'}, ensure_ascii=False)}\n\n"
try:
return StreamingResponse(
generate_sse_events(),
media_type="text/event-stream",
)
except Exception as e:
logging.error(f"[{task_id}] Error creating StreamingResponse: {e}")
# 返回错误响应
return JSONResponse(
status_code=500,
content={"error": "Failed to create SSE stream", "task_id": task_id}
)
if __name__ == "__main__":
enforce_log_format()
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -0,0 +1,23 @@
from datetime import datetime
from typing import List
from abc import ABC, abstractmethod
from pydantic import BaseModel
class FileItem(BaseModel):
file_name: str
file_type: str
file_size: int
file_uri: str
file_url: str
upload_type: str
create_time: int
update_time: int
class UploadService(BaseModel,ABC):
headers:dict[str,str] = {}
async def upload_file(self,file_content:str,file_name:str,base64_content:str=''):
pass
async def list_file(self)->List[FileItem]:
pass
async def filter_file_by_time(file_list:List[FileItem],start_time:int)->List[FileItem]:
return [file for file in file_list if (start_time <= file.create_time or start_time <= file.update_time) ]

View File

@ -0,0 +1,86 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, List
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult
root_logger = logging.getLogger()
for handler in root_logger.handlers:
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
'%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def enforce_log_format():
root_logger = logging.getLogger()
for handler in root_logger.handlers:
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
'%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
class ModelLoggingCallback(BaseCallbackHandler):
def on_chat_model_start(
self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs
) -> None:
logging.info(
f"[Model] Chat model started\n")
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
logging.info(
f"[Model] Chat model ended, response: {response}")
def on_llm_error(self, error: BaseException, **kwargs) -> Any:
logging.info(
f"[Model] Chat model error, response: {error}")
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs
) -> None:
logging.info(
f"[Model] Chain {serialized.get('name')} started")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None:
logging.info(f"[Model] Chain ended, outputs: {outputs}")
# class that wraps another class and logs all function calls being executed
class Wrapper:
def __init__(self, wrapped_class):
self.wrapped_class = wrapped_class
def __getattr__(self, attr):
original_func = getattr(self.wrapped_class, attr)
def wrapper(*args, **kwargs):
print(f"Calling function: {attr}")
print(f"Arguments: {args}, {kwargs}")
result = original_func(*args, **kwargs)
print(f"Response: {result}")
return result
return wrapper

29
py_package/pyproject.toml Normal file
View File

@ -0,0 +1,29 @@
[build-system]
requires = ["setuptools>=64.0.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
include-package-data = true
package-dir = {"browser_agent" = "browser_agent","stream_helper" = "stream_helper"}
[tool.setuptools.package-data]
"browser_agent" = ["**/*.mo", "**/*.po"]
[project]
name = "coze-studio-py-package" # 包名在PyPI上唯一。通常用中划线
version = "0.1.0"
description = "browser agent"
# 如果你的包有依赖,在这里声明
dependencies = [
"fastapi==0.109.2",
"uvicorn==0.27.0.post1",
"python-multipart==0.0.9",
"MainContentExtractor==0.0.4",
"aiohttp==3.11.16",
"websockets==15.0.1",
"faiss-cpu==1.10.0",
"browser_use @ git+https://github.com/zzxwill/browser-use.git@i18n_system_prompt",
"playwright==1.52.0",
"pillow==11.3.0"
]

View File

View File

@ -0,0 +1,125 @@
from enum import Enum, IntEnum
from typing import Optional, Dict, Union
from pydantic import BaseModel
class StepInfo(BaseModel):
step_number:int
goal:str
class WebSocketItem(BaseModel):
ws_url:str
class WebSocketInfo(BaseModel):
items: list[WebSocketItem]
class MessageActionTypeEnum(int, Enum):
"""MessageActionType 的枚举值"""
WebPageAuthorization = 1 # Web page authorization
class MessageActionItem(BaseModel):
type: MessageActionTypeEnum = MessageActionTypeEnum.WebPageAuthorization
class Config:
use_enum_values = True
class MessageActionInfo(BaseModel):
actions: list[MessageActionItem]
# 定义枚举类型
class FileType(str, Enum):
DIR = "dir"
FILE = "file"
class FileChangeType(str, Enum):
CREATE = "create"
DELETE = "delete"
UPDATE = "update"
class FileChangeData(BaseModel):
file_type: FileType = FileType.FILE
file_path: str = ''
file_name: str
change_type: FileChangeType
uri: str
url: str
class Config:
use_enum_values = True
class ErrData(BaseModel):
data: Dict[str, str]
class FileChangeInfo(BaseModel):
file_change_list: Optional[list[FileChangeData]] = None
err_list: Optional[list[ErrData]] = None
class OutputModeEnum(int, Enum):
"""SSEData.output_mode 的枚举值"""
NOT_STREAM = 0 # 非流式
STREAM = 1 # 流式
class ReturnTypeEnum(int, Enum):
"""SSEData.return_type 的枚举值"""
MODEL = 0 # 输出到模型
USER_TERMINAL = 1 # 输出到终端
class ContentTypeEnum(int, Enum):
"""SSEData.content_type 的枚举值"""
TEXT = 0 # 文本
APPLET_WIDGET = 1 # 小程序组件
LOADING_TIPS = 2 # 加载提示
CARD = 3 # 卡片
VERBOSE = 4 # 详细信息
USAGE = 10 # 使用情况
class ContextModeEnum(int, Enum):
"""SSEData.context_mode 的枚举值"""
NOT_IGNORE = 0 # 不忽略上下文
IGNORE = 1 # 忽略上下文
class ReplyTypeInReplyEnum(IntEnum):
"""Type of reply (purpose/context)."""
ANSWER = 1 # Standard answer
SUGGEST = 2 # Suggestion (e.g., follow-up questions)
LLM_OUTPUT = 3 # Raw LLM output (before processing)
TOOL_OUTPUT = 4 # Output from an external tool/API
VERBOSE = 100 # Debug/verbose logging
PLACEHOLDER = 101 # Placeholder (e.g., loading state)
TOOL_VERBOSE = 102 # Verbose logs from tools
class ContentTypeInReplyEnum(IntEnum):
"""Format of the content."""
TXT = 1 # Plain text
IMAGE = 2 # Image
VIDEO = 4 # Video
MUSIC = 7 # Audio
CARD = 50 # Structured card (e.g., rich UI element)
WIDGET = 52 # Interactive widget
APP = 100 # Embedded application
WEBSOCKET_INFO = 101 # Websocket metadata
FILE_CHANGE_INFO = 102 # File system event
ACTION_INFO = 103 # Action/command metadata
class ReplyContentType(BaseModel):
"""Combines reply type and content type for structured responses."""
reply_type: ReplyTypeInReplyEnum = ReplyTypeInReplyEnum.TOOL_VERBOSE
content_type: ContentTypeInReplyEnum = ContentTypeInReplyEnum.TXT
class SSEData(BaseModel):
stream_id: str
message_title: Optional[str] = None
context_mode: Union[ContextModeEnum, int] = ContextModeEnum.NOT_IGNORE
output_mode: OutputModeEnum = OutputModeEnum.STREAM # 0=非流式, 1=流式
return_type: Union[ReturnTypeEnum, int] = ReturnTypeEnum.USER_TERMINAL
content_type: Union[ContentTypeEnum, int] = ContentTypeEnum.TEXT
is_last_msg: bool = False
is_finish: bool = False
is_last_packet_in_msg: bool = False
content: Optional[str] = None
response_for_model: Optional[str] = None
ext: Optional[Dict[str, str]] = None
card_body: Optional[str] = None
reply_content_type :Optional[ReplyContentType] = None
class Config:
use_enum_values = True # 序列化时使用枚举的原始值