POST /admin/favorites/import 收藏批量 / 增量导入接口 · 版本 2026-04-24 · 对应 Worker 提交 6c1b1ac

Admin X-Sync-Key IP 白名单 D1 DB_FAV 最多 1000 行 / 请求

POST https://ytk-api.yitongcs.com/admin/favorites/import
本文件面向后端开发 / 数据迁移脚本作者 / AI 工具,描述一同看收藏批量导入接口的完整协议、模式语义、边界条件、最佳实践和排错指南。配套:项目文档(§4.2 admin 速览) · android-api(用户侧收藏 API)。

1. 概览

MethodPOST
Path/admin/favorites/import
Hosthttps://ytk-api.yitongcs.com
鉴权X-Sync-Key + CF-Connecting-IP 白名单(双重)
Content-Typeapplication/json; charset=utf-8
单次行数上限1000
批大小(内部)50 行 / 一次 db.batch()
代码位置workers/src/services/favorites.ts#importFavorites + index.ts 路由分支

典型使用场景

2. 鉴权

双重门(和 /admin/sync/admin/videos/* 等同类):

2.1 X-Sync-Key header

X-Sync-Key: ytk-sync-2026

值来自环境变量 SYNC_KEY(默认 ytk-sync-2026,可 wrangler secret put SYNC_KEY 覆盖)。

2.2 CF-Connecting-IP 必须在白名单内

白名单由 wrangler.toml [vars]SYNC_ALLOWED_IPS 配置,当前生产值:

IP角色
149.28.233.43主 MySQL / db-proxy 宿主,跑 ytk_sync.sh 增量同步
108.61.23.146视频数据库宿主,另一份 ytk_sync.sh
173.208.185.170外部收藏同步源(/admin/favorites/import 调用方)
63.141.249.154外部收藏同步源

从白名单外发起(例如本机办公网 IP)会直接 403。

临时放行 / 紧急停用

2.3 未通过鉴权的响应

HTTP/1.1 403 Forbidden
Content-Type: text/plain

Forbidden

(纯文本 Forbidden,不是 JSON envelope —— 在 gate 层就 short-circuit,不暴露业务逻辑。)

3. 请求契约

3.1 Body 顶层结构

{
  "mode": "append",
  "uid": 42,
  "rows": [ /* ImportRow[] */ ]
}
字段类型必填默认说明
modestring"append"append / upsert / replace;其他值静默降级为 append
uidnumbermode="replace" 时 ✅只在 replace 模式下使用,表示"清空哪个用户"
rowsarray收藏行数组,最多 1000 项(超出立即 ok=false

3.2 ImportRow 行结构

{
  "uid":              42,
  "section":          "gv",
  "content_id":       109114,
  "title":            "示例视频",
  "cover":            "https://pic.yulecdn.com/xxx.jpg",
  "year":             "2024",
  "duration_seconds": 1516,
  "added_at":         1776900000
}
字段类型必填约束入库列
uidinteger> 0favorites.uid
sectionstringgv / mv / tv / picfavorites.section
content_idinteger> 0favorites.content_id
titlestring最多 500 字符(超出截断)favorites.title(默认 ""
coverstring最多 2000 字符(超出截断)favorites.cover(默认 ""
yearstring \| null最多 10 字符(超出截断)favorites.year
duration_secondsinteger \| null有限数字或 nullfavorites.duration_seconds
added_atintegerUnix 秒;缺省或 <= 0nowfavorites.added_at

3.3 组合主键

favorites 表复合主键 (uid, section, content_id) —— 同一三元组在表里最多一行。这是后面三种 mode 语义的基础。

3.4 最小可行请求

{
  "rows": [
    { "uid": 42, "section": "gv", "content_id": 109114 }
  ]
}

只要 3 个必填字段就能插入一行,title/cover 为空串,year/duration_seconds 为 NULL,added_at 当前时间。

4. 响应契约

4.1 成功响应(HTTP 200)

{
  "ok": true,
  "mode": "append",
  "total": 100,
  "inserted": 87,
  "skipped": 13,
  "updated": 0,
  "deleted": 0,
  "errors": []
}
字段类型说明
okboolean无 errors 或至少一行落库为 true;全量失败为 false
modestring实际生效的模式(非法值静默降级为 append,此字段反映真实值)
totalinteger传入的 rows.length
insertedinteger新插入的行数(含 upsert 模式下的覆盖,见 §4.3)
skippedinteger跳过的行数(仅 append 模式下的冲突跳过)
updatedinteger始终为 0(见 §4.3)
deletedinteger仅 replace 模式下清除的行数
errorsarray每条校验/写入失败的行,{row_index, reason}

4.2 失败响应(HTTP 400)

{
  "ok": false,
  "mode": "append",
  "total": 0,
  "inserted": 0,
  "skipped": 0,
  "updated": 0,
  "deleted": 0,
  "errors": [
    { "row_index": -1, "reason": "max 1000 rows per call, got 1001" }
  ]
}

顶层级错误会把 row_index=-1 放在 errors[]:

4.3 关于 updated 始终为 0

这是一个已知的 D1 限制,不是 bug。 D1 / SQLite 的 .meta.changesINSERT ... ON CONFLICT DO UPDATE 在新插入和更新已存在时都返回 1,没有字段区分两者。没做 pre-SELECT(成本高)。

调用方如果需要严格区分"新插入"与"更新已有",要么 append 前先 SELECT 一次,要么接受 upsert 把两者合并看待。

4.4 HTTP 状态码

情况HTTPok
正常导入(至少一行落库或行数=0)200true
顶层错误(body 格式错、>1000 行、DB 不可用等)400false
全量行级失败且没一行落库200false
鉴权失败403
路径写错 / 方法错404 / 405
服务器异常500

5. 三种模式语义

5.1 mode = "append"(默认)

用途:增量同步;定时任务周期性重放;幂等补齐。

INSERT OR IGNORE INTO favorites
  (uid, section, content_id, title, cover, year, duration_seconds, added_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)

典型用法:cron 定时把"这段时间里新增的收藏"一股脑 POST 过来,已存在的无感跳过。

5.2 mode = "upsert"

用途:权威数据源同步;覆盖 metadata 但保留收藏关系。

INSERT INTO favorites
  (uid, section, content_id, title, cover, year, duration_seconds, added_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(uid, section, content_id) DO UPDATE SET
  title            = excluded.title,
  cover            = excluded.cover,
  year             = excluded.year,
  duration_seconds = excluded.duration_seconds,
  added_at         = excluded.added_at
upsert 会覆盖 added_at。如果你只想更新 metadata 但保留用户原来的收藏时间,不要用 upsert;手工先 DELETE + append,或者单独写一个"只改 title/cover"的端点(目前没有)。

5.3 mode = "replace"

用途:权威源的一次性全量覆盖;管理员修复单用户数据。

  1. 校验 body.uid 必须是正整数
  2. DELETE FROM favorites WHERE uid = ? —— 清空该用户所有收藏(deleted 反映数量)
  3. 校验每行 row.uid === body.uid,否则进 errors[] 不插入
  4. INSERT OR IGNORE 剩下的合法行
安全栏杆:
  • 没有 body.uid 直接拒绝,不会清任何数据
  • 不能用一次 replace 同时覆盖多个用户
  • replace 会真的 DELETE,没有软删或事务回滚保护 —— 先备份再用

典型用法

6. 字段校验规则

按行校验,任一失败该行进 errors[],其它行不受影响。

字段校验失败时 reason
行本身不是对象 / null"not an object"
uidNumber.isInteger(uid) && uid > 0"uid must be positive integer"
section∈ {gv, mv, tv, pic}'invalid section "<value>"'
content_idNumber.isInteger(content_id) && content_id > 0"content_id must be positive integer"
uid 与 replaceUid 一致性(仅 replace)row.uid === replaceUid"uid <X> != replaceUid <Y>"

自动处理(不产生 error)

7. 错误分类

7.1 行级错误(row_index >= 0

出现在 errors[],不影响其他行

reason原因
"not an object"rows[i] 是 null / 数字 / 字符串
"uid must be positive integer"uid 非整数或 ≤ 0
'invalid section "xxx"'section 不在四个枚举中
"content_id must be positive integer"content_id 非整数或 ≤ 0
"uid <N> != replaceUid <M>"replace 模式下行 uid 与请求 uid 不匹配
D1 batch 异常消息(前 120 字符)整批 INSERT 失败(通常 DB 临时问题)

7.2 顶层级错误(row_index = -1

出现在 errors[] 且整个请求 ok=false

reason原因
"DB_FAV not bound"Workers 环境配置错,联系运维
"rows must be an array"body.rows 类型错
"max 1000 rows per call, got <N>"超出单次 1000 上限
"replace mode requires replaceUid"replace 模式没带 uid

7.3 完全 HTTP 层错误

状态原因
403 ForbiddenX-Sync-Key 错 或 IP 不在白名单
404URL 写错
405用了 GET / PUT 等其它方法
500 + {ok:false, error:"..."}JSON 解析失败或其他服务端异常

8. 幂等与并发

8.1 幂等性

模式幂等性说明
append✅ 严格同一请求重发 N 次,DB 终态一样;inserted 只第一次 > 0,之后均为 0
upsert⚠️ 最后一次写赢多次调用字段有变时,最后一次覆盖;结果终态可预测
replace❌ 不幂等每次都 DELETE 再 INSERT,最终行数同;但计数器、日志多次触发

8.2 并发

端点层面无锁。D1 SQLite 内部是 serializable,同一 DB 并发写串行化,但跨批语义没保证:

建议:同一 uid 的 replace 请求由调用方做排队串行化(简单的 mutex / 单例 worker 足够)。

8.3 与用户端 POST /api/member/favorites 的交互

9. 性能与限制

9.1 单次上限

9.2 内部分批

9.3 延迟参考

实测(从 149 服务器打本地 D1):

行数模式大致耗时
3append~200ms
100append~600ms
1000append~3-5s
1000upsert~3-5s
1000replace(含 DELETE)~4-6s
Worker CPU 和 subrequest 预算:1000 行导入 + 20 个 batch + videos 计数器更新 还在预算内,但如果 Worker plan 是 Bundled 不是 Paid,建议切小。

9.4 为什么不支持 > 1000?

10. 副作用

10.1 videos.favorites_count 计数器

replace 模式 DELETE 时不减 favorites_count(目前未实现),会造成排名轻微漂移;数据量大时可能要运行 /admin/init-favcount 重建。

10.2 不影响的副作用

11. 集成样例

11.1 最小 curl 调用

curl -X POST https://ytk-api.yitongcs.com/admin/favorites/import \
  -H 'X-Sync-Key: ytk-sync-2026' \
  -H 'Content-Type: application/json' \
  -d '{
    "mode": "append",
    "rows": [
      { "uid": 42, "section": "gv", "content_id": 109114,
        "title": "测试视频", "cover": "https://pic.yulecdn.com/x.jpg",
        "duration_seconds": 1516, "added_at": 1776900000 }
    ]
  }'
必须从白名单 IP 发起(默认 149.28.233.43 / 108.61.23.146)。本地测试要 SSH 到 149 再 curl。

11.2 Python:从老 PHP 迁移全量

import mysql.connector, requests, json

API    = 'https://ytk-api.yitongcs.com/admin/favorites/import'
HEADER = { 'X-Sync-Key': 'ytk-sync-2026', 'Content-Type': 'application/json' }

cnx = mysql.connector.connect(host='127.0.0.1', user='root',
                              password='<...>', database='ytk')
cur = cnx.cursor(dictionary=True)
cur.execute("""
  SELECT userid AS uid, section, content_id,
         title, cover, year, duration_seconds,
         UNIX_TIMESTAMP(created_at) AS added_at
    FROM tp_favorite
ORDER BY id
""")

batch = []
batch_no = 0
for row in cur:
    batch.append(row)
    if len(batch) >= 1000:
        r = requests.post(API, headers=HEADER, data=json.dumps({
            'mode': 'append', 'rows': batch
        }), timeout=30).json()
        print(f'batch {batch_no}: inserted={r["inserted"]} skipped={r["skipped"]} errors={len(r["errors"])}')
        batch_no += 1
        batch = []

if batch:
    r = requests.post(API, headers=HEADER, data=json.dumps({
        'mode': 'append', 'rows': batch
    }), timeout=30).json()
    print(f'batch {batch_no} (final): inserted={r["inserted"]} skipped={r["skipped"]}')

cur.close(); cnx.close()
注意added_at 是 UNIX 秒(数字),不是字符串。section 必须严格 gv/mv/tv/pic;老表里如是中文或数字需要做映射。

11.3 Node.js:定时增量同步

const fetch = require('node-fetch');

async function pullDelta(since) {
  // ... 查老库或其他源,返回 [{uid, section, content_id, ...}]
}

async function syncOnce() {
  const rows = await pullDelta(lastSyncAt);
  if (rows.length === 0) return;
  for (let i = 0; i < rows.length; i += 1000) {
    const chunk = rows.slice(i, i + 1000);
    const res = await fetch('https://ytk-api.yitongcs.com/admin/favorites/import', {
      method: 'POST',
      headers: {
        'X-Sync-Key': process.env.SYNC_KEY,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ mode: 'append', rows: chunk }),
    }).then(r => r.json());
    console.log(`chunk ${i/1000}: inserted=${res.inserted} skipped=${res.skipped}`);
    if (res.errors.length) console.warn('errors:', res.errors);
  }
  lastSyncAt = Date.now() / 1000 | 0;
}

setInterval(syncOnce, 5 * 60 * 1000);  // 每 5 分钟同步一次

Node 进程要跑在 149.28.233.43108.61.23.146,否则 IP 门拦住。

11.4 Bash:单用户全量覆盖(修复场景)

#!/bin/bash
# 修复 uid=42 的收藏:从备份 JSON 全量覆盖
set -euo pipefail

UID_TARGET=42
BACKUP=uid-${UID_TARGET}-backup.json  # [{"uid":42,"section":"gv",...}, ...]

curl -s -X POST https://ytk-api.yitongcs.com/admin/favorites/import \
  -H "X-Sync-Key: $SYNC_KEY" \
  -H 'Content-Type: application/json' \
  -d @<(jq -c --argjson uid "$UID_TARGET" \
         '{mode:"replace", uid:$uid, rows:.}' "$BACKUP") \
  | jq '.'

先 dry-run:把 jq 前缀 . 改成 .[:3] 先跑 3 行看结果,没问题再全量。

11.5 TypeScript(Worker 内部调用,不经 HTTP)

import { importFavorites, type ImportRow } from './services/favorites';

const rows: ImportRow[] = [/* ... */];
const r = await importFavorites(env, rows, 'append');
console.log(r);

前提:你的 Worker 有 DB_FAVDB 两个 D1 绑定。这种调用方式不走 IP 白名单 / X-Sync-Key,直接进内部逻辑。

12. 排错手册

12.1 请求直接 403,响应体是纯文本 Forbidden

原因:卡在 isAdminAuthorized 门:

  1. X-Sync-Key 缺失或不等于 env.SYNC_KEY
  2. CF-Connecting-IP 不在 env.SYNC_ALLOWED_IPS 白名单

自查

curl -s https://api.ipify.org                  # 查本机公网 IP
grep SYNC_ALLOWED_IPS workers/wrangler.toml    # 查白名单

12.2 {ok:false, errors:[{row_index:-1, reason:"max 1000 rows..."}]}

HTTP 400,一条顶层错。修复:把 rows 切成 1000 行以内的 chunk 重发。参考 §11.2 / 11.3。

12.3 {ok:false, errors:[{row_index:-1, reason:"replace mode requires replaceUid"}]}

用了 mode=replace 但没传 uid

修复:body 顶层加 "uid": <number>,并确保 rows 里每一行 uid 都等于这个值。

12.4 Response inserted=0, skipped=N

不是 bug:append 模式下 rows 全是已存在的三元组,被 INSERT OR IGNORE 跳过。

自查:如果期望插入,说明这些 (uid, section, content_id) 已经在表里。改用 upsert 或先 DELETE。

12.5 D1 报 "too many subrequests"

单个 Worker invocation 打了太多 subrequest(Paid 计划 1000 上限)。1000 行导入通常不会触发,但若同时还打其他 API,可能超。

缓解:缩小单次导入行数到 500,或减少 videos counter 更新频率。

12.6 导入后 videos.favorites_count 不对

修复:运行 /admin/init-favcount 一次性重新扫 favorites 表重建 counter(幂等)。

13. 数据模型与内部行为

13.1 favorites

DB 绑定:DB_FAV → 数据库 ytk-favoritesdatabase_id: 4a17aad9-...)。

CREATE TABLE favorites (
  uid              INTEGER NOT NULL,
  section          TEXT    NOT NULL,       -- gv | mv | tv | pic
  content_id       INTEGER NOT NULL,
  title            TEXT    NOT NULL DEFAULT '',
  cover            TEXT    NOT NULL DEFAULT '',
  year             TEXT,
  duration_seconds INTEGER,
  added_at         INTEGER NOT NULL,
  PRIMARY KEY (uid, section, content_id)
);

CREATE INDEX idx_fav_uid_added ON favorites(uid, added_at DESC);
CREATE INDEX idx_fav_content   ON favorites(section, content_id);

1M DAU × 平均 20 收藏 ≈ 20M 行 × 100B ≈ 2 GB —— 远低于 D1 单库 10 GB 限,未来可按 uid % N 水平分片(schema 不变,加绑定 + 改 shards.ts)。

13.2 videos.favorites_count 热度计数器

位于主库 DB.videos。importFavorites 在 GV 收藏实际落库时对应 content_id 加 1。去重:同一次请求里同一个 content_id 只加一次。

驱动 GET /api/content/gv?sort=favoritesORDER BY favorites_count DESC

13.3 执行流程伪代码

1. 反序列化 body
2. 规范化 mode (非法 → append)
3. 校验 rows 上限
4. 校验每一行 → valid[] 和 errors[]
5. if mode == replace:
     4a. 必须有 uid
     4b. DELETE FROM favorites WHERE uid = replaceUid
     4c. 记录 deleted
6. 构造 INSERT SQL:
     - append/replace → INSERT OR IGNORE
     - upsert         → ON CONFLICT DO UPDATE
7. 按 50 行分批:
     - db.batch(stmts)
     - 遍历 .meta.changes 归类 inserted / skipped
     - 记 GV landed content_id
8. 对去重后的 GV content_id: UPDATE favorites_count += 1(按 50 个一批)
9. 返回 ImportResult
端点场景本接口差异
POST /api/member/favorites用户自己加一条收藏单条、需 Bearer token、走风控
DELETE /api/member/favorites/:section/:id用户自己取消一条单条、需 Bearer token
GET /api/member/favorites用户自己查只看 Bearer token 身份的 uid
POST /admin/init-favcount一次性回填所有 GV 的 favorites_count批量扫描,幂等

15. 变更历史

日期提交变化
2026-04-246c1b1ac首次实现:三种模式 + 行级校验 + batch 写入 + 计数器去重