from __future__ import annotations
from typing import Any
import httpx
DEFAULT_BASE_URL = "https://api.cursor.com"
[docs]
class CursorAPIError(Exception):
"""Raised when the API returns an error response."""
def __init__(
self,
message: str,
*,
status_code: int | None = None,
response: httpx.Response | None = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.response = response
def _raise_for_status(response: httpx.Response) -> None:
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
detail = response.text
try:
body = response.json()
if isinstance(body, dict) and "message" in body:
detail = str(body["message"])
elif isinstance(body, dict) and "error" in body:
detail = str(body["error"])
except ValueError:
pass
raise CursorAPIError(
f"{response.status_code} {response.reason_phrase}: {detail}",
status_code=response.status_code,
response=response,
) from e
def _normalize_agent_source(
repo: str | None,
ref: str | None,
pr_url: str | None,
) -> tuple[str | None, str | None, str | None]:
if repo is not None:
r = repo.strip()
if not r:
raise ValueError("repo must be a non-empty string when provided")
out_repo: str | None = r
else:
out_repo = None
if pr_url is not None:
p = pr_url.strip()
if not p:
raise ValueError("pr_url must be a non-empty string when provided")
out_pr: str | None = p
else:
out_pr = None
if ref is not None:
rf = ref.strip()
out_ref: str | None = rf if rf else None
else:
out_ref = None
return out_repo, out_ref, out_pr
def _launch_agent_json_body(
*,
prompt: str,
repository: str | None = None,
ref: str | None = None,
pr_url: str | None = None,
model: str | None = None,
target: dict[str, Any] | None = None,
webhook: dict[str, Any] | None = None,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
if pr_url is None and repository is None:
raise ValueError("launch_agent() requires either repository or pr_url")
prompt_body: dict[str, Any] = {"text": prompt}
if images is not None:
prompt_body["images"] = images
source: dict[str, Any] = {}
if pr_url is not None:
source["prUrl"] = pr_url
else:
source["repository"] = repository
if ref is not None:
source["ref"] = ref
body: dict[str, Any] = {"prompt": prompt_body, "source": source}
if model is not None:
body["model"] = model
if target is not None:
body["target"] = target
if webhook is not None:
body["webhook"] = webhook
return body
[docs]
class SyncClient:
"""
Synchronous client for the Cursor Cloud Agents API.
See https://cursor.com/docs/cloud-agent/api/endpoints
Authentication uses HTTP Basic auth: API key as username, empty password
(equivalent to ``curl -u YOUR_API_KEY: ...``).
"""
def __init__(
self,
api_key: str,
*,
base_url: str = DEFAULT_BASE_URL,
timeout: float = 120.0,
) -> None:
self._owns_client = True
self._client = httpx.Client(
base_url=base_url.rstrip("/"),
auth=(api_key, ""),
timeout=timeout,
headers={"Accept": "application/json"},
)
[docs]
@classmethod
def from_httpx_client(cls, client: httpx.Client) -> SyncClient:
"""Wrap an existing :class:`httpx.Client` (you manage base URL and auth)."""
obj = cls.__new__(cls)
obj._owns_client = False
obj._client = client
return obj
[docs]
def close(self) -> None:
if self._owns_client:
self._client.close()
def __enter__(self) -> SyncClient:
return self
def __exit__(self, *args: object) -> None:
self.close()
def _json(
self,
method: str,
path: str,
*,
json: Any | None = None,
params: dict[str, Any] | None = None,
) -> Any:
r = self._client.request(method, path, json=json, params=params)
_raise_for_status(r)
if not r.content:
return {}
return r.json()
[docs]
def new_agent(
self,
repo: str | None = None,
*,
ref: str | None = None,
pr_url: str | None = None,
) -> Agent:
"""
Create an :class:`Agent` bound to a GitHub repo or PR.
Pass ``repo`` (repository URL) or ``pr_url`` — same rules as ``POST /v0/agents`` ``source``.
Use :meth:`Agent.create` for the first prompt, then :meth:`Agent.follow_up` for more.
"""
if repo is None and pr_url is None:
raise ValueError("new_agent() requires either repo or pr_url")
return Agent(self, repo=repo, ref=ref, pr_url=pr_url)
[docs]
def me(self) -> dict[str, Any]:
"""``GET /v0/me`` — API key metadata."""
return self._json("GET", "/v0/me")
[docs]
def list_models(self) -> dict[str, Any]:
"""``GET /v0/models`` — models available for agents."""
return self._json("GET", "/v0/models")
[docs]
def list_repositories(self) -> dict[str, Any]:
"""``GET /v0/repositories`` — GitHub repositories accessible to the key."""
return self._json("GET", "/v0/repositories")
[docs]
def list_agents(self) -> dict[str, Any]:
"""``GET /v0/agents`` — list cloud agents."""
return self._json("GET", "/v0/agents")
[docs]
def get_agent(self, agent_id: str) -> dict[str, Any]:
"""``GET /v0/agents/{id}`` — agent status and details."""
return self._json("GET", f"/v0/agents/{agent_id}")
[docs]
def get_conversation(self, agent_id: str) -> dict[str, Any]:
"""``GET /v0/agents/{id}/conversation``."""
return self._json("GET", f"/v0/agents/{agent_id}/conversation")
[docs]
def launch_agent(
self,
*,
prompt: str,
repository: str | None = None,
ref: str | None = None,
pr_url: str | None = None,
model: str | None = None,
target: dict[str, Any] | None = None,
webhook: dict[str, Any] | None = None,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""
``POST /v0/agents`` — start a new agent.
Provide either ``repository`` (and optionally ``ref``) or ``pr_url`` for the source.
"""
body = _launch_agent_json_body(
prompt=prompt,
repository=repository,
ref=ref,
pr_url=pr_url,
model=model,
target=target,
webhook=webhook,
images=images,
)
return self._json("POST", "/v0/agents", json=body)
[docs]
def followup(
self,
agent_id: str,
*,
prompt: str,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""``POST /v0/agents/{id}/followup``."""
p: dict[str, Any] = {"text": prompt}
if images is not None:
p["images"] = images
return self._json("POST", f"/v0/agents/{agent_id}/followup", json={"prompt": p})
[docs]
def stop_agent(self, agent_id: str) -> dict[str, Any]:
"""``POST /v0/agents/{id}/stop``."""
return self._json("POST", f"/v0/agents/{agent_id}/stop")
[docs]
def delete_agent(self, agent_id: str) -> dict[str, Any]:
"""``DELETE /v0/agents/{id}``."""
return self._json("DELETE", f"/v0/agents/{agent_id}")
[docs]
class AsyncClient:
"""
Async client for the Cursor Cloud Agents API (:class:`httpx.AsyncClient`).
Use ``async with AsyncClient(...)`` or call :meth:`aclose` when done.
"""
def __init__(
self,
api_key: str,
*,
base_url: str = DEFAULT_BASE_URL,
timeout: float = 120.0,
) -> None:
self._owns_client = True
self._client = httpx.AsyncClient(
base_url=base_url.rstrip("/"),
auth=(api_key, ""),
timeout=timeout,
headers={"Accept": "application/json"},
)
[docs]
@classmethod
def from_httpx_client(cls, client: httpx.AsyncClient) -> AsyncClient:
"""Wrap an existing :class:`httpx.AsyncClient` (you manage base URL and auth)."""
obj = cls.__new__(cls)
obj._owns_client = False
obj._client = client
return obj
[docs]
async def aclose(self) -> None:
if self._owns_client:
await self._client.aclose()
async def __aenter__(self) -> AsyncClient:
return self
async def __aexit__(self, *args: object) -> None:
await self.aclose()
async def _json(
self,
method: str,
path: str,
*,
json: Any | None = None,
params: dict[str, Any] | None = None,
) -> Any:
r = await self._client.request(method, path, json=json, params=params)
_raise_for_status(r)
if not r.content:
return {}
return r.json()
[docs]
def new_agent(
self,
repo: str | None = None,
*,
ref: str | None = None,
pr_url: str | None = None,
) -> AsyncAgent:
if repo is None and pr_url is None:
raise ValueError("new_agent() requires either repo or pr_url")
return AsyncAgent(self, repo=repo, ref=ref, pr_url=pr_url)
[docs]
async def me(self) -> dict[str, Any]:
return await self._json("GET", "/v0/me")
[docs]
async def list_models(self) -> dict[str, Any]:
return await self._json("GET", "/v0/models")
[docs]
async def list_repositories(self) -> dict[str, Any]:
return await self._json("GET", "/v0/repositories")
[docs]
async def list_agents(self) -> dict[str, Any]:
return await self._json("GET", "/v0/agents")
[docs]
async def get_agent(self, agent_id: str) -> dict[str, Any]:
return await self._json("GET", f"/v0/agents/{agent_id}")
[docs]
async def get_conversation(self, agent_id: str) -> dict[str, Any]:
return await self._json("GET", f"/v0/agents/{agent_id}/conversation")
[docs]
async def launch_agent(
self,
*,
prompt: str,
repository: str | None = None,
ref: str | None = None,
pr_url: str | None = None,
model: str | None = None,
target: dict[str, Any] | None = None,
webhook: dict[str, Any] | None = None,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
body = _launch_agent_json_body(
prompt=prompt,
repository=repository,
ref=ref,
pr_url=pr_url,
model=model,
target=target,
webhook=webhook,
images=images,
)
return await self._json("POST", "/v0/agents", json=body)
[docs]
async def followup(
self,
agent_id: str,
*,
prompt: str,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
p: dict[str, Any] = {"text": prompt}
if images is not None:
p["images"] = images
return await self._json(
"POST", f"/v0/agents/{agent_id}/followup", json={"prompt": p}
)
[docs]
async def stop_agent(self, agent_id: str) -> dict[str, Any]:
return await self._json("POST", f"/v0/agents/{agent_id}/stop")
[docs]
async def delete_agent(self, agent_id: str) -> dict[str, Any]:
return await self._json("DELETE", f"/v0/agents/{agent_id}")
[docs]
class Agent:
"""
One :class:`Agent` instance maps to one Cursor cloud agent (one id from the API).
Use with :class:`SyncClient`. Call :meth:`create` once (``POST /v0/agents``), then
:meth:`follow_up` for further prompts. Or :meth:`attach` and :meth:`follow_up` only.
"""
def __init__(
self,
client: SyncClient,
*,
repo: str | None = None,
ref: str | None = None,
pr_url: str | None = None,
) -> None:
self._client = client
self._id: str | None = None
self._repo, self._ref, self._pr_url = _normalize_agent_source(repo, ref, pr_url)
@property
def id(self) -> str | None:
return self._id
[docs]
def create(
self,
prompt: str,
*,
model: str | None = None,
target: dict[str, Any] | None = None,
webhook: dict[str, Any] | None = None,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""
Create the cloud agent (``POST /v0/agents``).
Requires ``repo`` / ``pr_url`` from :meth:`SyncClient.new_agent`. Call at most once per
:class:`Agent` instance; for more prompts use :meth:`follow_up`.
"""
if self._id is not None:
raise RuntimeError(
"This agent already has a cloud id: use follow_up() for more prompts, "
"or use client.new_agent(...) for a separate agent."
)
if not self._repo and not self._pr_url:
raise RuntimeError(
"No repository context: create the agent with client.new_agent(repo=...) or "
"new_agent(pr_url=...). If you only have an existing cloud id, use attach() and "
"follow_up()."
)
data = self._client.launch_agent(
prompt=prompt,
repository=self._repo,
ref=self._ref,
pr_url=self._pr_url,
model=model,
target=target,
webhook=webhook,
images=images,
)
aid = data.get("id")
if isinstance(aid, str):
self._id = aid
return data
[docs]
def follow_up(
self,
prompt: str,
*,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""``POST /v0/agents/{id}/followup`` — send another prompt to the same cloud agent."""
if not self._id:
raise RuntimeError(
"No cloud agent yet: call create() first (with repo from new_agent), "
"or attach(agent_id)."
)
return self._client.followup(self._id, prompt=prompt, images=images)
[docs]
def refresh(self) -> dict[str, Any]:
"""``GET /v0/agents/{id}`` — latest status (requires :meth:`create` or :meth:`attach`)."""
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return self._client.get_agent(self._id)
[docs]
def conversation(self) -> dict[str, Any]:
"""``GET /v0/agents/{id}/conversation``."""
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return self._client.get_conversation(self._id)
[docs]
def stop(self) -> dict[str, Any]:
"""``POST /v0/agents/{id}/stop``."""
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return self._client.stop_agent(self._id)
[docs]
def delete(self) -> dict[str, Any]:
"""``DELETE /v0/agents/{id}``."""
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return self._client.delete_agent(self._id)
[docs]
def attach(self, agent_id: str) -> None:
"""Use an existing agent id (e.g. from a previous session) for follow-ups and status."""
self._id = agent_id
self._repo = None
self._ref = None
self._pr_url = None
[docs]
class AsyncAgent:
"""
Async counterpart to :class:`Agent` for use with :class:`AsyncClient`.
Methods mirror :class:`Agent` but are async.
"""
def __init__(
self,
client: AsyncClient,
*,
repo: str | None = None,
ref: str | None = None,
pr_url: str | None = None,
) -> None:
self._client = client
self._id: str | None = None
self._repo, self._ref, self._pr_url = _normalize_agent_source(repo, ref, pr_url)
@property
def id(self) -> str | None:
return self._id
[docs]
async def create(
self,
prompt: str,
*,
model: str | None = None,
target: dict[str, Any] | None = None,
webhook: dict[str, Any] | None = None,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
if self._id is not None:
raise RuntimeError(
"This agent already has a cloud id: use follow_up() for more prompts, "
"or use client.new_agent(...) for a separate agent."
)
if not self._repo and not self._pr_url:
raise RuntimeError(
"No repository context: create the agent with client.new_agent(repo=...) or "
"new_agent(pr_url=...). If you only have an existing cloud id, use attach() and "
"follow_up()."
)
data = await self._client.launch_agent(
prompt=prompt,
repository=self._repo,
ref=self._ref,
pr_url=self._pr_url,
model=model,
target=target,
webhook=webhook,
images=images,
)
aid = data.get("id")
if isinstance(aid, str):
self._id = aid
return data
[docs]
async def follow_up(
self,
prompt: str,
*,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
if not self._id:
raise RuntimeError(
"No cloud agent yet: call create() first (with repo from new_agent), "
"or attach(agent_id)."
)
return await self._client.followup(self._id, prompt=prompt, images=images)
[docs]
async def refresh(self) -> dict[str, Any]:
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return await self._client.get_agent(self._id)
[docs]
async def conversation(self) -> dict[str, Any]:
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return await self._client.get_conversation(self._id)
[docs]
async def stop(self) -> dict[str, Any]:
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return await self._client.stop_agent(self._id)
[docs]
async def delete(self) -> dict[str, Any]:
if not self._id:
raise RuntimeError("No agent id: call create() or attach().")
return await self._client.delete_agent(self._id)
[docs]
def attach(self, agent_id: str) -> None:
self._id = agent_id
self._repo = None
self._ref = None
self._pr_url = None
# Backward compatibility — :class:`CursorClient` was the original name for the sync client.
CursorClient = SyncClient