POST /admin/favorites/import
收藏批量 / 增量导入接口 · 版本 2026-04-24 · 对应 Worker 提交 6c1b1ac
Admin
X-Sync-Key
IP 白名单
D1 DB_FAV
最多 1000 行 / 请求
https://ytk-api.yitongcs.com/admin/favorites/import
本文件面向后端开发 / 数据迁移脚本作者 / AI 工具,描述一同看收藏批量导入接口的完整协议、模式语义、边界条件、最佳实践和排错指南。配套:项目文档(§4.2 admin 速览) · android-api(用户侧收藏 API)。
1. 概览
| 项 | 值 |
|---|---|
| Method | POST |
| Path | /admin/favorites/import |
| Host | https://ytk-api.yitongcs.com |
| 鉴权 | X-Sync-Key + CF-Connecting-IP 白名单(双重) |
| Content-Type | application/json; charset=utf-8 |
| 单次行数上限 | 1000 |
| 批大小(内部) | 50 行 / 一次 db.batch() |
| 代码位置 | workers/src/services/favorites.ts#importFavorites + index.ts 路由分支 |
典型使用场景:
- 从老 PHP
tp_favorite表或其他数据源一次性迁移全量收藏到 D1 - 外部系统 cron 以一定频率把新增收藏同步过来(增量)
- 运维手工修复单个用户的收藏(覆盖式替换)
- 把导出的 JSON 备份恢复到测试环境
2. 鉴权
双重门(和 /admin/sync、/admin/videos/* 等同类):
2.1 X-Sync-Key header
X-Sync-Key: ytk-sync-2026
值来自环境变量 SYNC_KEY(默认 ytk-sync-2026,可 wrangler secret put SYNC_KEY 覆盖)。
2.2 CF-Connecting-IP 必须在白名单内
白名单由 wrangler.toml [vars] 的 SYNC_ALLOWED_IPS 配置,当前生产值:
| IP | 角色 |
|---|---|
149.28.233.43 | 主 MySQL / db-proxy 宿主,跑 ytk_sync.sh 增量同步 |
108.61.23.146 | 视频数据库宿主,另一份 ytk_sync.sh |
173.208.185.170 | 外部收藏同步源(/admin/favorites/import 调用方) |
63.141.249.154 | 外部收藏同步源 |
从白名单外发起(例如本机办公网 IP)会直接 403。
临时放行 / 紧急停用
- 加 IP:改
wrangler.toml→SYNC_ALLOWED_IPS追加 →wrangler deploy - 紧急关闭 IP 限制:
SYNC_ALLOWED_IPS = "*"(只验 key,不验 IP)
2.3 未通过鉴权的响应
HTTP/1.1 403 Forbidden
Content-Type: text/plain
Forbidden
(纯文本 Forbidden,不是 JSON envelope —— 在 gate 层就 short-circuit,不暴露业务逻辑。)
3. 请求契约
3.1 Body 顶层结构
{
"mode": "append",
"uid": 42,
"rows": [ /* ImportRow[] */ ]
}
| 字段 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
mode | string | ❌ | "append" | append / upsert / replace;其他值静默降级为 append |
uid | number | mode="replace" 时 ✅ | — | 只在 replace 模式下使用,表示"清空哪个用户" |
rows | array | ✅ | — | 收藏行数组,最多 1000 项(超出立即 ok=false) |
3.2 ImportRow 行结构
{
"uid": 42,
"section": "gv",
"content_id": 109114,
"title": "示例视频",
"cover": "https://pic.yulecdn.com/xxx.jpg",
"year": "2024",
"duration_seconds": 1516,
"added_at": 1776900000
}
| 字段 | 类型 | 必填 | 约束 | 入库列 |
|---|---|---|---|---|
uid | integer | ✅ | > 0 | favorites.uid |
section | string | ✅ | gv / mv / tv / pic | favorites.section |
content_id | integer | ✅ | > 0 | favorites.content_id |
title | string | ❌ | 最多 500 字符(超出截断) | favorites.title(默认 "") |
cover | string | ❌ | 最多 2000 字符(超出截断) | favorites.cover(默认 "") |
year | string \| null | ❌ | 最多 10 字符(超出截断) | favorites.year |
duration_seconds | integer \| null | ❌ | 有限数字或 null | favorites.duration_seconds |
added_at | integer | ❌ | Unix 秒;缺省或 <= 0 用 now | favorites.added_at |
3.3 组合主键
favorites 表复合主键 (uid, section, content_id) —— 同一三元组在表里最多一行。这是后面三种 mode 语义的基础。
3.4 最小可行请求
{
"rows": [
{ "uid": 42, "section": "gv", "content_id": 109114 }
]
}
只要 3 个必填字段就能插入一行,title/cover 为空串,year/duration_seconds 为 NULL,added_at 当前时间。
4. 响应契约
4.1 成功响应(HTTP 200)
{
"ok": true,
"mode": "append",
"total": 100,
"inserted": 87,
"skipped": 13,
"updated": 0,
"deleted": 0,
"errors": []
}
| 字段 | 类型 | 说明 |
|---|---|---|
ok | boolean | 无 errors 或至少一行落库为 true;全量失败为 false |
mode | string | 实际生效的模式(非法值静默降级为 append,此字段反映真实值) |
total | integer | 传入的 rows.length |
inserted | integer | 新插入的行数(含 upsert 模式下的覆盖,见 §4.3) |
skipped | integer | 跳过的行数(仅 append 模式下的冲突跳过) |
updated | integer | 始终为 0(见 §4.3) |
deleted | integer | 仅 replace 模式下清除的行数 |
errors | array | 每条校验/写入失败的行,{row_index, reason} |
4.2 失败响应(HTTP 400)
{
"ok": false,
"mode": "append",
"total": 0,
"inserted": 0,
"skipped": 0,
"updated": 0,
"deleted": 0,
"errors": [
{ "row_index": -1, "reason": "max 1000 rows per call, got 1001" }
]
}
顶层级错误会把 row_index=-1 放在 errors[]:
DB_FAV not bound— Workers 环境配置错(不应在生产出现)rows must be an array— body.rows 不是数组max 1000 rows per call, got <N>— 超上限replace mode requires replaceUid— replace 模式没传 uid
4.3 关于 updated 始终为 0
.meta.changes 对 INSERT ... ON CONFLICT DO UPDATE 在新插入和更新已存在时都返回 1,没有字段区分两者。没做 pre-SELECT(成本高)。
- append:
changes=1→inserted++;changes=0→skipped++ - upsert:
changes=1→inserted++(可能是 INSERT 也可能是 UPDATE)
调用方如果需要严格区分"新插入"与"更新已有",要么 append 前先 SELECT 一次,要么接受 upsert 把两者合并看待。
4.4 HTTP 状态码
| 情况 | HTTP | ok |
|---|---|---|
| 正常导入(至少一行落库或行数=0) | 200 | true |
| 顶层错误(body 格式错、>1000 行、DB 不可用等) | 400 | false |
| 全量行级失败且没一行落库 | 200 | false |
| 鉴权失败 | 403 | — |
| 路径写错 / 方法错 | 404 / 405 | — |
| 服务器异常 | 500 | — |
5. 三种模式语义
5.1 mode = "append"(默认)
用途:增量同步;定时任务周期性重放;幂等补齐。
INSERT OR IGNORE INTO favorites
(uid, section, content_id, title, cover, year, duration_seconds, added_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- 主键
(uid, section, content_id)冲突 → 跳过,现有行不变 - 计入
skipped - 不会更新 title/cover/year/duration/added_at 任何字段
典型用法:cron 定时把"这段时间里新增的收藏"一股脑 POST 过来,已存在的无感跳过。
5.2 mode = "upsert"
用途:权威数据源同步;覆盖 metadata 但保留收藏关系。
INSERT INTO favorites
(uid, section, content_id, title, cover, year, duration_seconds, added_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(uid, section, content_id) DO UPDATE SET
title = excluded.title,
cover = excluded.cover,
year = excluded.year,
duration_seconds = excluded.duration_seconds,
added_at = excluded.added_at
- 主键冲突 → 覆盖全部非主键字段(包括 added_at)
- 计入
inserted(见 §4.3 为什么不是updated)
5.3 mode = "replace"
用途:权威源的一次性全量覆盖;管理员修复单用户数据。
- 校验
body.uid必须是正整数 DELETE FROM favorites WHERE uid = ?—— 清空该用户所有收藏(deleted反映数量)- 校验每行
row.uid === body.uid,否则进 errors[] 不插入 INSERT OR IGNORE剩下的合法行
- 没有
body.uid直接拒绝,不会清任何数据 - 不能用一次 replace 同时覆盖多个用户
- replace 会真的 DELETE,没有软删或事务回滚保护 —— 先备份再用
典型用法:
- 客服接到投诉"我的收藏丢了一半",拿到外部备份重放:
mode=replace, uid=<客户的> - 从老系统全量迁移单个用户
6. 字段校验规则
按行校验,任一失败该行进 errors[],其它行不受影响。
| 字段 | 校验 | 失败时 reason |
|---|---|---|
| 行本身 | 不是对象 / null | "not an object" |
uid | Number.isInteger(uid) && uid > 0 | "uid must be positive integer" |
section | ∈ {gv, mv, tv, pic} | 'invalid section "<value>"' |
content_id | Number.isInteger(content_id) && content_id > 0 | "content_id must be positive integer" |
| uid 与 replaceUid 一致性(仅 replace) | row.uid === replaceUid | "uid <X> != replaceUid <Y>" |
自动处理(不产生 error):
title/cover/year超长 → 静默截断duration_seconds非有限数字 → 转成nulladded_at缺省或<= 0→ 用当前时间year=null保留 NULL;year=""保留空串
7. 错误分类
7.1 行级错误(row_index >= 0)
出现在 errors[],不影响其他行:
| reason | 原因 |
|---|---|
"not an object" | rows[i] 是 null / 数字 / 字符串 |
"uid must be positive integer" | uid 非整数或 ≤ 0 |
'invalid section "xxx"' | section 不在四个枚举中 |
"content_id must be positive integer" | content_id 非整数或 ≤ 0 |
"uid <N> != replaceUid <M>" | replace 模式下行 uid 与请求 uid 不匹配 |
| D1 batch 异常消息(前 120 字符) | 整批 INSERT 失败(通常 DB 临时问题) |
7.2 顶层级错误(row_index = -1)
出现在 errors[] 且整个请求 ok=false:
| reason | 原因 |
|---|---|
"DB_FAV not bound" | Workers 环境配置错,联系运维 |
"rows must be an array" | body.rows 类型错 |
"max 1000 rows per call, got <N>" | 超出单次 1000 上限 |
"replace mode requires replaceUid" | replace 模式没带 uid |
7.3 完全 HTTP 层错误
| 状态 | 原因 |
|---|---|
403 Forbidden | X-Sync-Key 错 或 IP 不在白名单 |
404 | URL 写错 |
405 | 用了 GET / PUT 等其它方法 |
500 + {ok:false, error:"..."} | JSON 解析失败或其他服务端异常 |
8. 幂等与并发
8.1 幂等性
| 模式 | 幂等性 | 说明 |
|---|---|---|
append | ✅ 严格 | 同一请求重发 N 次,DB 终态一样;inserted 只第一次 > 0,之后均为 0 |
upsert | ⚠️ 最后一次写赢 | 多次调用字段有变时,最后一次覆盖;结果终态可预测 |
replace | ❌ 不幂等 | 每次都 DELETE 再 INSERT,最终行数同;但计数器、日志多次触发 |
8.2 并发
端点层面无锁。D1 SQLite 内部是 serializable,同一 DB 并发写串行化,但跨批语义没保证:
- 两个并发
append调用互不干扰,结果是并集 - 两个并发
upsert最后写的赢 - 两个并发
replace uid相同 结果不可预测(可能一个 DELETE 跑在另一个 INSERT 前面,导致最终只剩半套)
8.3 与用户端 POST /api/member/favorites 的交互
- 用户端写入和 importFavorites 底层都是
favorites表,共享相同的复合主键逻辑 - 管理员 append 和用户正在点收藏 → 两边都是
INSERT OR IGNORE,不冲突 - 管理员
replace uid=X清完瞬间用户 X 恰好点收藏 → 用户 INSERT 成功 → replace 再 INSERT 可能 IGNORE(已被抢先)。概率极低但存在
9. 性能与限制
9.1 单次上限
- 1000 行 / 请求(
IMPORT_MAX_ROWS) - 超过直接
ok=false,不写入任何行 - 要导 10 万行 → 调用方自己切成 100 次 1000 行的请求
9.2 内部分批
- 每批 50 行(
IMPORT_BATCH_SIZE),用env.DB_FAV.batch(stmts)一次事务 - 1000 行 = 20 个 batch
- 单 batch 异常不影响其他 batch,但异常 batch 内所有行整体计入 errors
9.3 延迟参考
实测(从 149 服务器打本地 D1):
| 行数 | 模式 | 大致耗时 |
|---|---|---|
| 3 | append | ~200ms |
| 100 | append | ~600ms |
| 1000 | append | ~3-5s |
| 1000 | upsert | ~3-5s |
| 1000 | replace(含 DELETE) | ~4-6s |
9.4 为什么不支持 > 1000?
- D1 单次
batch()上限约 100 statement - Workers 单次 CPU 预算有限(Paid 30s wall,Bundled 50ms CPU)
- 响应 body 过大影响客户端解析
- 1000 足够增量场景;全量迁移可以切片
10. 副作用
10.1 videos.favorites_count 计数器
- 每次
append或upsert导致 GV 收藏新增一行,主库videos.favorites_count加 1 - 去重:同一请求内重复的 content_id 只加一次(
Array.from(new Set(...))去重) - 一次 UPDATE 打包 50 个 content_id(
WHERE id IN (?, ?, ...)) - best-effort:UPDATE 失败被吞,不影响 importResult
/admin/init-favcount 重建。
10.2 不影响的副作用
- 不发通知(不给 uid 推站内信)
- 不触发 list_epoch bump(
cache:list_epoch只在视频元数据变化时 bump,收藏不影响列表缓存) - 不写 AE(不进 Analytics Engine)
- 不写 audit log(当前没有管理员操作审计)
11. 集成样例
11.1 最小 curl 调用
curl -X POST https://ytk-api.yitongcs.com/admin/favorites/import \
-H 'X-Sync-Key: ytk-sync-2026' \
-H 'Content-Type: application/json' \
-d '{
"mode": "append",
"rows": [
{ "uid": 42, "section": "gv", "content_id": 109114,
"title": "测试视频", "cover": "https://pic.yulecdn.com/x.jpg",
"duration_seconds": 1516, "added_at": 1776900000 }
]
}'
11.2 Python:从老 PHP 迁移全量
import mysql.connector, requests, json
API = 'https://ytk-api.yitongcs.com/admin/favorites/import'
HEADER = { 'X-Sync-Key': 'ytk-sync-2026', 'Content-Type': 'application/json' }
cnx = mysql.connector.connect(host='127.0.0.1', user='root',
password='<...>', database='ytk')
cur = cnx.cursor(dictionary=True)
cur.execute("""
SELECT userid AS uid, section, content_id,
title, cover, year, duration_seconds,
UNIX_TIMESTAMP(created_at) AS added_at
FROM tp_favorite
ORDER BY id
""")
batch = []
batch_no = 0
for row in cur:
batch.append(row)
if len(batch) >= 1000:
r = requests.post(API, headers=HEADER, data=json.dumps({
'mode': 'append', 'rows': batch
}), timeout=30).json()
print(f'batch {batch_no}: inserted={r["inserted"]} skipped={r["skipped"]} errors={len(r["errors"])}')
batch_no += 1
batch = []
if batch:
r = requests.post(API, headers=HEADER, data=json.dumps({
'mode': 'append', 'rows': batch
}), timeout=30).json()
print(f'batch {batch_no} (final): inserted={r["inserted"]} skipped={r["skipped"]}')
cur.close(); cnx.close()
added_at 是 UNIX 秒(数字),不是字符串。section 必须严格 gv/mv/tv/pic;老表里如是中文或数字需要做映射。
11.3 Node.js:定时增量同步
const fetch = require('node-fetch');
async function pullDelta(since) {
// ... 查老库或其他源,返回 [{uid, section, content_id, ...}]
}
async function syncOnce() {
const rows = await pullDelta(lastSyncAt);
if (rows.length === 0) return;
for (let i = 0; i < rows.length; i += 1000) {
const chunk = rows.slice(i, i + 1000);
const res = await fetch('https://ytk-api.yitongcs.com/admin/favorites/import', {
method: 'POST',
headers: {
'X-Sync-Key': process.env.SYNC_KEY,
'Content-Type': 'application/json',
},
body: JSON.stringify({ mode: 'append', rows: chunk }),
}).then(r => r.json());
console.log(`chunk ${i/1000}: inserted=${res.inserted} skipped=${res.skipped}`);
if (res.errors.length) console.warn('errors:', res.errors);
}
lastSyncAt = Date.now() / 1000 | 0;
}
setInterval(syncOnce, 5 * 60 * 1000); // 每 5 分钟同步一次
Node 进程要跑在 149.28.233.43 或 108.61.23.146,否则 IP 门拦住。
11.4 Bash:单用户全量覆盖(修复场景)
#!/bin/bash
# 修复 uid=42 的收藏:从备份 JSON 全量覆盖
set -euo pipefail
UID_TARGET=42
BACKUP=uid-${UID_TARGET}-backup.json # [{"uid":42,"section":"gv",...}, ...]
curl -s -X POST https://ytk-api.yitongcs.com/admin/favorites/import \
-H "X-Sync-Key: $SYNC_KEY" \
-H 'Content-Type: application/json' \
-d @<(jq -c --argjson uid "$UID_TARGET" \
'{mode:"replace", uid:$uid, rows:.}' "$BACKUP") \
| jq '.'
先 dry-run:把 jq 前缀 . 改成 .[:3] 先跑 3 行看结果,没问题再全量。
11.5 TypeScript(Worker 内部调用,不经 HTTP)
import { importFavorites, type ImportRow } from './services/favorites';
const rows: ImportRow[] = [/* ... */];
const r = await importFavorites(env, rows, 'append');
console.log(r);
前提:你的 Worker 有 DB_FAV 和 DB 两个 D1 绑定。这种调用方式不走 IP 白名单 / X-Sync-Key,直接进内部逻辑。
12. 排错手册
12.1 请求直接 403,响应体是纯文本 Forbidden
原因:卡在 isAdminAuthorized 门:
X-Sync-Key缺失或不等于env.SYNC_KEYCF-Connecting-IP不在env.SYNC_ALLOWED_IPS白名单
自查:
curl -s https://api.ipify.org # 查本机公网 IP
grep SYNC_ALLOWED_IPS workers/wrangler.toml # 查白名单
12.2 {ok:false, errors:[{row_index:-1, reason:"max 1000 rows..."}]}
HTTP 400,一条顶层错。修复:把 rows 切成 1000 行以内的 chunk 重发。参考 §11.2 / 11.3。
12.3 {ok:false, errors:[{row_index:-1, reason:"replace mode requires replaceUid"}]}
用了 mode=replace 但没传 uid。
修复:body 顶层加 "uid": <number>,并确保 rows 里每一行 uid 都等于这个值。
12.4 Response inserted=0, skipped=N
不是 bug:append 模式下 rows 全是已存在的三元组,被 INSERT OR IGNORE 跳过。
自查:如果期望插入,说明这些 (uid, section, content_id) 已经在表里。改用 upsert 或先 DELETE。
12.5 D1 报 "too many subrequests"
单个 Worker invocation 打了太多 subrequest(Paid 计划 1000 上限)。1000 行导入通常不会触发,但若同时还打其他 API,可能超。
缓解:缩小单次导入行数到 500,或减少 videos counter 更新频率。
12.6 导入后 videos.favorites_count 不对
- 原因 1:replace 模式清收藏时不减 counter(§10.1)。长期用 replace 会导致 counter 虚高
- 原因 2:并发下 race
修复:运行 /admin/init-favcount 一次性重新扫 favorites 表重建 counter(幂等)。
13. 数据模型与内部行为
13.1 favorites 表
DB 绑定:DB_FAV → 数据库 ytk-favorites(database_id: 4a17aad9-...)。
CREATE TABLE favorites (
uid INTEGER NOT NULL,
section TEXT NOT NULL, -- gv | mv | tv | pic
content_id INTEGER NOT NULL,
title TEXT NOT NULL DEFAULT '',
cover TEXT NOT NULL DEFAULT '',
year TEXT,
duration_seconds INTEGER,
added_at INTEGER NOT NULL,
PRIMARY KEY (uid, section, content_id)
);
CREATE INDEX idx_fav_uid_added ON favorites(uid, added_at DESC);
CREATE INDEX idx_fav_content ON favorites(section, content_id);
1M DAU × 平均 20 收藏 ≈ 20M 行 × 100B ≈ 2 GB —— 远低于 D1 单库 10 GB 限,未来可按 uid % N 水平分片(schema 不变,加绑定 + 改 shards.ts)。
13.2 videos.favorites_count 热度计数器
位于主库 DB.videos。importFavorites 在 GV 收藏实际落库时对应 content_id 加 1。去重:同一次请求里同一个 content_id 只加一次。
驱动 GET /api/content/gv?sort=favorites 的 ORDER BY favorites_count DESC。
13.3 执行流程伪代码
1. 反序列化 body
2. 规范化 mode (非法 → append)
3. 校验 rows 上限
4. 校验每一行 → valid[] 和 errors[]
5. if mode == replace:
4a. 必须有 uid
4b. DELETE FROM favorites WHERE uid = replaceUid
4c. 记录 deleted
6. 构造 INSERT SQL:
- append/replace → INSERT OR IGNORE
- upsert → ON CONFLICT DO UPDATE
7. 按 50 行分批:
- db.batch(stmts)
- 遍历 .meta.changes 归类 inserted / skipped
- 记 GV landed content_id
8. 对去重后的 GV content_id: UPDATE favorites_count += 1(按 50 个一批)
9. 返回 ImportResult
14. 相关端点
| 端点 | 场景 | 本接口差异 |
|---|---|---|
POST /api/member/favorites | 用户自己加一条收藏 | 单条、需 Bearer token、走风控 |
DELETE /api/member/favorites/:section/:id | 用户自己取消一条 | 单条、需 Bearer token |
GET /api/member/favorites | 用户自己查 | 只看 Bearer token 身份的 uid |
POST /admin/init-favcount | 一次性回填所有 GV 的 favorites_count | 批量扫描,幂等 |
15. 变更历史
| 日期 | 提交 | 变化 |
|---|---|---|
| 2026-04-24 | 6c1b1ac | 首次实现:三种模式 + 行级校验 + batch 写入 + 计数器去重 |