#!/usr/bin/env python3
"""
Increment 1: fetch ALL logs tagged "My dummy tag" via the BTQL
endpoint, paginating with the cursor until there are no more rows.
Mirrors this curl exactly (limit is driven by the LIMIT variable below):
POST https://api.braintrust.dev/btql
{
"query": "from: project_logs('<id>')
| filter: tags includes 'My dummy tag'
and created >= now() - interval 30 day
| select: id, tags, metadata.\"~__bt_assignments\"
| limit: <LIMIT>
| cursor: '<cursor>'" # only on pages 2+
}
Pagination (verified):
- The cursor is a BTQL clause INSIDE the query (`| cursor: '...'`), NOT an HTTP
header and NOT a top-level body field.
- The response (fmt defaults to json) returns the next cursor in the body
`cursor` field (and the `x-bt-cursor` header). Stop when no cursor comes back.
Requirements:
pip install requests
export BRAINTRUST_API_KEY=sk-...
# self-hosted only: export BRAINTRUST_API_URL=https://your-data-plane
Usage:
python fetch_discontent_logs.py
"""
import argparse
import json
import os
import requests
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
PROJECT_ID = "your-project-id"
TAG = "My dummy tag"
DAYS = 7
# Page size for each BTQL request. This is the "limit" variable the query uses.
LIMIT = 1000
# Reviewers to assign unassigned rows to. Populate with your actual user IDs or
# dynamically fetch the users using the API. The rows are divided
# as evenly as possible across however many user ids are in this list.
USER_IDS: list[str] = [
"reviewer-1-uuid",
"reviewer-2-uuid",
"reviewer-3-uuid",
"reviewer-4-uuid",
"reviewer-5-uuid",
# "...",
]
# How many events to send per insert request (assignment is batchable).
BATCH_SIZE = 1000
API_URL = os.environ.get("BRAINTRUST_API_URL", "https://api.braintrust.dev").rstrip("/")
API_KEY = os.environ.get("BRAINTRUST_API_KEY")
ASSIGNMENTS_META_FIELD = "~__bt_assignments"
REVIEW_LISTS_META_FIELD = "~__bt_review_lists"
DEFAULT_REVIEW_LIST = "__bt_default_review_list"
# The select expression `metadata."~__bt_assignments"` comes back as a column
# keyed by that exact string in each result row.
ASSIGNMENTS_COLUMN_KEY = f'metadata."{ASSIGNMENTS_META_FIELD}"'
# Safety cap so a bad cursor loop can't run forever (LIMIT * MAX_PAGES rows max).
MAX_PAGES = 10_000
def _headers() -> dict[str, str]:
return {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
def build_query(cursor: str | None) -> str:
"""Build the BTQL query string. Identical across pages except the cursor."""
query = (
f"from: project_logs('{PROJECT_ID}') "
f"| filter: tags includes '{TAG}' and created >= now() - interval {DAYS} day "
f'| select: id, tags, metadata."{ASSIGNMENTS_META_FIELD}" '
f"| limit: {LIMIT}"
)
if cursor:
query += f" | cursor: '{cursor}'"
return query
def fetch_all_logs() -> list[dict]:
"""Fetch every matching log row, following the cursor across pages."""
rows: list[dict] = []
cursor: str | None = None
for page in range(1, MAX_PAGES + 1):
resp = requests.post(
f"{API_URL}/btql",
headers=_headers(),
json={"query": build_query(cursor)},
timeout=120,
)
if not resp.ok:
raise SystemExit(f"BTQL query failed ({resp.status_code}): {resp.text}")
body = resp.json()
page_rows = body.get("data", [])
rows.extend(page_rows)
# Next cursor: prefer the body field, fall back to the header.
cursor = body.get("cursor") or resp.headers.get("x-bt-cursor")
print(f" page {page}: {len(page_rows)} rows (total {len(rows)})")
# Done when the server stops handing back a cursor, or a short page.
if not cursor or len(page_rows) < LIMIT:
break
return rows
def get_assignments(row: dict) -> list:
"""Return the assignment list for a row, or [] if there is none."""
value = row.get(ASSIGNMENTS_COLUMN_KEY)
if not value:
return []
return value if isinstance(value, list) else [value]
def is_assigned(row: dict) -> bool:
"""A record is assigned if ~__bt_assignments holds at least one user id."""
return len(get_assignments(row)) > 0
def distribute(ids: list[str], users: list[str]) -> list[tuple[str, str]]:
"""Divide ids as evenly as possible across users.
With N users and M ids, each user gets either floor(M/N) or floor(M/N)+1
ids; the first (M % N) users get the extra one. Returns (id, user) pairs.
"""
pairs: list[tuple[str, str]] = []
base, remainder = divmod(len(ids), len(users))
idx = 0
for u_i, user in enumerate(users):
count = base + (1 if u_i < remainder else 0)
for _ in range(count):
pairs.append((ids[idx], user))
idx += 1
return pairs
def build_event(log_id: str, user_id: str) -> dict:
"""Merge-update payload that assigns one row to one reviewer."""
return {
"id": log_id,
"metadata": {
ASSIGNMENTS_META_FIELD: [user_id],
REVIEW_LISTS_META_FIELD: {DEFAULT_REVIEW_LIST: {"status": "PENDING"}},
},
"_is_merge": True,
"_merge_paths": [
["metadata", ASSIGNMENTS_META_FIELD, REVIEW_LISTS_META_FIELD],
],
}
def assign_batch(events: list[dict]) -> None:
"""POST a batch of assignment events to the project_logs insert endpoint."""
resp = requests.post(
f"{API_URL}/v1/project_logs/{PROJECT_ID}/insert",
headers=_headers(),
json={"events": events},
timeout=120,
)
if not resp.ok:
raise SystemExit(f"Assign failed ({resp.status_code}): {resp.text}")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--dry-run", action="store_true",
help="Show the assignment plan without updating any rows.",
)
args = parser.parse_args()
if not API_KEY:
raise SystemExit("Set BRAINTRUST_API_KEY in your environment.")
rows = fetch_all_logs()
print(f"\nFetched {len(rows)} logs tagged '{TAG}' from the last {DAYS} days.")
# Validate which records are already assigned; keep only ids of the rest.
to_assign_ids = [r["id"] for r in rows if not is_assigned(r)]
print(f" already assigned (skip): {len(rows) - len(to_assign_ids)}")
print(f" not assigned (to assign): {len(to_assign_ids)}")
here = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(here, "to_assign_ids.json"), "w") as f:
json.dump(to_assign_ids, f, indent=2)
if not to_assign_ids:
print("Nothing to assign.")
return
if not USER_IDS:
raise SystemExit("Populate USER_IDS with at least one reviewer id first.")
# Divide the unassigned rows across reviewers as evenly as possible.
pairs = distribute(to_assign_ids, USER_IDS)
per_user: dict[str, int] = {}
for _, user in pairs:
per_user[user] = per_user.get(user, 0) + 1
print("\nAssignment plan:")
for user in USER_IDS:
print(f" {user}: {per_user.get(user, 0)} rows")
if args.dry_run:
print("\n[dry-run] No rows updated.")
return
# Batch the merge-updates.
events = [build_event(log_id, user) for log_id, user in pairs]
for start in range(0, len(events), BATCH_SIZE):
batch = events[start:start + BATCH_SIZE]
assign_batch(batch)
print(f" assigned {min(start + BATCH_SIZE, len(events))}/{len(events)}")
print(f"\nDone. Assigned {len(events)} rows across {len(USER_IDS)} reviewers.")
if __name__ == "__main__":
main()