Skip to content

Tool registry

ToolRegistry holds the Tool instances available to a processor, exposes their JSON schemas to the model, and dispatches tool calls during the agentic loop. For the catalogue of built-in tools, see Tools.

registry

Tool registry with image management, execution tracking, and documentation.

This module provides: - ToolDocumenter: Schema generation and documentation formatting - ToolRegistry: Complete tool management with image handling and execution - EncodedImage: Container for base64-encoded image data - encode_image: Utility to encode PIL Images

ImageManager

Manages image loading, transformation, and state.

Example

manager = ImageManager() manager.set_image(Path("scan.png")) manager.transform_image(lambda img: zoom_image(img, 2.0)) current = manager.current_image

Source code in src/gaze/tools/image_manager.py
@beartype
class ImageManager:
    """Manages image loading, transformation, and state.

    Example:
        manager = ImageManager()
        manager.set_image(Path("scan.png"))
        manager.transform_image(lambda img: zoom_image(img, 2.0))
        current = manager.current_image
    """

    @beartype
    def __init__(self) -> None:
        """Initialize image manager."""
        self._image_path: Path | None = None
        self._current_image: Image.Image | None = None
        self._original_image: Image.Image | None = None
        self._original_encoding: EncodedImage | None = None
        self._image_lock = asyncio.Lock()
        self._modified: bool = False

    @property
    def current_image(self) -> Image.Image | None:
        """Get the currently loaded image (may be None if not yet loaded)."""
        return self._current_image

    @property
    def image_path(self) -> Path | None:
        """Get the path to the source image."""
        return self._image_path

    @property
    def has_image(self) -> bool:
        """Check if an image is currently loaded."""
        return self._current_image is not None

    @property
    def is_modified(self) -> bool:
        """Whether the current image has been modified from the original.

        Set ``True`` by :meth:`transform_image`, cleared by
        :meth:`reset_to_original`, :meth:`set_image`,
        :meth:`set_preloaded_image`, and :meth:`close`.
        """
        return self._modified

    @property
    def original_encoding(self) -> Any:
        """Cached base64 encoding of the *original* (unmodified) image.

        Set once after the first encode and reused by reset to skip
        redundant JPEG→base64 work.

        Typed as ``Any`` at runtime to avoid circular-import forward-reference
        issues with beartype.  Static checkers see ``EncodedImage | None`` via
        the TYPE_CHECKING guard.
        """
        return self._original_encoding

    @original_encoding.setter
    def original_encoding(self, value: Any) -> None:
        self._original_encoding = value

    @staticmethod
    def _validate_image_path(image_path: Path) -> Path:
        """Resolve and validate an image path against traversal attacks.

        Returns the resolved (absolute, symlink-free) path.

        Raises:
            ToolExecutionError: If the path escapes allowed directories or
                uses dangerous patterns like ``..``.
        """
        resolved = image_path.resolve()
        # Block paths that contain '..' components before resolution
        # (e.g. symlink tricks where resolved path looks safe but intent is malicious)
        if ".." in image_path.parts:
            raise ToolExecutionError(f"Path traversal detected: {image_path}")
        # Block device files and other non-regular files (if they already exist)
        if resolved.exists() and not resolved.is_file():
            raise ToolExecutionError(f"Path is not a regular file: {image_path}")
        return resolved

    @beartype
    def set_image(self, image_path: Path) -> None:
        """Set the source image for operations.

        Args:
            image_path: Path to the image file

        Raises:
            ToolExecutionError: If image cannot be loaded or path is invalid
        """
        resolved = self._validate_image_path(image_path)
        self.close()

        try:
            with Image.open(resolved) as img:
                self._original_image = img.copy()
                self._current_image = self._original_image.copy()
        except FileNotFoundError as e:
            raise ToolExecutionError(f"Image file not found: {image_path}") from e
        except UnidentifiedImageError as e:
            raise ToolExecutionError(f"File is not a valid image: {image_path}") from e
        except OSError as e:
            raise ToolExecutionError(f"Failed to read image file {image_path}: {e}") from e

        self._image_path = resolved
        self._modified = False
        logger.debug(f"Loaded image: {resolved} ({self._current_image.size})")

    @beartype
    async def ensure_loaded(self) -> None:
        """Ensure image is loaded, loading from path if necessary.

        This is thread-safe and can be called multiple times.

        Raises:
            ToolExecutionError: If no image path is set or image cannot be loaded.
        """
        # Fast path - if already loaded, return immediately
        if self._current_image is not None:
            return

        # Acquire lock before checking again to prevent race conditions
        async with self._image_lock:
            # Double-checked locking pattern - check again after acquiring lock
            if self._current_image is not None:
                return

            if self._image_path is None:
                raise ToolExecutionError("No image path set - call set_image() first")

            # Re-validate in case the path was set before validation was added
            image_path = self._validate_image_path(self._image_path)

            def _load_image() -> Image.Image:
                with Image.open(image_path) as img:
                    return img.copy()

            try:
                loaded = await asyncio.to_thread(_load_image)
            except FileNotFoundError as e:
                raise ToolExecutionError(f"Image file not found: {self._image_path}") from e
            except UnidentifiedImageError as e:
                raise ToolExecutionError(f"File is not a valid image: {self._image_path}") from e
            except OSError as e:
                raise ToolExecutionError(
                    f"Failed to read image file {self._image_path}: {e}"
                ) from e

            self._original_image = loaded
            self._current_image = loaded.copy()
            self._modified = False

    @beartype
    def transform_image(self, operation: Callable[[Image.Image], Image.Image]) -> None:
        """Apply a transformation to the current image with automatic cleanup.

        The previous ``_current_image`` is always closed after the operation
        since current and original are always independent copies.

        This method is synchronous and does **not** acquire ``_image_lock``.
        It is safe to call from the single-threaded asyncio event loop (the
        normal execution path via ``ToolRegistry.execute``), but callers must
        not invoke it concurrently from multiple coroutines on the same
        ``ImageManager`` instance.

        Args:
            operation: Function that takes current image and returns new image

        Raises:
            ToolExecutionError: If no image is loaded

        Example:
            manager.transform_image(lambda img: zoom_image(img, 2.0))
        """
        if self._current_image is None:
            raise ToolExecutionError("No image loaded to transform")

        old_image = self._current_image
        self._current_image = operation(old_image)
        self._modified = True
        # Safe to always close: current is never aliased to original.
        if old_image is not self._current_image:
            old_image.close()

    @beartype
    def set_preloaded_image(
        self,
        image: Image.Image,
        image_path: Path,
        *,
        transfer_ownership: bool = False,
    ) -> None:
        """Set a pre-loaded PIL Image, avoiding a redundant disk read.

        Args:
            image: Already-loaded PIL Image (must have pixel data in memory).
            image_path: Path to the source file (stored for reset/logging).
            transfer_ownership: If True, the manager takes ownership of
                *image* directly (used as ``_original_image`` without
                copying).  The caller **must not** use or close *image*
                after this call.  When False (default), *image* is copied
                so the caller retains ownership.

        Raises:
            ToolExecutionError: If image_path contains traversal patterns.
        """
        resolved = self._validate_image_path(image_path)
        self.close()
        if transfer_ownership:
            self._original_image = image
        else:
            self._original_image = image.copy()
        self._current_image = self._original_image.copy()
        self._image_path = resolved
        self._modified = False

    @beartype
    def reset_to_original(self) -> None:
        """Reset the current image to the originally loaded state.

        Raises:
            ToolExecutionError: If no image is loaded
        """
        if self._original_image is None:
            raise ToolExecutionError("No original image to reset to")

        if self._current_image is self._original_image:
            return

        old_image = self._current_image
        self._current_image = self._original_image.copy()
        self._modified = False
        if old_image is not None:
            old_image.close()
        logger.debug("Reset image to original")

    @beartype
    def close(self) -> None:
        """Close and release all image resources."""
        if self._current_image is not None:
            self._current_image.close()
        self._current_image = None
        if self._original_image is not None:
            self._original_image.close()
        self._original_image = None
        self._image_path = None
        self._original_encoding = None
        self._modified = False

current_image property

current_image: Image | None

Get the currently loaded image (may be None if not yet loaded).

image_path property

image_path: Path | None

Get the path to the source image.

has_image property

has_image: bool

Check if an image is currently loaded.

is_modified property

is_modified: bool

Whether the current image has been modified from the original.

Set True by :meth:transform_image, cleared by :meth:reset_to_original, :meth:set_image, :meth:set_preloaded_image, and :meth:close.

original_encoding property writable

original_encoding: Any

Cached base64 encoding of the original (unmodified) image.

Set once after the first encode and reused by reset to skip redundant JPEG→base64 work.

Typed as Any at runtime to avoid circular-import forward-reference issues with beartype. Static checkers see EncodedImage | None via the TYPE_CHECKING guard.

__init__

__init__() -> None

Initialize image manager.

Source code in src/gaze/tools/image_manager.py
@beartype
def __init__(self) -> None:
    """Initialize image manager."""
    self._image_path: Path | None = None
    self._current_image: Image.Image | None = None
    self._original_image: Image.Image | None = None
    self._original_encoding: EncodedImage | None = None
    self._image_lock = asyncio.Lock()
    self._modified: bool = False

set_image

set_image(image_path: Path) -> None

Set the source image for operations.

Parameters:

Name Type Description Default
image_path Path

Path to the image file

required

Raises:

Type Description
ToolExecutionError

If image cannot be loaded or path is invalid

Source code in src/gaze/tools/image_manager.py
@beartype
def set_image(self, image_path: Path) -> None:
    """Set the source image for operations.

    Args:
        image_path: Path to the image file

    Raises:
        ToolExecutionError: If image cannot be loaded or path is invalid
    """
    resolved = self._validate_image_path(image_path)
    self.close()

    try:
        with Image.open(resolved) as img:
            self._original_image = img.copy()
            self._current_image = self._original_image.copy()
    except FileNotFoundError as e:
        raise ToolExecutionError(f"Image file not found: {image_path}") from e
    except UnidentifiedImageError as e:
        raise ToolExecutionError(f"File is not a valid image: {image_path}") from e
    except OSError as e:
        raise ToolExecutionError(f"Failed to read image file {image_path}: {e}") from e

    self._image_path = resolved
    self._modified = False
    logger.debug(f"Loaded image: {resolved} ({self._current_image.size})")

ensure_loaded async

ensure_loaded() -> None

Ensure image is loaded, loading from path if necessary.

This is thread-safe and can be called multiple times.

Raises:

Type Description
ToolExecutionError

If no image path is set or image cannot be loaded.

Source code in src/gaze/tools/image_manager.py
@beartype
async def ensure_loaded(self) -> None:
    """Ensure image is loaded, loading from path if necessary.

    This is thread-safe and can be called multiple times.

    Raises:
        ToolExecutionError: If no image path is set or image cannot be loaded.
    """
    # Fast path - if already loaded, return immediately
    if self._current_image is not None:
        return

    # Acquire lock before checking again to prevent race conditions
    async with self._image_lock:
        # Double-checked locking pattern - check again after acquiring lock
        if self._current_image is not None:
            return

        if self._image_path is None:
            raise ToolExecutionError("No image path set - call set_image() first")

        # Re-validate in case the path was set before validation was added
        image_path = self._validate_image_path(self._image_path)

        def _load_image() -> Image.Image:
            with Image.open(image_path) as img:
                return img.copy()

        try:
            loaded = await asyncio.to_thread(_load_image)
        except FileNotFoundError as e:
            raise ToolExecutionError(f"Image file not found: {self._image_path}") from e
        except UnidentifiedImageError as e:
            raise ToolExecutionError(f"File is not a valid image: {self._image_path}") from e
        except OSError as e:
            raise ToolExecutionError(
                f"Failed to read image file {self._image_path}: {e}"
            ) from e

        self._original_image = loaded
        self._current_image = loaded.copy()
        self._modified = False

transform_image

transform_image(
    operation: Callable[[Image], Image],
) -> None

Apply a transformation to the current image with automatic cleanup.

The previous _current_image is always closed after the operation since current and original are always independent copies.

This method is synchronous and does not acquire _image_lock. It is safe to call from the single-threaded asyncio event loop (the normal execution path via ToolRegistry.execute), but callers must not invoke it concurrently from multiple coroutines on the same ImageManager instance.

Parameters:

Name Type Description Default
operation Callable[[Image], Image]

Function that takes current image and returns new image

required

Raises:

Type Description
ToolExecutionError

If no image is loaded

Example

manager.transform_image(lambda img: zoom_image(img, 2.0))

Source code in src/gaze/tools/image_manager.py
@beartype
def transform_image(self, operation: Callable[[Image.Image], Image.Image]) -> None:
    """Apply a transformation to the current image with automatic cleanup.

    The previous ``_current_image`` is always closed after the operation
    since current and original are always independent copies.

    This method is synchronous and does **not** acquire ``_image_lock``.
    It is safe to call from the single-threaded asyncio event loop (the
    normal execution path via ``ToolRegistry.execute``), but callers must
    not invoke it concurrently from multiple coroutines on the same
    ``ImageManager`` instance.

    Args:
        operation: Function that takes current image and returns new image

    Raises:
        ToolExecutionError: If no image is loaded

    Example:
        manager.transform_image(lambda img: zoom_image(img, 2.0))
    """
    if self._current_image is None:
        raise ToolExecutionError("No image loaded to transform")

    old_image = self._current_image
    self._current_image = operation(old_image)
    self._modified = True
    # Safe to always close: current is never aliased to original.
    if old_image is not self._current_image:
        old_image.close()

set_preloaded_image

set_preloaded_image(
    image: Image,
    image_path: Path,
    *,
    transfer_ownership: bool = False,
) -> None

Set a pre-loaded PIL Image, avoiding a redundant disk read.

Parameters:

Name Type Description Default
image Image

Already-loaded PIL Image (must have pixel data in memory).

required
image_path Path

Path to the source file (stored for reset/logging).

required
transfer_ownership bool

If True, the manager takes ownership of image directly (used as _original_image without copying). The caller must not use or close image after this call. When False (default), image is copied so the caller retains ownership.

False

Raises:

Type Description
ToolExecutionError

If image_path contains traversal patterns.

Source code in src/gaze/tools/image_manager.py
@beartype
def set_preloaded_image(
    self,
    image: Image.Image,
    image_path: Path,
    *,
    transfer_ownership: bool = False,
) -> None:
    """Set a pre-loaded PIL Image, avoiding a redundant disk read.

    Args:
        image: Already-loaded PIL Image (must have pixel data in memory).
        image_path: Path to the source file (stored for reset/logging).
        transfer_ownership: If True, the manager takes ownership of
            *image* directly (used as ``_original_image`` without
            copying).  The caller **must not** use or close *image*
            after this call.  When False (default), *image* is copied
            so the caller retains ownership.

    Raises:
        ToolExecutionError: If image_path contains traversal patterns.
    """
    resolved = self._validate_image_path(image_path)
    self.close()
    if transfer_ownership:
        self._original_image = image
    else:
        self._original_image = image.copy()
    self._current_image = self._original_image.copy()
    self._image_path = resolved
    self._modified = False

reset_to_original

reset_to_original() -> None

Reset the current image to the originally loaded state.

Raises:

Type Description
ToolExecutionError

If no image is loaded

Source code in src/gaze/tools/image_manager.py
@beartype
def reset_to_original(self) -> None:
    """Reset the current image to the originally loaded state.

    Raises:
        ToolExecutionError: If no image is loaded
    """
    if self._original_image is None:
        raise ToolExecutionError("No original image to reset to")

    if self._current_image is self._original_image:
        return

    old_image = self._current_image
    self._current_image = self._original_image.copy()
    self._modified = False
    if old_image is not None:
        old_image.close()
    logger.debug("Reset image to original")

close

close() -> None

Close and release all image resources.

Source code in src/gaze/tools/image_manager.py
@beartype
def close(self) -> None:
    """Close and release all image resources."""
    if self._current_image is not None:
        self._current_image.close()
    self._current_image = None
    if self._original_image is not None:
        self._original_image.close()
    self._original_image = None
    self._image_path = None
    self._original_encoding = None
    self._modified = False

Tool dataclass

Tool definition for agentic processing.

Immutable: a tool is constructed once (e.g. in create_visual_tools) and never mutated, consistent with the frozen data types in gaze.types.

Source code in src/gaze/tools/tool.py
@beartype
@dataclass(frozen=True)
class Tool:
    """Tool definition for agentic processing.

    Immutable: a tool is constructed once (e.g. in ``create_visual_tools``)
    and never mutated, consistent with the frozen data types in ``gaze.types``.
    """

    name: str
    description: str
    parameters: dict[str, Any]
    execute: Callable[..., Awaitable[ToolResult]]
    requires_image: bool = False
    category: str | None = None
    prompt_documentation: str | None = None

    def get_prompt_documentation(self, *, compact: bool = False) -> str:
        """Generate documentation for prompt inclusion.

        Args:
            compact: If True, emit a single-line summary per tool to reduce
                token overhead for small-context models.

        Returns custom prompt_documentation if provided (unless compact),
        otherwise generates documentation from the tool's description and
        parameters.
        """
        if compact:
            params = ", ".join(
                f"{p}:{info.get('type', '?')}"
                for p, info in self.parameters.items()
                if "default" not in info
            )
            return f"- {self.name}({params}): {self.description}"

        if self.prompt_documentation:
            return self.prompt_documentation

        doc = f"**{self.name}**: {self.description}\n"
        if self.parameters:
            doc += "Parameters:\n"
            for param, info in self.parameters.items():
                required = "Required" if "default" not in info else "Optional"
                param_type = info.get("type", "unknown")
                param_desc = info.get("description", "")
                doc += f"- {param} ({param_type}, {required}): {param_desc}\n"
        return doc

get_prompt_documentation

get_prompt_documentation(*, compact: bool = False) -> str

Generate documentation for prompt inclusion.

Parameters:

Name Type Description Default
compact bool

If True, emit a single-line summary per tool to reduce token overhead for small-context models.

False

Returns custom prompt_documentation if provided (unless compact), otherwise generates documentation from the tool's description and parameters.

Source code in src/gaze/tools/tool.py
def get_prompt_documentation(self, *, compact: bool = False) -> str:
    """Generate documentation for prompt inclusion.

    Args:
        compact: If True, emit a single-line summary per tool to reduce
            token overhead for small-context models.

    Returns custom prompt_documentation if provided (unless compact),
    otherwise generates documentation from the tool's description and
    parameters.
    """
    if compact:
        params = ", ".join(
            f"{p}:{info.get('type', '?')}"
            for p, info in self.parameters.items()
            if "default" not in info
        )
        return f"- {self.name}({params}): {self.description}"

    if self.prompt_documentation:
        return self.prompt_documentation

    doc = f"**{self.name}**: {self.description}\n"
    if self.parameters:
        doc += "Parameters:\n"
        for param, info in self.parameters.items():
            required = "Required" if "default" not in info else "Optional"
            param_type = info.get("type", "unknown")
            param_desc = info.get("description", "")
            doc += f"- {param} ({param_type}, {required}): {param_desc}\n"
    return doc

EncodedImage dataclass

Container for encoded image data.

Source code in src/gaze/tools/registry.py
@dataclass(frozen=True)
class EncodedImage:
    """Container for encoded image data."""

    data: str
    mime_type: str
    _data_url: str = field(default="", repr=False, compare=False)

    def __post_init__(self) -> None:
        # Pre-compute the data URL once to avoid re-creating the ~500KB+
        # string on every call.  Uses object.__setattr__ because the
        # dataclass is frozen.
        object.__setattr__(self, "_data_url", f"data:{self.mime_type};base64,{self.data}")

    def to_data_url(self) -> str:
        """Convert to a data URL for embedding in HTML/messages."""
        return self._data_url

to_data_url

to_data_url() -> str

Convert to a data URL for embedding in HTML/messages.

Source code in src/gaze/tools/registry.py
def to_data_url(self) -> str:
    """Convert to a data URL for embedding in HTML/messages."""
    return self._data_url

ToolDocumenter

Generates tool schemas and documentation.

Handles: - OpenAI-compatible tool schema generation - Prompt documentation formatting - Tool categorization and filtering - Schema validation

Example

documenter = ToolDocumenter(tools=[zoom_tool, crop_tool]) schemas = documenter.get_tool_schemas() docs = documenter.generate_prompt_documentation()

Source code in src/gaze/tools/registry.py
@beartype
class ToolDocumenter:
    """Generates tool schemas and documentation.

    Handles:
    - OpenAI-compatible tool schema generation
    - Prompt documentation formatting
    - Tool categorization and filtering
    - Schema validation

    Example:
        documenter = ToolDocumenter(tools=[zoom_tool, crop_tool])
        schemas = documenter.get_tool_schemas()
        docs = documenter.generate_prompt_documentation()
    """

    @beartype
    def __init__(self, tools: list[Tool] | None = None) -> None:
        """Initialize tool documenter.

        Args:
            tools: List of tools to document. Can be empty and tools added later.
        """
        self._tools: dict[str, Tool] = {}
        self._schemas_cache: list[dict[str, Any]] | None = None
        for tool in tools or []:
            self.register(tool)

    @beartype
    def register(self, tool: Tool) -> None:
        """Register a tool for documentation.

        Args:
            tool: Tool to register
        """
        self._tools[tool.name] = tool
        self._schemas_cache = None  # Invalidate on change

    @beartype
    def get_tool(self, name: str) -> Tool | None:
        """Get a registered tool by name.

        Args:
            name: Tool name to look up

        Returns:
            Tool if found, None otherwise
        """
        return self._tools.get(name)

    @beartype
    def get_tool_names(self) -> list[str]:
        """Get list of all registered tool names.

        Returns:
            List of tool names
        """
        return list(self._tools.keys())

    @beartype
    def get_tool_schemas(self) -> list[dict[str, Any]]:
        """Get OpenAI-compatible tool schemas for all registered tools.

        Results are cached and invalidated when tools are registered.

        Returns:
            List of tool schemas in OpenAI function-calling format

        Raises:
            ValueError: If tool has invalid schema configuration
        """
        if self._schemas_cache is not None:
            return self._schemas_cache
        schemas: list[dict[str, Any]] = []
        for tool in self._tools.values():
            properties: dict[str, Any] = {}
            required_params: list[str] = []

            for param_name, param_def in tool.parameters.items():
                # Validate parameter type
                param_type = param_def.get("type")
                if not param_type:
                    raise ValueError(
                        f"Tool '{tool.name}' parameter '{param_name}' is missing required 'type'"
                    )
                if param_type not in _VALID_PARAM_TYPES:
                    raise ValueError(
                        f"Tool '{tool.name}' parameter "
                        f"'{param_name}' has invalid type "
                        f"'{param_type}'. Must be one of: "
                        f"{', '.join(sorted(_VALID_PARAM_TYPES))}"
                    )

                prop: dict[str, Any] = {"type": param_type}

                # Copy schema validation keywords
                for key in (
                    "description",
                    "enum",
                    "default",
                    "minimum",
                    "maximum",
                    "minItems",
                    "maxItems",
                    "pattern",
                    "format",
                ):
                    if key in param_def:
                        value = param_def[key]
                        # Skip "default": None — emitting {"type": "integer", "default": null}
                        # is invalid JSON Schema.  The parameter is already marked optional
                        # (not in required) via the "default" key presence check below.
                        if key == "default" and value is None:
                            continue
                        prop[key] = value

                # Handle array item types
                if param_def.get("type") == "array" and "items" in param_def:
                    prop["items"] = param_def["items"]

                # Mark as required if no default
                if "default" not in param_def:
                    required_params.append(param_name)

                properties[param_name] = prop

            schema: dict[str, Any] = {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": {
                        "type": "object",
                        "properties": properties,
                        "required": required_params,
                        "additionalProperties": False,
                    },
                },
            }
            schemas.append(schema)
        self._schemas_cache = schemas
        return schemas

    @beartype
    def _get_tools_by_category(self) -> dict[str, list[Tool]]:
        """Group registered tools by category.

        Returns:
            Dictionary mapping category names to lists of tools.
            Tools with no category are grouped under "other".
        """
        categories: dict[str, list[Tool]] = {}
        for tool in self._tools.values():
            category = tool.category or "other"
            if category not in categories:
                categories[category] = []
            categories[category].append(tool)
        return categories

    @beartype
    def generate_prompt_documentation(
        self,
        group_by_category: bool = True,
        include_categories: set[str] | None = None,
        exclude_categories: set[str] | None = None,
        compact: bool = False,
    ) -> str:
        """Generate prompt documentation for all registered tools.

        This creates formatted text suitable for inclusion in system prompts,
        documenting all available tools with their parameters and usage.

        Args:
            group_by_category: If True, group tools by category with headers
            include_categories: If set, only include tools from these categories
            exclude_categories: If set, exclude tools from these categories
            compact: If True, emit one-line-per-tool summaries to reduce token
                overhead for small-context models (<=8K).

        Returns:
            Formatted documentation string for system prompts
        """
        if not self._tools:
            return ""

        sections: list[str] = []

        if group_by_category:
            categories = self._get_tools_by_category()

            # Apply category filters
            if include_categories:
                categories = {k: v for k, v in categories.items() if k in include_categories}
            if exclude_categories:
                categories = {k: v for k, v in categories.items() if k not in exclude_categories}

            # Sort categories for consistent output
            for category in sorted(categories.keys()):
                tools = categories[category]
                if not tools:
                    continue

                # Category header
                category_title = category.replace("_", " ").title()
                sections.append(
                    f"**{category_title} Tools:**\n" if not compact else f"[{category_title}]"
                )

                # Tool documentation
                for tool in sorted(tools, key=lambda t: t.name):
                    sections.append(tool.get_prompt_documentation(compact=compact))
                    if not compact:
                        sections.append("")  # Blank line between tools
        else:
            # Flat list without categories
            tools = list(self._tools.values())

            # Apply category filters
            if include_categories:
                tools = [t for t in tools if (t.category or "other") in include_categories]
            if exclude_categories:
                tools = [t for t in tools if (t.category or "other") not in exclude_categories]

            for tool in sorted(tools, key=lambda t: t.name):
                sections.append(tool.get_prompt_documentation(compact=compact))
                if not compact:
                    sections.append("")

        return "\n".join(sections).strip()

__init__

__init__(tools: list[Tool] | None = None) -> None

Initialize tool documenter.

Parameters:

Name Type Description Default
tools list[Tool] | None

List of tools to document. Can be empty and tools added later.

None
Source code in src/gaze/tools/registry.py
@beartype
def __init__(self, tools: list[Tool] | None = None) -> None:
    """Initialize tool documenter.

    Args:
        tools: List of tools to document. Can be empty and tools added later.
    """
    self._tools: dict[str, Tool] = {}
    self._schemas_cache: list[dict[str, Any]] | None = None
    for tool in tools or []:
        self.register(tool)

register

register(tool: Tool) -> None

Register a tool for documentation.

Parameters:

Name Type Description Default
tool Tool

Tool to register

required
Source code in src/gaze/tools/registry.py
@beartype
def register(self, tool: Tool) -> None:
    """Register a tool for documentation.

    Args:
        tool: Tool to register
    """
    self._tools[tool.name] = tool
    self._schemas_cache = None  # Invalidate on change

get_tool

get_tool(name: str) -> Tool | None

Get a registered tool by name.

Parameters:

Name Type Description Default
name str

Tool name to look up

required

Returns:

Type Description
Tool | None

Tool if found, None otherwise

Source code in src/gaze/tools/registry.py
@beartype
def get_tool(self, name: str) -> Tool | None:
    """Get a registered tool by name.

    Args:
        name: Tool name to look up

    Returns:
        Tool if found, None otherwise
    """
    return self._tools.get(name)

get_tool_names

get_tool_names() -> list[str]

Get list of all registered tool names.

Returns:

Type Description
list[str]

List of tool names

Source code in src/gaze/tools/registry.py
@beartype
def get_tool_names(self) -> list[str]:
    """Get list of all registered tool names.

    Returns:
        List of tool names
    """
    return list(self._tools.keys())

get_tool_schemas

get_tool_schemas() -> list[dict[str, Any]]

Get OpenAI-compatible tool schemas for all registered tools.

Results are cached and invalidated when tools are registered.

Returns:

Type Description
list[dict[str, Any]]

List of tool schemas in OpenAI function-calling format

Raises:

Type Description
ValueError

If tool has invalid schema configuration

Source code in src/gaze/tools/registry.py
@beartype
def get_tool_schemas(self) -> list[dict[str, Any]]:
    """Get OpenAI-compatible tool schemas for all registered tools.

    Results are cached and invalidated when tools are registered.

    Returns:
        List of tool schemas in OpenAI function-calling format

    Raises:
        ValueError: If tool has invalid schema configuration
    """
    if self._schemas_cache is not None:
        return self._schemas_cache
    schemas: list[dict[str, Any]] = []
    for tool in self._tools.values():
        properties: dict[str, Any] = {}
        required_params: list[str] = []

        for param_name, param_def in tool.parameters.items():
            # Validate parameter type
            param_type = param_def.get("type")
            if not param_type:
                raise ValueError(
                    f"Tool '{tool.name}' parameter '{param_name}' is missing required 'type'"
                )
            if param_type not in _VALID_PARAM_TYPES:
                raise ValueError(
                    f"Tool '{tool.name}' parameter "
                    f"'{param_name}' has invalid type "
                    f"'{param_type}'. Must be one of: "
                    f"{', '.join(sorted(_VALID_PARAM_TYPES))}"
                )

            prop: dict[str, Any] = {"type": param_type}

            # Copy schema validation keywords
            for key in (
                "description",
                "enum",
                "default",
                "minimum",
                "maximum",
                "minItems",
                "maxItems",
                "pattern",
                "format",
            ):
                if key in param_def:
                    value = param_def[key]
                    # Skip "default": None — emitting {"type": "integer", "default": null}
                    # is invalid JSON Schema.  The parameter is already marked optional
                    # (not in required) via the "default" key presence check below.
                    if key == "default" and value is None:
                        continue
                    prop[key] = value

            # Handle array item types
            if param_def.get("type") == "array" and "items" in param_def:
                prop["items"] = param_def["items"]

            # Mark as required if no default
            if "default" not in param_def:
                required_params.append(param_name)

            properties[param_name] = prop

        schema: dict[str, Any] = {
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": {
                    "type": "object",
                    "properties": properties,
                    "required": required_params,
                    "additionalProperties": False,
                },
            },
        }
        schemas.append(schema)
    self._schemas_cache = schemas
    return schemas

generate_prompt_documentation

generate_prompt_documentation(
    group_by_category: bool = True,
    include_categories: set[str] | None = None,
    exclude_categories: set[str] | None = None,
    compact: bool = False,
) -> str

Generate prompt documentation for all registered tools.

This creates formatted text suitable for inclusion in system prompts, documenting all available tools with their parameters and usage.

Parameters:

Name Type Description Default
group_by_category bool

If True, group tools by category with headers

True
include_categories set[str] | None

If set, only include tools from these categories

None
exclude_categories set[str] | None

If set, exclude tools from these categories

None
compact bool

If True, emit one-line-per-tool summaries to reduce token overhead for small-context models (<=8K).

False

Returns:

Type Description
str

Formatted documentation string for system prompts

Source code in src/gaze/tools/registry.py
@beartype
def generate_prompt_documentation(
    self,
    group_by_category: bool = True,
    include_categories: set[str] | None = None,
    exclude_categories: set[str] | None = None,
    compact: bool = False,
) -> str:
    """Generate prompt documentation for all registered tools.

    This creates formatted text suitable for inclusion in system prompts,
    documenting all available tools with their parameters and usage.

    Args:
        group_by_category: If True, group tools by category with headers
        include_categories: If set, only include tools from these categories
        exclude_categories: If set, exclude tools from these categories
        compact: If True, emit one-line-per-tool summaries to reduce token
            overhead for small-context models (<=8K).

    Returns:
        Formatted documentation string for system prompts
    """
    if not self._tools:
        return ""

    sections: list[str] = []

    if group_by_category:
        categories = self._get_tools_by_category()

        # Apply category filters
        if include_categories:
            categories = {k: v for k, v in categories.items() if k in include_categories}
        if exclude_categories:
            categories = {k: v for k, v in categories.items() if k not in exclude_categories}

        # Sort categories for consistent output
        for category in sorted(categories.keys()):
            tools = categories[category]
            if not tools:
                continue

            # Category header
            category_title = category.replace("_", " ").title()
            sections.append(
                f"**{category_title} Tools:**\n" if not compact else f"[{category_title}]"
            )

            # Tool documentation
            for tool in sorted(tools, key=lambda t: t.name):
                sections.append(tool.get_prompt_documentation(compact=compact))
                if not compact:
                    sections.append("")  # Blank line between tools
    else:
        # Flat list without categories
        tools = list(self._tools.values())

        # Apply category filters
        if include_categories:
            tools = [t for t in tools if (t.category or "other") in include_categories]
        if exclude_categories:
            tools = [t for t in tools if (t.category or "other") not in exclude_categories]

        for tool in sorted(tools, key=lambda t: t.name):
            sections.append(tool.get_prompt_documentation(compact=compact))
            if not compact:
                sections.append("")

    return "\n".join(sections).strip()

ToolRegistry

Refactored tool registry with separated responsibilities.

This implementation delegates to specialized managers: - ImageManager: Handles image loading and transformation - ToolDocumenter: Handles schema generation and documentation

Architecture

ToolRegistry ├── ImageManager (image loading, transformation, state) ├── ToolDocumenter (schemas, documentation, validation) └── Tool execution (actual tool calling)

Source code in src/gaze/tools/registry.py
class ToolRegistry:
    """Refactored tool registry with separated responsibilities.

    This implementation delegates to specialized managers:
    - ImageManager: Handles image loading and transformation
    - ToolDocumenter: Handles schema generation and documentation

    Architecture:
        ToolRegistry
        ├── ImageManager (image loading, transformation, state)
        ├── ToolDocumenter (schemas, documentation, validation)
        └── Tool execution (actual tool calling)
    """

    @beartype
    def __init__(
        self,
        image_path: Path | None = None,
        tools: list[Tool] | None = None,
        max_history: int = 100,
        web_search_manager: Any | None = None,
        image_search_manager: Any | None = None,
    ) -> None:
        """Initialize refactored tool registry."""
        # Initialize specialized managers
        self._image_manager = ImageManager()
        self._documenter = ToolDocumenter(tools)

        # Tool execution history
        self._tool_history: deque[ToolResult] = deque(maxlen=max_history)
        self.max_history = max_history

        # Lazily-created search managers — reused across tool calls within a
        # single agentic session to keep TCP connections alive.
        self._web_search_manager: WebSearchManager | None = web_search_manager
        self._image_search_manager: MedicalImageSearchManager | None = image_search_manager
        self._owns_web_search_manager = web_search_manager is None
        self._owns_image_search_manager = image_search_manager is None

        # Set initial image if provided
        if image_path:
            self._image_manager.set_image(image_path)

    def __enter__(self) -> ToolRegistry:
        """Sync context manager entry."""
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Sync context manager exit with cleanup."""
        self.close()

    @beartype
    def close(self) -> None:
        """Close and clean up synchronous resources.

        Prefer :meth:`aclose` from async contexts so that search manager
        sessions are properly awaited.
        """
        self._image_manager.close()
        self._tool_history.clear()

    async def aclose(self) -> None:
        """Close all resources including async search manager sessions."""
        close_tasks: list[asyncio.Task[None]] = []
        if self._web_search_manager is not None and self._owns_web_search_manager:
            close_tasks.append(asyncio.ensure_future(self._web_search_manager.close()))
        if self._image_search_manager is not None and self._owns_image_search_manager:
            close_tasks.append(asyncio.ensure_future(self._image_search_manager.close()))
        self._web_search_manager = None
        self._image_search_manager = None
        if close_tasks:
            await asyncio.gather(*close_tasks)
        self.close()

    def get_web_search_manager(self) -> WebSearchManager:
        """Get or create a reusable WebSearchManager for the session."""
        if self._web_search_manager is None:
            from gaze.retrieval.web_search import WebSearchManager

            self._web_search_manager = WebSearchManager()
        return self._web_search_manager

    def get_image_search_manager(self) -> MedicalImageSearchManager:
        """Get or create a reusable MedicalImageSearchManager for the session."""
        if self._image_search_manager is None:
            from gaze.retrieval.image_search import MedicalImageSearchManager

            self._image_search_manager = MedicalImageSearchManager()
        return self._image_search_manager

    @beartype
    def register(self, tool: Tool) -> None:
        """Register a tool in the registry."""
        self._documenter.register(tool)

    @beartype
    def get_tool_schemas(self) -> list[dict[str, Any]]:
        """Get OpenAI-compatible tool schemas for all registered tools."""
        return self._documenter.get_tool_schemas()

    @beartype
    async def execute(self, tool_name: str, **kwargs: Any) -> ToolResult:
        """Execute a tool by name with given arguments.

        Raises:
            UnknownToolError: If ``tool_name`` is not registered.
            ToolExecutionError: If the tool raises a ``ToolExecutionError`` or
                if the call fails due to invalid argument types (``TypeError``).
        """
        tool = self._documenter.get_tool(tool_name)
        if tool is None:
            raise UnknownToolError(tool_name, self._documenter.get_tool_names())

        if tool.requires_image:
            await self._image_manager.ensure_loaded()
            if not self._image_manager.has_image:
                raise ToolExecutionError(
                    f"Tool '{tool_name}' requires an image, but no image path was provided"
                )

        # Coerce JSON-native types to match Python type hints that beartype enforces.
        # JSON deserializes 2 as int, but beartype rejects int for float hints;
        # similarly some models send 50.0 for integer-declared params.
        for param_name, param_info in tool.parameters.items():
            if param_name not in kwargs:
                continue
            val = kwargs[param_name]
            param_type = param_info.get("type")

            # int → float for "number" params
            if param_type == "number":
                if isinstance(val, int) and not isinstance(val, bool):
                    kwargs[param_name] = float(val)

            # float → int for "integer" params (only if lossless)
            elif param_type == "integer":
                if isinstance(val, float) and not isinstance(val, bool) and val == int(val):
                    kwargs[param_name] = int(val)

            # Coerce elements inside "array" params whose items are "number" or "integer"
            elif param_type == "array" and isinstance(val, list):
                elements = cast("list[Any]", val)
                items_type = param_info.get("items", {}).get("type")
                if items_type == "number":
                    kwargs[param_name] = [
                        float(v) if isinstance(v, int) and not isinstance(v, bool) else v
                        for v in elements
                    ]
                elif items_type == "integer":
                    kwargs[param_name] = [
                        int(v)
                        if isinstance(v, float) and not isinstance(v, bool) and v == int(v)
                        else v
                        for v in elements
                    ]

        try:
            result = await tool.execute(self, **kwargs)
        except (TypeError, BeartypeException) as e:
            raise ToolExecutionError(f"Tool '{tool_name}' received invalid arguments: {e}") from e

        self._tool_history.append(result)
        return result

    @property
    def history(self) -> list[ToolResult]:
        """Get the history of tool executions."""
        return list(self._tool_history)

    @beartype
    def get_image_manager(self) -> ImageManager:
        """Get the image manager instance."""
        return self._image_manager

    @beartype
    def get_documenter(self) -> ToolDocumenter:
        """Get the tool documenter instance."""
        return self._documenter

history property

history: list[ToolResult]

Get the history of tool executions.

__init__

__init__(
    image_path: Path | None = None,
    tools: list[Tool] | None = None,
    max_history: int = 100,
    web_search_manager: Any | None = None,
    image_search_manager: Any | None = None,
) -> None

Initialize refactored tool registry.

Source code in src/gaze/tools/registry.py
@beartype
def __init__(
    self,
    image_path: Path | None = None,
    tools: list[Tool] | None = None,
    max_history: int = 100,
    web_search_manager: Any | None = None,
    image_search_manager: Any | None = None,
) -> None:
    """Initialize refactored tool registry."""
    # Initialize specialized managers
    self._image_manager = ImageManager()
    self._documenter = ToolDocumenter(tools)

    # Tool execution history
    self._tool_history: deque[ToolResult] = deque(maxlen=max_history)
    self.max_history = max_history

    # Lazily-created search managers — reused across tool calls within a
    # single agentic session to keep TCP connections alive.
    self._web_search_manager: WebSearchManager | None = web_search_manager
    self._image_search_manager: MedicalImageSearchManager | None = image_search_manager
    self._owns_web_search_manager = web_search_manager is None
    self._owns_image_search_manager = image_search_manager is None

    # Set initial image if provided
    if image_path:
        self._image_manager.set_image(image_path)

__enter__

__enter__() -> ToolRegistry

Sync context manager entry.

Source code in src/gaze/tools/registry.py
def __enter__(self) -> ToolRegistry:
    """Sync context manager entry."""
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Sync context manager exit with cleanup.

Source code in src/gaze/tools/registry.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Sync context manager exit with cleanup."""
    self.close()

close

close() -> None

Close and clean up synchronous resources.

Prefer :meth:aclose from async contexts so that search manager sessions are properly awaited.

Source code in src/gaze/tools/registry.py
@beartype
def close(self) -> None:
    """Close and clean up synchronous resources.

    Prefer :meth:`aclose` from async contexts so that search manager
    sessions are properly awaited.
    """
    self._image_manager.close()
    self._tool_history.clear()

aclose async

aclose() -> None

Close all resources including async search manager sessions.

Source code in src/gaze/tools/registry.py
async def aclose(self) -> None:
    """Close all resources including async search manager sessions."""
    close_tasks: list[asyncio.Task[None]] = []
    if self._web_search_manager is not None and self._owns_web_search_manager:
        close_tasks.append(asyncio.ensure_future(self._web_search_manager.close()))
    if self._image_search_manager is not None and self._owns_image_search_manager:
        close_tasks.append(asyncio.ensure_future(self._image_search_manager.close()))
    self._web_search_manager = None
    self._image_search_manager = None
    if close_tasks:
        await asyncio.gather(*close_tasks)
    self.close()

get_web_search_manager

get_web_search_manager() -> WebSearchManager

Get or create a reusable WebSearchManager for the session.

Source code in src/gaze/tools/registry.py
def get_web_search_manager(self) -> WebSearchManager:
    """Get or create a reusable WebSearchManager for the session."""
    if self._web_search_manager is None:
        from gaze.retrieval.web_search import WebSearchManager

        self._web_search_manager = WebSearchManager()
    return self._web_search_manager

get_image_search_manager

get_image_search_manager() -> MedicalImageSearchManager

Get or create a reusable MedicalImageSearchManager for the session.

Source code in src/gaze/tools/registry.py
def get_image_search_manager(self) -> MedicalImageSearchManager:
    """Get or create a reusable MedicalImageSearchManager for the session."""
    if self._image_search_manager is None:
        from gaze.retrieval.image_search import MedicalImageSearchManager

        self._image_search_manager = MedicalImageSearchManager()
    return self._image_search_manager

register

register(tool: Tool) -> None

Register a tool in the registry.

Source code in src/gaze/tools/registry.py
@beartype
def register(self, tool: Tool) -> None:
    """Register a tool in the registry."""
    self._documenter.register(tool)

get_tool_schemas

get_tool_schemas() -> list[dict[str, Any]]

Get OpenAI-compatible tool schemas for all registered tools.

Source code in src/gaze/tools/registry.py
@beartype
def get_tool_schemas(self) -> list[dict[str, Any]]:
    """Get OpenAI-compatible tool schemas for all registered tools."""
    return self._documenter.get_tool_schemas()

execute async

execute(tool_name: str, **kwargs: Any) -> ToolResult

Execute a tool by name with given arguments.

Raises:

Type Description
UnknownToolError

If tool_name is not registered.

ToolExecutionError

If the tool raises a ToolExecutionError or if the call fails due to invalid argument types (TypeError).

Source code in src/gaze/tools/registry.py
@beartype
async def execute(self, tool_name: str, **kwargs: Any) -> ToolResult:
    """Execute a tool by name with given arguments.

    Raises:
        UnknownToolError: If ``tool_name`` is not registered.
        ToolExecutionError: If the tool raises a ``ToolExecutionError`` or
            if the call fails due to invalid argument types (``TypeError``).
    """
    tool = self._documenter.get_tool(tool_name)
    if tool is None:
        raise UnknownToolError(tool_name, self._documenter.get_tool_names())

    if tool.requires_image:
        await self._image_manager.ensure_loaded()
        if not self._image_manager.has_image:
            raise ToolExecutionError(
                f"Tool '{tool_name}' requires an image, but no image path was provided"
            )

    # Coerce JSON-native types to match Python type hints that beartype enforces.
    # JSON deserializes 2 as int, but beartype rejects int for float hints;
    # similarly some models send 50.0 for integer-declared params.
    for param_name, param_info in tool.parameters.items():
        if param_name not in kwargs:
            continue
        val = kwargs[param_name]
        param_type = param_info.get("type")

        # int → float for "number" params
        if param_type == "number":
            if isinstance(val, int) and not isinstance(val, bool):
                kwargs[param_name] = float(val)

        # float → int for "integer" params (only if lossless)
        elif param_type == "integer":
            if isinstance(val, float) and not isinstance(val, bool) and val == int(val):
                kwargs[param_name] = int(val)

        # Coerce elements inside "array" params whose items are "number" or "integer"
        elif param_type == "array" and isinstance(val, list):
            elements = cast("list[Any]", val)
            items_type = param_info.get("items", {}).get("type")
            if items_type == "number":
                kwargs[param_name] = [
                    float(v) if isinstance(v, int) and not isinstance(v, bool) else v
                    for v in elements
                ]
            elif items_type == "integer":
                kwargs[param_name] = [
                    int(v)
                    if isinstance(v, float) and not isinstance(v, bool) and v == int(v)
                    else v
                    for v in elements
                ]

    try:
        result = await tool.execute(self, **kwargs)
    except (TypeError, BeartypeException) as e:
        raise ToolExecutionError(f"Tool '{tool_name}' received invalid arguments: {e}") from e

    self._tool_history.append(result)
    return result

get_image_manager

get_image_manager() -> ImageManager

Get the image manager instance.

Source code in src/gaze/tools/registry.py
@beartype
def get_image_manager(self) -> ImageManager:
    """Get the image manager instance."""
    return self._image_manager

get_documenter

get_documenter() -> ToolDocumenter

Get the tool documenter instance.

Source code in src/gaze/tools/registry.py
@beartype
def get_documenter(self) -> ToolDocumenter:
    """Get the tool documenter instance."""
    return self._documenter

encode_image

encode_image(
    image: Image,
    *,
    format: str = "JPEG",
    quality: int | None = None,
) -> EncodedImage

Encode a PIL Image to a base64 string.

Parameters:

Name Type Description Default
image Image

PIL Image to encode.

required
format str

Image format — "JPEG" (default, much smaller) or "PNG".

'JPEG'
quality int | None

JPEG quality 1-100. Ignored for PNG. When None, uses ImageProcessingConfig.default_jpeg_quality (default 85).

None

Returns:

Type Description
EncodedImage

EncodedImage with base64 data and correct MIME type.

Source code in src/gaze/tools/registry.py
@beartype
def encode_image(
    image: Image.Image,
    *,
    format: str = "JPEG",
    quality: int | None = None,
) -> EncodedImage:
    """Encode a PIL Image to a base64 string.

    Args:
        image: PIL Image to encode.
        format: Image format — ``"JPEG"`` (default, much smaller) or ``"PNG"``.
        quality: JPEG quality 1-100. Ignored for PNG. When *None*, uses
            ``ImageProcessingConfig.default_jpeg_quality`` (default 85).

    Returns:
        EncodedImage with base64 data and correct MIME type.
    """
    from gaze.config import get_config

    fmt = format.upper()
    if fmt not in _SUPPORTED_IMAGE_FORMATS:
        raise ValueError(f"Unsupported image format: {format!r}. Use 'JPEG' or 'PNG'.")

    # JPEG only supports RGB and L modes.  Medical images may use I (32-bit
    # int), I;16 (16-bit int from DICOM-converted PNGs), or F (float32).
    # Alpha modes (RGBA, LA, PA) and palette mode (P) also need conversion.
    if fmt == "JPEG" and image.mode not in _JPEG_SAFE_MODES:
        image = image.convert("RGB")

    # PNG cannot save mode F (float32).  Convert to L for lossless grayscale.
    if fmt == "PNG" and image.mode in _PNG_UNSAFE_MODES:
        image = image.convert("L")

    buffer = BytesIO()
    if fmt == "PNG":
        image.save(buffer, format="PNG")
        mime = "image/png"
    else:
        q = quality if quality is not None else get_config().image.default_jpeg_quality
        image.save(buffer, format="JPEG", quality=q)
        mime = "image/jpeg"

    image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
    return EncodedImage(data=image_base64, mime_type=mime)