Skip to main content
Applies to:
  • Plan -
  • Deployment -

Overview

This script fetches log records tagged as “My dummy tag” from Braintrust using the BTQL (Braintrust Query Language) endpoint, identifies which rows are not yet assigned to a reviewer, and distributes those rows evenly across a list of specified reviewers. The script performs three main operations:
  1. Fetch: Retrieves all matching logs from a Braintrust project
  2. Filter: Identifies which logs are unassigned
  3. Distribute & Assign: Evenly distributes unassigned rows to reviewers and commits the assignments

Requirements

Before running this script, ensure you have:
  • Python 3.7+ with the requests library installed
  • Environment variables:
    • BRAINTRUST_API_KEY - Your Braintrust API key (required)
    • BRAINTRUST_API_URL - Only needed for self-hosted deployments (optional)
Install dependencies:
pip install requests
export BRAINTRUST_API_KEY=sk-...

Configuration

The script includes several configuration constants at the top that you can customize:
VariableDefaultPurpose
PROJECT_ID"your-project-id"The Braintrust project ID to query
TAG"My dummy tag"The tag filter for selecting logs
DAYS7How many days back to fetch logs
LIMIT1000Page size for each BTQL request
USER_IDSArray of 5 UUIDsReviewer user IDs to distribute rows to
BATCH_SIZE1000Number of assignments per API request

How It Works

1. BTQL Query & Pagination

The script builds a BTQL query that:
  • Selects logs from the specified project
  • Filters by tag and creation date (within the last DAYS days)
  • Returns the log id, tags, and current assignments metadata
  • Applies pagination with a cursor-based strategy
Key detail: The cursor is embedded within the BTQL query itself (| cursor: '...'), not as an HTTP header. The API returns the next cursor in the response body’s cursor field or the x-bt-cursor header.

2. Fetching All Logs

The fetch_all_logs() function:
  • Iterates through pages of results (capped at MAX_PAGES = 10,000)
  • Stops when the server returns no new cursor or a page with fewer rows than the LIMIT
  • Returns a complete list of all matching log records

3. Identifying Unassigned Rows

The script uses the ~__bt_assignments metadata field to check which rows already have a reviewer assigned:
  • Calls get_assignments(row) to extract the assignments list
  • Calls is_assigned(row) to test whether a row has at least one reviewer
  • Filters out already-assigned rows

4. Even Distribution

The distribute() function divides unassigned row IDs as evenly as possible across reviewers:
  • With N users and M rows: each user gets either ⌊M/N⌋ or ⌊M/N⌋ + 1 rows
  • The first (M % N) users receive one extra row to ensure all rows are assigned
  • Returns a list of (row_id, user_id) pairs

5. Assigning to Reviewers

For each unassigned row:
  • Creates a merge-update event using build_event() that:
    • Sets the ~__bt_assignments field to the reviewer’s user ID
    • Sets the ~__bt_review_lists field with a default review list in “PENDING” status
    • Uses _is_merge: true to instruct the API to merge rather than replace metadata
  • Batches assignments in groups of BATCH_SIZE
  • POSTs each batch to the /v1/project_logs/{PROJECT_ID}/insert endpoint

Usage

Basic Run

python assigning_rows_to_users.py
This fetches logs, shows the assignment plan, and commits assignments.

Dry Run

python assigning_rows_to_users.py --dry-run
Shows the assignment plan without updating any rows. Useful for validation before committing.

Output

The script prints progress to stdout:
  page 1: 1000 rows (total 1000)
  page 2: 500 rows (total 1500)

Fetched 1500 logs tagged 'My dummy tag' from the last 7 days.
  already assigned (skip):   500
  not assigned (to assign):  1000

Assignment plan:
  reviewer-1-uuid: 200 rows
  reviewer-2-uuid: 200 rows
  reviewer-3-uuid: 200 rows
  reviewer-4-uuid: 200 rows
  reviewer-5-uuid: 200 rows
  assigned 1000/1000

Done. Assigned 1000 rows across 5 reviewers.
Additionally, a file named to_assign_ids.json is created in the script’s directory containing the list of row IDs that were assigned (useful for reference or further automation).

Key Functions

build_query(cursor: str | None) -> str

Constructs the BTQL query string, optionally including a pagination cursor.

fetch_all_logs() -> list[dict]

Fetches all log records matching the query criteria, handling multi-page pagination.

get_assignments(row: dict) -> list

Extracts the assignment list from a log row’s metadata; returns an empty list if no assignments exist.

is_assigned(row: dict) -> bool

Returns True if a row has at least one reviewer assigned.

distribute(ids: list[str], users: list[str]) -> list[tuple[str, str]]

Divides row IDs as evenly as possible across reviewers and returns (id, user) pairs.

build_event(log_id: str, user_id: str) -> dict

Creates a merge-update event that assigns a row to a reviewer.

assign_batch(events: list[dict]) -> None

POSTs a batch of assignment events to the Braintrust API.

Error Handling

The script includes error handling for:
  • Missing API key: Exits with an error message if BRAINTRUST_API_KEY is not set
  • BTQL query failures: Exits if the API returns a non-200 status code
  • Empty reviewer list: Exits if no reviewers are configured in USER_IDS
  • Assignment failures: Exits if an assignment batch fails to POST

Safety Features

  • Pagination cap: MAX_PAGES = 10,000 prevents infinite loops due to cursor issues
  • Timeout: All HTTP requests have a 120-second timeout
  • Dry-run mode: Test assignments without committing changes
  • Existing assignments: Skips rows already assigned to prevent re-assignment
  • JSON export: Saves the list of assigned row IDs for audit purposes

Common Customizations

Change the tag or time window

Edit the TAG and DAYS variables:
TAG = "My Custom Tag"
DAYS = 30

Add or remove reviewers

Update the USER_IDS list with the UUIDs of your reviewers:
USER_IDS: list[str] = [
    "uuid-1",
    "uuid-2",
    "uuid-3",
]

Adjust batch sizes for performance

Increase LIMIT (page size) for faster fetching or BATCH_SIZE (assignment batch size) for faster assignment posting:
LIMIT = 5000       # Fetch 5,000 rows per request
BATCH_SIZE = 2000  # Assign 2,000 rows per request

Self-Hosted Deployments

If you’re using a self-hosted Braintrust instance, set the BRAINTRUST_API_URL environment variable:
export BRAINTRUST_API_URL=https://your-data-plane
python assigning_rows_to_users.py
The script will use this URL instead of the default https://api.braintrust.dev.

Troubleshooting

IssueSolution
BTQL query failed (401)Check that BRAINTRUST_API_KEY is correct and has not expired
BTQL query failed (404)Verify that PROJECT_ID is correct
No cursor returned, but more rows existIncrease MAX_PAGES or check API logs
Nothing to assignAll rows matching the criteria are already assigned to reviewers
Populate USER_IDS firstAdd at least one reviewer UUID to the USER_IDS list

Manual Execution with curl

If you prefer to run the equivalent operations manually using curl commands, follow these steps:

Step 1: Fetch Unassigned Logs via BTQL

First, query all logs matching your criteria. The cursor-based pagination allows you to retrieve all results in multiple requests. Initial request (first page):
curl -X POST https://api.braintrust.dev/btql \
  -H "Authorization: Bearer $BRAINTRUST_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "from: project_logs('\''your-project-id'\'') | filter: tags includes '\''My dummy tag'\'' and created >= now() - interval 30 day | select: id, tags, metadata.\"~__bt_assignments\" | limit: 1000 | sort: _pagination_key desc"
  }'
The response includes a cursor field. Use this cursor to fetch the next page: Subsequent request (with cursor):
curl -X POST https://api.braintrust.dev/btql \
  -H "Authorization: Bearer $BRAINTRUST_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
	  "query": "from: project_logs('\''your-project-id'\'') | filter: tags includes '\''My dummy tag'\'' and created >= now() - interval 30 day | select: id, tags, metadata.\"~__bt_assignments\" | limit: 1000 | sort: _pagination_key desc | cursor: '\''<CURSOR_VALUE>'\''"
  }'
Repeat the subsequent request, replacing <CURSOR_VALUE> with the cursor from the previous response, until the response no longer includes a cursor. Save the response data: Extract and save all the log IDs from the responses. Filter out rows where metadata."~__bt_assignments" is already populated (these are already assigned).

Step 2: Distribute Row IDs Across Reviewers

Manually distribute the unassigned row IDs across your reviewers. For example, with 1000 unassigned rows and 5 reviewers:
  • Each reviewer gets 200 rows
  • Assign rows 1-200 to reviewer 1, 201-400 to reviewer 2, etc.

Step 3: Assign Rows via Merge-Update

For each row, create a merge-update event and POST it to the insert endpoint. You can batch multiple assignments in one request. Single assignment:
curl -X POST https://api.braintrust.dev/v1/project_logs/your-project-id/insert \
  -H "Authorization: Bearer $BRAINTRUST_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "events": [
      {
        "id": "<LOG_ID>",
        "metadata": {
          "~__bt_assignments": ["<REVIEWER_USER_ID>"],
          "~__bt_review_lists": {
            "__bt_default_review_list": {
              "status": "PENDING"
            }
          }
        },
        "_is_merge": true,
        "_merge_paths": [["metadata", "~__bt_assignments", "~__bt_review_lists"]]
      }
    ]
  }'
Batch assignment (recommended for performance):
curl -X POST https://api.braintrust.dev/v1/project_logs/your-project-id/insert \
  -H "Authorization: Bearer $BRAINTRUST_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "events": [
      {
        "id": "<LOG_ID_1>",
        "metadata": {
          "~__bt_assignments": ["<REVIEWER_USER_ID_1>"],
          "~__bt_review_lists": {
            "__bt_default_review_list": {
              "status": "PENDING"
            }
          }
        },
        "_is_merge": true,
        "_merge_paths": [["metadata", "~__bt_assignments", "~__bt_review_lists"]]
      },
      {
        "id": "<LOG_ID_2>",
        "metadata": {
          "~__bt_assignments": ["<REVIEWER_USER_ID_2>"],
          "~__bt_review_lists": {
            "__bt_default_review_list": {
              "status": "PENDING"
            }
          }
        },
        "_is_merge": true,
        "_merge_paths": [["metadata", "~__bt_assignments", "~__bt_review_lists"]]
      }
    ]
  }'
Replace the placeholders:
  • <LOG_ID> - The ID from the BTQL response
  • <REVIEWER_USER_ID> - A user ID from your reviewer list
  • Repeat the event object for each row you’re assigning
Repeat this request for each batch of assignments until all rows are assigned.

Full script

#!/usr/bin/env python3
"""
Increment 1: fetch ALL logs tagged "My dummy tag" via the BTQL
endpoint, paginating with the cursor until there are no more rows.

Mirrors this curl exactly (limit is driven by the LIMIT variable below):

    POST https://api.braintrust.dev/btql
    {
      "query": "from: project_logs('<id>')
                | filter: tags includes 'My dummy tag'
                          and created >= now() - interval 30 day
                | select: id, tags, metadata.\"~__bt_assignments\"
                | limit: <LIMIT>
                | cursor: '<cursor>'"   # only on pages 2+
    }

Pagination (verified):
- The cursor is a BTQL clause INSIDE the query (`| cursor: '...'`), NOT an HTTP
  header and NOT a top-level body field.
- The response (fmt defaults to json) returns the next cursor in the body
  `cursor` field (and the `x-bt-cursor` header). Stop when no cursor comes back.

Requirements:
    pip install requests
    export BRAINTRUST_API_KEY=sk-...
    # self-hosted only: export BRAINTRUST_API_URL=https://your-data-plane

Usage:
    python fetch_discontent_logs.py
"""

import argparse
import json
import os

import requests

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

PROJECT_ID = "your-project-id"
TAG = "My dummy tag"
DAYS = 7

# Page size for each BTQL request. This is the "limit" variable the query uses.
LIMIT = 1000

# Reviewers to assign unassigned rows to. Populate with your actual user IDs or
# dynamically fetch the users using the API. The rows are divided
# as evenly as possible across however many user ids are in this list.
USER_IDS: list[str] = [
    "reviewer-1-uuid",
    "reviewer-2-uuid",
    "reviewer-3-uuid",
    "reviewer-4-uuid",
    "reviewer-5-uuid",
    # "...",
]

# How many events to send per insert request (assignment is batchable).
BATCH_SIZE = 1000

API_URL = os.environ.get("BRAINTRUST_API_URL", "https://api.braintrust.dev").rstrip("/")
API_KEY = os.environ.get("BRAINTRUST_API_KEY")

ASSIGNMENTS_META_FIELD = "~__bt_assignments"
REVIEW_LISTS_META_FIELD = "~__bt_review_lists"
DEFAULT_REVIEW_LIST = "__bt_default_review_list"
# The select expression `metadata."~__bt_assignments"` comes back as a column
# keyed by that exact string in each result row.
ASSIGNMENTS_COLUMN_KEY = f'metadata."{ASSIGNMENTS_META_FIELD}"'

# Safety cap so a bad cursor loop can't run forever (LIMIT * MAX_PAGES rows max).
MAX_PAGES = 10_000


def _headers() -> dict[str, str]:
    return {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
    }


def build_query(cursor: str | None) -> str:
    """Build the BTQL query string. Identical across pages except the cursor."""
    query = (
        f"from: project_logs('{PROJECT_ID}') "
        f"| filter: tags includes '{TAG}' and created >= now() - interval {DAYS} day "
        f'| select: id, tags, metadata."{ASSIGNMENTS_META_FIELD}" '
        f"| limit: {LIMIT}"
    )
    if cursor:
        query += f" | cursor: '{cursor}'"
    return query


def fetch_all_logs() -> list[dict]:
    """Fetch every matching log row, following the cursor across pages."""
    rows: list[dict] = []
    cursor: str | None = None

    for page in range(1, MAX_PAGES + 1):
        resp = requests.post(
            f"{API_URL}/btql",
            headers=_headers(),
            json={"query": build_query(cursor)},
            timeout=120,
        )
        if not resp.ok:
            raise SystemExit(f"BTQL query failed ({resp.status_code}): {resp.text}")

        body = resp.json()
        page_rows = body.get("data", [])
        rows.extend(page_rows)

        # Next cursor: prefer the body field, fall back to the header.
        cursor = body.get("cursor") or resp.headers.get("x-bt-cursor")
        print(f"  page {page}: {len(page_rows)} rows (total {len(rows)})")

        # Done when the server stops handing back a cursor, or a short page.
        if not cursor or len(page_rows) < LIMIT:
            break

    return rows


def get_assignments(row: dict) -> list:
    """Return the assignment list for a row, or [] if there is none."""
    value = row.get(ASSIGNMENTS_COLUMN_KEY)
    if not value:
        return []
    return value if isinstance(value, list) else [value]


def is_assigned(row: dict) -> bool:
    """A record is assigned if ~__bt_assignments holds at least one user id."""
    return len(get_assignments(row)) > 0


def distribute(ids: list[str], users: list[str]) -> list[tuple[str, str]]:
    """Divide ids as evenly as possible across users.

    With N users and M ids, each user gets either floor(M/N) or floor(M/N)+1
    ids; the first (M % N) users get the extra one. Returns (id, user) pairs.
    """
    pairs: list[tuple[str, str]] = []
    base, remainder = divmod(len(ids), len(users))
    idx = 0
    for u_i, user in enumerate(users):
        count = base + (1 if u_i < remainder else 0)
        for _ in range(count):
            pairs.append((ids[idx], user))
            idx += 1
    return pairs


def build_event(log_id: str, user_id: str) -> dict:
    """Merge-update payload that assigns one row to one reviewer."""
    return {
        "id": log_id,
        "metadata": {
            ASSIGNMENTS_META_FIELD: [user_id],
            REVIEW_LISTS_META_FIELD: {DEFAULT_REVIEW_LIST: {"status": "PENDING"}},
        },
        "_is_merge": True,
        "_merge_paths": [
            ["metadata", ASSIGNMENTS_META_FIELD, REVIEW_LISTS_META_FIELD],
        ],
    }


def assign_batch(events: list[dict]) -> None:
    """POST a batch of assignment events to the project_logs insert endpoint."""
    resp = requests.post(
        f"{API_URL}/v1/project_logs/{PROJECT_ID}/insert",
        headers=_headers(),
        json={"events": events},
        timeout=120,
    )
    if not resp.ok:
        raise SystemExit(f"Assign failed ({resp.status_code}): {resp.text}")


def main() -> None:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "--dry-run", action="store_true",
        help="Show the assignment plan without updating any rows.",
    )
    args = parser.parse_args()

    if not API_KEY:
        raise SystemExit("Set BRAINTRUST_API_KEY in your environment.")

    rows = fetch_all_logs()
    print(f"\nFetched {len(rows)} logs tagged '{TAG}' from the last {DAYS} days.")

    # Validate which records are already assigned; keep only ids of the rest.
    to_assign_ids = [r["id"] for r in rows if not is_assigned(r)]
    print(f"  already assigned (skip):   {len(rows) - len(to_assign_ids)}")
    print(f"  not assigned (to assign):  {len(to_assign_ids)}")

    here = os.path.dirname(os.path.abspath(__file__))
    with open(os.path.join(here, "to_assign_ids.json"), "w") as f:
        json.dump(to_assign_ids, f, indent=2)

    if not to_assign_ids:
        print("Nothing to assign.")
        return
    if not USER_IDS:
        raise SystemExit("Populate USER_IDS with at least one reviewer id first.")

    # Divide the unassigned rows across reviewers as evenly as possible.
    pairs = distribute(to_assign_ids, USER_IDS)
    per_user: dict[str, int] = {}
    for _, user in pairs:
        per_user[user] = per_user.get(user, 0) + 1
    print("\nAssignment plan:")
    for user in USER_IDS:
        print(f"  {user}: {per_user.get(user, 0)} rows")

    if args.dry_run:
        print("\n[dry-run] No rows updated.")
        return

    # Batch the merge-updates.
    events = [build_event(log_id, user) for log_id, user in pairs]
    for start in range(0, len(events), BATCH_SIZE):
        batch = events[start:start + BATCH_SIZE]
        assign_batch(batch)
        print(f"  assigned {min(start + BATCH_SIZE, len(events))}/{len(events)}")

    print(f"\nDone. Assigned {len(events)} rows across {len(USER_IDS)} reviewers.")


if __name__ == "__main__":
    main()