Files
ai-app-database/app/utils/downloader.py
T
huty f103148ebf feat: 初始化个人资料库 Web 应用
基于 Flask + MySQL + Bootstrap 5 的全栈个人资料库管理系统。

主要功能:
- 管理员/普通用户双角色权限体系,全站登录保护
- 资源管理:文本、图片、音频、视频四类资源
- 三种添加方式:本地上传(拖拽)、URL 后台下载、磁力下载(aria2c)
- 在线预览:文本、图片、HTML5 音视频播放器
- 安全:bcrypt 加盐密码哈希、CSRF 防护、SQLAlchemy ORM 防注入

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 00:16:59 +09:00

195 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
后台下载模块:支持 HTTP URL 下载和磁力链接下载(需要安装 aria2c)
"""
import os
import uuid
import mimetypes
import threading
import subprocess
import requests
from urllib.parse import urlparse, unquote
from flask import current_app
from app.extensions import db
from app.utils.file_handler import guess_resource_type
def _get_filename_from_url(url):
"""从 URL 猜测文件名"""
path = urlparse(url).path
name = unquote(os.path.basename(path))
return name if name else 'download'
def _update_resource(app, resource_id, **kwargs):
"""在 app context 内更新 Resource 记录"""
with app.app_context():
from app.models.resource import Resource
res = db.session.get(Resource, resource_id)
if res:
for k, v in kwargs.items():
setattr(res, k, v)
db.session.commit()
def download_url(app, resource_id, url, resource_type, user_id):
"""后台线程:HTTP 下载"""
try:
_update_resource(app, resource_id,
download_status='downloading', download_progress=0)
headers = {'User-Agent': 'Mozilla/5.0 (compatible; ResourceLibrary/1.0)'}
response = requests.get(url, stream=True, timeout=60, headers=headers)
response.raise_for_status()
# 确定文件名和类型
orig_name = _get_filename_from_url(url)
content_type = response.headers.get('content-type', '').split(';')[0].strip()
ext = mimetypes.guess_extension(content_type) or \
('.' + orig_name.rsplit('.', 1)[-1] if '.' in orig_name else '')
ext = ext.lstrip('.')
if not resource_type:
resource_type = guess_resource_type(orig_name, content_type) or 'text'
unique_name = f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex
save_dir = os.path.join(app.config['UPLOAD_FOLDER'], resource_type)
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, unique_name)
total = int(response.headers.get('content-length', 0))
downloaded = 0
chunk_size = 65536 # 64KB
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total > 0:
progress = int(downloaded * 100 / total)
_update_resource(app, resource_id,
download_progress=progress)
file_size = os.path.getsize(save_path)
rel_path = f"uploads/{resource_type}/{unique_name}"
_update_resource(app, resource_id,
download_status='done',
download_progress=100,
filename=unique_name,
original_name=orig_name,
file_path=rel_path,
file_size=file_size,
mime_type=content_type,
resource_type=resource_type)
except Exception as e:
_update_resource(app, resource_id,
download_status='failed',
download_error=str(e)[:500])
def start_url_download(app, resource_id, url, resource_type, user_id):
"""启动 URL 下载线程"""
t = threading.Thread(
target=download_url,
args=(app, resource_id, url, resource_type, user_id),
daemon=True
)
t.start()
def start_magnet_download(app, resource_id, magnet_uri, resource_type):
"""启动磁力下载线程(需要 aria2c"""
t = threading.Thread(
target=_magnet_download_worker,
args=(app, resource_id, magnet_uri, resource_type),
daemon=True
)
t.start()
def _magnet_download_worker(app, resource_id, magnet_uri, resource_type):
"""使用 aria2c 下载磁力链接"""
if not resource_type:
resource_type = 'video' # 磁力下载默认视频
save_dir = os.path.join(app.config['UPLOAD_FOLDER'], resource_type)
os.makedirs(save_dir, exist_ok=True)
try:
_update_resource(app, resource_id,
download_status='downloading', download_progress=0)
# 检查 aria2c 是否可用
result = subprocess.run(
['aria2c', '--version'],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
raise RuntimeError('aria2c 未安装或不可用,请先安装 aria2c')
cmd = [
'aria2c',
'--dir', save_dir,
'--seed-time=0',
'--max-connection-per-server=4',
'--console-log-level=warn',
magnet_uri
]
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, encoding='utf-8', errors='replace'
)
# 实时读取输出更新进度(简单估算)
progress = 10
for line in proc.stdout:
line = line.strip()
if '[' in line and '%' in line:
try:
pct_str = line.split('%')[0].split()[-1]
progress = int(float(pct_str))
_update_resource(app, resource_id,
download_progress=min(progress, 99))
except (ValueError, IndexError):
pass
proc.wait()
if proc.returncode == 0:
# 查找下载的文件
files = []
for root, dirs, fnames in os.walk(save_dir):
for fname in fnames:
fpath = os.path.join(root, fname)
files.append((fpath, os.path.getmtime(fpath)))
if files:
latest = max(files, key=lambda x: x[1])[0]
orig_name = os.path.basename(latest)
rel_path = os.path.relpath(
latest,
os.path.join(app.root_path, 'static')
).replace('\\', '/')
_update_resource(app, resource_id,
download_status='done',
download_progress=100,
filename=orig_name,
original_name=orig_name,
file_path=rel_path,
file_size=os.path.getsize(latest))
else:
raise RuntimeError('下载完成但未找到文件')
else:
raise RuntimeError(f'aria2c 退出码: {proc.returncode}')
except FileNotFoundError:
_update_resource(app, resource_id,
download_status='failed',
download_error='aria2c 未安装,请安装后重试')
except Exception as e:
_update_resource(app, resource_id,
download_status='failed',
download_error=str(e)[:500])