diff --git a/.gitignore b/.gitignore index dfdf34dc7..d4a481dd1 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ values-dev.yaml *.tsbuildinfo +*venv/ \ No newline at end of file diff --git a/py_package/browser_agent/.python-version b/py_package/browser_agent/.python-version new file mode 100644 index 000000000..fdcfcfdfc --- /dev/null +++ b/py_package/browser_agent/.python-version @@ -0,0 +1 @@ +3.12 \ No newline at end of file diff --git a/py_package/browser_agent/__init__.py b/py_package/browser_agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/agent.py b/py_package/browser_agent/agent.py new file mode 100644 index 000000000..e3612052f --- /dev/null +++ b/py_package/browser_agent/agent.py @@ -0,0 +1,30 @@ +from pydantic import BaseModel +from browser_agent.index import RunBrowserUseAgentCtx,LLMConfig +from typing import AsyncGenerator,Dict,Optional +from abc import ABC, abstractmethod +from datetime import datetime +from stream_helper.schema import SSEData +from langchain_core.language_models.chat_models import BaseChatModel +class BrowserAgentBase(BaseModel,ABC): + query: str + conversation_id: str = '' + llm: BaseChatModel + browser_session_endpoint: str + endpoint_header: Dict[str, str] = {} + max_steps: int = 20 + system_prompt: str = None + extend_prompt: str = None + @abstractmethod + async def save_cookies(self): + pass + + @abstractmethod + async def get_llm(self)->BaseChatModel: + pass + + @abstractmethod + async def get_system_prompt(self)->str: + pass + + async def run(self)->AsyncGenerator[SSEData,None]: + pass diff --git a/py_package/browser_agent/browser.py b/py_package/browser_agent/browser.py new file mode 100644 index 000000000..d2f337c9c --- /dev/null +++ b/py_package/browser_agent/browser.py @@ -0,0 +1,158 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import os +import aiohttp +from playwright.async_api import async_playwright +from playwright.async_api import Browser +from playwright.async_api._generated import Playwright as AsyncPlaywright + +browser_ready_event = asyncio.Event() + +class BrowserWrapper: + def __init__(self, port, browser: Browser, playwright: AsyncPlaywright, remote_browser_id: str = None, endpoint: str = None): + self.port = port + self.browser = browser + self.playwright = playwright + self.remote_browser_id = remote_browser_id + self.endpoint = endpoint + + async def stop(self): + if self.browser: + logging.info(f"Closing browser on port {self.port}...") + # try: + # for context in self.browser.contexts: + # await context.close() + # except Exception as e: + # logging.error(f"Error closing contexts: {e}") + await self.browser.close() + logging.info(f"Browser on port {self.port} closed successfully") + if self.playwright: + logging.info(f"Closing playwright session {self.port}...") + await self.playwright.stop() + logging.info( + f"Paywright session on port {self.port} closed successfully") + if self.remote_browser_id and self.endpoint: + logging.info(f"Closing remote browser {self.remote_browser_id}...") + try: + async with aiohttp.ClientSession() as session: + delete_url = f"{self.endpoint}/{self.remote_browser_id}" + async with session.delete(delete_url, timeout=30) as response: + if response.status == 200: + logging.info(f"Remote browser {self.remote_browser_id} closed successfully") + else: + error_text = await response.text() + logging.error(f"Failed to close remote browser. Status: {response.status}, Error: {error_text}") + except Exception as e: + logging.error(f"Error closing remote browser {self.remote_browser_id}: {e}") + + +async def start_local_browser(port): + logging.info(f"Attempting to start browser on port {port}") + p = None + browser = None + + try: + p = await async_playwright().start() + + w = BrowserWrapper(port, None, p) + + try: + # Configure proxy if environment variables are set + proxy_config = None + proxy_server = os.getenv('PROXY_SERVER') + if proxy_server: + proxy_config = { + 'server': proxy_server, + 'bypass': '127.0.0.1,localhost' + } + + browser = await p.chromium.launch( + # executable_path="/opt/chromium.org/browser_use/chromium/chromium-browser-use", + headless=False, + args=[ + f'--remote-debugging-port={port}', + '--remote-allow-origins=*', + '--remote-debugging-address=0.0.0.0', + '--no-sandbox' + ], + proxy=proxy_config + ) + + w = BrowserWrapper(port, browser, p) + + logging.info(f"Browser launched successfully on port {port}") + + # Verify CDP is actually running + try: + async with aiohttp.ClientSession() as session: + async with session.get(f"http://127.0.0.1:{port}/json/version", timeout=5) as response: + if response.status == 200: + version_info = await response.json() + logging.info( + f"successfully connected to cdp: {version_info}") + else: + logging.error( + f"Failed to get CDP version. Status: {response.status}") + except Exception as cdp_e: + logging.error(f"Error checking CDP availability: {cdp_e}") + + logging.info('closing playwright driver and browser') + await w.stop() + raise + return BrowserWrapper(port, browser, p) + + except Exception as launch_e: + logging.error(f"Failed to launch browser: {launch_e}") + browser_ready_event.clear() + + logging.info('closing playwright driver and browser') + await w.stop() + raise + except Exception as p_e: + logging.error(f"Playwright initialization error: {p_e}") + browser_ready_event.clear() + + logging.info('closing playwright driver and browser') + await w.stop() + raise + +async def start_remote_browser(endpoint): + logging.info(f"Attempting to create browser via remote endpoint: {endpoint}") + + try: + async with aiohttp.ClientSession() as session: + data = {"timeout": 30} + + async with session.post( + endpoint, + json=data, + headers={'Content-Type': 'application/json'}, + timeout=30 + ) as response: + if response.status == 200: + result = await response.json() + logging.info(f"Browser created successfully: {result}") + id_value = result.get('id', None) + if id_value is None: + result['id'] = 'id' + return BrowserWrapper(None, None, None, result['id'], endpoint) + else: + error_text = await response.text() + logging.error(f"Failed to create browser. Status: {response.status}, Error: {error_text}") + raise Exception(f"Failed to create browser: {response.status} - {error_text}") + + except Exception as e: + logging.error(f"Error creating remote browser: {e}") + raise + diff --git a/py_package/browser_agent/browser_use_custom/__init__.py b/py_package/browser_agent/browser_use_custom/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/browser_use_custom/agent/__init__.py b/py_package/browser_agent/browser_use_custom/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/browser_use_custom/agent/service.py b/py_package/browser_agent/browser_use_custom/agent/service.py new file mode 100644 index 000000000..b57ee1a00 --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/agent/service.py @@ -0,0 +1,6 @@ +from browser_use import Agent + +class MyAgent(Agent): + # browser use 0.2.5版本 移除模型检测 + def _test_tool_calling_method(self, method: str) -> bool: + return True \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/controller/__init__.py b/py_package/browser_agent/browser_use_custom/controller/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/browser_use_custom/controller/screen.py b/py_package/browser_agent/browser_use_custom/controller/screen.py new file mode 100644 index 000000000..c50a20efe --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/controller/screen.py @@ -0,0 +1,185 @@ +import asyncio +import io,base64 +import logging,time +from typing import Optional, Tuple +from PIL import Image, ImageChops +import numpy as np +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +class VisualChangeDetector: + """视觉变化检测器(基于屏幕截图比对)""" + + def __init__( + self, + similarity_threshold: float = 0.95, + pixel_change_threshold: int = 1000, + resize_width: Optional[int] = 800 + ): + """ + Args: + similarity_threshold: 相似度阈值(0-1) + pixel_change_threshold: 变化像素数量阈值 + resize_width: 缩放宽度(提升性能) + """ + self.similarity_threshold = similarity_threshold + self.pixel_threshold = pixel_change_threshold + self.resize_width = resize_width + + async def capture_screenshot(self, page, upload_service=None,logger=None) -> Image.Image: + """捕获页面截图(自动优化)""" + screenshot_bytes = await page.screenshot(timeout=10000,type='jpeg',quality=50,full_page=False,animations='disabled') + img = Image.open(io.BytesIO(screenshot_bytes)) + + # 统一缩放尺寸(提升比对性能) + if self.resize_width: + w, h = img.size + new_h = int(h * (self.resize_width / w)) + img = img.resize((self.resize_width, new_h)) + if upload_service: + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format='JPEG') + binary_data = img_byte_arr.getvalue() + if not binary_data: + return + file_name = f"screenshot_{int(time.time())}.jpg" + base64_str = base64.b64encode(binary_data).decode('ascii') + if logger: + logger.info(f'upload screenshot to {file_name}') + else: + logging.info(f'upload screenshot to {file_name}') + try: + await upload_service.upload_file('',file_name,base64_content=base64_str) + except Exception as e: + if logger: + logger.error(f"Failed to upload screenshot to {file_name}: {e}") + else: + logging.error(f"Failed to upload screenshot to {file_name}: {e}") + return img.convert('RGB') + return img.convert('RGB') # 确保RGB模式 + + def calculate_change( + self, + img1: Image.Image, + img2: Image.Image + ) -> Tuple[float, Image.Image]: + """ + 计算两图差异 + + Returns: + tuple: (相似度百分比, 差异图) + """ + # 确保尺寸一致 + if img1.size != img2.size: + img2 = img2.resize(img1.size) + + arr1 = np.array(img1) # shape: (height, width, 3) + arr2 = np.array(img2) # shape: (height, width, 3) + + # 计算绝对差异(每个通道单独计算) + diff = np.abs(arr1.astype(int) - arr2.astype(int)) + + # 变化像素计算(任一通道有变化即视为该像素变化) + # 先检查每个像素的3个通道是否有变化 + pixel_changed = np.any(diff > 0, axis=2) # shape: (height, width) + changed_pixels = np.sum(pixel_changed) # 变化像素总数 + + # 总像素数(不需要除以3,因为pixel_changed已经是像素级别的判断) + total_pixels = arr1.shape[0] * arr1.shape[1] + + # 生成差异图(可视化用) + diff_img = ImageChops.difference(img1, img2) + + # 计算相似度 + similarity = 1 - (changed_pixels / total_pixels) + + return similarity, diff_img + + async def detect_change( + self, + browser_session, + reference_img: Optional[Image.Image] = None, + max_attempts: int = 5, + attempt_interval: float = 1.5, + upload_service = None, + logger = None + ) -> Tuple[bool, Optional[Image.Image], Optional[Image.Image]]: + """ + 检测视觉变化 + + Args: + page: 浏览器页面对象 + reference_img: 基准截图(None则自动捕获) + max_attempts: 最大检测次数 + attempt_interval: 检测间隔(秒) + + Returns: + tuple: (是否变化, 基准截图, 差异图) + """ + # 首次捕获基准图 + if reference_img is None: + page = await browser_session.get_current_page() + reference_img = await self.capture_screenshot(page,upload_service=upload_service,logger=logger) + logger.info("start detect change") + for attempt in range(max_attempts): + await asyncio.sleep(attempt_interval) + + # 捕获当前截图 + page = await browser_session.get_current_page() + current_img = await self.capture_screenshot(page,upload_service=upload_service,logger=logger) + if not current_img: + logger.error(f"Failed to capture screenshot on attempt {attempt + 1}") + continue + # 计算变化 + similarity, diff_img = self.calculate_change(reference_img, current_img) + logger.info(f"Attempt {attempt + 1}: 相似度 {similarity:.2f}, 变化像素 {np.sum(np.array(diff_img) > 0)}") + # 判断是否显著变化 + if similarity < self.similarity_threshold: + diff_pixels = np.sum(np.array(diff_img) > 0) + if diff_pixels > self.pixel_threshold: + logger.info(f"视觉变化 detected (相似度: {similarity:.2f}, 变化像素: {diff_pixels})") + return True, reference_img, diff_img + + return False, reference_img, None + +class WaitForVisualChangeAction(BaseModel): + """等待视觉变化的参数模型""" + timeout: int = 30 + check_interval: float = 2 + similarity_threshold: float = 0.95 + pixel_threshold: int = 5000 + +async def wait_for_visual_change( + params: WaitForVisualChangeAction, + browser_session, + initial_screenshot: Optional[Image.Image] = None, + upload_service = None, + logger = None +) -> Tuple[bool, Optional[Image.Image], Optional[Image.Image]]: + """ + 等待页面视觉变化(完整工作流) + + Args: + params: 配置参数 + browser_session: 浏览器会话 + initial_screenshot: 初始截图(可选) + + Returns: + tuple: (是否变化, 初始截图, 差异图) + """ + detector = VisualChangeDetector( + similarity_threshold=params.similarity_threshold, + pixel_change_threshold=params.pixel_threshold + ) + + max_attempts = int(params.timeout / params.check_interval) + + return await detector.detect_change( + browser_session=browser_session, + reference_img=initial_screenshot, + max_attempts=max_attempts, + attempt_interval=params.check_interval, + upload_service=upload_service, + logger=logger, + ) \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/controller/service.py b/py_package/browser_agent/browser_use_custom/controller/service.py new file mode 100644 index 000000000..bbb65983a --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/controller/service.py @@ -0,0 +1,204 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import io +import logging +from typing import Optional, Tuple +from PIL import Image, ImageChops +import numpy as np +from pydantic import BaseModel +import logging +import asyncio +import re +import markdownify +from bs4 import BeautifulSoup +from typing import Optional +from browser_use.browser import BrowserSession +from browser_use.controller.views import SearchGoogleAction +from browser_use.agent.views import ActionResult +from browser_use.controller.service import Controller +from playwright.async_api import Page +from pydantic import BaseModel,Field +from langchain_core.language_models.chat_models import BaseChatModel +from browser_agent.browser_use_custom.i18n import _ +from langchain_core.prompts import PromptTemplate +from browser_agent.browser_use_custom.controller.screen import VisualChangeDetector,wait_for_visual_change,WaitForVisualChangeAction + +logger = logging.getLogger(__name__) + +class PauseAction(BaseModel): + reason: str + +class WaitForLoginAction(BaseModel): + """Action parameters for waiting for login completion.""" + timeout: int = Field( + default=300, + description="Maximum time to wait for login completion in seconds" + ) + check_interval: int = Field( + default=5, + description="Interval between checks for URL changes in seconds" + ) + +class MyController(Controller): + """Custom controller extending base Controller with additional actions. + + Features: + - Inherits core controller functionality + - Adds custom pause action handler + - Maintains action registry with exclusion support + """ + + def __init__( + self, + exclude_actions: list[str] = [], + output_model: type[BaseModel] | None = None, + ): + super().__init__(exclude_actions, output_model) + # Basic Navigation Actions + @self.registry.action( + _('Search the query in Baidu in the current tab, the query should be a search query like humans search in Baidu, concrete and not vague or super long. More the single most important items.'), + param_model=SearchGoogleAction, + ) + async def search_google(params: SearchGoogleAction, browser_session: BrowserSession): + search_url = f'https://www.baidu.com/s?wd={params.query}' + + page = await browser_session.get_current_page() + await page.goto(search_url) + await page.wait_for_load_state() + msg = _('🔍 Searched for "{query}" in Baidu').format(query=params.query) + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + # Content Actions + @self.registry.action( + _('Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about xyc, 4 links with companies in structured format. Use include_links true if the goal requires links'), + ) + async def extract_content( + goal: str, + page: Page, + page_extraction_llm: BaseChatModel, + include_links: bool = False, + ): + raw_content = await page.content() + soup = BeautifulSoup( + raw_content, 'html.parser') + # remove all unnecessary http metadata + for s in soup.select('script'): + s.decompose() + for s in soup.select('style'): + s.decompose() + for s in soup.select('textarea'): + s.decompose() + for s in soup.select('img'): + s.decompose() + for s in soup.find_all(style=re.compile("background-image.*")): + s.decompose() + content = markdownify.markdownify(str(soup)) + + # manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes) + for iframe in page.frames: + if iframe.url != page.url and not iframe.url.startswith('data:'): + content += f'\n\nIFRAME {iframe.url}:\n' + content += markdownify.markdownify(await iframe.content()) + + prompt = _('Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}') + template = PromptTemplate(input_variables=['goal', 'page'], template=prompt) + try: + output = await page_extraction_llm.ainvoke(template.format(goal=goal, page=content)) + msg = _('📄 Extracted from page\n: {content}\n').format(content=output.content) + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + except Exception as e: + logger.debug(_('Error extracting content: {error}').format(error=e)) + msg = _('📄 Extracted from page\n: {content}\n').format(content=content) + logger.info(msg) + return ActionResult(extracted_content=msg) + + @self.registry.action( + _('Pause agent'), + param_model=PauseAction, + ) + async def pause(params: PauseAction): + msg = _('👩 Pause agent, reason: {reason}').format(reason=params.reason) + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + # Login detection and waiting action + # @self.registry.action( + # _('Detects if current page requires login and waits for authentication to complete.Wait for login completion by monitoring URL changes.'), + # param_model=WaitForLoginAction, + # ) + # async def wait_for_login(params: WaitForLoginAction, browser_session: BrowserSession): + # page = await browser_session.get_current_page() + + # # Get initial URL for comparison + # initial_url = page.url + # logger.info(_('🔐 Starting login detection. Initial URL: {url}').format(url=initial_url)) + + # # Wait for URL change indicating login completion + # msg = _('🔐 Login page detected. Waiting for authentication completion (max {timeout}s)...').format( + # timeout=params.timeout + # ) + # logger.info(msg) + + # final_url = await self._wait_for_url_change( + # page, + # initial_url, + # params.timeout, + # params.check_interval + # ) + + # if final_url and final_url != initial_url: + # success_msg = _('✅ Login completed successfully! URL changed from {initial} to {final}').format( + # initial=initial_url, + # final=final_url + # ) + # logger.info(success_msg) + # return ActionResult( + # extracted_content=success_msg, + # include_in_memory=True, + # success=True + # ) + # else: + # timeout_msg = _('⏰ Login timeout or no URL change detected after {timeout} seconds').format( + # timeout=params.timeout + # ) + # logger.warning(timeout_msg) + # return ActionResult( + # extracted_content=timeout_msg, + # include_in_memory=True, + # success=False + # ) + + # async def _wait_for_url_change( + # self, + # page: Page, + # initial_url: str, + # timeout: int, + # check_interval: int + # ) -> Optional[str]: + # """Wait for URL change indicating login completion.""" + # start_time = asyncio.get_event_loop().time() + + # while (asyncio.get_event_loop().time() - start_time) < timeout: + # try: + # current_url = page.url + + # # If URL has changed, return the new URL + # if current_url != initial_url: + # return current_url + + # # Wait before checking again + # await asyncio.sleep(check_interval) + + # except Exception as e: + # logger.warning(_('Error checking URL change: {error}').format(error=str(e))) + # await asyncio.sleep(check_interval) + + # return None # Timeout reached without URL change \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/i18n/__init__.py b/py_package/browser_agent/browser_use_custom/i18n/__init__.py new file mode 100644 index 000000000..830a0f187 --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/i18n/__init__.py @@ -0,0 +1,127 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gettext +import os +import copy +from typing import Optional + +# Default language +_current_language = 'zh-CN' +_translator = None + +def get_locales_dir(): + """Get the locales directory path""" + return os.path.join(os.path.dirname(__file__), 'locales') + +def set_language(language: str = 'zh-CN'): + """Set the current language for translations""" + global _current_language, _translator + _current_language = language + + locales_dir = get_locales_dir() + + try: + # Try to load the translation + translation = gettext.translation( + 'vefaas_browser_use', + localedir=locales_dir, + languages=[language], + fallback=True + ) + _translator = translation + except Exception: + # Fallback to NullTranslations if translation not found + _translator = gettext.NullTranslations() + +def get_language(): + """Get the current language""" + return _current_language + +def _(message: str) -> str: + """Translate a message""" + if _translator is None: + set_language(_current_language) + return _translator.gettext(message) + +def get_available_languages(): + """Get list of available languages""" + locales_dir = get_locales_dir() + if not os.path.exists(locales_dir): + return ['zh-CN'] + + languages = [] + for item in os.listdir(locales_dir): + lang_dir = os.path.join(locales_dir, item) + if os.path.isdir(lang_dir): + mo_file = os.path.join(lang_dir, 'LC_MESSAGES', 'vefaas_browser_use.mo') + if os.path.exists(mo_file): + languages.append(item) + + return languages if languages else ['zh-CN'] + +def translate_planning_step_data(conversation_update: dict) -> dict: + """Translate planning step data fields using pattern matching""" + import re + translated_update = copy.deepcopy(conversation_update) + + # Translate evaluation field + if 'evaluation' in translated_update: + evaluation = translated_update['evaluation'] + # Basic status translations + if evaluation == "Unknown": + translated_update['evaluation'] = _("Unknown") + elif evaluation == "Failed": + translated_update['evaluation'] = _("Failed") + elif evaluation == "Success": + translated_update['evaluation'] = _("Success") + # Specific pattern translations + elif evaluation == "Unknown - The task has just started, and no actions have been taken yet.": + translated_update['evaluation'] = _("Unknown - The task has just started, and no actions have been taken yet.") + elif evaluation == "Unknown - The task has not yet begun as the current page is blank.": + translated_update['evaluation'] = _("Unknown - The task has not yet begun as the current page is blank.") + elif evaluation == "Success - Successfully navigated to Google's homepage.": + translated_update['evaluation'] = _("Success - Successfully navigated to Google's homepage.") + elif evaluation == "Failed - The search was blocked by Google's CAPTCHA verification.": + translated_update['evaluation'] = _("Failed - The search was blocked by Google's CAPTCHA verification.") + # General pattern matching for CAPTCHA-related failures + elif "CAPTCHA" in evaluation and evaluation.startswith("Failed"): + translated_update['evaluation'] = _("Failed - The search was blocked by a CAPTCHA verification page.") + # Partial translation for status prefixes + elif evaluation.startswith("Success - "): + translated_update['evaluation'] = evaluation.replace("Success", _("Success"), 1) + elif evaluation.startswith("Failed - "): + translated_update['evaluation'] = evaluation.replace("Failed", _("Failed"), 1) + elif evaluation.startswith("Unknown - "): + translated_update['evaluation'] = evaluation.replace("Unknown", _("Unknown"), 1) + + # Translate goal field with pattern matching + if 'goal' in translated_update: + goal = translated_update['goal'] + if goal == "Pause execution due to CAPTCHA verification.": + translated_update['goal'] = _("Pause execution due to CAPTCHA verification.") + # Add more general goal patterns as needed + + # Translate actions if they contain pause reasons + if 'actions' in translated_update: + for action in translated_update['actions']: + if 'pause' in action and 'reason' in action['pause']: + reason = action['pause']['reason'] + if reason == "CAPTCHA verification required. Human intervention is needed to proceed.": + action['pause']['reason'] = _("CAPTCHA verification required. Human intervention is needed to proceed.") + elif "CAPTCHA" in reason and "verification required" in reason: + # General CAPTCHA verification pattern + action['pause']['reason'] = _("CAPTCHA verification required. Human intervention is needed to proceed.") + + return translated_update + +# Initialize with default language +set_language('zh-CN') \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.mo b/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.mo new file mode 100644 index 000000000..2ca970324 Binary files /dev/null and b/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.mo differ diff --git a/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.po b/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.po new file mode 100644 index 000000000..83a6f10b7 --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.po @@ -0,0 +1,178 @@ +msgid "" +msgstr "" +"Project-Id-Version: vefaas-browser-use\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2025-01-04 12:00+0000\n" +"PO-Revision-Date: 2025-01-04 12:00+0000\n" +"Last-Translator: Auto Generated\n" +"Language-Team: Chinese (Simplified)\n" +"Language: zh-CN\n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=UTF-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Plural-Forms: nplurals=1; plural=0;\n" + +# Task management messages +msgid "Task auto-stopped after 1 minute in paused state" +msgstr "任务在暂停状态1分钟后自动停止" + +# Controller action descriptions +msgid "Search the query in Baidu in the current tab, the query should be a search query like humans search in Baidu, concrete and not vague or super long. More the single most important items." +msgstr "在当前标签页的百度中搜索查询,查询应该是像人类在百度中搜索的搜索查询,具体而不模糊或过长。更多的单个最重要的项目。" + +msgid "🔍 Searched for \"{query}\" in Baidu" +msgstr "🔍 在百度搜索了\"{query}\"" + +msgid "Pause agent" +msgstr "暂停代理" + +msgid "👩 Pause agent, reason: {reason}" +msgstr "👩 暂停代理,原因:{reason}" + +msgid "Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about xyc, 4 links with companies in structured format. Use include_links true if the goal requires links" +msgstr "提取页面内容以从页面中检索特定信息,例如所有公司名称、特定描述、关于xyc的所有信息、4个具有结构化格式公司的链接。如果目标需要链接,请使用include_links true" + +msgid "Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}" +msgstr "您的任务是提取页面的内容。您将获得一个页面和一个目标,您应该从页面中提取围绕此目标的所有相关信息。如果目标模糊,请总结页面。以json格式响应。提取目标:{goal},页面:{page}" + +msgid "📄 Extracted from page\n: {content}\n" +msgstr "📄 从页面提取\n:{content}\n" + +msgid "Error extracting content: {error}" +msgstr "提取内容错误:{error}" + +# SSE messages +msgid "Starting new step..." +msgstr "开始新步骤..." + +msgid "Taken action #{number}: {action}" +msgstr "执行动作 #{number}:{action}" + +msgid "Agent creation failed: {error}" +msgstr "代理创建失败:{error}" + +msgid "Agent execution failed: {error}" +msgstr "代理执行失败:{error}" + +# Planning step status messages +msgid "Unknown" +msgstr "未知" + +msgid "Failed" +msgstr "失败" + +msgid "Success" +msgstr "成功" + +# Common action result messages +msgid "Unknown - The task has just started, and no actions have been taken yet." +msgstr "未知 - 任务刚刚开始,尚未执行任何操作。" + +msgid "Failed - The attempt to search for 'MCP news' was blocked by a CAPTCHA verification page. This requires human intervention to proceed." +msgstr "失败 - 搜索'MCP news'的尝试被验证码页面阻止。需要人工干预才能继续。" + +msgid "CAPTCHA verification required to proceed with the search for MCP news. Please solve the CAPTCHA to continue." +msgstr "需要验证码验证才能继续搜索MCP新闻。请解决验证码以继续。" + +# Dynamic evaluation patterns +msgid "Unknown - The task has not yet begun as the current page is blank." +msgstr "未知 - 任务尚未开始,因为当前页面为空白。" + +msgid "Success - Successfully navigated to Google's homepage." +msgstr "成功 - 成功导航到Google主页。" + +# General pause patterns +msgid "Pause execution due to CAPTCHA verification." +msgstr "由于验证码验证而暂停执行。" + +# General evaluation patterns +msgid "Failed - The search was blocked by Google's CAPTCHA verification." +msgstr "失败 - 搜索被Google的验证码验证阻止。" + +msgid "Failed - The search was blocked by a CAPTCHA verification page." +msgstr "失败 - 搜索被验证码页面阻止。" + +# General pause reasons +msgid "CAPTCHA verification required. Human intervention is needed to proceed." +msgstr "需要验证码验证。需要人工干预才能继续。" + +msgid "pause" +msgstr "暂停" + +msgid "reason" +msgstr "原因" + +msgid "Human intervention required to solve it before proceeding with the task." +msgstr "需要人工干预解决后才能继续执行任务。" + +# CAPTCHA detection messages +msgid "CAPTCHA detected on Amazon.com. Human intervention required to solve it before proceeding with the task." +msgstr "在Amazon.com检测到验证码。需要人工干预解决后才能继续执行任务。" + +msgid "Pause execution and request human intervention to solve the CAPTCHA" +msgstr "暂停执行并请求人工干预解决验证码" + +# Task status messages +msgid "Task started." +msgstr "任务已开始。" + +msgid "Task started:" +msgstr "任务已开始:" + +msgid "Pause the task and wait for human intervention to complete the CAPTCHA verification." +msgstr "暂停任务并等待人工干预完成验证码验证。" + +# Extraction instruction +msgid "Extract the titles, sources, and summaries of" +msgstr "提取标题、来源和摘要" + +# Initial task status messages +msgid "No previous actions have been taken yet." +msgstr "尚未执行任何操作。" + +msgid "Submit the search query for" +msgstr "提交搜索查询" + +# Key instruction messages +msgid "'keys': 'Enter'" +msgstr "'输入键': '回车'" + +msgid "error" +msgstr "失败" + +# Status/State translations +msgid "queued" +msgstr "排队中" + +msgid "starting" +msgstr "开始执行" + +msgid "browser_initialized" +msgstr "浏览器初始化" + +msgid "running" +msgstr "执行中" + +msgid "failed" +msgstr "失败" + +msgid "agent_initialized" +msgstr "agent 初始化" + +msgid "completed" +msgstr "完成" + +msgid "go_to_url" +msgstr "访问网址" + +msgid "click_element_by_index" +msgstr "按索引点击元素" + +msgid "index" +msgstr "索引" + +msgid "include_links" +msgstr "包含链接" + + + diff --git a/py_package/browser_agent/browser_use_custom/i18n/update_translations.py b/py_package/browser_agent/browser_use_custom/i18n/update_translations.py new file mode 100644 index 000000000..59446cbf1 --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/i18n/update_translations.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Translation management script for vefaas-browser-use + +Usage: + python scripts/update_translations.py compile # Compile .po files to .mo files + python scripts/update_translations.py test # Test translations +""" + +import sys +import subprocess +from pathlib import Path + +def get_project_root(): + """Get the project root directory""" + return Path(__file__).parent.parent + +def compile_translations(): + """Compile all .po files to .mo files""" + project_root = get_project_root() + locales_dir = project_root / "my_browser_use" / "locales" + + if not locales_dir.exists(): + print(f"Locales directory not found: {locales_dir}") + return False + + success = True + for lang_dir in locales_dir.iterdir(): + if lang_dir.is_dir() and lang_dir.name != "__pycache__": + po_file = lang_dir / "LC_MESSAGES" / "vefaas_browser_use.po" + mo_file = lang_dir / "LC_MESSAGES" / "vefaas_browser_use.mo" + + if po_file.exists(): + print(f"Compiling {lang_dir.name}...") + try: + subprocess.run([ + "msgfmt", "-o", str(mo_file), str(po_file) + ], check=True) + print(f" ✓ Compiled {lang_dir.name}") + except subprocess.CalledProcessError as e: + print(f" ✗ Failed to compile {lang_dir.name}: {e}") + success = False + else: + print(f" ⚠ No .po file found for {lang_dir.name}") + + return success + +def test_translations(): + """Test the translation system""" + sys.path.insert(0, str(get_project_root())) + + try: + from browser_agent.browser_use_custom.i18n import _, set_language, get_available_languages + + print("Testing translation system...") + print(f"Available languages: {get_available_languages()}") + + # Test a few key messages + test_messages = [ + "Task auto-stopped after 1 minute in paused state", + "Pause agent", + "🔍 Searched for \"{query}\" in Baidu", + "Starting new step...", + "Taken action #{number}: {action}", + "Agent creation failed: {error}", + "Pause execution due to CAPTCHA verification.", + "Unknown - The task has not yet begun as the current page is blank.", + "Success - Successfully navigated to Google's homepage.", + "CAPTCHA verification required. Human intervention is needed to proceed." + ] + + for lang in get_available_languages(): + print(f"\n--- Testing {lang} ---") + set_language(lang) + for msg in test_messages: + translated = _(msg) + print(f" {msg[:50]}... -> {translated[:50]}...") + + print("\n✓ Translation test completed") + return True + + except Exception as e: + print(f"✗ Translation test failed: {e}") + return False + +def main(): + if len(sys.argv) != 2: + print(__doc__) + sys.exit(1) + + command = sys.argv[1] + + if command == "compile": + success = compile_translations() + sys.exit(0 if success else 1) + elif command == "test": + success = test_translations() + sys.exit(0 if success else 1) + else: + print(f"Unknown command: {command}") + print(__doc__) + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/py_package/browser_agent/cdp.py b/py_package/browser_agent/cdp.py new file mode 100644 index 000000000..f8e43e573 --- /dev/null +++ b/py_package/browser_agent/cdp.py @@ -0,0 +1,241 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import os +from urllib.parse import urlparse, urlunparse +import aiohttp +import websockets +from fastapi import HTTPException, WebSocket, WebSocketDisconnect + + +async def websocket_endpoint(websocket: WebSocket, page_id: str, port: str): + await websocket.accept() + + try: + # Construct the WebSocket URL for the specific page + ws_url = f"ws://127.0.0.1:{port}/devtools/page/{page_id}" + + # Establish a connection to the CDP WebSocket + async with websockets.connect(ws_url) as cdp_socket: + # Create tasks for bidirectional communication + receive_task = asyncio.create_task( + receive_from_cdp(cdp_socket, websocket)) + send_task = asyncio.create_task(send_to_cdp(cdp_socket, websocket)) + + # Wait for either task to complete + done, pending = await asyncio.wait( + [receive_task, send_task], + return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel any remaining tasks + for task in pending: + task.cancel() + + except Exception as e: + logging.error(f"WebSocket error: {e}") + await websocket.close() + + +async def websocket_browser_endpoint(websocket: WebSocket, browser_id: str, port: str): + await websocket.accept() + + try: + # Construct the WebSocket URL for the specific page + ws_url = f"ws://0.0.0.0:{port}/devtools/browser/{browser_id}" + + # Establish a connection to the CDP WebSocket + async with websockets.connect(ws_url) as cdp_socket: + # Create tasks for bidirectional communication + receive_task = asyncio.create_task( + receive_from_cdp(cdp_socket, websocket)) + send_task = asyncio.create_task(send_to_cdp(cdp_socket, websocket)) + + # Wait for either task to complete + done, pending = await asyncio.wait( + [receive_task, send_task], + return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel any remaining tasks + for task in pending: + task.cancel() + + except Exception as e: + logging.error(f"WebSocket error: {e}") + await websocket.close() + + +async def receive_from_cdp(cdp_socket, websocket): + try: + while True: + message = await cdp_socket.recv() + await websocket.send_text(message) + except websockets.exceptions.ConnectionClosed: + logging.info("CDP WebSocket connection closed") + except Exception as e: + logging.error(f"Error receiving from CDP: {e}") + + +async def send_to_cdp(cdp_socket, websocket): + try: + while True: + message = await websocket.receive_text() + await cdp_socket.send(message) + except WebSocketDisconnect: + logging.info("Client WebSocket disconnected") + except Exception as e: + logging.error(f"Error sending to CDP: {e}") + + +async def get_websocket_targets(port: str): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket targets for endpoint: {endpoint}") + try: + async with aiohttp.ClientSession() as session: + # Use the cdp_websocket parameter to dynamically construct the URL + base_url = f"http://127.0.0.1:{port}/json/list" + async with session.get(base_url) as response: + if response.status == 200: + result = await response.json() + + # If result is an empty list, return it immediately + if not result: + return result + + # Modify the URLs in the result to use the provided cdp_websocket + for target in result: + # Replace devtoolsFrontendUrl + if 'devtoolsFrontendUrl' in target: + target['devtoolsFrontendUrl'] = target['devtoolsFrontendUrl'].replace( + '127.0.0.1:' + str(port), str(endpoint) + ) + + # Replace webSocketDebuggerUrl + if 'webSocketDebuggerUrl' in target: + target['webSocketDebuggerUrl'] = target['webSocketDebuggerUrl'].replace( + '127.0.0.1:' + str(port), str(endpoint) + ) + + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch JSON list") + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error fetching JSON list: {str(e)}") + + +async def get_websocket_version(port: str): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket version for endpoint: {endpoint}") + try: + async with aiohttp.ClientSession() as session: + # Use the cdp_websocket parameter to dynamically construct the URL + base_url = f"http://127.0.0.1:{port}/json/version" + async with session.get(base_url) as response: + if response.status == 200: + result = await response.json() + + if not result: + return result + + if 'webSocketDebuggerUrl' in result: + result['webSocketDebuggerUrl'] = result['webSocketDebuggerUrl'].replace( + '127.0.0.1:' + str(port), str(endpoint) + ) + + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch JSON list") + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error fetching JSON list: {str(e)}") + +async def get_remote_websocket_version(browser_session_endpoint: str, browser_id: str): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket version for endpoint: {endpoint}") + try: + async with aiohttp.ClientSession() as session: + # Use the cdp_websocket parameter to dynamically construct the URL + base_url = f"{browser_session_endpoint}/{browser_id}/json/version" + async with session.get(base_url) as response: + if response.status == 200: + result = await response.json() + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch JSON version") + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error fetching JSON list: {str(e)}") + + +async def get_inspector(url): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket targets for endpoint: {endpoint}") + + # Convert Starlette URL to string + url_str = str(url) + + # like this: http://127.0.0.1:8000/devtools/inspector.html?ws=scqevor9btoi2t6dnkcpg.apigateway-cn-beijing.volceapi.com/devtools/page/7D574C7E6237CB326E385DD2C3A5C845 + logging.info(f"Getting original inspector for URL: {url_str}") + + # Parse the URL + parsed_url = urlparse(url_str) + + # Check if the current domain matches the endpoint + if parsed_url.netloc == endpoint: + # Replace only the netloc + modified_url = parsed_url._replace(netloc="127.0.0.1:9222") + url_str = urlunparse(modified_url) + + logging.info(f"Converted inspector for URL: {url_str}") + + # if parsed_url contains "127.0.0.1:9222", run a 301 redirect + if "127.0.0.1" in parsed_url.netloc: + url_str = url_str.replace("127.0.0.1:9222", endpoint) + logging.info(f"Redirecting to: {url_str}") + return await get_inspector(url_str) + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url_str) as response: + if response.status == 200: + # Check content type + content_type = response.headers.get( + 'Content-Type', '').lower() + + if 'application/json' not in content_type: + # If not JSON, read the content + content = await response.text() + logging.error( + f"Unexpected content type: {content_type}") + logging.error(f"Response content: {content[:500]}") + + return { + "status": "error", + "content_type": content_type, + "content": content + } + + result = await response.json() + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch inspector") + except Exception as e: + logging.error(f"Error fetching inspector: {e}") + raise HTTPException( + status_code=500, detail=f"Error fetching inspector: {str(e)}") diff --git a/py_package/browser_agent/index.py b/py_package/browser_agent/index.py new file mode 100644 index 000000000..07f3c1a75 --- /dev/null +++ b/py_package/browser_agent/index.py @@ -0,0 +1,541 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Dict,List,Union +import asyncio +import json +import logging +import os +import uuid +from pathlib import Path +from typing import AsyncGenerator,Optional +import aiohttp +import aiohttp +import uvicorn +from dotenv import load_dotenv +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, StreamingResponse +from pydantic import BaseModel, Field,ConfigDict +from browser_use.browser.views import BrowserStateSummary +from browser_agent.browser import start_local_browser,BrowserWrapper +from browser_agent.browser_use_custom.controller.service import MyController +from browser_agent.utils import enforce_log_format +from browser_use import Agent +from browser_agent.browser_use_custom.agent.service import MyAgent +from browser_agent.browser_use_custom.i18n import _, set_language +from browser_use.agent.views import ( + AgentOutput, +) +from browser_use import Agent, BrowserProfile, BrowserSession +from stream_helper.schema import SSEData,ContentTypeEnum,ReturnTypeEnum,OutputModeEnum,ContextModeEnum,MessageActionInfo,MessageActionItem,ReplyContentType,ContentTypeInReplyEnum,ReplyTypeInReplyEnum +from browser_agent.upload import UploadService +from browser_agent.upload import filter_file_by_time +from stream_helper.schema import FileChangeInfo,FileChangeType,FileChangeData +import base64 +from langchain_core.language_models.chat_models import BaseChatModel +from datetime import datetime +from langchain_openai import ChatOpenAI +from logging import Logger +from browser_agent.browser_use_custom.controller.screen import wait_for_visual_change,WaitForVisualChangeAction,VisualChangeDetector +from browser_use.utils import match_url_with_domain_pattern, time_execution_async, time_execution_sync +app = FastAPI() +load_dotenv() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["x-faas-instance-name"], +) + +llm_openai = "openai" +llm_deepseek = "deepseek" +llm_ark = "ark" +llm_name = llm_ark + + +# Global variable to track the port +CURRENT_CDP_PORT = 9222 + +browser_session_endpoint = None +set_language(os.getenv("LANGUAGE", "en")) + +class Message(BaseModel): + role: str + content: str + + +class Messages(BaseModel): + messages: list[Message] + + +class TaskRequest(BaseModel): + task: str + +import logging + +class LLMConfig(BaseModel): + llm_type: str + model_id: str + api_key: str + extract_model_id: str + +class RunBrowserUseAgentCtx(BaseModel): + query: str + conversation_id: str + llm: BaseChatModel + browser_session_endpoint: str + max_steps: int = 20 + system_prompt: str | None = None + extend_prompt: str | None = None + upload_service:Optional[UploadService] = None + start_time:int = int(datetime.now().timestamp() * 1000) + logger: Logger = Field(default_factory=lambda: logging.getLogger(__name__)) + model_config = ConfigDict( + arbitrary_types_allowed=True, + ) + +def genSSEData(stream_id:str, + content:str, + content_type:ContentTypeEnum = ContentTypeEnum.TEXT, + return_type:ReturnTypeEnum = ReturnTypeEnum.MODEL, + output_mode:OutputModeEnum = OutputModeEnum.STREAM, + is_last_msg:bool = False, + is_finish:bool = False, + is_last_packet_in_msg:bool = False, + message_title:str = None, + context_mode:ContextModeEnum = ContextModeEnum.NOT_IGNORE, + response_for_model:str = None, + ext:Dict[str,str] = None, + card_body:str = None, + reply_content_type:Optional[ReplyContentType] = None)->SSEData: + return SSEData( + stream_id = stream_id, + content = content, + content_type = content_type, + return_type = return_type, + output_mode = output_mode, + is_last_msg = is_last_msg, + is_finish = is_finish, + is_last_packet_in_msg = is_last_packet_in_msg, + message_title = message_title, + context_mode = context_mode, + response_for_model = response_for_model, + ext = ext, + card_body = card_body, + reply_content_type = reply_content_type + ) +def convert_ws_url(original_url: str) -> str: + """ + 将本地开发环境的 WebSocket URL 转换为生产环境的 URL 格式。 + + 示例输入: + 'ws://127.0.0.1:8001/v1/browsers/devtools/browser/77a025af-5483-46e2-ac57-9360812e4608?faasInstanceName=vefaas-duxz5kfb-jjecq2yuvf-d340et34rnmvf4b2ugd0-sandbox' + + 示例输出: + 'ws://bots-sandbox.bytedance.net/api/sandbox/coze_studio/proxy/v1/browsers/devtools/browser/77a025af-5483-46e2-ac57-9360812e4608' + """ + # 提取 UUID 部分(从路径中截取) + uuid_part = original_url.split("/v1/browsers/devtools/browser/")[1].split("?")[0] + + # 构建新 URL + new_url = ( + "ws://bots-sandbox.bytedance.net" + "/ws/sandbox/coze_studio/proxy" + f"/v1/browsers/devtools/browser/{uuid_part}" + ) + return new_url + +async def get_data_files(directory: str | Path) -> List[Dict[str, str]]: + path = Path(directory) + if not path.is_dir(): + raise ValueError(f"'{directory}' is not a valid directory") + result = [] + for file in path.iterdir(): + if file.is_file(): + try: + with open(file, 'rb') as f: + content = f.read() + base64_encoded = base64.b64encode(content).decode('ascii') + result.append({ + "name": file.name, + "content": base64_encoded + }) + except Exception as e: + result.append({ + "name": file.name, + "content": f"[ERROR READING FILE: {str(e)}]".encode() + }) + + return result + +async def RunBrowserUseAgent(ctx: RunBrowserUseAgentCtx) -> AsyncGenerator[SSEData, None]: + task_id = str(uuid.uuid4()) + event_queue = asyncio.Queue(maxsize=100) + # 初始化日志 + + ctx.logger.info(f"RunBrowserUseAgent with query: {ctx.query},task_id:{task_id}") + + # 浏览器初始化 + try: + if ctx.browser_session_endpoint == "" or ctx.browser_session_endpoint is None: + global CURRENT_CDP_PORT + CURRENT_CDP_PORT += 1 + current_port = CURRENT_CDP_PORT + browser_wrapper = await start_local_browser(current_port) + else: + browser_wrapper = BrowserWrapper(None, None, None, 'id', ctx.browser_session_endpoint) + except Exception as e: + ctx.logger.error(f"[Failed to initialize browser: {e}") + yield genSSEData( + stream_id=ctx.conversation_id, + content=f'init browser_wrapper err:{e}', + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + output_mode=OutputModeEnum.NOT_STREAM, + return_type=ReturnTypeEnum.MODEL, + response_for_model=f'init browser_wrapper err:{e}', + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + return + + # CDP URL 获取 + cdp_url = None + try: + if browser_wrapper.remote_browser_id: + async with aiohttp.ClientSession() as session: + get_url = f"{browser_wrapper.endpoint}/v1/browsers/" + async with session.get(url = get_url, + timeout=aiohttp.ClientTimeout(total=30), + headers={ + 'x-sandbox-taskid':ctx.conversation_id, + 'x-tt-env':'ppe_coze_sandbox', + 'x-use-ppe':'1', + }) as response: + if response.status == 200: + reader = response.content + browser_info = json.loads(await reader.read()) + cdp_url = convert_ws_url(browser_info['ws_url']) + ctx.logger.info(f"[{task_id}] Retrieved remote CDP URL: {cdp_url}") + else: + error_text = await response.text() + raise Exception(f"Failed to get browser info. Status: {response.status}, Error: {error_text}") + else: + current_port = CURRENT_CDP_PORT + ctx.logger.info(f"Starting task with local browser on port: {current_port}") + cdp_url = f"http://127.0.0.1:{current_port}" + except Exception as e: + ctx.logger.error(f"Error getting browser URL: {e}") + yield genSSEData( + stream_id=ctx.conversation_id, + content=f'Failed to get browser cdp_url:{e}', + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + output_mode=OutputModeEnum.NOT_STREAM, + return_type=ReturnTypeEnum.MODEL, + response_for_model=f'', + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + return + + # 目录创建 + base_dir = os.path.join("videos", task_id) + snapshot_dir = os.path.join(base_dir, "snapshots") + Path(snapshot_dir).mkdir(parents=True, exist_ok=True) + browser_session = None + agent = None + agent_task = None + + try: + # 浏览器会话配置 + headless = False if ctx.browser_session_endpoint != "" else True + browser_profile = BrowserProfile( + headless=headless, + disable_security=True, + highlight_elements=False, + wait_between_actions=1, + keep_alive=False, + headers={ + 'x-sandbox-taskid':ctx.conversation_id, + 'x-tt-env':'ppe_coze_sandbox', + 'x-use-ppe':'1', + }, + ) + browser_session = BrowserSession( + browser_profile=browser_profile, + cdp_url=cdp_url, + ) + ctx.logger.info(f"[{task_id}] Browser initialized with CDP URL: {cdp_url}") + async def on_step_end(agent): + try: + ctx.logger.info("Agent step end") + page = await agent.browser_session.get_current_page() + detector = VisualChangeDetector() + current_screenshot = await detector.capture_screenshot(page,upload_service=ctx.upload_service,logger=ctx.logger) + if not current_screenshot: + ctx.logger.error(f"Failed to capture screenshot") + return + agent.context = {'current_screenshot':current_screenshot} + ctx.logger.info(f'captured screenshot success') + except Exception as e: + ctx.logger.error(f"Error in on_step_end: {e}") + # 回调函数定义 + async def new_step_callback_wrapper(browser_state_summary: BrowserStateSummary, + model_output: AgentOutput, + step_number: int): + try: + islogin = False + for ac in model_output.action: + try: + action_data = ac.model_dump(exclude_unset=True) + action_name = next(iter(action_data.keys())) + if action_name == 'pause': + islogin = True + ctx.logger.info("pause action detected, browser task pause") + agent.pause() + except Exception as e: + ctx.logger.error(f"Error in new_step_callback_wrapper: {e}") + agent.resume() + data = '' + content_type = ContentTypeInReplyEnum.TXT + if islogin: + data = data + MessageActionInfo(actions=[MessageActionItem()]).model_dump_json() + content_type = ContentTypeInReplyEnum.ACTION_INFO + else: + data = data + model_output.current_state.next_goal or '' + await event_queue.put(genSSEData( + stream_id=ctx.conversation_id, + content=data, + reply_content_type= ReplyContentType(content_type=content_type) + )) + if islogin: + current_screenshot = None + if agent.context: + current_screenshot = agent.context.get('current_screenshot',None) + if current_screenshot: + ctx.logger.info(f'current screenshot exists') + has_change, _, _ = await wait_for_visual_change( + params=WaitForVisualChangeAction( + timeout=300, + similarity_threshold=0.85, + ), + browser_session=agent.browser_session, + initial_screenshot=current_screenshot, + upload_service=ctx.upload_service, + logger=ctx.logger, + ) + if has_change: + ctx.logger.info('detected visual change,browser task resume') + agent.resume() + data = 'timeout: no visual change detected during login' + await event_queue.put(genSSEData( + stream_id=ctx.conversation_id, + content=data, + output_mode=OutputModeEnum.NOT_STREAM, + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + response_for_model=data, + reply_content_type= ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + )) + except Exception as e: + ctx.logger.error(f"Error in new_step_callback_wrapper: {e}") + agent.resume() + # Agent 创建 + agent = MyAgent( + task=ctx.query, + browser_session=browser_session, + register_new_step_callback=new_step_callback_wrapper, + llm=ctx.llm, + page_extraction_llm=ctx.llm, + planner_llm=ctx.llm, + controller=MyController(), + planner_interval=int(os.getenv("ARK_PLANNER_INTERVAL", "1")), + override_system_message=ctx.system_prompt, + extend_planner_system_message=ctx.extend_prompt, + language='zh', + enable_memory=False, + ) + + ctx.logger.info(f"[{task_id}] Agent initialized and ready to run") + # 启动 Agent 任务 + agent_task = asyncio.create_task(agent.run(20,on_step_end=on_step_end)) + ctx.logger.info(f"[{task_id}] Agent started running") + + # 事件流生成 + while True: + try: + # 等待事件或检查任务完成 + try: + event = await asyncio.wait_for(event_queue.get(), timeout=0.5) + yield event + + # 检查是否应该结束 + if event == "error" or (isinstance(event, dict) and event.get("status") == "completed"): + break + + except asyncio.TimeoutError: + # 检查 Agent 任务是否完成 + if agent_task.done(): + break + continue + + except Exception as e: + ctx.logger.error(f"[{task_id}] Error in event streaming: {e}") + break + + # 等待 Agent 任务完成 + if agent_task and not agent_task.done(): + await agent_task + # 获取最终结果 + result = await agent_task if agent_task else None + if result: + final_result = None + for history_item in reversed(result.history): + for result_item in history_item.result: + if hasattr(result_item, "is_done") and result_item.is_done: + final_result = result_item.extracted_content + break + if final_result: + break + + if not final_result: + result = [ + [item.extracted_content for item in history_item.result + if hasattr(item, "extracted_content")] + for history_item in result.history + ] + final_result = "\n".join( + item + for sublist in result + for item in sublist + if isinstance(item, str) + ) + if final_result: + ctx.logger.info(f"[{task_id}] final_result: {final_result}") + completion_event = genSSEData( + stream_id=ctx.conversation_id, + content= final_result, + return_type=ReturnTypeEnum.MODEL, + response_for_model=final_result, + content_type=ContentTypeEnum.TEXT, + output_mode=OutputModeEnum.STREAM, + is_finish= True, + is_last_msg=True, + is_last_packet_in_msg=True, + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + yield completion_event + + ctx.logger.info(f"[{task_id}] Task completed successfully") + + except Exception as e: + ctx.logger.error(f"[{task_id}] Agent execution failed: {e}") + yield genSSEData( + stream_id=ctx.conversation_id, + content=f'err:{e}', + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + return_type=ReturnTypeEnum.MODEL, + response_for_model=f'err:{e}', + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + finally: + if agent: + if agent.browser_session: + agent.browser_session.cdp_url = None + if agent.browser_session.browser_context: + await agent.browser_session.browser_context.close() + if agent.browser_session.browser: + await agent.browser_session.browser.close() + agent.pause() + ctx.logger.info('agent close success') + +@app.get("/") +async def root(): + return {"message": "Hello World"} + + +@app.post("/tasks") +async def sse_task(request: Messages): + task_id = str(uuid.uuid4()) + + # 提取用户消息 + prompt = "" + for message in request.messages: + if message.role == "user": + prompt = message.content + logging.debug(f"Found user message: {prompt}") + break + + logging.info(f"[{task_id}] Starting SSE task with prompt: {prompt}") + + async def generate_sse_events(): + try: + # 创建SSE格式的生成器 + async for event in RunBrowserUseAgent(ctx=RunBrowserUseAgentCtx( + query=prompt, + conversation_id="", + cookie="", + llm_config=LLMConfig( + ), + browser_session_endpoint="http://127.0.0.1:8001/v1/browsers", + max_steps=20 + )): + # 确保事件是SSE格式 + if isinstance(event, str): + # 如果是字符串,直接作为数据发送 + yield f"data: {event}\n\n" + elif isinstance(event, dict): + # 如果是字典,转换为JSON格式 + event_json = json.dumps(event, ensure_ascii=False) + yield f"data: {event_json}\n\n" + else: + # 其他类型转换为字符串 + yield f"data: {str(event)}\n\n" + + except Exception as e: + logging.error(f"[{task_id}] Error in SSE generation: {e}") + # 发送错误事件 + error_event = json.dumps({ + "task_id": task_id, + "status": "error", + "error": str(e) + }, ensure_ascii=False) + yield f"data: {error_event}\n\n" + finally: + # 可选:发送结束标记 + yield f"data: {json.dumps({'task_id': task_id, 'status': 'stream_completed'}, ensure_ascii=False)}\n\n" + + try: + return StreamingResponse( + generate_sse_events(), + media_type="text/event-stream", + ) + except Exception as e: + logging.error(f"[{task_id}] Error creating StreamingResponse: {e}") + # 返回错误响应 + return JSONResponse( + status_code=500, + content={"error": "Failed to create SSE stream", "task_id": task_id} + ) + +if __name__ == "__main__": + enforce_log_format() + uvicorn.run(app, host="0.0.0.0", port=8000) + + + diff --git a/py_package/browser_agent/upload.py b/py_package/browser_agent/upload.py new file mode 100644 index 000000000..72f3c8f7e --- /dev/null +++ b/py_package/browser_agent/upload.py @@ -0,0 +1,23 @@ +from datetime import datetime +from typing import List +from abc import ABC, abstractmethod +from pydantic import BaseModel +class FileItem(BaseModel): + file_name: str + file_type: str + file_size: int + file_uri: str + file_url: str + upload_type: str + create_time: int + update_time: int + +class UploadService(BaseModel,ABC): + headers:dict[str,str] = {} + async def upload_file(self,file_content:str,file_name:str,base64_content:str=''): + pass + async def list_file(self)->List[FileItem]: + pass + +async def filter_file_by_time(file_list:List[FileItem],start_time:int)->List[FileItem]: + return [file for file in file_list if (start_time <= file.create_time or start_time <= file.update_time) ] \ No newline at end of file diff --git a/py_package/browser_agent/utils.py b/py_package/browser_agent/utils.py new file mode 100644 index 000000000..2b5e9fd68 --- /dev/null +++ b/py_package/browser_agent/utils.py @@ -0,0 +1,86 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Any, Dict, List +from langchain_core.callbacks import BaseCallbackHandler +from langchain_core.messages import BaseMessage +from langchain_core.outputs import LLMResult + + +root_logger = logging.getLogger() +for handler in root_logger.handlers: + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s', + '%Y-%m-%d %H:%M:%S' + ) + handler.setFormatter(formatter) + + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + + +def enforce_log_format(): + root_logger = logging.getLogger() + for handler in root_logger.handlers: + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s', + '%Y-%m-%d %H:%M:%S' + ) + handler.setFormatter(formatter) + + +class ModelLoggingCallback(BaseCallbackHandler): + def on_chat_model_start( + self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs + ) -> None: + logging.info( + f"[Model] Chat model started\n") + + def on_llm_end(self, response: LLMResult, **kwargs) -> None: + logging.info( + f"[Model] Chat model ended, response: {response}") + + def on_llm_error(self, error: BaseException, **kwargs) -> Any: + logging.info( + f"[Model] Chat model error, response: {error}") + + def on_chain_start( + self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs + ) -> None: + logging.info( + f"[Model] Chain {serialized.get('name')} started") + + def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None: + logging.info(f"[Model] Chain ended, outputs: {outputs}") + +# class that wraps another class and logs all function calls being executed + + +class Wrapper: + def __init__(self, wrapped_class): + self.wrapped_class = wrapped_class + + def __getattr__(self, attr): + original_func = getattr(self.wrapped_class, attr) + + def wrapper(*args, **kwargs): + print(f"Calling function: {attr}") + print(f"Arguments: {args}, {kwargs}") + result = original_func(*args, **kwargs) + print(f"Response: {result}") + return result + + return wrapper diff --git a/py_package/pyproject.toml b/py_package/pyproject.toml new file mode 100644 index 000000000..9d100a159 --- /dev/null +++ b/py_package/pyproject.toml @@ -0,0 +1,29 @@ +[build-system] +requires = ["setuptools>=64.0.0", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +include-package-data = true +package-dir = {"browser_agent" = "browser_agent","stream_helper" = "stream_helper"} + +[tool.setuptools.package-data] +"browser_agent" = ["**/*.mo", "**/*.po"] + +[project] +name = "coze-studio-py-package" # 包名,在PyPI上唯一。通常用中划线 +version = "0.1.0" +description = "browser agent" + +# 如果你的包有依赖,在这里声明 +dependencies = [ + "fastapi==0.109.2", + "uvicorn==0.27.0.post1", + "python-multipart==0.0.9", + "MainContentExtractor==0.0.4", + "aiohttp==3.11.16", + "websockets==15.0.1", + "faiss-cpu==1.10.0", + "browser_use @ git+https://github.com/zzxwill/browser-use.git@i18n_system_prompt", + "playwright==1.52.0", + "pillow==11.3.0" +] \ No newline at end of file diff --git a/py_package/stream_helper/__init__.py b/py_package/stream_helper/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/stream_helper/schema.py b/py_package/stream_helper/schema.py new file mode 100644 index 000000000..5f4a97acb --- /dev/null +++ b/py_package/stream_helper/schema.py @@ -0,0 +1,125 @@ +from enum import Enum, IntEnum +from typing import Optional, Dict, Union +from pydantic import BaseModel + +class StepInfo(BaseModel): + step_number:int + goal:str + +class WebSocketItem(BaseModel): + ws_url:str + +class WebSocketInfo(BaseModel): + items: list[WebSocketItem] + + +class MessageActionTypeEnum(int, Enum): + """MessageActionType 的枚举值""" + WebPageAuthorization = 1 # Web page authorization + +class MessageActionItem(BaseModel): + type: MessageActionTypeEnum = MessageActionTypeEnum.WebPageAuthorization + class Config: + use_enum_values = True + +class MessageActionInfo(BaseModel): + actions: list[MessageActionItem] + +# 定义枚举类型 +class FileType(str, Enum): + DIR = "dir" + FILE = "file" + +class FileChangeType(str, Enum): + CREATE = "create" + DELETE = "delete" + UPDATE = "update" + +class FileChangeData(BaseModel): + file_type: FileType = FileType.FILE + file_path: str = '' + file_name: str + change_type: FileChangeType + uri: str + url: str + + class Config: + use_enum_values = True + +class ErrData(BaseModel): + data: Dict[str, str] + + +class FileChangeInfo(BaseModel): + file_change_list: Optional[list[FileChangeData]] = None + err_list: Optional[list[ErrData]] = None + +class OutputModeEnum(int, Enum): + """SSEData.output_mode 的枚举值""" + NOT_STREAM = 0 # 非流式 + STREAM = 1 # 流式 + +class ReturnTypeEnum(int, Enum): + """SSEData.return_type 的枚举值""" + MODEL = 0 # 输出到模型 + USER_TERMINAL = 1 # 输出到终端 + +class ContentTypeEnum(int, Enum): + """SSEData.content_type 的枚举值""" + TEXT = 0 # 文本 + APPLET_WIDGET = 1 # 小程序组件 + LOADING_TIPS = 2 # 加载提示 + CARD = 3 # 卡片 + VERBOSE = 4 # 详细信息 + USAGE = 10 # 使用情况 + +class ContextModeEnum(int, Enum): + """SSEData.context_mode 的枚举值""" + NOT_IGNORE = 0 # 不忽略上下文 + IGNORE = 1 # 忽略上下文 + +class ReplyTypeInReplyEnum(IntEnum): + """Type of reply (purpose/context).""" + ANSWER = 1 # Standard answer + SUGGEST = 2 # Suggestion (e.g., follow-up questions) + LLM_OUTPUT = 3 # Raw LLM output (before processing) + TOOL_OUTPUT = 4 # Output from an external tool/API + VERBOSE = 100 # Debug/verbose logging + PLACEHOLDER = 101 # Placeholder (e.g., loading state) + TOOL_VERBOSE = 102 # Verbose logs from tools + +class ContentTypeInReplyEnum(IntEnum): + """Format of the content.""" + TXT = 1 # Plain text + IMAGE = 2 # Image + VIDEO = 4 # Video + MUSIC = 7 # Audio + CARD = 50 # Structured card (e.g., rich UI element) + WIDGET = 52 # Interactive widget + APP = 100 # Embedded application + WEBSOCKET_INFO = 101 # Websocket metadata + FILE_CHANGE_INFO = 102 # File system event + ACTION_INFO = 103 # Action/command metadata + +class ReplyContentType(BaseModel): + """Combines reply type and content type for structured responses.""" + reply_type: ReplyTypeInReplyEnum = ReplyTypeInReplyEnum.TOOL_VERBOSE + content_type: ContentTypeInReplyEnum = ContentTypeInReplyEnum.TXT + +class SSEData(BaseModel): + stream_id: str + message_title: Optional[str] = None + context_mode: Union[ContextModeEnum, int] = ContextModeEnum.NOT_IGNORE + output_mode: OutputModeEnum = OutputModeEnum.STREAM # 0=非流式, 1=流式 + return_type: Union[ReturnTypeEnum, int] = ReturnTypeEnum.USER_TERMINAL + content_type: Union[ContentTypeEnum, int] = ContentTypeEnum.TEXT + is_last_msg: bool = False + is_finish: bool = False + is_last_packet_in_msg: bool = False + content: Optional[str] = None + response_for_model: Optional[str] = None + ext: Optional[Dict[str, str]] = None + card_body: Optional[str] = None + reply_content_type :Optional[ReplyContentType] = None + class Config: + use_enum_values = True # 序列化时使用枚举的原始值 \ No newline at end of file