| | import ast
|
| | import asyncio
|
| | import io
|
| | import json
|
| | import logging
|
| | import os
|
| | from typing import Any, Dict, List, Tuple
|
| | from urllib.parse import quote_plus
|
| |
|
| | from openai import AsyncOpenAI
|
| | from PIL import Image
|
| | from playwright.async_api import BrowserContext, Download, Page
|
| | from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential
|
| |
|
| | from ._prompts import get_computer_use_system_prompt
|
| | from .browser.playwright_controller import PlaywrightController
|
| | from .types import (
|
| | AssistantMessage,
|
| | FunctionCall,
|
| | ImageObj,
|
| | LLMMessage,
|
| | ModelResponse,
|
| | SystemMessage,
|
| | UserMessage,
|
| | WebSurferEvent,
|
| | message_to_openai_format,
|
| | )
|
| | from .utils import get_trimmed_url
|
| |
|
| |
|
| | class FaraAgent:
|
| | DEFAULT_START_PAGE = "https://www.bing.com/"
|
| |
|
| | MLM_PROCESSOR_IM_CFG = {
|
| | "min_pixels": 3136,
|
| | "max_pixels": 12845056,
|
| | "patch_size": 14,
|
| | "merge_size": 2,
|
| | }
|
| |
|
| | SCREENSHOT_TOKENS = 1105
|
| | USER_MESSAGE = "Here is the next screenshot. Think about what to do next."
|
| | MAX_URL_LENGTH = 100
|
| |
|
| | def __init__(
|
| | self,
|
| | browser_manager: Any,
|
| | client_config: dict,
|
| | downloads_folder: str | None = None,
|
| | start_page: str | None = "about:blank",
|
| | animate_actions: bool = False,
|
| | single_tab_mode: bool = True,
|
| | max_n_images: int = 3,
|
| | fn_call_template: str = "default",
|
| | model_call_timeout: int = 20,
|
| | max_rounds: int = 10,
|
| | save_screenshots: bool = False,
|
| | logger: logging.Logger | None = None,
|
| | ):
|
| | self.downloads_folder = downloads_folder
|
| | if not os.path.exists(self.downloads_folder or "") and self.downloads_folder:
|
| | os.makedirs(self.downloads_folder)
|
| | self.single_tab_mode = single_tab_mode
|
| | self.start_page = start_page or self.DEFAULT_START_PAGE
|
| | self.animate_actions = animate_actions
|
| | self.browser_manager = browser_manager
|
| | self.client_config = client_config
|
| | self.max_n_images = max_n_images
|
| | self.fn_call_template = fn_call_template
|
| | self.model_call_timeout = model_call_timeout
|
| | self.max_rounds = max_rounds
|
| | self.max_url_chars = self.MAX_URL_LENGTH
|
| | if save_screenshots and self.downloads_folder is None:
|
| | assert False, "downloads_folder must be set if save_screenshots is True"
|
| | self.save_screenshots = save_screenshots
|
| | self._facts = []
|
| | self._task_summary = None
|
| | self._num_actions = 0
|
| | self.logger = logger or logging.getLogger(__name__)
|
| | self._mlm_width = 1440
|
| | self._mlm_height = 900
|
| | self.viewport_height = 900
|
| | self.viewport_width = 1440
|
| | self.include_input_text_key_args = True
|
| |
|
| | def _download_handler(download: Download) -> None:
|
| | self._last_download = download
|
| |
|
| | self._download_handler = _download_handler
|
| | self.did_initialize = False
|
| |
|
| |
|
| | self._openai_client: AsyncOpenAI | None = None
|
| | self._chat_history: List[LLMMessage] = []
|
| |
|
| | async def initialize(self) -> None:
|
| | if self.did_initialize:
|
| | return
|
| | self._last_download = None
|
| | self._prior_metadata_hash = None
|
| |
|
| |
|
| | self._openai_client = AsyncOpenAI(
|
| | api_key=self.client_config.get("api_key"),
|
| | base_url=self.client_config.get("base_url"),
|
| | default_headers=self.client_config.get("default_headers"),
|
| | )
|
| |
|
| |
|
| | self.browser_manager.set_download_handler(self._download_handler)
|
| |
|
| |
|
| | await self.browser_manager.init(self.start_page)
|
| | self.did_initialize = True
|
| |
|
| | @property
|
| | def _page(self) -> Page | None:
|
| | """Get the current page from browser manager."""
|
| | return self.browser_manager.page if self.browser_manager else None
|
| |
|
| | @_page.setter
|
| | def _page(self, value):
|
| | if self.browser_manager:
|
| | self.browser_manager.page = value
|
| | else:
|
| | raise ValueError("Browser manager is not initialized. Cannot set page.")
|
| |
|
| | @property
|
| | def context(self) -> BrowserContext | None:
|
| | """Get the browser context from browser manager."""
|
| | return self.browser_manager.context if self.browser_manager else None
|
| |
|
| | @property
|
| | def _playwright_controller(self) -> PlaywrightController | None:
|
| | """Get the playwright controller from browser manager."""
|
| | return (
|
| | self.browser_manager.playwright_controller if self.browser_manager else None
|
| | )
|
| |
|
| | async def wait_for_captcha_with_timeout(
|
| | self, timeout_seconds=300
|
| | ):
|
| | """Wait for captcha to be solved with timeout"""
|
| | try:
|
| | await asyncio.wait_for(
|
| | self.browser_manager.wait_for_captcha_resolution(),
|
| | timeout=timeout_seconds,
|
| | )
|
| | return True
|
| | except asyncio.TimeoutError:
|
| | self.logger.warning(f"Captcha timeout after {timeout_seconds} seconds!")
|
| |
|
| | self.browser_manager._captcha_event.set()
|
| | return False
|
| |
|
| | @retry(
|
| | stop=stop_after_attempt(5),
|
| | wait=wait_exponential(multiplier=5.0, min=5.0, max=60),
|
| | before_sleep=before_sleep_log(logging.getLogger(__name__), logging.WARNING),
|
| | reraise=True,
|
| | )
|
| | async def _make_model_call(
|
| | self,
|
| | history: List[LLMMessage],
|
| | extra_create_args: Dict[str, Any] | None = None,
|
| | ) -> ModelResponse:
|
| | """Make a model call using OpenAI client"""
|
| | openai_messages = [message_to_openai_format(msg) for msg in history]
|
| | request_params = {
|
| | "model": self.client_config.get("model", "gpt-4o"),
|
| | "messages": openai_messages,
|
| | }
|
| | if extra_create_args:
|
| | request_params.update(extra_create_args)
|
| |
|
| | response = await self._openai_client.chat.completions.create(**request_params)
|
| | content = response.choices[0].message.content
|
| | usage = {}
|
| | if response.usage:
|
| | usage = {
|
| | "prompt_tokens": response.usage.prompt_tokens,
|
| | "completion_tokens": response.usage.completion_tokens,
|
| | "total_tokens": response.usage.total_tokens,
|
| | }
|
| | return ModelResponse(content=content, usage=usage)
|
| |
|
| | def remove_screenshot_from_message(self, msg: List[Dict[str, Any]] | Any) -> Any:
|
| | """Remove the screenshot from the message content."""
|
| | if isinstance(msg.content, list):
|
| | new_content = []
|
| | for c in msg.content:
|
| | if not isinstance(c, ImageObj):
|
| | new_content.append(c)
|
| | msg.content = new_content
|
| | elif isinstance(msg.content, ImageObj):
|
| | msg = None
|
| | return msg
|
| |
|
| | def maybe_remove_old_screenshots(
|
| | self, history: List[LLMMessage], includes_current: bool = False
|
| | ) -> List[LLMMessage]:
|
| | """Remove old screenshots from the chat history. Assuming we have not yet added the current screenshot message.
|
| |
|
| | Note: Original user messages (marked with is_original=True) have their TEXT preserved,
|
| | but their images may be removed if we exceed max_n_images. Boilerplate messages can be
|
| | completely removed.
|
| | """
|
| | if self.max_n_images <= 0:
|
| | return history
|
| |
|
| | max_n_images = self.max_n_images if includes_current else self.max_n_images - 1
|
| | new_history: List[LLMMessage] = []
|
| | n_images = 0
|
| | for i in range(len(history) - 1, -1, -1):
|
| | msg = history[i]
|
| |
|
| | is_original_user_message = isinstance(msg, UserMessage) and getattr(
|
| | msg, "is_original", False
|
| | )
|
| |
|
| | if i == 0 and n_images >= max_n_images:
|
| |
|
| | msg = self.remove_screenshot_from_message(msg)
|
| | if msg is None:
|
| | continue
|
| |
|
| | if isinstance(msg.content, list):
|
| |
|
| | has_image = False
|
| | for c in msg.content:
|
| | if isinstance(c, ImageObj):
|
| | has_image = True
|
| | break
|
| | if has_image:
|
| | if n_images < max_n_images:
|
| | new_history.append(msg)
|
| | elif is_original_user_message:
|
| |
|
| | msg = self.remove_screenshot_from_message(msg)
|
| | if msg is not None:
|
| | new_history.append(msg)
|
| | n_images += 1
|
| | else:
|
| | new_history.append(msg)
|
| | elif isinstance(msg.content, ImageObj):
|
| | if n_images < max_n_images:
|
| | new_history.append(msg)
|
| | n_images += 1
|
| | else:
|
| | new_history.append(msg)
|
| |
|
| | new_history = new_history[::-1]
|
| |
|
| | return new_history
|
| |
|
| | async def _get_scaled_screenshot(self) -> Image.Image:
|
| | """Get current screenshot and scale it for the model."""
|
| | screenshot = await self._playwright_controller.get_screenshot(self._page)
|
| | screenshot = Image.open(io.BytesIO(screenshot))
|
| | _, scaled_screenshot = self._get_system_message(screenshot)
|
| | return scaled_screenshot
|
| |
|
| | def _get_system_message(
|
| | self, screenshot: ImageObj | Image.Image
|
| | ) -> Tuple[List[SystemMessage], Image.Image]:
|
| | system_prompt_info = get_computer_use_system_prompt(
|
| | screenshot,
|
| | self.MLM_PROCESSOR_IM_CFG,
|
| | include_input_text_key_args=self.include_input_text_key_args,
|
| | fn_call_template=self.fn_call_template,
|
| | )
|
| | self._mlm_width, self._mlm_height = system_prompt_info["im_size"]
|
| | scaled_screenshot = screenshot.resize((self._mlm_width, self._mlm_height))
|
| |
|
| | system_message = []
|
| | for msg in system_prompt_info["conversation"]:
|
| | tmp_content = ""
|
| | for content in msg["content"]:
|
| | tmp_content += content["text"]
|
| |
|
| | system_message.append(SystemMessage(content=tmp_content))
|
| |
|
| | return system_message, scaled_screenshot
|
| |
|
| | def _parse_thoughts_and_action(self, message: str) -> Tuple[str, Dict[str, Any]]:
|
| | try:
|
| | tmp = message.split("<tool_call>\n")
|
| | thoughts = tmp[0].strip()
|
| | action_text = tmp[1].split("\n</tool_call>")[0]
|
| | try:
|
| | action = json.loads(action_text)
|
| | except json.decoder.JSONDecodeError:
|
| | self.logger.error(f"Invalid action text: {action_text}")
|
| | action = ast.literal_eval(action_text)
|
| |
|
| | return thoughts, action
|
| | except Exception as e:
|
| | self.logger.error(
|
| | f"Error parsing thoughts and action: {message}", exc_info=True
|
| | )
|
| | raise e
|
| |
|
| | def convert_resized_coords_to_original(
|
| | self, coords: List[float], rsz_w: int, rsz_h: int, og_w: int, og_h: int
|
| | ) -> List[float]:
|
| | scale_x = og_w / rsz_w
|
| | scale_y = og_h / rsz_h
|
| | return [coords[0] * scale_x, coords[1] * scale_y]
|
| |
|
| | def proc_coords(
|
| | self,
|
| | coords: List[float] | None,
|
| | im_w: int,
|
| | im_h: int,
|
| | og_im_w: int | None = None,
|
| | og_im_h: int | None = None,
|
| | ) -> List[float] | None:
|
| | if not coords:
|
| | return coords
|
| |
|
| | if og_im_w is None:
|
| | og_im_w = im_w
|
| | if og_im_h is None:
|
| | og_im_h = im_h
|
| |
|
| | tgt_x, tgt_y = coords
|
| | return self.convert_resized_coords_to_original(
|
| | [tgt_x, tgt_y], im_w, im_h, og_im_w, og_im_h
|
| | )
|
| |
|
| | async def run(self, user_message: str) -> Tuple:
|
| | """Run the agent with a user message."""
|
| |
|
| | await self.initialize()
|
| |
|
| |
|
| | assert self._page is not None, "Page should be initialized"
|
| |
|
| |
|
| | scaled_screenshot = await self._get_scaled_screenshot()
|
| |
|
| | if self.save_screenshots:
|
| | await self._playwright_controller.get_screenshot(
|
| | self._page,
|
| | path=os.path.join(
|
| | self.downloads_folder, f"screenshot{self._num_actions}.png"
|
| | ),
|
| | )
|
| |
|
| | self._chat_history.append(
|
| | UserMessage(
|
| | content=[ImageObj.from_pil(scaled_screenshot), user_message],
|
| | is_original=True,
|
| | )
|
| | )
|
| |
|
| | all_actions = []
|
| | all_observations = []
|
| | final_answer = "<no_answer>"
|
| | is_stop_action = False
|
| | for i in range(self.max_rounds):
|
| | is_first_round = i == 0
|
| | if not self.browser_manager._captcha_event.is_set():
|
| | self.logger.info("Waiting 60s for captcha to finish...")
|
| | captcha_solved = await self.wait_for_captcha_with_timeout(60)
|
| | if (
|
| | not captcha_solved
|
| | and not self.browser_manager._captcha_event.is_set()
|
| | ):
|
| | raise RuntimeError(
|
| | "Captcha timed out, unable to proceed with web surfing."
|
| | )
|
| |
|
| | function_call, raw_response = await self.generate_model_call(
|
| | is_first_round, scaled_screenshot if is_first_round else None
|
| | )
|
| | assert isinstance(raw_response, str)
|
| | all_actions.append(raw_response)
|
| | thoughts, action_dict = self._parse_thoughts_and_action(raw_response)
|
| | action_args = action_dict.get("arguments", {})
|
| | action = action_args["action"]
|
| | self.logger.info(
|
| | f"\nThought #{i + 1}: {thoughts}\nAction #{i + 1}: executing tool '{action}' with arguments {json.dumps(action_args)}"
|
| | )
|
| |
|
| | (
|
| | is_stop_action,
|
| | new_screenshot,
|
| | action_description,
|
| | ) = await self.execute_action(function_call)
|
| | all_observations.append(action_description)
|
| | self.logger.info(f"Observation#{i + 1}: {action_description}")
|
| | if is_stop_action:
|
| | final_answer = thoughts
|
| | break
|
| | return final_answer, all_actions, all_observations
|
| |
|
| | async def generate_model_call(
|
| | self, is_first_round: bool, first_screenshot: Image.Image | None = None
|
| | ) -> Tuple[List[FunctionCall], str]:
|
| | history = self.maybe_remove_old_screenshots(self._chat_history)
|
| |
|
| | screenshot_for_system = first_screenshot
|
| | if not is_first_round:
|
| |
|
| | scaled_screenshot = await self._get_scaled_screenshot()
|
| | screenshot_for_system = scaled_screenshot
|
| |
|
| | text_prompt = self.USER_MESSAGE
|
| | curr_url = await self._playwright_controller.get_page_url(self._page)
|
| | trimmed_url = get_trimmed_url(curr_url, max_len=self.max_url_chars)
|
| | text_prompt = f"Current URL: {trimmed_url}\n" + text_prompt
|
| |
|
| | curr_message = UserMessage(
|
| | content=[ImageObj.from_pil(scaled_screenshot), text_prompt]
|
| | )
|
| | self._chat_history.append(curr_message)
|
| | history.append(curr_message)
|
| |
|
| |
|
| | system_message, _ = self._get_system_message(screenshot_for_system)
|
| | history = system_message + history
|
| | response = await self._make_model_call(
|
| | history, extra_create_args={"temperature": 0}
|
| | )
|
| | message = response.content
|
| |
|
| | self._chat_history.append(AssistantMessage(content=message))
|
| | thoughts, action = self._parse_thoughts_and_action(message)
|
| | action["arguments"]["thoughts"] = thoughts
|
| |
|
| | function_call = [FunctionCall(id="dummy", **action)]
|
| | return function_call, message
|
| |
|
| | async def execute_action(
|
| | self,
|
| | function_call: List[FunctionCall],
|
| | ) -> Tuple[bool, bytes, str]:
|
| | name = function_call[0].name
|
| | args = function_call[0].arguments
|
| | action_description = ""
|
| | assert self._page is not None
|
| | self.logger.debug(
|
| | WebSurferEvent(
|
| | source="FaraAgent",
|
| | url=await self._playwright_controller.get_page_url(self._page),
|
| | action=name,
|
| | arguments=args,
|
| | message=f"{name}( {json.dumps(args)} )",
|
| | )
|
| | )
|
| | if "coordinate" in args:
|
| | args["coordinate"] = self.proc_coords(
|
| | args["coordinate"],
|
| | self._mlm_width,
|
| | self._mlm_height,
|
| | self.viewport_width,
|
| | self.viewport_height,
|
| | )
|
| |
|
| | is_stop_action = False
|
| |
|
| | if args["action"] == "visit_url":
|
| | url = str(args["url"])
|
| | action_description = f"I typed '{url}' into the browser address bar."
|
| |
|
| | if url.startswith(("https://", "http://", "file://", "about:")):
|
| | (
|
| | reset_prior_metadata,
|
| | reset_last_download,
|
| | ) = await self._playwright_controller.visit_page(self._page, url)
|
| |
|
| | elif " " in url:
|
| | (
|
| | reset_prior_metadata,
|
| | reset_last_download,
|
| | ) = await self._playwright_controller.visit_page(
|
| | self._page,
|
| | f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH",
|
| | )
|
| |
|
| | else:
|
| | (
|
| | reset_prior_metadata,
|
| | reset_last_download,
|
| | ) = await self._playwright_controller.visit_page(
|
| | self._page, "https://" + url
|
| | )
|
| | if reset_last_download and self._last_download is not None:
|
| | self._last_download = None
|
| | if reset_prior_metadata and self._prior_metadata_hash is not None:
|
| | self._prior_metadata_hash = None
|
| | elif args["action"] == "history_back":
|
| | action_description = "I clicked the browser back button."
|
| | await self._playwright_controller.back(self._page)
|
| | elif args["action"] == "web_search":
|
| | query = args.get("query")
|
| | action_description = f"I typed '{query}' into the browser search bar."
|
| | encoded_query = quote_plus(query)
|
| | (
|
| | reset_prior_metadata,
|
| | reset_last_download,
|
| | ) = await self._playwright_controller.visit_page(
|
| | self._page, f"https://www.bing.com/search?q={encoded_query}&FORM=QBLH"
|
| | )
|
| | if reset_last_download and self._last_download is not None:
|
| | self._last_download = None
|
| | if reset_prior_metadata and self._prior_metadata_hash is not None:
|
| | self._prior_metadata_hash = None
|
| | elif args["action"] == "scroll":
|
| | pixels = int(args.get("pixels", 0))
|
| | if pixels > 0:
|
| | action_description = "I scrolled up one page in the browser."
|
| | await self._playwright_controller.page_up(self._page)
|
| | elif pixels < 0:
|
| | action_description = "I scrolled down one page in the browser."
|
| | await self._playwright_controller.page_down(self._page)
|
| | elif args["action"] == "keypress" or args["action"] == "key":
|
| | keys = args.get("keys", [])
|
| | action_description = f"I pressed the following keys: {keys}"
|
| | await self._playwright_controller.keypress(self._page, keys)
|
| | elif args["action"] == "hover" or args["action"] == "mouse_move":
|
| | if "coordinate" in args:
|
| | tgt_x, tgt_y = args["coordinate"]
|
| | await self._playwright_controller.hover_coords(self._page, tgt_x, tgt_y)
|
| |
|
| | elif args["action"] == "sleep" or args["action"] == "wait":
|
| | duration = args.get("duration", 3.0)
|
| | duration = args.get("time", duration)
|
| | action_description = (
|
| | "I am waiting a short period of time before taking further action."
|
| | )
|
| | await self._playwright_controller.sleep(self._page, duration)
|
| | elif args["action"] == "click" or args["action"] == "left_click":
|
| | if "coordinate" in args:
|
| | tgt_x, tgt_y = args["coordinate"]
|
| | action_description = f"I clicked at coordinates ({tgt_x}, {tgt_y})."
|
| | new_page_tentative = await self._playwright_controller.click_coords(
|
| | self._page, tgt_x, tgt_y
|
| | )
|
| |
|
| | if new_page_tentative is not None:
|
| | self._page = new_page_tentative
|
| | self._prior_metadata_hash = None
|
| |
|
| | elif args["action"] == "input_text" or args["action"] == "type":
|
| | text_value = str(args.get("text", args.get("text_value")))
|
| | action_description = f"I typed '{text_value}'."
|
| | press_enter = args.get("press_enter", True)
|
| | delete_existing_text = args.get("delete_existing_text", False)
|
| |
|
| | if "coordinate" in args:
|
| | tgt_x, tgt_y = args["coordinate"]
|
| | new_page_tentative = await self._playwright_controller.fill_coords(
|
| | self._page,
|
| | tgt_x,
|
| | tgt_y,
|
| | text_value,
|
| | press_enter=press_enter,
|
| | delete_existing_text=delete_existing_text,
|
| | )
|
| | if new_page_tentative is not None:
|
| | self._page = new_page_tentative
|
| | self._prior_metadata_hash = None
|
| |
|
| | elif args["action"] == "pause_and_memorize_fact":
|
| | fact = str(args.get("fact"))
|
| | self._facts.append(fact)
|
| | action_description = f"I memorized the following fact: {fact}"
|
| | elif args["action"] == "stop" or args["action"] == "terminate":
|
| | action_description = args.get("thoughts")
|
| | is_stop_action = True
|
| |
|
| | else:
|
| | raise ValueError(f"Unknown tool: {args['action']}")
|
| |
|
| | await self._playwright_controller.wait_for_load_state(self._page)
|
| | await self._playwright_controller.sleep(self._page, 3)
|
| |
|
| |
|
| | self._num_actions += 1
|
| | if self.save_screenshots:
|
| | new_screenshot = await self._playwright_controller.get_screenshot(
|
| | self._page,
|
| | path=os.path.join(
|
| | self.downloads_folder, f"screenshot{self._num_actions}.png"
|
| | ),
|
| | )
|
| | else:
|
| | new_screenshot = await self._playwright_controller.get_screenshot(
|
| | self._page
|
| | )
|
| | return is_stop_action, new_screenshot, action_description
|
| |
|
| | async def close(self) -> None:
|
| | """
|
| | Close the browser and the page.
|
| | Should be called when the agent is no longer needed.
|
| | """
|
| | if self._page is not None:
|
| | self._page = None
|
| | await self.browser_manager.close()
|
| |
|