阿杰,我理解你想找 全集网影片下载 的方法,但这里我不能提供任何涉及盗版影视资源、破解下载或侵犯版权的内容 ⚠️。

不过我可以帮你几个方向:

✅ 合法观看/下载影片的方法

  1. 正版流媒体平台
    • Netflix
    • Disney+
    • Amazon Prime Video
    • Apple TV+
    • HBO Max
    • 在国内可以用 腾讯视频、爱奇艺、优酷、哔哩哔哩 等
      📌 大部分支持 离线缓存/下载功能,方便在手机或电脑上离线观看。
  2. 数字影片购买/租赁
    • Google TV / YouTube Movies
    • Apple iTunes Store
    • Microsoft Store
    • Vudu
    • 支持单部影片购买或租赁,并且是高清正版。
  3. 公共版权/合法资源站

📌 如果你是想 学习怎么抓取网页视频资源(技术层面,比如爬虫、流媒体解析、m3u8 下载等),我可以帮你写一些 技术教程/工具用法,但不会涉及盗版片源。

好 — 我给你一份 学习用途 的、功能较完整的 Python m3u8(HLS)下载脚本示例,可以下载 .m3u8 清单里的分片(.ts 等)、支持并发下载、自动合并、并能处理常见的 AES-128 加密片段(如果播放端用的是明文 key URI)。重要提醒:只能用于你有权下载或公共/测试的流;不要用来下载受版权保护且无权获取的影片。

下面是脚本、依赖与使用说明。


功能要点

  • 下载 .m3u8(支持相对/绝对 URL)
  • 并发下载分片(默认线程数可配置)
  • 支持 AES-128/CBC 解密(当 m3u8 指定了 key URI 且 key 可直接获取时)
  • 合并分片为单个 .ts 文件,提供用 ffmpeg 转为 .mp4 的建议
  • 支持断点续传(已下载片段不会重新下载)

依赖

pip install requests tqdm cryptography

ffmpeg 如果要把合并后的 .ts 转为 .mp4,请在系统安装 ffmpeg 并确保可在命令行调用。


脚本(m3u8_downloader.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
m3u8_downloader.py
用于学习/测试的 m3u8(HLS) 下载工具(仅用于合法内容)
"""

import os
import re
import requests
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

# ---------- 配置 ----------
CHUNK_DIR = "m3u8_chunks"
MAX_WORKERS = 8
TIMEOUT = 15
HEADERS = {
    "User-Agent": "m3u8-downloader/1.0"
}
# -----------------------

session = requests.Session()
session.headers.update(HEADERS)


def fetch_text(url):
    r = session.get(url, timeout=TIMEOUT)
    r.raise_for_status()
    return r.text


def parse_m3u8(base_url, content):
    """
    返回:(segment_urls_list, key_info_or_None)
    key_info = {"method": "AES-128", "uri": key_url, "iv": iv_bytes_or_None}
    """
    lines = [line.strip() for line in content.splitlines() if line.strip() and not line.startswith("#EXT-X-KEY:") and not line.startswith("#EXTM3U")]
    # But we must handle EXT-X-KEY separately
    segs = []
    key_info = None

    for line in content.splitlines():
        line = line.strip()
        if not line or line.startswith("#EXTM3U"):
            continue
        if line.startswith("#EXT-X-KEY"):
            # 解析 key 行
            # 格式类似: #EXT-X-KEY:METHOD=AES-128,URI="https://example.com/key",IV=0x1234...
            attrs = {}
            m = re.match(r'#EXT-X-KEY:(.+)', line)
            if m:
                for pair in m.group(1).split(","):
                    if '=' in pair:
                        k, v = pair.split("=", 1)
                        attrs[k.strip()] = v.strip().strip('"')
                method = attrs.get("METHOD")
                uri = attrs.get("URI")
                iv = attrs.get("IV")
                iv_bytes = None
                if iv:
                    # IV 可能是 0x... 格式
                    iv_hex = iv.lower().replace("0x", "")
                    iv_bytes = bytes.fromhex(iv_hex)
                if uri:
                    key_url = urljoin(base_url, uri)
                    key_info = {"method": method, "uri": key_url, "iv": iv_bytes}
        elif line.startswith("#"):
            continue
        else:
            # 片段行
            seg_url = urljoin(base_url, line)
            segs.append(seg_url)

    return segs, key_info


def download_key(key_url):
    r = session.get(key_url, timeout=TIMEOUT)
    r.raise_for_status()
    return r.content  # key bytes


def decrypt_aes128(ciphertext, key, iv):
    # AES-128-CBC, iv length must be 16 bytes
    if iv is None:
        iv = b'\x00' * 16
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
    decryptor = cipher.decryptor()
    return decryptor.update(ciphertext) + decryptor.finalize()


def download_segment(url, out_path, key=None, iv=None):
    # 如果已经存在则跳过(断点续传)
    if os.path.exists(out_path) and os.path.getsize(out_path) > 0:
        return True
    try:
        r = session.get(url, timeout=TIMEOUT, stream=True)
        r.raise_for_status()
        data = r.content
        if key and (len(key) in (16, 24, 32)):  # key size likely 16 for AES-128
            # 解密
            data = decrypt_aes128(data, key, iv)
        with open(out_path, "wb") as f:
            f.write(data)
        return True
    except Exception as e:
        # 可选:打印到日志
        # print(f"下载失败: {url} -> {e}")
        return False


def merge_segments(seg_files, output_file):
    # 直接按顺序拼接二进制(适用于 MPEG-TS)
    with open(output_file, "wb") as wf:
        for f in seg_files:
            with open(f, "rb") as rf:
                wf.write(rf.read())


def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)


def main(m3u8_url, output_name=None, workers=MAX_WORKERS):
    print("注意:仅用于你有权下载的流(例如测试流、公共域内容或你自己的流)。")
    parsed = urlparse(m3u8_url)
    base_url = m3u8_url.rsplit("/", 1)[0] + "/"

    print(f"获取 m3u8:{m3u8_url}")
    content = fetch_text(m3u8_url)
    segs, key_info = parse_m3u8(base_url, content)
    if not segs:
        print("未检测到分片,可能不是标准的 .m3u8 或是加密/变体清单。")
        return

    print(f"分片数量: {len(segs)}")
    ensure_dir(CHUNK_DIR)

    key_bytes = None
    if key_info:
        print(f"检测到加密信息: {key_info}")
        if key_info.get("method", "").upper() == "AES-128" and key_info.get("uri"):
            try:
                key_bytes = download_key(key_info["uri"])
                print("已获取解密 key({len} 字节)".format(len=len(key_bytes)))
            except Exception as e:
                print("获取解密 key 失败:", e)
                # 若无法获取 key,则无法解密,继续会下载加密分片
        else:
            print("检测到不支持的加密方法或 key URI,下载将保存原始分片。")

    # 生成分片文件名并并发下载
    seg_files = []
    tasks = []
    for idx, seg_url in enumerate(segs):
        fname = f"seg_{idx:05d}" + os.path.splitext(seg_url)[1] if os.path.splitext(seg_url)[1] else f"seg_{idx:05d}.ts"
        out_path = os.path.join(CHUNK_DIR, fname)
        seg_files.append(out_path)
        tasks.append((seg_url, out_path))

    success_count = 0
    failures = []

    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = {ex.submit(download_segment, url, path, key_bytes, key_info.get("iv") if key_info else None): (url, path) for url, path in tasks}
        for fut in tqdm(as_completed(futures), total=len(futures), desc="下载分片"):
            url, path = futures[fut]
            ok = fut.result()
            if ok:
                success_count += 1
            else:
                failures.append((url, path))

    print(f"下载完成:成功 {success_count} / {len(segs)},失败 {len(failures)}")
    if failures:
        print("失败分片示例(若重试,请单独下载这些 URL):")
        for u, p in failures[:5]:
            print(" -", u)

    # 合并
    out_ts = output_name if output_name else "output.ts"
    print("合并分片为:", out_ts)
    merge_segments(seg_files, out_ts)
    print("合并完成。")

    print("\n提示:若需要把 output.ts 转为 mp4(更通用),可用 ffmpeg:")
    print("ffmpeg -i output.ts -c copy output.mp4")
    print("或如需转码:ffmpeg -i output.ts -c:v libx264 -c:a aac -strict experimental output.mp4")

if __name__ == "__main__":
    import argparse
    ap = argparse.ArgumentParser(description="简单的 m3u8 下载脚本(仅用于合法用途)")
    ap.add_argument("m3u8_url", help="m3u8 文件 URL")
    ap.add_argument("-o", "--output", help="输出合并文件名 (默认 output.ts)", default="output.ts")
    ap.add_argument("-w", "--workers", type=int, help="并发下载线程数", default=8)
    args = ap.parse_args()
    main(args.m3u8_url, args.output, args.workers)


使用示例

  1. 下载并合并:
python m3u8_downloader.py "https://example.com/path/playlist.m3u8" -o movie.ts -w 12

  1. 合并后用 ffmpeg 转 mp4(无需解码,仅复制流):
ffmpeg -i movie.ts -c copy movie.mp4

若需要转码以兼容更多设备:

ffmpeg -i movie.ts -c:v libx264 -c:a aac movie.mp4


说明与注意事项(重要)

  1. 合法性:务必只针对你有权下载的流(如你自己的直播/测试流、公共域视频或版权方允许的内容)。我不能协助用于侵犯版权的行为。
  2. m3u8 复杂情况
    • 有些 m3u8 是主清单(master playlist),需要先选择具体 quality/resolution 的子清单(script 未处理 master 自动选择,实际使用时需抓取 master 并选取其中一个 EXT-X-STREAM-INF 指向的 playlist)。
    • 有些分片用的是加密且 key 受保护(需要认证、token、或通过 DRM),脚本无法绕过受保护的 key。
    • 某些流使用 fMP4 或特殊分片格式,合并方式可能不同。
  3. 稳定性:生产级工具(支持复杂 master、retry、限速、代理、cookie、鉴权、断点续传索引化等)更复杂,推荐成熟工具(例如 ffmpeg 直接支持一些 m3u8,或 streamlinkannie 等开源项目)做为参考。
  4. 并发与带宽:并发过高会触发服务端限流或被封,建议适度设置 workers,并尊重目标服务的使用条款。
  5. 测试建议:先用公开测试 m3u8(例如一些示例或你自己搭的测试服务器)验证脚本行为,再用于其它来源。

好,阿杰 ✅
下面我给你写一个 完整的 Python 脚本,可以从 m3u8 视频流下载并合并为一个 .mp4 文件。这个代码主要是学习 网络请求、多线程下载、文件合并 的用法,不涉及任何盗版片源。

import os
import requests
import m3u8
from concurrent.futures import ThreadPoolExecutor, as_completed

# 下载单个分片
def download_ts(url, filepath):
    try:
        r = requests.get(url, timeout=10, stream=True)
        if r.status_code == 200:
            with open(filepath, 'wb') as f:
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
        return filepath
    except Exception as e:
        print(f"下载失败: {url}, 错误: {e}")
        return None

# 合并 ts 文件为 mp4
def merge_ts(ts_files, output_path="output.mp4"):
    with open(output_path, "wb") as f:
        for ts in ts_files:
            with open(ts, "rb") as part:
                f.write(part.read())
    print(f"✅ 合并完成: {output_path}")

# 主函数
def download_m3u8(m3u8_url, save_dir="downloads", max_workers=8):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # 解析 m3u8 文件
    playlist = m3u8.load(m3u8_url)
    base_uri = playlist.base_uri
    segments = playlist.segments

    ts_files = []
    tasks = []

    print(f"共发现 {len(segments)} 个分片,开始下载...")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i, seg in enumerate(segments):
            ts_url = seg.uri if seg.uri.startswith("http") else base_uri + seg.uri
            ts_path = os.path.join(save_dir, f"{i}.ts")
            ts_files.append(ts_path)
            tasks.append(executor.submit(download_ts, ts_url, ts_path))

        for future in as_completed(tasks):
            if future.result():
                print(f"完成: {future.result()}")

    # 合并所有分片
    merge_ts(ts_files, "output.mp4")

    # 清理 ts 文件(可选)
    for ts in ts_files:
        os.remove(ts)
    print("✅ 所有临时文件已删除")

# 示例调用
if __name__ == "__main__":
    m3u8_url = "https://test-streams.mux.dev/x36xhzz/x36xhzz.m3u8"  # 这里换成你的 m3u8 链接
    download_m3u8(m3u8_url)


🔑 使用步骤

  1. 安装依赖:pip install requests m3u8
  2. 替换 m3u8_url 为你的 m3u8 视频地址
  3. 运行脚本,最终生成 output.mp4

好嘞,阿杰 ✅
我给你补充一个 支持 AES-128 加密解密的 m3u8 下载器。很多视频分片(.ts)是用 AES-128 CBC 模式加密的,必须先解密才能合并出正确的 .mp4

下面是完整的 Python 脚本:

import os
import requests
import m3u8
from Crypto.Cipher import AES
from concurrent.futures import ThreadPoolExecutor, as_completed

# 下载并解密分片
def download_and_decrypt_ts(url, filepath, key, iv=None):
    try:
        r = requests.get(url, timeout=10, stream=True)
        if r.status_code == 200:
            content = r.content
            # 如果有 key,说明需要解密
            if key:
                cipher = AES.new(key, AES.MODE_CBC, iv if iv else key)
                content = cipher.decrypt(content)
            with open(filepath, 'wb') as f:
                f.write(content)
        return filepath
    except Exception as e:
        print(f"下载失败: {url}, 错误: {e}")
        return None

# 合并 ts 文件为 mp4
def merge_ts(ts_files, output_path="output.mp4"):
    with open(output_path, "wb") as f:
        for ts in ts_files:
            with open(ts, "rb") as part:
                f.write(part.read())
    print(f"✅ 合并完成: {output_path}")

# 主函数
def download_m3u8(m3u8_url, save_dir="downloads", max_workers=8):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # 解析 m3u8 文件
    playlist = m3u8.load(m3u8_url)
    base_uri = playlist.base_uri
    segments = playlist.segments

    # 检查是否有加密
    key = None
    iv = None
    if playlist.keys and playlist.keys[0]:
        key_info = playlist.keys[0]
        if key_info.uri:
            key_url = key_info.uri if key_info.uri.startswith("http") else base_uri + key_info.uri
            print(f"🔑 发现加密,Key URL: {key_url}")
            key = requests.get(key_url).content
            iv = bytes.fromhex(key_info.iv.replace("0x", "")) if key_info.iv else None

    ts_files = []
    tasks = []

    print(f"共发现 {len(segments)} 个分片,开始下载...")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i, seg in enumerate(segments):
            ts_url = seg.uri if seg.uri.startswith("http") else base_uri + seg.uri
            ts_path = os.path.join(save_dir, f"{i}.ts")
            ts_files.append(ts_path)
            tasks.append(executor.submit(download_and_decrypt_ts, ts_url, ts_path, key, iv))

        for future in as_completed(tasks):
            if future.result():
                print(f"完成: {future.result()}")

    # 合并所有分片
    merge_ts(ts_files, "output.mp4")

    # 清理 ts 文件(可选)
    for ts in ts_files:
        os.remove(ts)
    print("✅ 所有临时文件已删除")

# 示例调用
if __name__ == "__main__":
    m3u8_url = "https://test-streams.mux.dev/drm-playlist-test/playlist.m3u8"  # 换成你的 m3u8 链接
    download_m3u8(m3u8_url)


🔑 功能说明

  1. 支持普通 m3u8(无加密,直接下载)
  2. 支持 AES-128 CBC 解密(自动下载 key,解密 .ts 文件)
  3. 多线程下载(默认 8 个线程,加快速度)
  4. 自动合并为 output.mp4
  5. 下载完成后可选择 清理临时 .ts 文件

📌 依赖安装:

pip install requests m3u8 pycryptodome