Files
ai-app-database/app/routes/resources.py
T
huty 66ffa9679d
CI — Docker Build & Push / Build & Push Image (push) Failing after 2m28s
修复 word 文件预览问题
2026-04-28 12:45:54 +09:00

476 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
from flask import (Blueprint, render_template, redirect, url_for, flash,
request, abort, current_app, send_file, jsonify, Response)
from flask_login import login_required, current_user
from flask_wtf import FlaskForm
from wtforms import (StringField, TextAreaField, SelectField,
FileField, SubmitField, BooleanField)
from wtforms.validators import DataRequired, Optional, URL, Length
from app.extensions import db
from app.models.resource import Resource
from app.models.folder import Folder
from app.models.setting import SystemSetting
from app.utils.file_handler import (save_uploaded_file, delete_resource_file,
allowed_file, guess_resource_type)
from app.utils.downloader import start_url_download, start_magnet_download
resources_bp = Blueprint('resources', __name__)
# ── 表单 ─────────────────────────────────────────────────────────────────────
class UploadForm(FlaskForm):
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
description = TextAreaField('描述', validators=[Optional()])
resource_type = SelectField('类型', choices=[
('', '— 自动识别 —'),
('text', '文本'),
('image', '图片'),
('audio', '音频'),
('video', '视频'),
], validators=[Optional()])
tags = StringField('标签(逗号分隔)', validators=[Optional(), Length(max=512)])
file = FileField('选择文件', validators=[DataRequired(message='请选择文件')])
submit = SubmitField('上传')
class UrlDownloadForm(FlaskForm):
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
description = TextAreaField('描述', validators=[Optional()])
source_url = StringField('下载 URL', validators=[DataRequired()])
resource_type = SelectField('类型', choices=[
('', '— 自动识别 —'),
('text', '文本'),
('image', '图片'),
('audio', '音频'),
('video', '视频'),
], validators=[Optional()])
tags = StringField('标签', validators=[Optional(), Length(max=512)])
submit = SubmitField('开始下载')
class MagnetForm(FlaskForm):
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
description = TextAreaField('描述', validators=[Optional()])
magnet_uri = StringField('磁力链接', validators=[DataRequired()])
resource_type = SelectField('类型', choices=[
('video', '视频'),
('audio', '音频'),
('image', '图片'),
('text', '文本'),
])
tags = StringField('标签', validators=[Optional(), Length(max=512)])
submit = SubmitField('开始下载')
class EditForm(FlaskForm):
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
description = TextAreaField('描述', validators=[Optional()])
tags = StringField('标签', validators=[Optional(), Length(max=512)])
is_public = BooleanField('公开资源')
submit = SubmitField('保存')
# ── 辅助函数 ─────────────────────────────────────────────────────────────────
def _check_setting(key):
return SystemSetting.get(key, 'true') == 'true'
def get_folder_choices(user_id: int) -> list:
"""生成带层级缩进的文件夹扁平列表,用于表单下拉框"""
def _walk(folders, depth=0):
result = []
for f in sorted(folders, key=lambda x: x.name):
result.append((str(f.id), '\u3000' * depth + f.name))
result.extend(_walk(f.children, depth + 1))
return result
roots = (Folder.query
.filter_by(user_id=user_id, parent_id=None)
.order_by(Folder.name)
.all())
return [('', '— 根目录 —')] + _walk(roots)
def _parse_folder_id(user_id: int) -> int | None:
"""从 form/args 中解析并验证 folder_id,返回合法 id 或 None"""
raw = request.form.get('folder_id', '').strip()
if not raw:
return None
if not raw.isdigit():
return None
folder_id = int(raw)
folder = Folder.query.filter_by(id=folder_id, user_id=user_id).first()
if folder is None:
abort(403)
return folder_id
# ── 路由 ─────────────────────────────────────────────────────────────────────
@resources_bp.route('/')
@login_required
def list_resources():
page = request.args.get('page', 1, type=int)
q = request.args.get('q', '')
rtype = request.args.get('type', '')
source = request.args.get('source', '')
folder_id = request.args.get('folder_id', type=int)
query = Resource.query.filter_by(user_id=current_user.id)
# 文件夹过滤
current_folder = None
breadcrumb = []
if folder_id is not None:
current_folder = Folder.query.filter_by(
id=folder_id, user_id=current_user.id
).first_or_404()
query = query.filter_by(folder_id=folder_id)
breadcrumb = current_folder.get_breadcrumb()
if q:
query = query.filter(
Resource.title.ilike(f'%{q}%') |
Resource.description.ilike(f'%{q}%') |
Resource.tags.ilike(f'%{q}%')
)
if rtype:
query = query.filter_by(resource_type=rtype)
if source:
query = query.filter_by(source_type=source)
pagination = query.order_by(Resource.created_at.desc()).paginate(
page=page, per_page=24, error_out=False)
counts = {}
base_q = Resource.query.filter_by(user_id=current_user.id)
for t in ('text', 'image', 'audio', 'video'):
counts[t] = base_q.filter_by(resource_type=t).count()
counts['total'] = base_q.count()
return render_template('user/resources.html',
pagination=pagination, q=q,
rtype=rtype, source=source,
counts=counts,
folder_id=folder_id,
current_folder=current_folder,
breadcrumb=breadcrumb)
@resources_bp.route('/upload', methods=['GET', 'POST'])
@login_required
def upload():
form = UploadForm()
folder_choices = get_folder_choices(current_user.id)
if form.validate_on_submit():
file_obj = form.file.data
if not allowed_file(file_obj.filename):
flash('不支持的文件类型', 'danger')
return render_template('user/upload.html', form=form, tab='upload',
folder_choices=folder_choices)
rtype = form.resource_type.data or guess_resource_type(file_obj.filename)
if not rtype:
flash('无法识别文件类型,请手动选择', 'warning')
return render_template('user/upload.html', form=form, tab='upload',
folder_choices=folder_choices)
try:
uname, rel_path, size, mime, orig = save_uploaded_file(file_obj, rtype)
except Exception as e:
flash(f'文件保存失败: {e}', 'danger')
return render_template('user/upload.html', form=form, tab='upload',
folder_choices=folder_choices)
folder_id = _parse_folder_id(current_user.id)
res = Resource(
user_id=current_user.id,
title=form.title.data,
description=form.description.data,
resource_type=rtype,
source_type='upload',
filename=uname,
original_name=orig,
file_path=rel_path,
file_size=size,
mime_type=mime,
tags=form.tags.data,
download_status='na',
folder_id=folder_id
)
db.session.add(res)
db.session.commit()
flash('文件上传成功', 'success')
return redirect(url_for('resources.detail', resource_id=res.id))
return render_template('user/upload.html', form=form, tab='upload',
folder_choices=folder_choices)
@resources_bp.route('/url-download', methods=['GET', 'POST'])
@login_required
def url_download():
if not _check_setting('enable_url_download'):
flash('URL 下载功能已关闭', 'warning')
return redirect(url_for('resources.list_resources'))
form = UrlDownloadForm()
folder_choices = get_folder_choices(current_user.id)
if form.validate_on_submit():
rtype = form.resource_type.data or None
folder_id = _parse_folder_id(current_user.id)
res = Resource(
user_id=current_user.id,
title=form.title.data,
description=form.description.data,
resource_type=rtype or 'text',
source_type='url',
source_url=form.source_url.data,
tags=form.tags.data,
download_status='pending',
folder_id=folder_id
)
db.session.add(res)
db.session.commit()
start_url_download(
current_app._get_current_object(),
res.id,
form.source_url.data,
rtype,
current_user.id
)
flash('下载任务已启动,请稍后刷新查看进度', 'info')
return redirect(url_for('resources.detail', resource_id=res.id))
return render_template('user/upload.html', form=form, tab='url',
folder_choices=folder_choices)
@resources_bp.route('/magnet-download', methods=['GET', 'POST'])
@login_required
def magnet_download():
if not _check_setting('enable_magnet'):
flash('磁力下载功能已关闭', 'warning')
return redirect(url_for('resources.list_resources'))
form = MagnetForm()
folder_choices = get_folder_choices(current_user.id)
if form.validate_on_submit():
if not form.magnet_uri.data.startswith('magnet:'):
flash('请输入有效的磁力链接(以 magnet: 开头)', 'danger')
return render_template('user/upload.html', form=form, tab='magnet',
folder_choices=folder_choices)
folder_id = _parse_folder_id(current_user.id)
res = Resource(
user_id=current_user.id,
title=form.title.data,
description=form.description.data,
resource_type=form.resource_type.data,
source_type='magnet',
source_url=form.magnet_uri.data,
tags=form.tags.data,
download_status='pending',
folder_id=folder_id
)
db.session.add(res)
db.session.commit()
start_magnet_download(
current_app._get_current_object(),
res.id,
form.magnet_uri.data,
form.resource_type.data
)
flash('磁力下载任务已启动(需要安装 aria2c)', 'info')
return redirect(url_for('resources.detail', resource_id=res.id))
return render_template('user/upload.html', form=form, tab='magnet',
folder_choices=folder_choices)
@resources_bp.route('/<int:resource_id>')
@login_required
def detail(resource_id):
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id and not current_user.is_admin:
abort(403)
return render_template('user/detail.html', resource=res)
@resources_bp.route('/<int:resource_id>/edit', methods=['GET', 'POST'])
@login_required
def edit(resource_id):
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id and not current_user.is_admin:
abort(403)
form = EditForm(obj=res)
folder_choices = get_folder_choices(current_user.id)
if form.validate_on_submit():
res.title = form.title.data
res.description = form.description.data
res.tags = form.tags.data
res.is_public = form.is_public.data
# 更新文件夹
folder_id_raw = request.form.get('folder_id', '').strip()
if folder_id_raw == '':
res.folder_id = None
elif folder_id_raw.isdigit():
fid = int(folder_id_raw)
folder = Folder.query.filter_by(id=fid, user_id=current_user.id).first()
if folder:
res.folder_id = fid
db.session.commit()
flash('资源信息已更新', 'success')
return redirect(url_for('resources.detail', resource_id=res.id))
return render_template('user/edit.html', form=form, resource=res,
folder_choices=folder_choices)
@resources_bp.route('/<int:resource_id>/move', methods=['POST'])
@login_required
def move_resource(resource_id):
"""移动资源到指定文件夹(JSON 接口)"""
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id:
abort(403)
folder_id_raw = request.form.get('folder_id', '').strip()
if folder_id_raw == '':
res.folder_id = None
elif folder_id_raw.isdigit():
fid = int(folder_id_raw)
folder = Folder.query.filter_by(id=fid, user_id=current_user.id).first()
if folder is None:
abort(403)
res.folder_id = fid
else:
return jsonify(ok=False, msg='无效的文件夹 ID'), 400
db.session.commit()
return jsonify(ok=True, folder_id=res.folder_id)
@resources_bp.route('/<int:resource_id>/delete', methods=['POST'])
@login_required
def delete(resource_id):
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id and not current_user.is_admin:
abort(403)
delete_resource_file(res.file_path)
db.session.delete(res)
db.session.commit()
flash('资源已删除', 'success')
return redirect(url_for('resources.list_resources'))
@resources_bp.route('/<int:resource_id>/download')
@login_required
def download(resource_id):
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id and not current_user.is_admin:
abort(403)
if not res.file_path:
flash('文件尚未就绪', 'warning')
return redirect(url_for('resources.detail', resource_id=res.id))
abs_path = os.path.join(current_app.root_path, 'static', res.file_path)
if not os.path.exists(abs_path):
flash('文件不存在', 'danger')
return redirect(url_for('resources.detail', resource_id=res.id))
return send_file(abs_path, as_attachment=True,
download_name=res.original_name or res.filename)
@resources_bp.route('/<int:resource_id>/progress')
@login_required
def progress(resource_id):
"""返回下载进度 JSON(轮询接口)"""
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id and not current_user.is_admin:
abort(403)
return jsonify({
'status': res.download_status,
'progress': res.download_progress,
'error': res.download_error,
'file_ready': res.file_path is not None and res.download_status == 'done'
})
@resources_bp.route('/<int:resource_id>/preview')
@login_required
def preview(resource_id):
"""直接返回文件内容(用于文本/Word 文档预览)"""
res = db.get_or_404(Resource, resource_id)
if res.user_id != current_user.id and not current_user.is_admin:
abort(403)
if not res.file_path or res.resource_type != 'text':
abort(404)
abs_path = os.path.join(current_app.root_path, 'static', res.file_path)
if not os.path.exists(abs_path):
abort(404)
ext = (res.original_name or res.filename or '').rsplit('.', 1)[-1].lower()
mime = (res.mime_type or '').lower()
DOCX_MIME = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
DOC_MIME = 'application/msword'
# 部分来源(如 URL 下载)文件名没有扩展名,需用 MIME 与文件头兜底判断
is_docx = ext == 'docx' or mime == DOCX_MIME
is_doc = ext == 'doc' or mime == DOC_MIME
if not (is_docx or is_doc):
try:
with open(abs_path, 'rb') as f:
head = f.read(8)
# docx/xlsx/pptx 实际上是 zipPK\x03\x04
if head.startswith(b'PK\x03\x04') and (
'wordprocessingml' in mime or 'officedocument' in mime
):
is_docx = True
# 旧版 .doc 二进制头:D0 CF 11 E0 A1 B1 1A E1
elif head.startswith(b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1') and \
('msword' in mime or mime == ''):
is_doc = True
except Exception:
pass
# Word 文档(.docx)使用 mammoth 转换为 HTML
if is_docx:
try:
import mammoth
with open(abs_path, 'rb') as f:
result = mammoth.convert_to_html(f)
return Response(result.value, mimetype='text/html; charset=utf-8')
except ImportError:
return Response(
'<div style="padding:1rem;color:#dc3545">'
'服务器未安装 mammoth 库,无法预览 .docx 文件。请运行 '
'<code>pip install mammoth</code> 后重试。</div>',
mimetype='text/html; charset=utf-8'
)
except Exception as e:
return Response(
f'<div style="padding:1rem;color:#dc3545">无法解析 Word 文档:{e}</div>',
mimetype='text/html; charset=utf-8'
)
# 旧版 .doc 二进制格式不支持在线预览
if is_doc:
return Response(
'<div style="padding:1rem;color:#6c757d">'
'不支持在线预览旧版 .doc 文件,请下载后查看,或将文件转换为 .docx 格式。</div>',
mimetype='text/html; charset=utf-8'
)
try:
with open(abs_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
except Exception:
abort(500)
return Response(content, mimetype='text/plain; charset=utf-8')