阿杰,我理解你想找 全集网影片下载 的方法,但这里我不能提供任何涉及盗版影视资源、破解下载或侵犯版权的内容 ⚠️。
不过我可以帮你几个方向:
✅ 合法观看/下载影片的方法
- 正版流媒体平台
- Netflix
- Disney+
- Amazon Prime Video
- Apple TV+
- HBO Max
- 在国内可以用 腾讯视频、爱奇艺、优酷、哔哩哔哩 等
📌 大部分支持 离线缓存/下载功能,方便在手机或电脑上离线观看。
- 数字影片购买/租赁
- Google TV / YouTube Movies
- Apple iTunes Store
- Microsoft Store
- Vudu
- 支持单部影片购买或租赁,并且是高清正版。
- 公共版权/合法资源站
- Internet Archive (大量公共领域影片、经典老片免费下载)
- Public Domain Torrents (合法公版电影)
📌 如果你是想 学习怎么抓取网页视频资源(技术层面,比如爬虫、流媒体解析、m3u8 下载等),我可以帮你写一些 技术教程/工具用法,但不会涉及盗版片源。
好 — 我给你一份 学习用途 的、功能较完整的 Python m3u8(HLS)下载脚本示例,可以下载 .m3u8
清单里的分片(.ts
等)、支持并发下载、自动合并、并能处理常见的 AES-128 加密片段(如果播放端用的是明文 key URI)。重要提醒:只能用于你有权下载或公共/测试的流;不要用来下载受版权保护且无权获取的影片。
下面是脚本、依赖与使用说明。
功能要点
- 下载
.m3u8
(支持相对/绝对 URL) - 并发下载分片(默认线程数可配置)
- 支持 AES-128/CBC 解密(当 m3u8 指定了 key URI 且 key 可直接获取时)
- 合并分片为单个
.ts
文件,提供用ffmpeg
转为.mp4
的建议 - 支持断点续传(已下载片段不会重新下载)
依赖
pip install requests tqdm cryptography
ffmpeg
如果要把合并后的 .ts
转为 .mp4
,请在系统安装 ffmpeg
并确保可在命令行调用。
脚本(m3u8_downloader.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
m3u8_downloader.py
用于学习/测试的 m3u8(HLS) 下载工具(仅用于合法内容)
"""
import os
import re
import requests
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# ---------- 配置 ----------
CHUNK_DIR = "m3u8_chunks"
MAX_WORKERS = 8
TIMEOUT = 15
HEADERS = {
"User-Agent": "m3u8-downloader/1.0"
}
# -----------------------
session = requests.Session()
session.headers.update(HEADERS)
def fetch_text(url):
r = session.get(url, timeout=TIMEOUT)
r.raise_for_status()
return r.text
def parse_m3u8(base_url, content):
"""
返回:(segment_urls_list, key_info_or_None)
key_info = {"method": "AES-128", "uri": key_url, "iv": iv_bytes_or_None}
"""
lines = [line.strip() for line in content.splitlines() if line.strip() and not line.startswith("#EXT-X-KEY:") and not line.startswith("#EXTM3U")]
# But we must handle EXT-X-KEY separately
segs = []
key_info = None
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#EXTM3U"):
continue
if line.startswith("#EXT-X-KEY"):
# 解析 key 行
# 格式类似: #EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key",IV=0x1234...
attrs = {}
m = re.match(r'#EXT-X-KEY:(.+)', line)
if m:
for pair in m.group(1).split(","):
if '=' in pair:
k, v = pair.split("=", 1)
attrs[k.strip()] = v.strip().strip('"')
method = attrs.get("METHOD")
uri = attrs.get("URI")
iv = attrs.get("IV")
iv_bytes = None
if iv:
# IV 可能是 0x... 格式
iv_hex = iv.lower().replace("0x", "")
iv_bytes = bytes.fromhex(iv_hex)
if uri:
key_url = urljoin(base_url, uri)
key_info = {"method": method, "uri": key_url, "iv": iv_bytes}
elif line.startswith("#"):
continue
else:
# 片段行
seg_url = urljoin(base_url, line)
segs.append(seg_url)
return segs, key_info
def download_key(key_url):
r = session.get(key_url, timeout=TIMEOUT)
r.raise_for_status()
return r.content # key bytes
def decrypt_aes128(ciphertext, key, iv):
# AES-128-CBC, iv length must be 16 bytes
if iv is None:
iv = b'\x00' * 16
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
def download_segment(url, out_path, key=None, iv=None):
# 如果已经存在则跳过(断点续传)
if os.path.exists(out_path) and os.path.getsize(out_path) > 0:
return True
try:
r = session.get(url, timeout=TIMEOUT, stream=True)
r.raise_for_status()
data = r.content
if key and (len(key) in (16, 24, 32)): # key size likely 16 for AES-128
# 解密
data = decrypt_aes128(data, key, iv)
with open(out_path, "wb") as f:
f.write(data)
return True
except Exception as e:
# 可选:打印到日志
# print(f"下载失败: {url} -> {e}")
return False
def merge_segments(seg_files, output_file):
# 直接按顺序拼接二进制(适用于 MPEG-TS)
with open(output_file, "wb") as wf:
for f in seg_files:
with open(f, "rb") as rf:
wf.write(rf.read())
def ensure_dir(path):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def main(m3u8_url, output_name=None, workers=MAX_WORKERS):
print("注意:仅用于你有权下载的流(例如测试流、公共域内容或你自己的流)。")
parsed = urlparse(m3u8_url)
base_url = m3u8_url.rsplit("/", 1)[0] + "/"
print(f"获取 m3u8:{m3u8_url}")
content = fetch_text(m3u8_url)
segs, key_info = parse_m3u8(base_url, content)
if not segs:
print("未检测到分片,可能不是标准的 .m3u8 或是加密/变体清单。")
return
print(f"分片数量: {len(segs)}")
ensure_dir(CHUNK_DIR)
key_bytes = None
if key_info:
print(f"检测到加密信息: {key_info}")
if key_info.get("method", "").upper() == "AES-128" and key_info.get("uri"):
try:
key_bytes = download_key(key_info["uri"])
print("已获取解密 key({len} 字节)".format(len=len(key_bytes)))
except Exception as e:
print("获取解密 key 失败:", e)
# 若无法获取 key,则无法解密,继续会下载加密分片
else:
print("检测到不支持的加密方法或 key URI,下载将保存原始分片。")
# 生成分片文件名并并发下载
seg_files = []
tasks = []
for idx, seg_url in enumerate(segs):
fname = f"seg_{idx:05d}" + os.path.splitext(seg_url)[1] if os.path.splitext(seg_url)[1] else f"seg_{idx:05d}.ts"
out_path = os.path.join(CHUNK_DIR, fname)
seg_files.append(out_path)
tasks.append((seg_url, out_path))
success_count = 0
failures = []
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = {ex.submit(download_segment, url, path, key_bytes, key_info.get("iv") if key_info else None): (url, path) for url, path in tasks}
for fut in tqdm(as_completed(futures), total=len(futures), desc="下载分片"):
url, path = futures[fut]
ok = fut.result()
if ok:
success_count += 1
else:
failures.append((url, path))
print(f"下载完成:成功 {success_count} / {len(segs)},失败 {len(failures)}")
if failures:
print("失败分片示例(若重试,请单独下载这些 URL):")
for u, p in failures[:5]:
print(" -", u)
# 合并
out_ts = output_name if output_name else "output.ts"
print("合并分片为:", out_ts)
merge_segments(seg_files, out_ts)
print("合并完成。")
print("\n提示:若需要把 output.ts 转为 mp4(更通用),可用 ffmpeg:")
print("ffmpeg -i output.ts -c copy output.mp4")
print("或如需转码:ffmpeg -i output.ts -c:v libx264 -c:a aac -strict experimental output.mp4")
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="简单的 m3u8 下载脚本(仅用于合法用途)")
ap.add_argument("m3u8_url", help="m3u8 文件 URL")
ap.add_argument("-o", "--output", help="输出合并文件名 (默认 output.ts)", default="output.ts")
ap.add_argument("-w", "--workers", type=int, help="并发下载线程数", default=8)
args = ap.parse_args()
main(args.m3u8_url, args.output, args.workers)
使用示例
- 下载并合并:
python m3u8_downloader.py "https://example.com/path/playlist.m3u8" -o movie.ts -w 12
- 合并后用 ffmpeg 转 mp4(无需解码,仅复制流):
ffmpeg -i movie.ts -c copy movie.mp4
若需要转码以兼容更多设备:
ffmpeg -i movie.ts -c:v libx264 -c:a aac movie.mp4
说明与注意事项(重要)
- 合法性:务必只针对你有权下载的流(如你自己的直播/测试流、公共域视频或版权方允许的内容)。我不能协助用于侵犯版权的行为。
- m3u8 复杂情况:
- 有些 m3u8 是主清单(master playlist),需要先选择具体 quality/resolution 的子清单(script 未处理 master 自动选择,实际使用时需抓取 master 并选取其中一个
EXT-X-STREAM-INF
指向的 playlist)。 - 有些分片用的是加密且 key 受保护(需要认证、token、或通过 DRM),脚本无法绕过受保护的 key。
- 某些流使用 fMP4 或特殊分片格式,合并方式可能不同。
- 有些 m3u8 是主清单(master playlist),需要先选择具体 quality/resolution 的子清单(script 未处理 master 自动选择,实际使用时需抓取 master 并选取其中一个
- 稳定性:生产级工具(支持复杂 master、retry、限速、代理、cookie、鉴权、断点续传索引化等)更复杂,推荐成熟工具(例如
ffmpeg
直接支持一些 m3u8,或streamlink
、annie
等开源项目)做为参考。 - 并发与带宽:并发过高会触发服务端限流或被封,建议适度设置
workers
,并尊重目标服务的使用条款。 - 测试建议:先用公开测试 m3u8(例如一些示例或你自己搭的测试服务器)验证脚本行为,再用于其它来源。
好,阿杰 ✅
下面我给你写一个 完整的 Python 脚本,可以从 m3u8 视频流下载并合并为一个 .mp4
文件。这个代码主要是学习 网络请求、多线程下载、文件合并 的用法,不涉及任何盗版片源。
import os
import requests
import m3u8
from concurrent.futures import ThreadPoolExecutor, as_completed
# 下载单个分片
def download_ts(url, filepath):
try:
r = requests.get(url, timeout=10, stream=True)
if r.status_code == 200:
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
return filepath
except Exception as e:
print(f"下载失败: {url}, 错误: {e}")
return None
# 合并 ts 文件为 mp4
def merge_ts(ts_files, output_path="output.mp4"):
with open(output_path, "wb") as f:
for ts in ts_files:
with open(ts, "rb") as part:
f.write(part.read())
print(f"✅ 合并完成: {output_path}")
# 主函数
def download_m3u8(m3u8_url, save_dir="downloads", max_workers=8):
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 解析 m3u8 文件
playlist = m3u8.load(m3u8_url)
base_uri = playlist.base_uri
segments = playlist.segments
ts_files = []
tasks = []
print(f"共发现 {len(segments)} 个分片,开始下载...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i, seg in enumerate(segments):
ts_url = seg.uri if seg.uri.startswith("http") else base_uri + seg.uri
ts_path = os.path.join(save_dir, f"{i}.ts")
ts_files.append(ts_path)
tasks.append(executor.submit(download_ts, ts_url, ts_path))
for future in as_completed(tasks):
if future.result():
print(f"完成: {future.result()}")
# 合并所有分片
merge_ts(ts_files, "output.mp4")
# 清理 ts 文件(可选)
for ts in ts_files:
os.remove(ts)
print("✅ 所有临时文件已删除")
# 示例调用
if __name__ == "__main__":
m3u8_url = "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8" # 这里换成你的 m3u8 链接
download_m3u8(m3u8_url)
🔑 使用步骤
- 安装依赖:
pip install requests m3u8
- 替换
m3u8_url
为你的 m3u8 视频地址 - 运行脚本,最终生成
output.mp4
好嘞,阿杰 ✅
我给你补充一个 支持 AES-128 加密解密的 m3u8 下载器。很多视频分片(.ts
)是用 AES-128 CBC 模式加密的,必须先解密才能合并出正确的 .mp4
。
下面是完整的 Python 脚本:
import os
import requests
import m3u8
from Crypto.Cipher import AES
from concurrent.futures import ThreadPoolExecutor, as_completed
# 下载并解密分片
def download_and_decrypt_ts(url, filepath, key, iv=None):
try:
r = requests.get(url, timeout=10, stream=True)
if r.status_code == 200:
content = r.content
# 如果有 key,说明需要解密
if key:
cipher = AES.new(key, AES.MODE_CBC, iv if iv else key)
content = cipher.decrypt(content)
with open(filepath, 'wb') as f:
f.write(content)
return filepath
except Exception as e:
print(f"下载失败: {url}, 错误: {e}")
return None
# 合并 ts 文件为 mp4
def merge_ts(ts_files, output_path="output.mp4"):
with open(output_path, "wb") as f:
for ts in ts_files:
with open(ts, "rb") as part:
f.write(part.read())
print(f"✅ 合并完成: {output_path}")
# 主函数
def download_m3u8(m3u8_url, save_dir="downloads", max_workers=8):
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 解析 m3u8 文件
playlist = m3u8.load(m3u8_url)
base_uri = playlist.base_uri
segments = playlist.segments
# 检查是否有加密
key = None
iv = None
if playlist.keys and playlist.keys[0]:
key_info = playlist.keys[0]
if key_info.uri:
key_url = key_info.uri if key_info.uri.startswith("http") else base_uri + key_info.uri
print(f"🔑 发现加密,Key URL: {key_url}")
key = requests.get(key_url).content
iv = bytes.fromhex(key_info.iv.replace("0x", "")) if key_info.iv else None
ts_files = []
tasks = []
print(f"共发现 {len(segments)} 个分片,开始下载...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i, seg in enumerate(segments):
ts_url = seg.uri if seg.uri.startswith("http") else base_uri + seg.uri
ts_path = os.path.join(save_dir, f"{i}.ts")
ts_files.append(ts_path)
tasks.append(executor.submit(download_and_decrypt_ts, ts_url, ts_path, key, iv))
for future in as_completed(tasks):
if future.result():
print(f"完成: {future.result()}")
# 合并所有分片
merge_ts(ts_files, "output.mp4")
# 清理 ts 文件(可选)
for ts in ts_files:
os.remove(ts)
print("✅ 所有临时文件已删除")
# 示例调用
if __name__ == "__main__":
m3u8_url = "https://test-streams.mux.dev/drm-playlist-test/playlist.m3u8" # 换成你的 m3u8 链接
download_m3u8(m3u8_url)
🔑 功能说明
- 支持普通 m3u8(无加密,直接下载)
- 支持 AES-128 CBC 解密(自动下载 key,解密
.ts
文件) - 多线程下载(默认 8 个线程,加快速度)
- 自动合并为
output.mp4
- 下载完成后可选择 清理临时
.ts
文件
📌 依赖安装:
pip install requests m3u8 pycryptodome
发表回复