好的,阿杰 👍 我给你整理一份 MongoDB 变更流(Change Streams)使用说明,并附上 Windows 下快速搭建副本集(5分钟版本) 的步骤,保证你能本地跑通。


一、MongoDB 变更流(Change Streams)简介

变更流可以让你在不使用 oplog 的情况下实时监听 MongoDB 的数据变化(插入、更新、删除等),常用于 实时日志分析、消息队列替代、缓存同步 等场景。

⚠️ 注意:变更流必须基于 副本集 或 分片集群,单机模式无法使用。


1.1 变更流监听示例(C# .NET Core)

using MongoDB.Driver;
using System;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        var client = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0");
        var database = client.GetDatabase("testdb");
        var collection = database.GetCollection<MyData>("mycollection");

        using (var cursor = collection.Watch())
        {
            await foreach (var change in cursor.ToAsyncEnumerable())
            {
                Console.WriteLine($"操作类型: {change.OperationType}");
                if (change.FullDocument != null)
                {
                    Console.WriteLine($"新文档: {change.FullDocument.ToJson()}");
                }
            }
        }
    }
}

public class MyData
{
    public int Id { get; set; }
    public string Name { get; set; }
}

👉 这里 collection.Watch() 就是变更流监听,会在有 Insert / Update / Delete 时实时触发。


1.2 监听整个数据库

using (var cursor = database.Watch())
{
    await foreach (var change in cursor.ToAsyncEnumerable())
    {
        Console.WriteLine($"[DB Change] {change.OperationType}");
    }
}


二、Windows 下快速搭建副本集(5分钟)

假设已安装 MongoDB 8.x(路径示例:C:\mongodb\bin

2.1 新建目录

mkdir C:\mongodb\data1
mkdir C:\mongodb\data2
mkdir C:\mongodb\data3

2.2 启动三个实例

在三个不同的 PowerShell 窗口运行:

C:\mongodb\bin\mongod.exe --port 27017 --dbpath C:\mongodb\data1 --replSet rs0
C:\mongodb\bin\mongod.exe --port 27018 --dbpath C:\mongodb\data2 --replSet rs0
C:\mongodb\bin\mongod.exe --port 27019 --dbpath C:\mongodb\data3 --replSet rs0

2.3 初始化副本集

在另一个窗口执行 mongo shell:

C:\mongodb\bin\mongosh.exe --port 27017

然后:

rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" },
    { _id: 1, host: "localhost:27018" },
    { _id: 2, host: "localhost:27019" }
  ]
})

查看状态:

rs.status()


三、测试变更流

use testdb
db.mycollection.insertOne({ name: "阿杰", age: 25 })
db.mycollection.updateOne({ name: "阿杰" }, { $set: { age: 26 } })
db.mycollection.deleteOne({ name: "阿杰" })

你在 .NET Core 程序里会实时看到对应事件被捕捉。🚀