Skip to content

Config#

Snowflake connection resolution, credential deployment, OAuth flow, and token proxy.

Connection Resolver#

inspect_coco.config.connection #

Snowflake connection resolution from existing TOML configuration files.

SnowflakeConnectionConfig(account, user, host, role=None, warehouse=None, database=None, schema=None, private_key_path=None, token=None, oauth_access_token=None) dataclass #

Resolved Snowflake connection configuration.

ConnectionResolutionError #

Bases: Exception

Raised when a Snowflake connection cannot be resolved.

snowflake_home() #

Return the Snowflake config directory, honouring SNOWFLAKE_HOME.

Source code in src/inspect_coco/config/connection.py
def snowflake_home() -> Path:
    """Return the Snowflake config directory, honouring SNOWFLAKE_HOME."""
    home = os.environ.get("SNOWFLAKE_HOME", "")
    if home:
        return Path(home)
    return Path.home() / ".snowflake"

resolve_connection(connection_name=None) #

Resolve a Snowflake connection from existing TOML files.

Resolution order for connection name
  1. Explicit connection_name parameter
  2. INSPECT_COCO_SNOWFLAKE_CONNECTION environment variable
  3. default_connection_name field in TOML file
  4. Fallback to "default"

File lookup order (using SNOWFLAKE_HOME or ~/.snowflake): 1. connections.toml (newer format) — top-level [connection_name] 2. config.toml (older format) — nested [connections.connection_name]

Source code in src/inspect_coco/config/connection.py
def resolve_connection(connection_name: str | None = None) -> SnowflakeConnectionConfig:
    """Resolve a Snowflake connection from existing TOML files.

    Resolution order for connection name:
        1. Explicit connection_name parameter
        2. INSPECT_COCO_SNOWFLAKE_CONNECTION environment variable
        3. default_connection_name field in TOML file
        4. Fallback to "default"

    File lookup order (using SNOWFLAKE_HOME or ~/.snowflake):
        1. connections.toml (newer format) — top-level [connection_name]
        2. config.toml (older format) — nested [connections.connection_name]
    """
    sf_home = snowflake_home()
    name = connection_name or os.environ.get("INSPECT_COCO_SNOWFLAKE_CONNECTION", "")

    # Try connections.toml first (newer format)
    connections_path = sf_home / "connections.toml"
    if connections_path.exists():
        data = _load_toml(connections_path)
        resolved_name = name or data.get("default_connection_name", "default")
        if resolved_name in data:
            return _parse_connection_section(data[resolved_name], resolved_name)
        # Connection not found - collect available names for error
        available = [
            k for k in data if k != "default_connection_name" and isinstance(data[k], dict)
        ]
        raise ConnectionResolutionError(
            f"Connection '{resolved_name}' not found in {connections_path}.\n"
            f"Available connections: {', '.join(available)}\n"
            f"Fix: set INSPECT_COCO_SNOWFLAKE_CONNECTION in your .env file to one of the above."
        )

    # Fall back to config.toml (older format)
    config_path = sf_home / "config.toml"
    if config_path.exists():
        data = _load_toml(config_path)
        resolved_name = name or data.get("default_connection_name", "default")
        connections = data.get("connections", {})
        if resolved_name in connections:
            return _parse_connection_section(connections[resolved_name], resolved_name)
        # Connection not found
        available = list(connections.keys())
        raise ConnectionResolutionError(
            f"Connection '{resolved_name}' not found in {config_path}.\n"
            f"Available connections: {', '.join(available)}\n"
            f"Fix: set INSPECT_COCO_SNOWFLAKE_CONNECTION in your .env file to one of the above."
        )

    raise ConnectionResolutionError(
        f"No Snowflake connection found. "
        f"Looked for connections in: {connections_path}, {config_path}. "
        f"Ensure a valid connection exists in your Snowflake config directory "
        f"({sf_home})."
    )

OAuth#

inspect_coco.config.oauth #

Snowflake Local OAuth flow using SNOWFLAKE$LOCAL_APPLICATION.

Implements the OAuth Authorization Code flow with PKCE for local development. The host machine handles the browser interaction and token management. Containers receive only short-lived access tokens via the token proxy.

OAuthTokens(access_token, refresh_token, expires_at, account, role=None) dataclass #

Cached OAuth tokens.

PkceCodes(verifier, challenge) dataclass #

PKCE verifier and challenge pair.

OAuthError #

Bases: Exception

Raised when OAuth authorization fails.

generate_pkce() #

Generate a PKCE code verifier and S256 challenge.

Source code in src/inspect_coco/config/oauth.py
def generate_pkce() -> PkceCodes:
    """Generate a PKCE code verifier and S256 challenge."""
    verifier = secrets.token_urlsafe(64)[:64]
    digest = hashlib.sha256(verifier.encode("ascii")).digest()
    challenge = urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
    return PkceCodes(verifier=verifier, challenge=challenge)

load_cached_tokens(account=None) #

Load cached tokens from OS keyring (preferred) or file fallback.

Source code in src/inspect_coco/config/oauth.py
def load_cached_tokens(account: str | None = None) -> OAuthTokens | None:
    """Load cached tokens from OS keyring (preferred) or file fallback."""
    if account:
        account = _normalize_account(account)
    if _use_keyring():
        return _load_from_keyring(account)
    tokens = _load_from_file()
    if tokens and account and tokens.account != account:
        return None
    return tokens

save_cached_tokens(tokens) #

Save tokens to OS keyring (preferred) or file fallback.

Source code in src/inspect_coco/config/oauth.py
def save_cached_tokens(tokens: OAuthTokens) -> None:
    """Save tokens to OS keyring (preferred) or file fallback."""
    if _use_keyring():
        _save_to_keyring(tokens)
    else:
        logger.warning("Keyring unavailable, falling back to file-based token cache")
        _save_to_file(tokens)

clear_cached_tokens(account=None) #

Remove cached tokens. Returns True if tokens were found and removed.

Source code in src/inspect_coco/config/oauth.py
def clear_cached_tokens(account: str | None = None) -> bool:
    """Remove cached tokens. Returns True if tokens were found and removed."""
    if _use_keyring():
        return _clear_from_keyring(account)
    return _clear_from_file()

exchange_code_for_tokens(account, code, pkce, redirect_uri) #

Exchange an authorization code for access and refresh tokens.

Source code in src/inspect_coco/config/oauth.py
def exchange_code_for_tokens(account: str, code: str, pkce: PkceCodes, redirect_uri: str) -> dict:
    """Exchange an authorization code for access and refresh tokens."""
    url = f"https://{account}.snowflakecomputing.com/oauth/token-request"
    response = requests.post(
        url,
        data={
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": redirect_uri,
            "client_id": OAUTH_CLIENT_ID,
            "code_verifier": pkce.verifier,
        },
        auth=(OAUTH_CLIENT_ID, OAUTH_CLIENT_ID),
        headers={"Accept": "application/json"},
        timeout=30,
    )

    if not response.ok:
        raise OAuthError(f"Token exchange failed ({response.status_code}): {response.text}")

    data = response.json()
    if "access_token" not in data:
        raise OAuthError("Token response missing access_token")
    if "refresh_token" not in data:
        raise OAuthError(
            "Token response missing refresh_token. "
            "Ensure the SNOWFLAKE$LOCAL_APPLICATION integration issues refresh tokens."
        )
    return data

refresh_access_token(account, refresh_token) #

Use a refresh token to obtain a new access token.

Source code in src/inspect_coco/config/oauth.py
def refresh_access_token(account: str, refresh_token: str) -> dict:
    """Use a refresh token to obtain a new access token."""
    url = f"https://{account}.snowflakecomputing.com/oauth/token-request"
    response = requests.post(
        url,
        data={
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "client_id": OAUTH_CLIENT_ID,
        },
        auth=(OAUTH_CLIENT_ID, OAUTH_CLIENT_ID),
        headers={"Accept": "application/json"},
        timeout=30,
    )

    if not response.ok:
        raise OAuthError(f"Token refresh failed ({response.status_code}): {response.text}")

    data = response.json()
    if "access_token" not in data:
        raise OAuthError("Refresh response missing access_token")
    return data

get_valid_token(tokens) #

Return tokens with a valid (non-expired) access token, refreshing if needed.

Source code in src/inspect_coco/config/oauth.py
def get_valid_token(tokens: OAuthTokens) -> OAuthTokens:
    """Return tokens with a valid (non-expired) access token, refreshing if needed."""
    if not tokens.is_expired:
        return tokens

    logger.info("Access token expired, refreshing...")
    data = refresh_access_token(tokens.account, tokens.refresh_token)

    updated = OAuthTokens(
        access_token=data["access_token"],
        refresh_token=data.get("refresh_token", tokens.refresh_token),
        expires_at=time.time() + data.get("expires_in", 600),
        account=tokens.account,
        role=tokens.role,
    )
    save_cached_tokens(updated)
    return updated

authorize(account, role=None) #

Run the full OAuth authorization code flow with PKCE.

Opens the user's browser to Snowflake's authorize endpoint, captures the callback on localhost, exchanges the code for tokens.

Parameters:

Name Type Description Default
account str

Snowflake account identifier (e.g., "myorg-myaccount").

required
role str | None

Optional Snowflake role to request.

None

Returns:

Type Description
OAuthTokens

OAuthTokens with access and refresh tokens.

Raises:

Type Description
OAuthError

If authorization fails or times out.

Source code in src/inspect_coco/config/oauth.py
def authorize(account: str, role: str | None = None) -> OAuthTokens:
    """Run the full OAuth authorization code flow with PKCE.

    Opens the user's browser to Snowflake's authorize endpoint,
    captures the callback on localhost, exchanges the code for tokens.

    Args:
        account: Snowflake account identifier (e.g., "myorg-myaccount").
        role: Optional Snowflake role to request.

    Returns:
        OAuthTokens with access and refresh tokens.

    Raises:
        OAuthError: If authorization fails or times out.
    """
    account = _normalize_account(account)
    pkce = generate_pkce()
    state = secrets.token_urlsafe(48)

    server = _OAuthCallbackServer()
    server.expected_state = state

    url = _build_authorize_url(account, role, state, pkce, server.redirect_uri)

    # Start callback server in background thread
    thread = Thread(target=server.serve_forever, daemon=True)
    thread.start()

    try:
        logger.info("Opening browser for Snowflake authentication...")
        webbrowser.open(url)

        # Wait for callback
        if not server.done.wait(timeout=OAUTH_TIMEOUT_SEC):
            raise OAuthError(
                f"OAuth callback timed out after {OAUTH_TIMEOUT_SEC}s. "
                "Did you complete the login in your browser?"
            )

        if server.error:
            raise server.error

        if not server.auth_code:
            raise OAuthError("No authorization code received")

        # Exchange code for tokens
        token_data = exchange_code_for_tokens(account, server.auth_code, pkce, server.redirect_uri)

        tokens = OAuthTokens(
            access_token=token_data["access_token"],
            refresh_token=token_data["refresh_token"],
            expires_at=time.time() + token_data.get("expires_in", 600),
            account=account,
            role=role,
        )

        save_cached_tokens(tokens)
        logger.info("OAuth tokens obtained and cached successfully")
        return tokens

    finally:
        server.shutdown()
        thread.join(timeout=2)

Credential Deployer#

inspect_coco.config.deployer #

Deploy Snowflake credentials into a Docker container.

SandboxExec #

Bases: Protocol

Protocol for executing commands in a sandbox container.

ExecResult(returncode=0, stdout='', stderr='') #

Minimal exec result for type compatibility.

Source code in src/inspect_coco/config/deployer.py
def __init__(self, returncode: int = 0, stdout: str = "", stderr: str = ""):
    self.returncode = returncode
    self.stdout = stdout
    self.stderr = stderr

deploy_credentials(config, exec_fn, connection_name='default') async #

Deploy Snowflake credentials into a Docker sandbox container.

Performs
  1. Creates ~/.snowflake/cortex directory structure
  2. Deploys private key (JWT) via base64 transport + chmod 0600
  3. Generates config.toml with correct authenticator
  4. Generates cortex settings.json
  5. Returns env vars dict for subsequent exec() calls

Parameters:

Name Type Description Default
config SnowflakeConnectionConfig

Resolved Snowflake connection configuration.

required
exec_fn SandboxExec

Callable to execute commands in the sandbox.

required
connection_name str

Name for the connection entry in config.toml.

'default'

Returns:

Type Description
dict[str, str]

Dict of environment variables to pass to subsequent exec() calls.

Source code in src/inspect_coco/config/deployer.py
async def deploy_credentials(
    config: SnowflakeConnectionConfig,
    exec_fn: SandboxExec,
    connection_name: str = "default",
) -> dict[str, str]:
    """Deploy Snowflake credentials into a Docker sandbox container.

    Performs:
        1. Creates ~/.snowflake/cortex directory structure
        2. Deploys private key (JWT) via base64 transport + chmod 0600
        3. Generates config.toml with correct authenticator
        4. Generates cortex settings.json
        5. Returns env vars dict for subsequent exec() calls

    Args:
        config: Resolved Snowflake connection configuration.
        exec_fn: Callable to execute commands in the sandbox.
        connection_name: Name for the connection entry in config.toml.

    Returns:
        Dict of environment variables to pass to subsequent exec() calls.
    """
    # 1. Create directory structure
    await exec_fn(cmd=["mkdir", "-p", "/root/.snowflake/cortex"])

    # 2. Deploy private key if JWT auth
    deployed_key_path: str | None = None
    if config.private_key_path:
        pem_content = normalize_pem(open(config.private_key_path).read())
        deployed_key_path = remote_key_path(connection_name, pem_content)
        payload = pem_to_base64_payload(config.private_key_path)

        await exec_fn(
            cmd=[
                "bash",
                "-c",
                f"echo '{payload}' | base64 -d > {deployed_key_path} && chmod 0600 {deployed_key_path}",
            ]
        )

    # 3. Generate connections.toml (newer format)
    connections_toml = _generate_connections_toml(config, connection_name, deployed_key_path)
    await exec_fn(
        cmd=[
            "bash",
            "-c",
            f"cat > /root/.snowflake/connections.toml << 'TOMLEOF'\n{connections_toml}\nTOMLEOF",
        ]
    )
    # Set required 0600 permissions (Snowflake CLI requirement)
    await exec_fn(cmd=["chmod", "0600", "/root/.snowflake/connections.toml"])

    # 4. Generate cortex settings.json
    settings = json.dumps(
        {"cortexAgentConnectionName": connection_name, "autoUpdate": False},
        indent=2,
    )
    await exec_fn(
        cmd=[
            "bash",
            "-c",
            f"cat > /root/.snowflake/cortex/settings.json << 'JSONEOF'\n{settings}\nJSONEOF",
        ]
    )

    # 5. Return env vars for exec calls
    return _build_env_vars(config, connection_name, deployed_key_path)

Token Proxy#

inspect_coco.proxy.server #

OAuth token proxy server (host-process mode).

A lightweight HTTP server that runs as a thread in the inspect-coco process, serves short-lived access tokens to Docker sandbox containers via extra_hosts (host-gateway). Reads refresh tokens from the OS keyring, handles automatic refresh, and triggers browser re-auth if needed.

The proxy binds to 127.0.0.1 on a random available port. The assigned port is passed to Docker compose via TOKEN_PROXY_PORT env var.

TokenState(account, role=None) #

Thread-safe token state backed by keyring.

Source code in src/inspect_coco/proxy/server.py
def __init__(self, account: str, role: str | None = None):
    self._account = account
    self._role = role
    self._access_token: str = ""
    self._expires_at: float = 0.0
    self._lock = threading.Lock()
    self._load_from_keyring()

get_access_token() #

Return a valid access token, refreshing if needed.

Source code in src/inspect_coco/proxy/server.py
def get_access_token(self) -> dict:
    """Return a valid access token, refreshing if needed."""
    with self._lock:
        if self.is_expired:
            self._refresh()

        expires_in = max(0, int(self._expires_at - time.time()))
        return {
            "access_token": self._access_token,
            "token_type": "Bearer",
            "expires_in": expires_in,
        }

TokenProxyError #

Bases: Exception

Raised when the proxy cannot serve a token.

ProxyHandler #

Bases: BaseHTTPRequestHandler

HTTP handler serving token and health endpoints.

ProxyServer(token_state) #

Bases: HTTPServer

HTTP server with token state attached.

Source code in src/inspect_coco/proxy/server.py
def __init__(self, token_state: TokenState):
    # Bind to 127.0.0.1 on a random available port
    super().__init__(("127.0.0.1", 0), ProxyHandler)
    self.token_state = token_state

TokenProxy(account, role=None) #

Manages the proxy server lifecycle as a background thread.

Source code in src/inspect_coco/proxy/server.py
def __init__(self, account: str, role: str | None = None):
    self._token_state = TokenState(account=account, role=role)
    self._server = ProxyServer(token_state=self._token_state)
    self._thread: threading.Thread | None = None

start() #

Start the proxy server in a background daemon thread.

Source code in src/inspect_coco/proxy/server.py
def start(self) -> None:
    """Start the proxy server in a background daemon thread."""
    self._thread = threading.Thread(
        target=self._server.serve_forever,
        daemon=True,
        name="token-proxy",
    )
    self._thread.start()
    logger.info("Token proxy started on port %d", self.port)

stop() #

Gracefully shut down the proxy server.

Source code in src/inspect_coco/proxy/server.py
def stop(self) -> None:
    """Gracefully shut down the proxy server."""
    self._server.shutdown()
    if self._thread:
        self._thread.join(timeout=5)
    logger.info("Token proxy stopped")