Journal: 2026-04-08
Anime Tracker
↳ from journal/2026-03-29
Show code
import json
from datetime import datetime
TOKEN = nb.secret("ANILIST_TOKEN")
API = "https://graphql.anilist.co"
refresh = False
inputs = nb_ctx.get("inputs", {}) if "nb_ctx" in dir() else {}
# ── AniList ──────────────────────────────────────────────
def gql(query, variables=None):
payload = json.dumps({"query": query, "variables": variables or {}})
resp = nb.fetch(API, method="POST",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "lifelab/1.0",
"Authorization": f"Bearer {TOKEN}",
},
body=payload)
raw = resp.get("body") or ""
if not resp.get("ok"):
raise Exception(f"AniList {resp.get('status')}: {raw}")
data = json.loads(raw)
if "errors" in data:
raise Exception(data["errors"][0]["message"])
return data["data"]
def fetch_watching():
user_id = gql("{ Viewer { id } }")["Viewer"]["id"]
data = gql("""
query ($userId: Int) {
MediaListCollection(userId: $userId, type: ANIME, status: CURRENT) {
lists { entries {
progress
media {
id
title { romaji english }
status episodes
nextAiringEpisode { airingAt timeUntilAiring episode }
siteUrl
}
}}
}
}""", {"userId": user_id})
results = []
for lst in data["MediaListCollection"]["lists"]:
for entry in lst["entries"]:
m = entry["media"]
progress = entry.get("progress") or 0
nxt = m.get("nextAiringEpisode")
if m["status"] not in ("RELEASING", "FINISHED"):
continue
if nxt and nxt.get("episode"):
latest = nxt["episode"] - 1
airing_at = nxt.get("airingAt")
elif m.get("episodes"):
latest = m["episodes"]
airing_at = None
else:
continue
results.append({
"id": m["id"],
"title": m["title"].get("english") or m["title"]["romaji"],
"progress": progress,
"latest": latest,
"behind": latest - progress,
"airing_at": airing_at,
"next_ep": nxt.get("episode") if nxt else None,
"url": m["siteUrl"],
})
results.sort(key=lambda x: x["behind"], reverse=True)
return results
# ── Handle bumps, then load (cached or fresh) ─────────
block = nb.current_block
block_id = block["id"] if block else None
cache = (block.get("metadata") or {}).get("_cache") if block else None
for key, val in inputs.items():
if key.startswith("b:") and val:
aid = int(key.split(":")[1])
new_progress = int(key.split(":")[2]) + 1
gql("""mutation ($mediaId: Int, $progress: Int) {
SaveMediaListEntry(mediaId: $mediaId, progress: $progress) { id }
}""", {"mediaId": aid, "progress": new_progress})
# Update cache in-place so we don't need a full refetch
if cache:
for a in cache:
if a["id"] == aid:
a["progress"] = new_progress
a["behind"] = a["latest"] - new_progress
break
if refresh or not cache:
anime_list = fetch_watching()
if block_id:
nb.patch_metadata(block_id, {"_cache": anime_list})
else:
anime_list = cache
# ── Visibility filter ───────────────────────────────────
now_ts = int(datetime.utcnow().timestamp())
visible = []
for a in anime_list:
if a["behind"] > 0:
visible.append(a)
elif a.get("airing_at") and (a["airing_at"] - now_ts) <= 3 * 86400:
a["_soon"] = True
visible.append(a)
# ── Render ───────────────────────────────────────────────
# Always render something so we can test UI works at all
if visible:
rows = []
for a in visible:
if a["behind"] > 0:
rows.append(ui.row([
ui.page_link(f"anime/{a['title']}", label=a['title']),
ui.text(f"{a['progress']}/{a['latest']} ({a['behind']} ep behind)"),
ui.button("+1", on_click=actions.rerun_with({f"b:{a['id']}:{a['progress']}": "true"})),
]))
elif a.get("_soon"):
secs = a["airing_at"] - now_ts
d, h = secs // 86400, (secs % 86400) // 3600
rows.append(ui.row([
ui.page_link(f"anime/{a['title']}", label=a['title']),
ui.badge(f"Ep {a['next_ep']} in {d}d {h}h", variant="warning"),
]))
ui.column([
*rows,
])