Anime Tracker
Show code
import json
from datetime import datetime

TOKEN = nb.secret("ANILIST_TOKEN")
API = "https://graphql.anilist.co"
refresh = False
inputs = nb_ctx.get("inputs", {}) if "nb_ctx" in dir() else {}

# ── AniList ──────────────────────────────────────────────

def gql(query, variables=None):
    payload = json.dumps({"query": query, "variables": variables or {}})
    resp = nb.fetch(API, method="POST",
        headers={
            "Content-Type": "application/json",
            "Accept": "application/json",
            "User-Agent": "lifelab/1.0",
            "Authorization": f"Bearer {TOKEN}",
        },
        body=payload)
    raw = resp.get("body") or ""
    if not resp.get("ok"):
        raise Exception(f"AniList {resp.get('status')}: {raw}")
    data = json.loads(raw)
    if "errors" in data:
        raise Exception(data["errors"][0]["message"])
    return data["data"]

def fetch_watching():
    user_id = gql("{ Viewer { id } }")["Viewer"]["id"]
    data = gql("""
    query ($userId: Int) {
      MediaListCollection(userId: $userId, type: ANIME, status: CURRENT) {
        lists { entries {
          progress
          media {
            id
            title { romaji english }
            status episodes
            nextAiringEpisode { airingAt timeUntilAiring episode }
            siteUrl
          }
        }}
      }
    }""", {"userId": user_id})

    results = []
    for lst in data["MediaListCollection"]["lists"]:
        for entry in lst["entries"]:
            m = entry["media"]
            progress = entry.get("progress") or 0
            nxt = m.get("nextAiringEpisode")

            if m["status"] not in ("RELEASING", "FINISHED"):
                continue

            if nxt and nxt.get("episode"):
                latest = nxt["episode"] - 1
                airing_at = nxt.get("airingAt")
            elif m.get("episodes"):
                latest = m["episodes"]
                airing_at = None
            else:
                continue

            results.append({
                "id": m["id"],
                "title": m["title"].get("english") or m["title"]["romaji"],
                "progress": progress,
                "latest": latest,
                "behind": latest - progress,
                "airing_at": airing_at,
                "next_ep": nxt.get("episode") if nxt else None,
                "url": m["siteUrl"],
            })

    results.sort(key=lambda x: x["behind"], reverse=True)
    return results

# ── Handle bumps, then load (cached or fresh) ─────────

block = nb.current_block
block_id = block["id"] if block else None
cache = (block.get("metadata") or {}).get("_cache") if block else None

for key, val in inputs.items():
    if key.startswith("b:") and val:
        aid = int(key.split(":")[1])
        new_progress = int(key.split(":")[2]) + 1
        gql("""mutation ($mediaId: Int, $progress: Int) {
          SaveMediaListEntry(mediaId: $mediaId, progress: $progress) { id }
        }""", {"mediaId": aid, "progress": new_progress})
        # Update cache in-place so we don't need a full refetch
        if cache:
            for a in cache:
                if a["id"] == aid:
                    a["progress"] = new_progress
                    a["behind"] = a["latest"] - new_progress
                    break

if refresh or not cache:
    anime_list = fetch_watching()
    if block_id:
        nb.patch_metadata(block_id, {"_cache": anime_list})
else:
    anime_list = cache

# ── Visibility filter ───────────────────────────────────

now_ts = int(datetime.utcnow().timestamp())

visible = []
for a in anime_list:
    if a["behind"] > 0:
        visible.append(a)
    elif a.get("airing_at") and (a["airing_at"] - now_ts) <= 3 * 86400:
        a["_soon"] = True
        visible.append(a)

# ── Render ───────────────────────────────────────────────

# Always render something so we can test UI works at all

if visible:
    rows = []
    for a in visible:
        if a["behind"] > 0:
            rows.append(ui.row([
                ui.page_link(f"anime/{a['title']}", label=a['title']),
                ui.text(f"{a['progress']}/{a['latest']} ({a['behind']} ep behind)"),
                ui.button("+1", on_click=actions.rerun_with({f"b:{a['id']}:{a['progress']}": "true"})),
            ]))
        elif a.get("_soon"):
            secs = a["airing_at"] - now_ts
            d, h = secs // 86400, (secs % 86400) // 3600
            rows.append(ui.row([
                ui.page_link(f"anime/{a['title']}", label=a['title']),
                ui.badge(f"Ep {a['next_ep']} in {d}d {h}h", variant="warning"),
            ]))

    ui.column([
        *rows,
    ])