From ee46dd8417acf1f01edf4ef44b5e156c2eaed864 Mon Sep 17 00:00:00 2001 From: "liuyunchao.0510" Date: Mon, 1 Sep 2025 15:23:09 +0800 Subject: [PATCH] browser_use_plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit name change name change name change name change name change name change 暂存 暂存 暂存 版本更新 版本更新 版本更新 和网关协议对齐 和网关协议对齐 和网关协议对齐 再升级下 再升 再完善下 升级 final resp 修复 修复 修复 再测试下 再测试下 包顺序 包顺序 包顺序 包顺序 修改为answer 更新下 更新版本 使用logger 使用logger 使用 滚滚滚 更新版本 screen opmot test use context 有问题 gogogo agent browser agent browser screen resume gogo gogo file upload to debug file upload base64 screen screen 修复 修复 --- .gitignore | 1 + py_package/browser_agent/.python-version | 1 + py_package/browser_agent/__init__.py | 0 py_package/browser_agent/agent.py | 30 + py_package/browser_agent/browser.py | 158 +++++ .../browser_use_custom/__init__.py | 0 .../browser_use_custom/agent/__init__.py | 0 .../browser_use_custom/agent/service.py | 6 + .../browser_use_custom/controller/__init__.py | 0 .../browser_use_custom/controller/screen.py | 185 ++++++ .../browser_use_custom/controller/service.py | 204 +++++++ .../browser_use_custom/i18n/__init__.py | 127 ++++ .../zh-CN/LC_MESSAGES/vefaas_browser_use.mo | Bin 0 -> 7255 bytes .../zh-CN/LC_MESSAGES/vefaas_browser_use.po | 178 ++++++ .../i18n/update_translations.py | 116 ++++ py_package/browser_agent/cdp.py | 241 ++++++++ py_package/browser_agent/index.py | 541 ++++++++++++++++++ py_package/browser_agent/upload.py | 23 + py_package/browser_agent/utils.py | 86 +++ py_package/pyproject.toml | 29 + py_package/stream_helper/__init__.py | 0 py_package/stream_helper/schema.py | 125 ++++ 22 files changed, 2051 insertions(+) create mode 100644 py_package/browser_agent/.python-version create mode 100644 py_package/browser_agent/__init__.py create mode 100644 py_package/browser_agent/agent.py create mode 100644 py_package/browser_agent/browser.py create mode 100644 py_package/browser_agent/browser_use_custom/__init__.py create mode 100644 py_package/browser_agent/browser_use_custom/agent/__init__.py create mode 100644 py_package/browser_agent/browser_use_custom/agent/service.py create mode 100644 py_package/browser_agent/browser_use_custom/controller/__init__.py create mode 100644 py_package/browser_agent/browser_use_custom/controller/screen.py create mode 100644 py_package/browser_agent/browser_use_custom/controller/service.py create mode 100644 py_package/browser_agent/browser_use_custom/i18n/__init__.py create mode 100644 py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.mo create mode 100644 py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.po create mode 100644 py_package/browser_agent/browser_use_custom/i18n/update_translations.py create mode 100644 py_package/browser_agent/cdp.py create mode 100644 py_package/browser_agent/index.py create mode 100644 py_package/browser_agent/upload.py create mode 100644 py_package/browser_agent/utils.py create mode 100644 py_package/pyproject.toml create mode 100644 py_package/stream_helper/__init__.py create mode 100644 py_package/stream_helper/schema.py diff --git a/.gitignore b/.gitignore index dfdf34dc7..d4a481dd1 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ values-dev.yaml *.tsbuildinfo +*venv/ \ No newline at end of file diff --git a/py_package/browser_agent/.python-version b/py_package/browser_agent/.python-version new file mode 100644 index 000000000..fdcfcfdfc --- /dev/null +++ b/py_package/browser_agent/.python-version @@ -0,0 +1 @@ +3.12 \ No newline at end of file diff --git a/py_package/browser_agent/__init__.py b/py_package/browser_agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/agent.py b/py_package/browser_agent/agent.py new file mode 100644 index 000000000..e3612052f --- /dev/null +++ b/py_package/browser_agent/agent.py @@ -0,0 +1,30 @@ +from pydantic import BaseModel +from browser_agent.index import RunBrowserUseAgentCtx,LLMConfig +from typing import AsyncGenerator,Dict,Optional +from abc import ABC, abstractmethod +from datetime import datetime +from stream_helper.schema import SSEData +from langchain_core.language_models.chat_models import BaseChatModel +class BrowserAgentBase(BaseModel,ABC): + query: str + conversation_id: str = '' + llm: BaseChatModel + browser_session_endpoint: str + endpoint_header: Dict[str, str] = {} + max_steps: int = 20 + system_prompt: str = None + extend_prompt: str = None + @abstractmethod + async def save_cookies(self): + pass + + @abstractmethod + async def get_llm(self)->BaseChatModel: + pass + + @abstractmethod + async def get_system_prompt(self)->str: + pass + + async def run(self)->AsyncGenerator[SSEData,None]: + pass diff --git a/py_package/browser_agent/browser.py b/py_package/browser_agent/browser.py new file mode 100644 index 000000000..d2f337c9c --- /dev/null +++ b/py_package/browser_agent/browser.py @@ -0,0 +1,158 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import os +import aiohttp +from playwright.async_api import async_playwright +from playwright.async_api import Browser +from playwright.async_api._generated import Playwright as AsyncPlaywright + +browser_ready_event = asyncio.Event() + +class BrowserWrapper: + def __init__(self, port, browser: Browser, playwright: AsyncPlaywright, remote_browser_id: str = None, endpoint: str = None): + self.port = port + self.browser = browser + self.playwright = playwright + self.remote_browser_id = remote_browser_id + self.endpoint = endpoint + + async def stop(self): + if self.browser: + logging.info(f"Closing browser on port {self.port}...") + # try: + # for context in self.browser.contexts: + # await context.close() + # except Exception as e: + # logging.error(f"Error closing contexts: {e}") + await self.browser.close() + logging.info(f"Browser on port {self.port} closed successfully") + if self.playwright: + logging.info(f"Closing playwright session {self.port}...") + await self.playwright.stop() + logging.info( + f"Paywright session on port {self.port} closed successfully") + if self.remote_browser_id and self.endpoint: + logging.info(f"Closing remote browser {self.remote_browser_id}...") + try: + async with aiohttp.ClientSession() as session: + delete_url = f"{self.endpoint}/{self.remote_browser_id}" + async with session.delete(delete_url, timeout=30) as response: + if response.status == 200: + logging.info(f"Remote browser {self.remote_browser_id} closed successfully") + else: + error_text = await response.text() + logging.error(f"Failed to close remote browser. Status: {response.status}, Error: {error_text}") + except Exception as e: + logging.error(f"Error closing remote browser {self.remote_browser_id}: {e}") + + +async def start_local_browser(port): + logging.info(f"Attempting to start browser on port {port}") + p = None + browser = None + + try: + p = await async_playwright().start() + + w = BrowserWrapper(port, None, p) + + try: + # Configure proxy if environment variables are set + proxy_config = None + proxy_server = os.getenv('PROXY_SERVER') + if proxy_server: + proxy_config = { + 'server': proxy_server, + 'bypass': '127.0.0.1,localhost' + } + + browser = await p.chromium.launch( + # executable_path="/opt/chromium.org/browser_use/chromium/chromium-browser-use", + headless=False, + args=[ + f'--remote-debugging-port={port}', + '--remote-allow-origins=*', + '--remote-debugging-address=0.0.0.0', + '--no-sandbox' + ], + proxy=proxy_config + ) + + w = BrowserWrapper(port, browser, p) + + logging.info(f"Browser launched successfully on port {port}") + + # Verify CDP is actually running + try: + async with aiohttp.ClientSession() as session: + async with session.get(f"http://127.0.0.1:{port}/json/version", timeout=5) as response: + if response.status == 200: + version_info = await response.json() + logging.info( + f"successfully connected to cdp: {version_info}") + else: + logging.error( + f"Failed to get CDP version. Status: {response.status}") + except Exception as cdp_e: + logging.error(f"Error checking CDP availability: {cdp_e}") + + logging.info('closing playwright driver and browser') + await w.stop() + raise + return BrowserWrapper(port, browser, p) + + except Exception as launch_e: + logging.error(f"Failed to launch browser: {launch_e}") + browser_ready_event.clear() + + logging.info('closing playwright driver and browser') + await w.stop() + raise + except Exception as p_e: + logging.error(f"Playwright initialization error: {p_e}") + browser_ready_event.clear() + + logging.info('closing playwright driver and browser') + await w.stop() + raise + +async def start_remote_browser(endpoint): + logging.info(f"Attempting to create browser via remote endpoint: {endpoint}") + + try: + async with aiohttp.ClientSession() as session: + data = {"timeout": 30} + + async with session.post( + endpoint, + json=data, + headers={'Content-Type': 'application/json'}, + timeout=30 + ) as response: + if response.status == 200: + result = await response.json() + logging.info(f"Browser created successfully: {result}") + id_value = result.get('id', None) + if id_value is None: + result['id'] = 'id' + return BrowserWrapper(None, None, None, result['id'], endpoint) + else: + error_text = await response.text() + logging.error(f"Failed to create browser. Status: {response.status}, Error: {error_text}") + raise Exception(f"Failed to create browser: {response.status} - {error_text}") + + except Exception as e: + logging.error(f"Error creating remote browser: {e}") + raise + diff --git a/py_package/browser_agent/browser_use_custom/__init__.py b/py_package/browser_agent/browser_use_custom/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/browser_use_custom/agent/__init__.py b/py_package/browser_agent/browser_use_custom/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/browser_use_custom/agent/service.py b/py_package/browser_agent/browser_use_custom/agent/service.py new file mode 100644 index 000000000..b57ee1a00 --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/agent/service.py @@ -0,0 +1,6 @@ +from browser_use import Agent + +class MyAgent(Agent): + # browser use 0.2.5版本 移除模型检测 + def _test_tool_calling_method(self, method: str) -> bool: + return True \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/controller/__init__.py b/py_package/browser_agent/browser_use_custom/controller/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/browser_agent/browser_use_custom/controller/screen.py b/py_package/browser_agent/browser_use_custom/controller/screen.py new file mode 100644 index 000000000..c50a20efe --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/controller/screen.py @@ -0,0 +1,185 @@ +import asyncio +import io,base64 +import logging,time +from typing import Optional, Tuple +from PIL import Image, ImageChops +import numpy as np +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +class VisualChangeDetector: + """视觉变化检测器(基于屏幕截图比对)""" + + def __init__( + self, + similarity_threshold: float = 0.95, + pixel_change_threshold: int = 1000, + resize_width: Optional[int] = 800 + ): + """ + Args: + similarity_threshold: 相似度阈值(0-1) + pixel_change_threshold: 变化像素数量阈值 + resize_width: 缩放宽度(提升性能) + """ + self.similarity_threshold = similarity_threshold + self.pixel_threshold = pixel_change_threshold + self.resize_width = resize_width + + async def capture_screenshot(self, page, upload_service=None,logger=None) -> Image.Image: + """捕获页面截图(自动优化)""" + screenshot_bytes = await page.screenshot(timeout=10000,type='jpeg',quality=50,full_page=False,animations='disabled') + img = Image.open(io.BytesIO(screenshot_bytes)) + + # 统一缩放尺寸(提升比对性能) + if self.resize_width: + w, h = img.size + new_h = int(h * (self.resize_width / w)) + img = img.resize((self.resize_width, new_h)) + if upload_service: + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format='JPEG') + binary_data = img_byte_arr.getvalue() + if not binary_data: + return + file_name = f"screenshot_{int(time.time())}.jpg" + base64_str = base64.b64encode(binary_data).decode('ascii') + if logger: + logger.info(f'upload screenshot to {file_name}') + else: + logging.info(f'upload screenshot to {file_name}') + try: + await upload_service.upload_file('',file_name,base64_content=base64_str) + except Exception as e: + if logger: + logger.error(f"Failed to upload screenshot to {file_name}: {e}") + else: + logging.error(f"Failed to upload screenshot to {file_name}: {e}") + return img.convert('RGB') + return img.convert('RGB') # 确保RGB模式 + + def calculate_change( + self, + img1: Image.Image, + img2: Image.Image + ) -> Tuple[float, Image.Image]: + """ + 计算两图差异 + + Returns: + tuple: (相似度百分比, 差异图) + """ + # 确保尺寸一致 + if img1.size != img2.size: + img2 = img2.resize(img1.size) + + arr1 = np.array(img1) # shape: (height, width, 3) + arr2 = np.array(img2) # shape: (height, width, 3) + + # 计算绝对差异(每个通道单独计算) + diff = np.abs(arr1.astype(int) - arr2.astype(int)) + + # 变化像素计算(任一通道有变化即视为该像素变化) + # 先检查每个像素的3个通道是否有变化 + pixel_changed = np.any(diff > 0, axis=2) # shape: (height, width) + changed_pixels = np.sum(pixel_changed) # 变化像素总数 + + # 总像素数(不需要除以3,因为pixel_changed已经是像素级别的判断) + total_pixels = arr1.shape[0] * arr1.shape[1] + + # 生成差异图(可视化用) + diff_img = ImageChops.difference(img1, img2) + + # 计算相似度 + similarity = 1 - (changed_pixels / total_pixels) + + return similarity, diff_img + + async def detect_change( + self, + browser_session, + reference_img: Optional[Image.Image] = None, + max_attempts: int = 5, + attempt_interval: float = 1.5, + upload_service = None, + logger = None + ) -> Tuple[bool, Optional[Image.Image], Optional[Image.Image]]: + """ + 检测视觉变化 + + Args: + page: 浏览器页面对象 + reference_img: 基准截图(None则自动捕获) + max_attempts: 最大检测次数 + attempt_interval: 检测间隔(秒) + + Returns: + tuple: (是否变化, 基准截图, 差异图) + """ + # 首次捕获基准图 + if reference_img is None: + page = await browser_session.get_current_page() + reference_img = await self.capture_screenshot(page,upload_service=upload_service,logger=logger) + logger.info("start detect change") + for attempt in range(max_attempts): + await asyncio.sleep(attempt_interval) + + # 捕获当前截图 + page = await browser_session.get_current_page() + current_img = await self.capture_screenshot(page,upload_service=upload_service,logger=logger) + if not current_img: + logger.error(f"Failed to capture screenshot on attempt {attempt + 1}") + continue + # 计算变化 + similarity, diff_img = self.calculate_change(reference_img, current_img) + logger.info(f"Attempt {attempt + 1}: 相似度 {similarity:.2f}, 变化像素 {np.sum(np.array(diff_img) > 0)}") + # 判断是否显著变化 + if similarity < self.similarity_threshold: + diff_pixels = np.sum(np.array(diff_img) > 0) + if diff_pixels > self.pixel_threshold: + logger.info(f"视觉变化 detected (相似度: {similarity:.2f}, 变化像素: {diff_pixels})") + return True, reference_img, diff_img + + return False, reference_img, None + +class WaitForVisualChangeAction(BaseModel): + """等待视觉变化的参数模型""" + timeout: int = 30 + check_interval: float = 2 + similarity_threshold: float = 0.95 + pixel_threshold: int = 5000 + +async def wait_for_visual_change( + params: WaitForVisualChangeAction, + browser_session, + initial_screenshot: Optional[Image.Image] = None, + upload_service = None, + logger = None +) -> Tuple[bool, Optional[Image.Image], Optional[Image.Image]]: + """ + 等待页面视觉变化(完整工作流) + + Args: + params: 配置参数 + browser_session: 浏览器会话 + initial_screenshot: 初始截图(可选) + + Returns: + tuple: (是否变化, 初始截图, 差异图) + """ + detector = VisualChangeDetector( + similarity_threshold=params.similarity_threshold, + pixel_change_threshold=params.pixel_threshold + ) + + max_attempts = int(params.timeout / params.check_interval) + + return await detector.detect_change( + browser_session=browser_session, + reference_img=initial_screenshot, + max_attempts=max_attempts, + attempt_interval=params.check_interval, + upload_service=upload_service, + logger=logger, + ) \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/controller/service.py b/py_package/browser_agent/browser_use_custom/controller/service.py new file mode 100644 index 000000000..bbb65983a --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/controller/service.py @@ -0,0 +1,204 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import io +import logging +from typing import Optional, Tuple +from PIL import Image, ImageChops +import numpy as np +from pydantic import BaseModel +import logging +import asyncio +import re +import markdownify +from bs4 import BeautifulSoup +from typing import Optional +from browser_use.browser import BrowserSession +from browser_use.controller.views import SearchGoogleAction +from browser_use.agent.views import ActionResult +from browser_use.controller.service import Controller +from playwright.async_api import Page +from pydantic import BaseModel,Field +from langchain_core.language_models.chat_models import BaseChatModel +from browser_agent.browser_use_custom.i18n import _ +from langchain_core.prompts import PromptTemplate +from browser_agent.browser_use_custom.controller.screen import VisualChangeDetector,wait_for_visual_change,WaitForVisualChangeAction + +logger = logging.getLogger(__name__) + +class PauseAction(BaseModel): + reason: str + +class WaitForLoginAction(BaseModel): + """Action parameters for waiting for login completion.""" + timeout: int = Field( + default=300, + description="Maximum time to wait for login completion in seconds" + ) + check_interval: int = Field( + default=5, + description="Interval between checks for URL changes in seconds" + ) + +class MyController(Controller): + """Custom controller extending base Controller with additional actions. + + Features: + - Inherits core controller functionality + - Adds custom pause action handler + - Maintains action registry with exclusion support + """ + + def __init__( + self, + exclude_actions: list[str] = [], + output_model: type[BaseModel] | None = None, + ): + super().__init__(exclude_actions, output_model) + # Basic Navigation Actions + @self.registry.action( + _('Search the query in Baidu in the current tab, the query should be a search query like humans search in Baidu, concrete and not vague or super long. More the single most important items.'), + param_model=SearchGoogleAction, + ) + async def search_google(params: SearchGoogleAction, browser_session: BrowserSession): + search_url = f'https://www.baidu.com/s?wd={params.query}' + + page = await browser_session.get_current_page() + await page.goto(search_url) + await page.wait_for_load_state() + msg = _('🔍 Searched for "{query}" in Baidu').format(query=params.query) + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + # Content Actions + @self.registry.action( + _('Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about xyc, 4 links with companies in structured format. Use include_links true if the goal requires links'), + ) + async def extract_content( + goal: str, + page: Page, + page_extraction_llm: BaseChatModel, + include_links: bool = False, + ): + raw_content = await page.content() + soup = BeautifulSoup( + raw_content, 'html.parser') + # remove all unnecessary http metadata + for s in soup.select('script'): + s.decompose() + for s in soup.select('style'): + s.decompose() + for s in soup.select('textarea'): + s.decompose() + for s in soup.select('img'): + s.decompose() + for s in soup.find_all(style=re.compile("background-image.*")): + s.decompose() + content = markdownify.markdownify(str(soup)) + + # manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes) + for iframe in page.frames: + if iframe.url != page.url and not iframe.url.startswith('data:'): + content += f'\n\nIFRAME {iframe.url}:\n' + content += markdownify.markdownify(await iframe.content()) + + prompt = _('Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}') + template = PromptTemplate(input_variables=['goal', 'page'], template=prompt) + try: + output = await page_extraction_llm.ainvoke(template.format(goal=goal, page=content)) + msg = _('📄 Extracted from page\n: {content}\n').format(content=output.content) + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + except Exception as e: + logger.debug(_('Error extracting content: {error}').format(error=e)) + msg = _('📄 Extracted from page\n: {content}\n').format(content=content) + logger.info(msg) + return ActionResult(extracted_content=msg) + + @self.registry.action( + _('Pause agent'), + param_model=PauseAction, + ) + async def pause(params: PauseAction): + msg = _('👩 Pause agent, reason: {reason}').format(reason=params.reason) + logger.info(msg) + return ActionResult(extracted_content=msg, include_in_memory=True) + # Login detection and waiting action + # @self.registry.action( + # _('Detects if current page requires login and waits for authentication to complete.Wait for login completion by monitoring URL changes.'), + # param_model=WaitForLoginAction, + # ) + # async def wait_for_login(params: WaitForLoginAction, browser_session: BrowserSession): + # page = await browser_session.get_current_page() + + # # Get initial URL for comparison + # initial_url = page.url + # logger.info(_('🔐 Starting login detection. Initial URL: {url}').format(url=initial_url)) + + # # Wait for URL change indicating login completion + # msg = _('🔐 Login page detected. Waiting for authentication completion (max {timeout}s)...').format( + # timeout=params.timeout + # ) + # logger.info(msg) + + # final_url = await self._wait_for_url_change( + # page, + # initial_url, + # params.timeout, + # params.check_interval + # ) + + # if final_url and final_url != initial_url: + # success_msg = _('✅ Login completed successfully! URL changed from {initial} to {final}').format( + # initial=initial_url, + # final=final_url + # ) + # logger.info(success_msg) + # return ActionResult( + # extracted_content=success_msg, + # include_in_memory=True, + # success=True + # ) + # else: + # timeout_msg = _('⏰ Login timeout or no URL change detected after {timeout} seconds').format( + # timeout=params.timeout + # ) + # logger.warning(timeout_msg) + # return ActionResult( + # extracted_content=timeout_msg, + # include_in_memory=True, + # success=False + # ) + + # async def _wait_for_url_change( + # self, + # page: Page, + # initial_url: str, + # timeout: int, + # check_interval: int + # ) -> Optional[str]: + # """Wait for URL change indicating login completion.""" + # start_time = asyncio.get_event_loop().time() + + # while (asyncio.get_event_loop().time() - start_time) < timeout: + # try: + # current_url = page.url + + # # If URL has changed, return the new URL + # if current_url != initial_url: + # return current_url + + # # Wait before checking again + # await asyncio.sleep(check_interval) + + # except Exception as e: + # logger.warning(_('Error checking URL change: {error}').format(error=str(e))) + # await asyncio.sleep(check_interval) + + # return None # Timeout reached without URL change \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/i18n/__init__.py b/py_package/browser_agent/browser_use_custom/i18n/__init__.py new file mode 100644 index 000000000..830a0f187 --- /dev/null +++ b/py_package/browser_agent/browser_use_custom/i18n/__init__.py @@ -0,0 +1,127 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gettext +import os +import copy +from typing import Optional + +# Default language +_current_language = 'zh-CN' +_translator = None + +def get_locales_dir(): + """Get the locales directory path""" + return os.path.join(os.path.dirname(__file__), 'locales') + +def set_language(language: str = 'zh-CN'): + """Set the current language for translations""" + global _current_language, _translator + _current_language = language + + locales_dir = get_locales_dir() + + try: + # Try to load the translation + translation = gettext.translation( + 'vefaas_browser_use', + localedir=locales_dir, + languages=[language], + fallback=True + ) + _translator = translation + except Exception: + # Fallback to NullTranslations if translation not found + _translator = gettext.NullTranslations() + +def get_language(): + """Get the current language""" + return _current_language + +def _(message: str) -> str: + """Translate a message""" + if _translator is None: + set_language(_current_language) + return _translator.gettext(message) + +def get_available_languages(): + """Get list of available languages""" + locales_dir = get_locales_dir() + if not os.path.exists(locales_dir): + return ['zh-CN'] + + languages = [] + for item in os.listdir(locales_dir): + lang_dir = os.path.join(locales_dir, item) + if os.path.isdir(lang_dir): + mo_file = os.path.join(lang_dir, 'LC_MESSAGES', 'vefaas_browser_use.mo') + if os.path.exists(mo_file): + languages.append(item) + + return languages if languages else ['zh-CN'] + +def translate_planning_step_data(conversation_update: dict) -> dict: + """Translate planning step data fields using pattern matching""" + import re + translated_update = copy.deepcopy(conversation_update) + + # Translate evaluation field + if 'evaluation' in translated_update: + evaluation = translated_update['evaluation'] + # Basic status translations + if evaluation == "Unknown": + translated_update['evaluation'] = _("Unknown") + elif evaluation == "Failed": + translated_update['evaluation'] = _("Failed") + elif evaluation == "Success": + translated_update['evaluation'] = _("Success") + # Specific pattern translations + elif evaluation == "Unknown - The task has just started, and no actions have been taken yet.": + translated_update['evaluation'] = _("Unknown - The task has just started, and no actions have been taken yet.") + elif evaluation == "Unknown - The task has not yet begun as the current page is blank.": + translated_update['evaluation'] = _("Unknown - The task has not yet begun as the current page is blank.") + elif evaluation == "Success - Successfully navigated to Google's homepage.": + translated_update['evaluation'] = _("Success - Successfully navigated to Google's homepage.") + elif evaluation == "Failed - The search was blocked by Google's CAPTCHA verification.": + translated_update['evaluation'] = _("Failed - The search was blocked by Google's CAPTCHA verification.") + # General pattern matching for CAPTCHA-related failures + elif "CAPTCHA" in evaluation and evaluation.startswith("Failed"): + translated_update['evaluation'] = _("Failed - The search was blocked by a CAPTCHA verification page.") + # Partial translation for status prefixes + elif evaluation.startswith("Success - "): + translated_update['evaluation'] = evaluation.replace("Success", _("Success"), 1) + elif evaluation.startswith("Failed - "): + translated_update['evaluation'] = evaluation.replace("Failed", _("Failed"), 1) + elif evaluation.startswith("Unknown - "): + translated_update['evaluation'] = evaluation.replace("Unknown", _("Unknown"), 1) + + # Translate goal field with pattern matching + if 'goal' in translated_update: + goal = translated_update['goal'] + if goal == "Pause execution due to CAPTCHA verification.": + translated_update['goal'] = _("Pause execution due to CAPTCHA verification.") + # Add more general goal patterns as needed + + # Translate actions if they contain pause reasons + if 'actions' in translated_update: + for action in translated_update['actions']: + if 'pause' in action and 'reason' in action['pause']: + reason = action['pause']['reason'] + if reason == "CAPTCHA verification required. Human intervention is needed to proceed.": + action['pause']['reason'] = _("CAPTCHA verification required. Human intervention is needed to proceed.") + elif "CAPTCHA" in reason and "verification required" in reason: + # General CAPTCHA verification pattern + action['pause']['reason'] = _("CAPTCHA verification required. Human intervention is needed to proceed.") + + return translated_update + +# Initialize with default language +set_language('zh-CN') \ No newline at end of file diff --git a/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.mo b/py_package/browser_agent/browser_use_custom/i18n/locales/zh-CN/LC_MESSAGES/vefaas_browser_use.mo new file mode 100644 index 0000000000000000000000000000000000000000..2ca9703245c96a07144818ffb980bb1eb8f655f0 GIT binary patch literal 7255 zcmb`KZEPIH8ONu*w8e#%0;MlNJA@WP@f~*FNSp>pLSCpE>LjM3Qq{1&8{3EPcCWj8 zNu0=%ZQ{fZehW@QVmrhMabiLo#}G*Diyf&_rGBW?sz{ZJDyZOg@9qNyDpmADKlK01 z%-Lt3FKwl+y!YFkoq6V&XP%ew`*+-QnBm%l`(E6?xPdVP9J~oXxcC@NeL+!2me^Wya8zeE{wQeQ*sJyM?hQz%g(W z_zCzBxT1uyuYd-)4vc|i;5b+fz7I}-OVH^q@MDnXzu{KK?f~!7EY+;k+^qS$rlYw} z^O)v&&1)dZ{iA+Q>G#hyZ@Eoc%@BAGCp&c>sSl82s{A(02~Fs555bofL{Luzk&A(YzFz)M$Itz z9Nt^NE#Q0LO0Wz{!lbMgr2Y>07qAy3|BGRhNuN&emtY^b1pGVrL-0XNM)uqSlAYT@ zl6M*`2gkuu@I#R1D}j;T0G|S(0{ayFHhAMV8T$@+pXLh9ZJMTLhvsR`YntzA{zLQj zZz;KtfFyUlet%wbx8@AnAEGlq5Sm4AOea!TZ7Iz(>H{AoV+=IRR4t z-+?Q^cflWlw?UcD!5@PZ=WAg!8n+dMiI@Os+#ZnZ*s9-qG>_=^x3ikuJ$=-BP+|d9vrjQr;>tnddE_4MIWL~@v&lG=j zQ9dRg*ns;<+#EN>a_(BKAIRPmD;2nDkLkJ@H^miQYjKlLAk^67xMc|_Z4ix4X(caR zZ@R)RWozn$>GF^*3^!t#yw-?BMYxjh5w>mFP5QIgB|?b>zpqQ-kN(cJbsN-WDQSMGGgjgpp*m9RrnT{Fy`L@Z`t z#Q0~D>nR?O`A9A6^C1tAbS zsol!q^@ZMxotW8PrHxiYyW70UHdO(ZrAe1^z?n6#w8AUXbEfgZc)Sy2+g!tzNd&n~rk7xl8bzE}aEf26mM<>P zwUie&EzoOmVXq~HO-l=#mK8QFFKl|OuxUkM)5?OTLbVnbuBTAZLOl!BEYz}4$wD3H zs+g#W!3*VnDQ8lS0Jmh@5+S5ACr2a;-BO!~qAr-mj!2zBg%eIclPv13n2`C1Rm;dz z5y>CeVY=C^(t)p^EwsS-g2?ZYO%OYE36n!|ewfL^ zhb&wZHB1!#pQ9MaGXO^$jHGL)8iIyJu02C_}G(=`y1g8$fr<6$w8(bD4 zdTc~L9m6@9R0eGk6+2{*<&{^%wh|cPQgx65YV+%DkUyPQ;Lt_JH0qctUAt8%EA6%l zCvHI$w&ewf4qUQ?2f40;bU>mBu)~C=GG2vTi#HnG#AM#x9x)?s#E3?Ai!iIPt(}gr z=d^{Qkx;{StRn{FY_Dm=$FSJNa=9>!CliU)kw=H^iFla&LLT^-I{&dcYrAW0PuNj5 zr<&u91WA8l@bLr`z{wHoV0OYZk*k<=4|@3Y#KG60{7khdgJr2F)}kJo(4}v0ZsG7x zX;WHf3Hs1s4B9u9px2=;u97r$LZ)1_N5)svBC?`P)vU_4Ucdn~@N_uvGpr0|tmHdH ztzkF;ZJ_`(DcLG0g$6b|b&+sjZKBQzR9lt2r0S=EttfspEbtVJQHi{{cv+ylB2d1R zS1hh9FMp&QSIJKd#|>25hUr8N*Rm`58U!BSC`@5f4CA|5m%su7)xwBX@^$qQQ=nQt zv<;~{iZh!CKRnZ?lJBk$tb3+p^V6Hx&+1$eEH7E7mL8~Xj6)L_&iQCOYDCOcJXCMk zj&N5$TfHH$VzwVCR4eSjdNX9v*}0OhsEN2GRndfPL<1Y(VNNACIx}M}>7ka7np?Aj&6zVTsmV$2@~}7YX7<$H%-Pqx zeQ$Za-F`<`=EbRW@@zUes^q7VN$<@N$ZRfq7D_(C` z`s~Gznp;#)zo#d2jb6RBw^EbcIH;%l_p1I%2tF-MT{-8qU51Esvd@2Yuh)6RAG+#Y z?NJgjAlo46LW(0*-a`3b7PhN-D*=|dxP5&@@Uy@ZWlp@9*O_0t-T&;Z-o<`@@Z5|Kr=|}3eO;-kLGm8Iztw9QR8gc~bTCc4 z19SW1t%cLgEQO^Hjbq1W-qk~71g7Rmf`0*y*Nso4T({1^Jy$hXwd+$6!G!h}K z9eikidUP5)0$Zk!O?#7P$YJF!`X`1nW2fLB>cN{llo=cLkB)gS_F#_mcoIV94S|q( zW8s~tsXpkQy3*y3oJ_xUz;8d2xwb!h_!{)^2QGQ1`^ibV4#SoG!RGA#F8CE~?8yl@ ztJL*svv>9-M7#<{Ism87pU3G3@6gU`8%HXq7wRm($hU<*U1!wd;sTcx|FpFDr)9-I zEieA*vErXr6#ulc=%?b#QdV2gPQ@2oe7(h&TYR;}7h8O-3ojK}I5l}X-}@L`-#c(Z zgR+Nw_r|Vf+Rve2sT`8Jl7#n@>*xSIFoGDKJ%>}G&mF{7rGne5KujNa$8T<_@Y?rf zUp|4-mf3&KJ1~N{M_xmDl~H;5O;5TnO)#$sk{J?6`oypXRMcJC`#t>0fwVo>OEglz z>`2*lcI3cN>dK^w^jtZbOus&vJ~ky++&Y3tQ*$D)^9vcpq%3#n=eLe{7x!hlFMHFk zq^>k0P?ao{0rje?3N)pjGEK}Wf2w#6TuLVo`=h5-XUJEzEIn|A3Y;v9m@B_bseA#k zlrfu=C~<0DbVG%uqqQz}eO;K96qPgOwCPvJYm8UJXeaZI*xYMBj=X}B$^5r_GH2Vp zV {translated[:50]}...") + + print("\n✓ Translation test completed") + return True + + except Exception as e: + print(f"✗ Translation test failed: {e}") + return False + +def main(): + if len(sys.argv) != 2: + print(__doc__) + sys.exit(1) + + command = sys.argv[1] + + if command == "compile": + success = compile_translations() + sys.exit(0 if success else 1) + elif command == "test": + success = test_translations() + sys.exit(0 if success else 1) + else: + print(f"Unknown command: {command}") + print(__doc__) + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/py_package/browser_agent/cdp.py b/py_package/browser_agent/cdp.py new file mode 100644 index 000000000..f8e43e573 --- /dev/null +++ b/py_package/browser_agent/cdp.py @@ -0,0 +1,241 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import os +from urllib.parse import urlparse, urlunparse +import aiohttp +import websockets +from fastapi import HTTPException, WebSocket, WebSocketDisconnect + + +async def websocket_endpoint(websocket: WebSocket, page_id: str, port: str): + await websocket.accept() + + try: + # Construct the WebSocket URL for the specific page + ws_url = f"ws://127.0.0.1:{port}/devtools/page/{page_id}" + + # Establish a connection to the CDP WebSocket + async with websockets.connect(ws_url) as cdp_socket: + # Create tasks for bidirectional communication + receive_task = asyncio.create_task( + receive_from_cdp(cdp_socket, websocket)) + send_task = asyncio.create_task(send_to_cdp(cdp_socket, websocket)) + + # Wait for either task to complete + done, pending = await asyncio.wait( + [receive_task, send_task], + return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel any remaining tasks + for task in pending: + task.cancel() + + except Exception as e: + logging.error(f"WebSocket error: {e}") + await websocket.close() + + +async def websocket_browser_endpoint(websocket: WebSocket, browser_id: str, port: str): + await websocket.accept() + + try: + # Construct the WebSocket URL for the specific page + ws_url = f"ws://0.0.0.0:{port}/devtools/browser/{browser_id}" + + # Establish a connection to the CDP WebSocket + async with websockets.connect(ws_url) as cdp_socket: + # Create tasks for bidirectional communication + receive_task = asyncio.create_task( + receive_from_cdp(cdp_socket, websocket)) + send_task = asyncio.create_task(send_to_cdp(cdp_socket, websocket)) + + # Wait for either task to complete + done, pending = await asyncio.wait( + [receive_task, send_task], + return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel any remaining tasks + for task in pending: + task.cancel() + + except Exception as e: + logging.error(f"WebSocket error: {e}") + await websocket.close() + + +async def receive_from_cdp(cdp_socket, websocket): + try: + while True: + message = await cdp_socket.recv() + await websocket.send_text(message) + except websockets.exceptions.ConnectionClosed: + logging.info("CDP WebSocket connection closed") + except Exception as e: + logging.error(f"Error receiving from CDP: {e}") + + +async def send_to_cdp(cdp_socket, websocket): + try: + while True: + message = await websocket.receive_text() + await cdp_socket.send(message) + except WebSocketDisconnect: + logging.info("Client WebSocket disconnected") + except Exception as e: + logging.error(f"Error sending to CDP: {e}") + + +async def get_websocket_targets(port: str): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket targets for endpoint: {endpoint}") + try: + async with aiohttp.ClientSession() as session: + # Use the cdp_websocket parameter to dynamically construct the URL + base_url = f"http://127.0.0.1:{port}/json/list" + async with session.get(base_url) as response: + if response.status == 200: + result = await response.json() + + # If result is an empty list, return it immediately + if not result: + return result + + # Modify the URLs in the result to use the provided cdp_websocket + for target in result: + # Replace devtoolsFrontendUrl + if 'devtoolsFrontendUrl' in target: + target['devtoolsFrontendUrl'] = target['devtoolsFrontendUrl'].replace( + '127.0.0.1:' + str(port), str(endpoint) + ) + + # Replace webSocketDebuggerUrl + if 'webSocketDebuggerUrl' in target: + target['webSocketDebuggerUrl'] = target['webSocketDebuggerUrl'].replace( + '127.0.0.1:' + str(port), str(endpoint) + ) + + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch JSON list") + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error fetching JSON list: {str(e)}") + + +async def get_websocket_version(port: str): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket version for endpoint: {endpoint}") + try: + async with aiohttp.ClientSession() as session: + # Use the cdp_websocket parameter to dynamically construct the URL + base_url = f"http://127.0.0.1:{port}/json/version" + async with session.get(base_url) as response: + if response.status == 200: + result = await response.json() + + if not result: + return result + + if 'webSocketDebuggerUrl' in result: + result['webSocketDebuggerUrl'] = result['webSocketDebuggerUrl'].replace( + '127.0.0.1:' + str(port), str(endpoint) + ) + + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch JSON list") + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error fetching JSON list: {str(e)}") + +async def get_remote_websocket_version(browser_session_endpoint: str, browser_id: str): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket version for endpoint: {endpoint}") + try: + async with aiohttp.ClientSession() as session: + # Use the cdp_websocket parameter to dynamically construct the URL + base_url = f"{browser_session_endpoint}/{browser_id}/json/version" + async with session.get(base_url) as response: + if response.status == 200: + result = await response.json() + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch JSON version") + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error fetching JSON list: {str(e)}") + + +async def get_inspector(url): + endpoint = os.getenv("CDP_ENDPOINT") + logging.info(f"Getting websocket targets for endpoint: {endpoint}") + + # Convert Starlette URL to string + url_str = str(url) + + # like this: http://127.0.0.1:8000/devtools/inspector.html?ws=scqevor9btoi2t6dnkcpg.apigateway-cn-beijing.volceapi.com/devtools/page/7D574C7E6237CB326E385DD2C3A5C845 + logging.info(f"Getting original inspector for URL: {url_str}") + + # Parse the URL + parsed_url = urlparse(url_str) + + # Check if the current domain matches the endpoint + if parsed_url.netloc == endpoint: + # Replace only the netloc + modified_url = parsed_url._replace(netloc="127.0.0.1:9222") + url_str = urlunparse(modified_url) + + logging.info(f"Converted inspector for URL: {url_str}") + + # if parsed_url contains "127.0.0.1:9222", run a 301 redirect + if "127.0.0.1" in parsed_url.netloc: + url_str = url_str.replace("127.0.0.1:9222", endpoint) + logging.info(f"Redirecting to: {url_str}") + return await get_inspector(url_str) + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url_str) as response: + if response.status == 200: + # Check content type + content_type = response.headers.get( + 'Content-Type', '').lower() + + if 'application/json' not in content_type: + # If not JSON, read the content + content = await response.text() + logging.error( + f"Unexpected content type: {content_type}") + logging.error(f"Response content: {content[:500]}") + + return { + "status": "error", + "content_type": content_type, + "content": content + } + + result = await response.json() + return result + else: + raise HTTPException( + status_code=response.status, detail="Failed to fetch inspector") + except Exception as e: + logging.error(f"Error fetching inspector: {e}") + raise HTTPException( + status_code=500, detail=f"Error fetching inspector: {str(e)}") diff --git a/py_package/browser_agent/index.py b/py_package/browser_agent/index.py new file mode 100644 index 000000000..07f3c1a75 --- /dev/null +++ b/py_package/browser_agent/index.py @@ -0,0 +1,541 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Dict,List,Union +import asyncio +import json +import logging +import os +import uuid +from pathlib import Path +from typing import AsyncGenerator,Optional +import aiohttp +import aiohttp +import uvicorn +from dotenv import load_dotenv +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, StreamingResponse +from pydantic import BaseModel, Field,ConfigDict +from browser_use.browser.views import BrowserStateSummary +from browser_agent.browser import start_local_browser,BrowserWrapper +from browser_agent.browser_use_custom.controller.service import MyController +from browser_agent.utils import enforce_log_format +from browser_use import Agent +from browser_agent.browser_use_custom.agent.service import MyAgent +from browser_agent.browser_use_custom.i18n import _, set_language +from browser_use.agent.views import ( + AgentOutput, +) +from browser_use import Agent, BrowserProfile, BrowserSession +from stream_helper.schema import SSEData,ContentTypeEnum,ReturnTypeEnum,OutputModeEnum,ContextModeEnum,MessageActionInfo,MessageActionItem,ReplyContentType,ContentTypeInReplyEnum,ReplyTypeInReplyEnum +from browser_agent.upload import UploadService +from browser_agent.upload import filter_file_by_time +from stream_helper.schema import FileChangeInfo,FileChangeType,FileChangeData +import base64 +from langchain_core.language_models.chat_models import BaseChatModel +from datetime import datetime +from langchain_openai import ChatOpenAI +from logging import Logger +from browser_agent.browser_use_custom.controller.screen import wait_for_visual_change,WaitForVisualChangeAction,VisualChangeDetector +from browser_use.utils import match_url_with_domain_pattern, time_execution_async, time_execution_sync +app = FastAPI() +load_dotenv() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["x-faas-instance-name"], +) + +llm_openai = "openai" +llm_deepseek = "deepseek" +llm_ark = "ark" +llm_name = llm_ark + + +# Global variable to track the port +CURRENT_CDP_PORT = 9222 + +browser_session_endpoint = None +set_language(os.getenv("LANGUAGE", "en")) + +class Message(BaseModel): + role: str + content: str + + +class Messages(BaseModel): + messages: list[Message] + + +class TaskRequest(BaseModel): + task: str + +import logging + +class LLMConfig(BaseModel): + llm_type: str + model_id: str + api_key: str + extract_model_id: str + +class RunBrowserUseAgentCtx(BaseModel): + query: str + conversation_id: str + llm: BaseChatModel + browser_session_endpoint: str + max_steps: int = 20 + system_prompt: str | None = None + extend_prompt: str | None = None + upload_service:Optional[UploadService] = None + start_time:int = int(datetime.now().timestamp() * 1000) + logger: Logger = Field(default_factory=lambda: logging.getLogger(__name__)) + model_config = ConfigDict( + arbitrary_types_allowed=True, + ) + +def genSSEData(stream_id:str, + content:str, + content_type:ContentTypeEnum = ContentTypeEnum.TEXT, + return_type:ReturnTypeEnum = ReturnTypeEnum.MODEL, + output_mode:OutputModeEnum = OutputModeEnum.STREAM, + is_last_msg:bool = False, + is_finish:bool = False, + is_last_packet_in_msg:bool = False, + message_title:str = None, + context_mode:ContextModeEnum = ContextModeEnum.NOT_IGNORE, + response_for_model:str = None, + ext:Dict[str,str] = None, + card_body:str = None, + reply_content_type:Optional[ReplyContentType] = None)->SSEData: + return SSEData( + stream_id = stream_id, + content = content, + content_type = content_type, + return_type = return_type, + output_mode = output_mode, + is_last_msg = is_last_msg, + is_finish = is_finish, + is_last_packet_in_msg = is_last_packet_in_msg, + message_title = message_title, + context_mode = context_mode, + response_for_model = response_for_model, + ext = ext, + card_body = card_body, + reply_content_type = reply_content_type + ) +def convert_ws_url(original_url: str) -> str: + """ + 将本地开发环境的 WebSocket URL 转换为生产环境的 URL 格式。 + + 示例输入: + 'ws://127.0.0.1:8001/v1/browsers/devtools/browser/77a025af-5483-46e2-ac57-9360812e4608?faasInstanceName=vefaas-duxz5kfb-jjecq2yuvf-d340et34rnmvf4b2ugd0-sandbox' + + 示例输出: + 'ws://bots-sandbox.bytedance.net/api/sandbox/coze_studio/proxy/v1/browsers/devtools/browser/77a025af-5483-46e2-ac57-9360812e4608' + """ + # 提取 UUID 部分(从路径中截取) + uuid_part = original_url.split("/v1/browsers/devtools/browser/")[1].split("?")[0] + + # 构建新 URL + new_url = ( + "ws://bots-sandbox.bytedance.net" + "/ws/sandbox/coze_studio/proxy" + f"/v1/browsers/devtools/browser/{uuid_part}" + ) + return new_url + +async def get_data_files(directory: str | Path) -> List[Dict[str, str]]: + path = Path(directory) + if not path.is_dir(): + raise ValueError(f"'{directory}' is not a valid directory") + result = [] + for file in path.iterdir(): + if file.is_file(): + try: + with open(file, 'rb') as f: + content = f.read() + base64_encoded = base64.b64encode(content).decode('ascii') + result.append({ + "name": file.name, + "content": base64_encoded + }) + except Exception as e: + result.append({ + "name": file.name, + "content": f"[ERROR READING FILE: {str(e)}]".encode() + }) + + return result + +async def RunBrowserUseAgent(ctx: RunBrowserUseAgentCtx) -> AsyncGenerator[SSEData, None]: + task_id = str(uuid.uuid4()) + event_queue = asyncio.Queue(maxsize=100) + # 初始化日志 + + ctx.logger.info(f"RunBrowserUseAgent with query: {ctx.query},task_id:{task_id}") + + # 浏览器初始化 + try: + if ctx.browser_session_endpoint == "" or ctx.browser_session_endpoint is None: + global CURRENT_CDP_PORT + CURRENT_CDP_PORT += 1 + current_port = CURRENT_CDP_PORT + browser_wrapper = await start_local_browser(current_port) + else: + browser_wrapper = BrowserWrapper(None, None, None, 'id', ctx.browser_session_endpoint) + except Exception as e: + ctx.logger.error(f"[Failed to initialize browser: {e}") + yield genSSEData( + stream_id=ctx.conversation_id, + content=f'init browser_wrapper err:{e}', + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + output_mode=OutputModeEnum.NOT_STREAM, + return_type=ReturnTypeEnum.MODEL, + response_for_model=f'init browser_wrapper err:{e}', + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + return + + # CDP URL 获取 + cdp_url = None + try: + if browser_wrapper.remote_browser_id: + async with aiohttp.ClientSession() as session: + get_url = f"{browser_wrapper.endpoint}/v1/browsers/" + async with session.get(url = get_url, + timeout=aiohttp.ClientTimeout(total=30), + headers={ + 'x-sandbox-taskid':ctx.conversation_id, + 'x-tt-env':'ppe_coze_sandbox', + 'x-use-ppe':'1', + }) as response: + if response.status == 200: + reader = response.content + browser_info = json.loads(await reader.read()) + cdp_url = convert_ws_url(browser_info['ws_url']) + ctx.logger.info(f"[{task_id}] Retrieved remote CDP URL: {cdp_url}") + else: + error_text = await response.text() + raise Exception(f"Failed to get browser info. Status: {response.status}, Error: {error_text}") + else: + current_port = CURRENT_CDP_PORT + ctx.logger.info(f"Starting task with local browser on port: {current_port}") + cdp_url = f"http://127.0.0.1:{current_port}" + except Exception as e: + ctx.logger.error(f"Error getting browser URL: {e}") + yield genSSEData( + stream_id=ctx.conversation_id, + content=f'Failed to get browser cdp_url:{e}', + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + output_mode=OutputModeEnum.NOT_STREAM, + return_type=ReturnTypeEnum.MODEL, + response_for_model=f'', + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + return + + # 目录创建 + base_dir = os.path.join("videos", task_id) + snapshot_dir = os.path.join(base_dir, "snapshots") + Path(snapshot_dir).mkdir(parents=True, exist_ok=True) + browser_session = None + agent = None + agent_task = None + + try: + # 浏览器会话配置 + headless = False if ctx.browser_session_endpoint != "" else True + browser_profile = BrowserProfile( + headless=headless, + disable_security=True, + highlight_elements=False, + wait_between_actions=1, + keep_alive=False, + headers={ + 'x-sandbox-taskid':ctx.conversation_id, + 'x-tt-env':'ppe_coze_sandbox', + 'x-use-ppe':'1', + }, + ) + browser_session = BrowserSession( + browser_profile=browser_profile, + cdp_url=cdp_url, + ) + ctx.logger.info(f"[{task_id}] Browser initialized with CDP URL: {cdp_url}") + async def on_step_end(agent): + try: + ctx.logger.info("Agent step end") + page = await agent.browser_session.get_current_page() + detector = VisualChangeDetector() + current_screenshot = await detector.capture_screenshot(page,upload_service=ctx.upload_service,logger=ctx.logger) + if not current_screenshot: + ctx.logger.error(f"Failed to capture screenshot") + return + agent.context = {'current_screenshot':current_screenshot} + ctx.logger.info(f'captured screenshot success') + except Exception as e: + ctx.logger.error(f"Error in on_step_end: {e}") + # 回调函数定义 + async def new_step_callback_wrapper(browser_state_summary: BrowserStateSummary, + model_output: AgentOutput, + step_number: int): + try: + islogin = False + for ac in model_output.action: + try: + action_data = ac.model_dump(exclude_unset=True) + action_name = next(iter(action_data.keys())) + if action_name == 'pause': + islogin = True + ctx.logger.info("pause action detected, browser task pause") + agent.pause() + except Exception as e: + ctx.logger.error(f"Error in new_step_callback_wrapper: {e}") + agent.resume() + data = '' + content_type = ContentTypeInReplyEnum.TXT + if islogin: + data = data + MessageActionInfo(actions=[MessageActionItem()]).model_dump_json() + content_type = ContentTypeInReplyEnum.ACTION_INFO + else: + data = data + model_output.current_state.next_goal or '' + await event_queue.put(genSSEData( + stream_id=ctx.conversation_id, + content=data, + reply_content_type= ReplyContentType(content_type=content_type) + )) + if islogin: + current_screenshot = None + if agent.context: + current_screenshot = agent.context.get('current_screenshot',None) + if current_screenshot: + ctx.logger.info(f'current screenshot exists') + has_change, _, _ = await wait_for_visual_change( + params=WaitForVisualChangeAction( + timeout=300, + similarity_threshold=0.85, + ), + browser_session=agent.browser_session, + initial_screenshot=current_screenshot, + upload_service=ctx.upload_service, + logger=ctx.logger, + ) + if has_change: + ctx.logger.info('detected visual change,browser task resume') + agent.resume() + data = 'timeout: no visual change detected during login' + await event_queue.put(genSSEData( + stream_id=ctx.conversation_id, + content=data, + output_mode=OutputModeEnum.NOT_STREAM, + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + response_for_model=data, + reply_content_type= ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + )) + except Exception as e: + ctx.logger.error(f"Error in new_step_callback_wrapper: {e}") + agent.resume() + # Agent 创建 + agent = MyAgent( + task=ctx.query, + browser_session=browser_session, + register_new_step_callback=new_step_callback_wrapper, + llm=ctx.llm, + page_extraction_llm=ctx.llm, + planner_llm=ctx.llm, + controller=MyController(), + planner_interval=int(os.getenv("ARK_PLANNER_INTERVAL", "1")), + override_system_message=ctx.system_prompt, + extend_planner_system_message=ctx.extend_prompt, + language='zh', + enable_memory=False, + ) + + ctx.logger.info(f"[{task_id}] Agent initialized and ready to run") + # 启动 Agent 任务 + agent_task = asyncio.create_task(agent.run(20,on_step_end=on_step_end)) + ctx.logger.info(f"[{task_id}] Agent started running") + + # 事件流生成 + while True: + try: + # 等待事件或检查任务完成 + try: + event = await asyncio.wait_for(event_queue.get(), timeout=0.5) + yield event + + # 检查是否应该结束 + if event == "error" or (isinstance(event, dict) and event.get("status") == "completed"): + break + + except asyncio.TimeoutError: + # 检查 Agent 任务是否完成 + if agent_task.done(): + break + continue + + except Exception as e: + ctx.logger.error(f"[{task_id}] Error in event streaming: {e}") + break + + # 等待 Agent 任务完成 + if agent_task and not agent_task.done(): + await agent_task + # 获取最终结果 + result = await agent_task if agent_task else None + if result: + final_result = None + for history_item in reversed(result.history): + for result_item in history_item.result: + if hasattr(result_item, "is_done") and result_item.is_done: + final_result = result_item.extracted_content + break + if final_result: + break + + if not final_result: + result = [ + [item.extracted_content for item in history_item.result + if hasattr(item, "extracted_content")] + for history_item in result.history + ] + final_result = "\n".join( + item + for sublist in result + for item in sublist + if isinstance(item, str) + ) + if final_result: + ctx.logger.info(f"[{task_id}] final_result: {final_result}") + completion_event = genSSEData( + stream_id=ctx.conversation_id, + content= final_result, + return_type=ReturnTypeEnum.MODEL, + response_for_model=final_result, + content_type=ContentTypeEnum.TEXT, + output_mode=OutputModeEnum.STREAM, + is_finish= True, + is_last_msg=True, + is_last_packet_in_msg=True, + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + yield completion_event + + ctx.logger.info(f"[{task_id}] Task completed successfully") + + except Exception as e: + ctx.logger.error(f"[{task_id}] Agent execution failed: {e}") + yield genSSEData( + stream_id=ctx.conversation_id, + content=f'err:{e}', + is_finish=True, + is_last_msg=True, + is_last_packet_in_msg=True, + return_type=ReturnTypeEnum.MODEL, + response_for_model=f'err:{e}', + reply_content_type=ReplyContentType(content_type=ContentTypeInReplyEnum.TXT,reply_type=ReplyTypeInReplyEnum.ANSWER) + ) + finally: + if agent: + if agent.browser_session: + agent.browser_session.cdp_url = None + if agent.browser_session.browser_context: + await agent.browser_session.browser_context.close() + if agent.browser_session.browser: + await agent.browser_session.browser.close() + agent.pause() + ctx.logger.info('agent close success') + +@app.get("/") +async def root(): + return {"message": "Hello World"} + + +@app.post("/tasks") +async def sse_task(request: Messages): + task_id = str(uuid.uuid4()) + + # 提取用户消息 + prompt = "" + for message in request.messages: + if message.role == "user": + prompt = message.content + logging.debug(f"Found user message: {prompt}") + break + + logging.info(f"[{task_id}] Starting SSE task with prompt: {prompt}") + + async def generate_sse_events(): + try: + # 创建SSE格式的生成器 + async for event in RunBrowserUseAgent(ctx=RunBrowserUseAgentCtx( + query=prompt, + conversation_id="", + cookie="", + llm_config=LLMConfig( + ), + browser_session_endpoint="http://127.0.0.1:8001/v1/browsers", + max_steps=20 + )): + # 确保事件是SSE格式 + if isinstance(event, str): + # 如果是字符串,直接作为数据发送 + yield f"data: {event}\n\n" + elif isinstance(event, dict): + # 如果是字典,转换为JSON格式 + event_json = json.dumps(event, ensure_ascii=False) + yield f"data: {event_json}\n\n" + else: + # 其他类型转换为字符串 + yield f"data: {str(event)}\n\n" + + except Exception as e: + logging.error(f"[{task_id}] Error in SSE generation: {e}") + # 发送错误事件 + error_event = json.dumps({ + "task_id": task_id, + "status": "error", + "error": str(e) + }, ensure_ascii=False) + yield f"data: {error_event}\n\n" + finally: + # 可选:发送结束标记 + yield f"data: {json.dumps({'task_id': task_id, 'status': 'stream_completed'}, ensure_ascii=False)}\n\n" + + try: + return StreamingResponse( + generate_sse_events(), + media_type="text/event-stream", + ) + except Exception as e: + logging.error(f"[{task_id}] Error creating StreamingResponse: {e}") + # 返回错误响应 + return JSONResponse( + status_code=500, + content={"error": "Failed to create SSE stream", "task_id": task_id} + ) + +if __name__ == "__main__": + enforce_log_format() + uvicorn.run(app, host="0.0.0.0", port=8000) + + + diff --git a/py_package/browser_agent/upload.py b/py_package/browser_agent/upload.py new file mode 100644 index 000000000..72f3c8f7e --- /dev/null +++ b/py_package/browser_agent/upload.py @@ -0,0 +1,23 @@ +from datetime import datetime +from typing import List +from abc import ABC, abstractmethod +from pydantic import BaseModel +class FileItem(BaseModel): + file_name: str + file_type: str + file_size: int + file_uri: str + file_url: str + upload_type: str + create_time: int + update_time: int + +class UploadService(BaseModel,ABC): + headers:dict[str,str] = {} + async def upload_file(self,file_content:str,file_name:str,base64_content:str=''): + pass + async def list_file(self)->List[FileItem]: + pass + +async def filter_file_by_time(file_list:List[FileItem],start_time:int)->List[FileItem]: + return [file for file in file_list if (start_time <= file.create_time or start_time <= file.update_time) ] \ No newline at end of file diff --git a/py_package/browser_agent/utils.py b/py_package/browser_agent/utils.py new file mode 100644 index 000000000..2b5e9fd68 --- /dev/null +++ b/py_package/browser_agent/utils.py @@ -0,0 +1,86 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# Licensed under the 【火山方舟】原型应用软件自用许可协议 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://www.volcengine.com/docs/82379/1433703 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Any, Dict, List +from langchain_core.callbacks import BaseCallbackHandler +from langchain_core.messages import BaseMessage +from langchain_core.outputs import LLMResult + + +root_logger = logging.getLogger() +for handler in root_logger.handlers: + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s', + '%Y-%m-%d %H:%M:%S' + ) + handler.setFormatter(formatter) + + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + + +def enforce_log_format(): + root_logger = logging.getLogger() + for handler in root_logger.handlers: + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s', + '%Y-%m-%d %H:%M:%S' + ) + handler.setFormatter(formatter) + + +class ModelLoggingCallback(BaseCallbackHandler): + def on_chat_model_start( + self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs + ) -> None: + logging.info( + f"[Model] Chat model started\n") + + def on_llm_end(self, response: LLMResult, **kwargs) -> None: + logging.info( + f"[Model] Chat model ended, response: {response}") + + def on_llm_error(self, error: BaseException, **kwargs) -> Any: + logging.info( + f"[Model] Chat model error, response: {error}") + + def on_chain_start( + self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs + ) -> None: + logging.info( + f"[Model] Chain {serialized.get('name')} started") + + def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None: + logging.info(f"[Model] Chain ended, outputs: {outputs}") + +# class that wraps another class and logs all function calls being executed + + +class Wrapper: + def __init__(self, wrapped_class): + self.wrapped_class = wrapped_class + + def __getattr__(self, attr): + original_func = getattr(self.wrapped_class, attr) + + def wrapper(*args, **kwargs): + print(f"Calling function: {attr}") + print(f"Arguments: {args}, {kwargs}") + result = original_func(*args, **kwargs) + print(f"Response: {result}") + return result + + return wrapper diff --git a/py_package/pyproject.toml b/py_package/pyproject.toml new file mode 100644 index 000000000..9d100a159 --- /dev/null +++ b/py_package/pyproject.toml @@ -0,0 +1,29 @@ +[build-system] +requires = ["setuptools>=64.0.0", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +include-package-data = true +package-dir = {"browser_agent" = "browser_agent","stream_helper" = "stream_helper"} + +[tool.setuptools.package-data] +"browser_agent" = ["**/*.mo", "**/*.po"] + +[project] +name = "coze-studio-py-package" # 包名,在PyPI上唯一。通常用中划线 +version = "0.1.0" +description = "browser agent" + +# 如果你的包有依赖,在这里声明 +dependencies = [ + "fastapi==0.109.2", + "uvicorn==0.27.0.post1", + "python-multipart==0.0.9", + "MainContentExtractor==0.0.4", + "aiohttp==3.11.16", + "websockets==15.0.1", + "faiss-cpu==1.10.0", + "browser_use @ git+https://github.com/zzxwill/browser-use.git@i18n_system_prompt", + "playwright==1.52.0", + "pillow==11.3.0" +] \ No newline at end of file diff --git a/py_package/stream_helper/__init__.py b/py_package/stream_helper/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_package/stream_helper/schema.py b/py_package/stream_helper/schema.py new file mode 100644 index 000000000..5f4a97acb --- /dev/null +++ b/py_package/stream_helper/schema.py @@ -0,0 +1,125 @@ +from enum import Enum, IntEnum +from typing import Optional, Dict, Union +from pydantic import BaseModel + +class StepInfo(BaseModel): + step_number:int + goal:str + +class WebSocketItem(BaseModel): + ws_url:str + +class WebSocketInfo(BaseModel): + items: list[WebSocketItem] + + +class MessageActionTypeEnum(int, Enum): + """MessageActionType 的枚举值""" + WebPageAuthorization = 1 # Web page authorization + +class MessageActionItem(BaseModel): + type: MessageActionTypeEnum = MessageActionTypeEnum.WebPageAuthorization + class Config: + use_enum_values = True + +class MessageActionInfo(BaseModel): + actions: list[MessageActionItem] + +# 定义枚举类型 +class FileType(str, Enum): + DIR = "dir" + FILE = "file" + +class FileChangeType(str, Enum): + CREATE = "create" + DELETE = "delete" + UPDATE = "update" + +class FileChangeData(BaseModel): + file_type: FileType = FileType.FILE + file_path: str = '' + file_name: str + change_type: FileChangeType + uri: str + url: str + + class Config: + use_enum_values = True + +class ErrData(BaseModel): + data: Dict[str, str] + + +class FileChangeInfo(BaseModel): + file_change_list: Optional[list[FileChangeData]] = None + err_list: Optional[list[ErrData]] = None + +class OutputModeEnum(int, Enum): + """SSEData.output_mode 的枚举值""" + NOT_STREAM = 0 # 非流式 + STREAM = 1 # 流式 + +class ReturnTypeEnum(int, Enum): + """SSEData.return_type 的枚举值""" + MODEL = 0 # 输出到模型 + USER_TERMINAL = 1 # 输出到终端 + +class ContentTypeEnum(int, Enum): + """SSEData.content_type 的枚举值""" + TEXT = 0 # 文本 + APPLET_WIDGET = 1 # 小程序组件 + LOADING_TIPS = 2 # 加载提示 + CARD = 3 # 卡片 + VERBOSE = 4 # 详细信息 + USAGE = 10 # 使用情况 + +class ContextModeEnum(int, Enum): + """SSEData.context_mode 的枚举值""" + NOT_IGNORE = 0 # 不忽略上下文 + IGNORE = 1 # 忽略上下文 + +class ReplyTypeInReplyEnum(IntEnum): + """Type of reply (purpose/context).""" + ANSWER = 1 # Standard answer + SUGGEST = 2 # Suggestion (e.g., follow-up questions) + LLM_OUTPUT = 3 # Raw LLM output (before processing) + TOOL_OUTPUT = 4 # Output from an external tool/API + VERBOSE = 100 # Debug/verbose logging + PLACEHOLDER = 101 # Placeholder (e.g., loading state) + TOOL_VERBOSE = 102 # Verbose logs from tools + +class ContentTypeInReplyEnum(IntEnum): + """Format of the content.""" + TXT = 1 # Plain text + IMAGE = 2 # Image + VIDEO = 4 # Video + MUSIC = 7 # Audio + CARD = 50 # Structured card (e.g., rich UI element) + WIDGET = 52 # Interactive widget + APP = 100 # Embedded application + WEBSOCKET_INFO = 101 # Websocket metadata + FILE_CHANGE_INFO = 102 # File system event + ACTION_INFO = 103 # Action/command metadata + +class ReplyContentType(BaseModel): + """Combines reply type and content type for structured responses.""" + reply_type: ReplyTypeInReplyEnum = ReplyTypeInReplyEnum.TOOL_VERBOSE + content_type: ContentTypeInReplyEnum = ContentTypeInReplyEnum.TXT + +class SSEData(BaseModel): + stream_id: str + message_title: Optional[str] = None + context_mode: Union[ContextModeEnum, int] = ContextModeEnum.NOT_IGNORE + output_mode: OutputModeEnum = OutputModeEnum.STREAM # 0=非流式, 1=流式 + return_type: Union[ReturnTypeEnum, int] = ReturnTypeEnum.USER_TERMINAL + content_type: Union[ContentTypeEnum, int] = ContentTypeEnum.TEXT + is_last_msg: bool = False + is_finish: bool = False + is_last_packet_in_msg: bool = False + content: Optional[str] = None + response_for_model: Optional[str] = None + ext: Optional[Dict[str, str]] = None + card_body: Optional[str] = None + reply_content_type :Optional[ReplyContentType] = None + class Config: + use_enum_values = True # 序列化时使用枚举的原始值 \ No newline at end of file