476 lines
18 KiB
Python
476 lines
18 KiB
Python
import os
|
||
import json
|
||
from flask import (Blueprint, render_template, redirect, url_for, flash,
|
||
request, abort, current_app, send_file, jsonify, Response)
|
||
from flask_login import login_required, current_user
|
||
from flask_wtf import FlaskForm
|
||
from wtforms import (StringField, TextAreaField, SelectField,
|
||
FileField, SubmitField, BooleanField)
|
||
from wtforms.validators import DataRequired, Optional, URL, Length
|
||
|
||
from app.extensions import db
|
||
from app.models.resource import Resource
|
||
from app.models.folder import Folder
|
||
from app.models.setting import SystemSetting
|
||
from app.utils.file_handler import (save_uploaded_file, delete_resource_file,
|
||
allowed_file, guess_resource_type)
|
||
from app.utils.downloader import start_url_download, start_magnet_download
|
||
|
||
resources_bp = Blueprint('resources', __name__)
|
||
|
||
|
||
# ── 表单 ─────────────────────────────────────────────────────────────────────
|
||
|
||
class UploadForm(FlaskForm):
|
||
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
|
||
description = TextAreaField('描述', validators=[Optional()])
|
||
resource_type = SelectField('类型', choices=[
|
||
('', '— 自动识别 —'),
|
||
('text', '文本'),
|
||
('image', '图片'),
|
||
('audio', '音频'),
|
||
('video', '视频'),
|
||
], validators=[Optional()])
|
||
tags = StringField('标签(逗号分隔)', validators=[Optional(), Length(max=512)])
|
||
file = FileField('选择文件', validators=[DataRequired(message='请选择文件')])
|
||
submit = SubmitField('上传')
|
||
|
||
|
||
class UrlDownloadForm(FlaskForm):
|
||
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
|
||
description = TextAreaField('描述', validators=[Optional()])
|
||
source_url = StringField('下载 URL', validators=[DataRequired()])
|
||
resource_type = SelectField('类型', choices=[
|
||
('', '— 自动识别 —'),
|
||
('text', '文本'),
|
||
('image', '图片'),
|
||
('audio', '音频'),
|
||
('video', '视频'),
|
||
], validators=[Optional()])
|
||
tags = StringField('标签', validators=[Optional(), Length(max=512)])
|
||
submit = SubmitField('开始下载')
|
||
|
||
|
||
class MagnetForm(FlaskForm):
|
||
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
|
||
description = TextAreaField('描述', validators=[Optional()])
|
||
magnet_uri = StringField('磁力链接', validators=[DataRequired()])
|
||
resource_type = SelectField('类型', choices=[
|
||
('video', '视频'),
|
||
('audio', '音频'),
|
||
('image', '图片'),
|
||
('text', '文本'),
|
||
])
|
||
tags = StringField('标签', validators=[Optional(), Length(max=512)])
|
||
submit = SubmitField('开始下载')
|
||
|
||
|
||
class EditForm(FlaskForm):
|
||
title = StringField('标题', validators=[DataRequired(), Length(max=255)])
|
||
description = TextAreaField('描述', validators=[Optional()])
|
||
tags = StringField('标签', validators=[Optional(), Length(max=512)])
|
||
is_public = BooleanField('公开资源')
|
||
submit = SubmitField('保存')
|
||
|
||
|
||
# ── 辅助函数 ─────────────────────────────────────────────────────────────────
|
||
|
||
def _check_setting(key):
|
||
return SystemSetting.get(key, 'true') == 'true'
|
||
|
||
|
||
def get_folder_choices(user_id: int) -> list:
|
||
"""生成带层级缩进的文件夹扁平列表,用于表单下拉框"""
|
||
def _walk(folders, depth=0):
|
||
result = []
|
||
for f in sorted(folders, key=lambda x: x.name):
|
||
result.append((str(f.id), '\u3000' * depth + f.name))
|
||
result.extend(_walk(f.children, depth + 1))
|
||
return result
|
||
|
||
roots = (Folder.query
|
||
.filter_by(user_id=user_id, parent_id=None)
|
||
.order_by(Folder.name)
|
||
.all())
|
||
return [('', '— 根目录 —')] + _walk(roots)
|
||
|
||
|
||
def _parse_folder_id(user_id: int) -> int | None:
|
||
"""从 form/args 中解析并验证 folder_id,返回合法 id 或 None"""
|
||
raw = request.form.get('folder_id', '').strip()
|
||
if not raw:
|
||
return None
|
||
if not raw.isdigit():
|
||
return None
|
||
folder_id = int(raw)
|
||
folder = Folder.query.filter_by(id=folder_id, user_id=user_id).first()
|
||
if folder is None:
|
||
abort(403)
|
||
return folder_id
|
||
|
||
|
||
# ── 路由 ─────────────────────────────────────────────────────────────────────
|
||
|
||
@resources_bp.route('/')
|
||
@login_required
|
||
def list_resources():
|
||
page = request.args.get('page', 1, type=int)
|
||
q = request.args.get('q', '')
|
||
rtype = request.args.get('type', '')
|
||
source = request.args.get('source', '')
|
||
folder_id = request.args.get('folder_id', type=int)
|
||
|
||
query = Resource.query.filter_by(user_id=current_user.id)
|
||
|
||
# 文件夹过滤
|
||
current_folder = None
|
||
breadcrumb = []
|
||
if folder_id is not None:
|
||
current_folder = Folder.query.filter_by(
|
||
id=folder_id, user_id=current_user.id
|
||
).first_or_404()
|
||
query = query.filter_by(folder_id=folder_id)
|
||
breadcrumb = current_folder.get_breadcrumb()
|
||
|
||
if q:
|
||
query = query.filter(
|
||
Resource.title.ilike(f'%{q}%') |
|
||
Resource.description.ilike(f'%{q}%') |
|
||
Resource.tags.ilike(f'%{q}%')
|
||
)
|
||
if rtype:
|
||
query = query.filter_by(resource_type=rtype)
|
||
if source:
|
||
query = query.filter_by(source_type=source)
|
||
|
||
pagination = query.order_by(Resource.created_at.desc()).paginate(
|
||
page=page, per_page=24, error_out=False)
|
||
|
||
counts = {}
|
||
base_q = Resource.query.filter_by(user_id=current_user.id)
|
||
for t in ('text', 'image', 'audio', 'video'):
|
||
counts[t] = base_q.filter_by(resource_type=t).count()
|
||
counts['total'] = base_q.count()
|
||
|
||
return render_template('user/resources.html',
|
||
pagination=pagination, q=q,
|
||
rtype=rtype, source=source,
|
||
counts=counts,
|
||
folder_id=folder_id,
|
||
current_folder=current_folder,
|
||
breadcrumb=breadcrumb)
|
||
|
||
|
||
@resources_bp.route('/upload', methods=['GET', 'POST'])
|
||
@login_required
|
||
def upload():
|
||
form = UploadForm()
|
||
folder_choices = get_folder_choices(current_user.id)
|
||
if form.validate_on_submit():
|
||
file_obj = form.file.data
|
||
if not allowed_file(file_obj.filename):
|
||
flash('不支持的文件类型', 'danger')
|
||
return render_template('user/upload.html', form=form, tab='upload',
|
||
folder_choices=folder_choices)
|
||
|
||
rtype = form.resource_type.data or guess_resource_type(file_obj.filename)
|
||
if not rtype:
|
||
flash('无法识别文件类型,请手动选择', 'warning')
|
||
return render_template('user/upload.html', form=form, tab='upload',
|
||
folder_choices=folder_choices)
|
||
|
||
try:
|
||
uname, rel_path, size, mime, orig = save_uploaded_file(file_obj, rtype)
|
||
except Exception as e:
|
||
flash(f'文件保存失败: {e}', 'danger')
|
||
return render_template('user/upload.html', form=form, tab='upload',
|
||
folder_choices=folder_choices)
|
||
|
||
folder_id = _parse_folder_id(current_user.id)
|
||
res = Resource(
|
||
user_id=current_user.id,
|
||
title=form.title.data,
|
||
description=form.description.data,
|
||
resource_type=rtype,
|
||
source_type='upload',
|
||
filename=uname,
|
||
original_name=orig,
|
||
file_path=rel_path,
|
||
file_size=size,
|
||
mime_type=mime,
|
||
tags=form.tags.data,
|
||
download_status='na',
|
||
folder_id=folder_id
|
||
)
|
||
db.session.add(res)
|
||
db.session.commit()
|
||
flash('文件上传成功', 'success')
|
||
return redirect(url_for('resources.detail', resource_id=res.id))
|
||
|
||
return render_template('user/upload.html', form=form, tab='upload',
|
||
folder_choices=folder_choices)
|
||
|
||
|
||
@resources_bp.route('/url-download', methods=['GET', 'POST'])
|
||
@login_required
|
||
def url_download():
|
||
if not _check_setting('enable_url_download'):
|
||
flash('URL 下载功能已关闭', 'warning')
|
||
return redirect(url_for('resources.list_resources'))
|
||
|
||
form = UrlDownloadForm()
|
||
folder_choices = get_folder_choices(current_user.id)
|
||
if form.validate_on_submit():
|
||
rtype = form.resource_type.data or None
|
||
folder_id = _parse_folder_id(current_user.id)
|
||
res = Resource(
|
||
user_id=current_user.id,
|
||
title=form.title.data,
|
||
description=form.description.data,
|
||
resource_type=rtype or 'text',
|
||
source_type='url',
|
||
source_url=form.source_url.data,
|
||
tags=form.tags.data,
|
||
download_status='pending',
|
||
folder_id=folder_id
|
||
)
|
||
db.session.add(res)
|
||
db.session.commit()
|
||
|
||
start_url_download(
|
||
current_app._get_current_object(),
|
||
res.id,
|
||
form.source_url.data,
|
||
rtype,
|
||
current_user.id
|
||
)
|
||
flash('下载任务已启动,请稍后刷新查看进度', 'info')
|
||
return redirect(url_for('resources.detail', resource_id=res.id))
|
||
|
||
return render_template('user/upload.html', form=form, tab='url',
|
||
folder_choices=folder_choices)
|
||
|
||
|
||
@resources_bp.route('/magnet-download', methods=['GET', 'POST'])
|
||
@login_required
|
||
def magnet_download():
|
||
if not _check_setting('enable_magnet'):
|
||
flash('磁力下载功能已关闭', 'warning')
|
||
return redirect(url_for('resources.list_resources'))
|
||
|
||
form = MagnetForm()
|
||
folder_choices = get_folder_choices(current_user.id)
|
||
if form.validate_on_submit():
|
||
if not form.magnet_uri.data.startswith('magnet:'):
|
||
flash('请输入有效的磁力链接(以 magnet: 开头)', 'danger')
|
||
return render_template('user/upload.html', form=form, tab='magnet',
|
||
folder_choices=folder_choices)
|
||
|
||
folder_id = _parse_folder_id(current_user.id)
|
||
res = Resource(
|
||
user_id=current_user.id,
|
||
title=form.title.data,
|
||
description=form.description.data,
|
||
resource_type=form.resource_type.data,
|
||
source_type='magnet',
|
||
source_url=form.magnet_uri.data,
|
||
tags=form.tags.data,
|
||
download_status='pending',
|
||
folder_id=folder_id
|
||
)
|
||
db.session.add(res)
|
||
db.session.commit()
|
||
|
||
start_magnet_download(
|
||
current_app._get_current_object(),
|
||
res.id,
|
||
form.magnet_uri.data,
|
||
form.resource_type.data
|
||
)
|
||
flash('磁力下载任务已启动(需要安装 aria2c)', 'info')
|
||
return redirect(url_for('resources.detail', resource_id=res.id))
|
||
|
||
return render_template('user/upload.html', form=form, tab='magnet',
|
||
folder_choices=folder_choices)
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>')
|
||
@login_required
|
||
def detail(resource_id):
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id and not current_user.is_admin:
|
||
abort(403)
|
||
return render_template('user/detail.html', resource=res)
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>/edit', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit(resource_id):
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id and not current_user.is_admin:
|
||
abort(403)
|
||
form = EditForm(obj=res)
|
||
folder_choices = get_folder_choices(current_user.id)
|
||
if form.validate_on_submit():
|
||
res.title = form.title.data
|
||
res.description = form.description.data
|
||
res.tags = form.tags.data
|
||
res.is_public = form.is_public.data
|
||
# 更新文件夹
|
||
folder_id_raw = request.form.get('folder_id', '').strip()
|
||
if folder_id_raw == '':
|
||
res.folder_id = None
|
||
elif folder_id_raw.isdigit():
|
||
fid = int(folder_id_raw)
|
||
folder = Folder.query.filter_by(id=fid, user_id=current_user.id).first()
|
||
if folder:
|
||
res.folder_id = fid
|
||
db.session.commit()
|
||
flash('资源信息已更新', 'success')
|
||
return redirect(url_for('resources.detail', resource_id=res.id))
|
||
return render_template('user/edit.html', form=form, resource=res,
|
||
folder_choices=folder_choices)
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>/move', methods=['POST'])
|
||
@login_required
|
||
def move_resource(resource_id):
|
||
"""移动资源到指定文件夹(JSON 接口)"""
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id:
|
||
abort(403)
|
||
|
||
folder_id_raw = request.form.get('folder_id', '').strip()
|
||
if folder_id_raw == '':
|
||
res.folder_id = None
|
||
elif folder_id_raw.isdigit():
|
||
fid = int(folder_id_raw)
|
||
folder = Folder.query.filter_by(id=fid, user_id=current_user.id).first()
|
||
if folder is None:
|
||
abort(403)
|
||
res.folder_id = fid
|
||
else:
|
||
return jsonify(ok=False, msg='无效的文件夹 ID'), 400
|
||
|
||
db.session.commit()
|
||
return jsonify(ok=True, folder_id=res.folder_id)
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>/delete', methods=['POST'])
|
||
@login_required
|
||
def delete(resource_id):
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id and not current_user.is_admin:
|
||
abort(403)
|
||
delete_resource_file(res.file_path)
|
||
db.session.delete(res)
|
||
db.session.commit()
|
||
flash('资源已删除', 'success')
|
||
return redirect(url_for('resources.list_resources'))
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>/download')
|
||
@login_required
|
||
def download(resource_id):
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id and not current_user.is_admin:
|
||
abort(403)
|
||
if not res.file_path:
|
||
flash('文件尚未就绪', 'warning')
|
||
return redirect(url_for('resources.detail', resource_id=res.id))
|
||
abs_path = os.path.join(current_app.root_path, 'static', res.file_path)
|
||
if not os.path.exists(abs_path):
|
||
flash('文件不存在', 'danger')
|
||
return redirect(url_for('resources.detail', resource_id=res.id))
|
||
return send_file(abs_path, as_attachment=True,
|
||
download_name=res.original_name or res.filename)
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>/progress')
|
||
@login_required
|
||
def progress(resource_id):
|
||
"""返回下载进度 JSON(轮询接口)"""
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id and not current_user.is_admin:
|
||
abort(403)
|
||
return jsonify({
|
||
'status': res.download_status,
|
||
'progress': res.download_progress,
|
||
'error': res.download_error,
|
||
'file_ready': res.file_path is not None and res.download_status == 'done'
|
||
})
|
||
|
||
|
||
@resources_bp.route('/<int:resource_id>/preview')
|
||
@login_required
|
||
def preview(resource_id):
|
||
"""直接返回文件内容(用于文本/Word 文档预览)"""
|
||
res = db.get_or_404(Resource, resource_id)
|
||
if res.user_id != current_user.id and not current_user.is_admin:
|
||
abort(403)
|
||
if not res.file_path or res.resource_type != 'text':
|
||
abort(404)
|
||
abs_path = os.path.join(current_app.root_path, 'static', res.file_path)
|
||
if not os.path.exists(abs_path):
|
||
abort(404)
|
||
|
||
ext = (res.original_name or res.filename or '').rsplit('.', 1)[-1].lower()
|
||
mime = (res.mime_type or '').lower()
|
||
|
||
DOCX_MIME = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
|
||
DOC_MIME = 'application/msword'
|
||
|
||
# 部分来源(如 URL 下载)文件名没有扩展名,需用 MIME 与文件头兜底判断
|
||
is_docx = ext == 'docx' or mime == DOCX_MIME
|
||
is_doc = ext == 'doc' or mime == DOC_MIME
|
||
if not (is_docx or is_doc):
|
||
try:
|
||
with open(abs_path, 'rb') as f:
|
||
head = f.read(8)
|
||
# docx/xlsx/pptx 实际上是 zip:PK\x03\x04
|
||
if head.startswith(b'PK\x03\x04') and (
|
||
'wordprocessingml' in mime or 'officedocument' in mime
|
||
):
|
||
is_docx = True
|
||
# 旧版 .doc 二进制头:D0 CF 11 E0 A1 B1 1A E1
|
||
elif head.startswith(b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1') and \
|
||
('msword' in mime or mime == ''):
|
||
is_doc = True
|
||
except Exception:
|
||
pass
|
||
|
||
# Word 文档(.docx)使用 mammoth 转换为 HTML
|
||
if is_docx:
|
||
try:
|
||
import mammoth
|
||
with open(abs_path, 'rb') as f:
|
||
result = mammoth.convert_to_html(f)
|
||
return Response(result.value, mimetype='text/html; charset=utf-8')
|
||
except ImportError:
|
||
return Response(
|
||
'<div style="padding:1rem;color:#dc3545">'
|
||
'服务器未安装 mammoth 库,无法预览 .docx 文件。请运行 '
|
||
'<code>pip install mammoth</code> 后重试。</div>',
|
||
mimetype='text/html; charset=utf-8'
|
||
)
|
||
except Exception as e:
|
||
return Response(
|
||
f'<div style="padding:1rem;color:#dc3545">无法解析 Word 文档:{e}</div>',
|
||
mimetype='text/html; charset=utf-8'
|
||
)
|
||
|
||
# 旧版 .doc 二进制格式不支持在线预览
|
||
if is_doc:
|
||
return Response(
|
||
'<div style="padding:1rem;color:#6c757d">'
|
||
'不支持在线预览旧版 .doc 文件,请下载后查看,或将文件转换为 .docx 格式。</div>',
|
||
mimetype='text/html; charset=utf-8'
|
||
)
|
||
|
||
try:
|
||
with open(abs_path, 'r', encoding='utf-8', errors='replace') as f:
|
||
content = f.read()
|
||
except Exception:
|
||
abort(500)
|
||
return Response(content, mimetype='text/plain; charset=utf-8')
|