跳转至

批量操作示例

本文档提供各种批量操作的完整示例,包括批量上传、批量私信、并发控制等高级用法。

目录

前置准备

安装依赖

maturin develop

创建客户端

import x_api

cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)

批量上传图片

上传同一张图片多次,获取多个独立的 media_id

基础批量上传

import asyncio
import x_api
from pathlib import Path

async def batch_upload_basic():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 读取图片
    image_bytes = Path("photo.jpg").read_bytes()

    # 批量上传 10 次
    result = await twitter.upload_image_multiple_times(
        image_bytes=image_bytes,
        media_category="dm_image",
        count=10,
    )

    print(f"📊 上传结果:")
    print(f"   成功: {result.success_count}")
    print(f"   失败: {result.failure_count}")
    print(f"   Media IDs: {result.media_ids}")

asyncio.run(batch_upload_basic())

为不同用途批量上传

import asyncio
import x_api
from pathlib import Path

async def batch_upload_for_different_uses():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    image_bytes = Path("photo.jpg").read_bytes()

    # 为私信准备图片
    dm_uploads = await twitter.upload_image_multiple_times(
        image_bytes, "dm_image", 5
    )
    print(f"私信图片: {dm_uploads.success_count} 个")

    # 为发帖准备图片
    tweet_uploads = await twitter.upload_image_multiple_times(
        image_bytes, "tweet_image", 3
    )
    print(f"发帖图片: {tweet_uploads.success_count} 个")

asyncio.run(batch_upload_for_different_uses())

批量发送私信

向多个用户发送相同内容的私信。

纯文本批量发送

import asyncio
import x_api

async def batch_send_text():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 目标用户列表
    user_ids = [
        "111111111",
        "222222222",
        "333333333",
        "444444444",
        "555555555",
    ]

    message = "感谢关注!这是一条群发消息 🎉"

    print(f"📤 发送私信到 {len(user_ids)} 个用户...")

    result = await twitter.send_batch_direct_messages(
        user_ids=user_ids,
        text=message,
    )

    print(f"\n📊 发送结果:")
    print(f"   成功: {result.success_count}/{len(user_ids)}")
    print(f"   失败: {result.failure_count}/{len(user_ids)}")

    # 统计成功和失败
    success_users = []
    failed_users = []

    for r in result.results:
        if r.success:
            success_users.append(r.user_id)
        else:
            failed_users.append((r.user_id, r.error_msg))

    if success_users:
        print(f"\n✅ 成功用户: {', '.join(success_users)}")

    if failed_users:
        print(f"\n❌ 失败用户:")
        for user_id, error in failed_users:
            print(f"   - {user_id}: {error}")

asyncio.run(batch_send_text())

批量发送带图片私信

每个用户发送相同文案 + 独立的图片。

import asyncio
import x_api
from pathlib import Path

async def batch_send_with_images():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    user_ids = ["111111111", "222222222", "333333333"]
    message = "看看这张图片!📷"

    # 步骤1:批量上传图片
    print("📤 上传图片...")
    image_bytes = Path("photo.jpg").read_bytes()

    upload_result = await twitter.upload_image_multiple_times(
        image_bytes=image_bytes,
        media_category="dm_image",
        count=len(user_ids),
    )

    if upload_result.success_count < len(user_ids):
        print(f"⚠️  只上传成功 {upload_result.success_count} 张,需要 {len(user_ids)} 张")
        # 调整用户数量
        user_ids = user_ids[:upload_result.success_count]

    print(f"✅ 上传完成: {len(upload_result.media_ids)} 个 media_id")

    # 步骤2:批量发送私信
    print(f"\n📤 发送私信到 {len(user_ids)} 个用户...")

    result = await twitter.send_batch_direct_messages(
        user_ids=user_ids,
        text=message,
        media_ids=upload_result.media_ids,
    )

    print(f"\n📊 发送结果:")
    print(f"   成功: {result.success_count}/{len(user_ids)}")
    print(f"   失败: {result.failure_count}/{len(user_ids)}")

asyncio.run(batch_send_with_images())

批量发送自定义文案

每个用户发送不同的个性化内容。

纯文本自定义文案

import asyncio
import x_api

async def batch_send_custom_texts():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 用户和对应的文案
    users_data = [
        ("111111111", "你好,张三!感谢你一直以来的支持!"),
        ("222222222", "你好,李四!很高兴认识你!"),
        ("333333333", "你好,王五!欢迎加入我们的社区!"),
        ("444444444", "你好,赵六!期待与你更多交流!"),
    ]

    user_ids = [u[0] for u in users_data]
    texts = [u[1] for u in users_data]

    print(f"📤 发送个性化私信到 {len(user_ids)} 个用户...")

    result = await twitter.send_batch_direct_messages_with_custom_texts(
        user_ids=user_ids,
        texts=texts,
    )

    print(f"\n📊 发送结果:")
    print(f"   成功: {result.success_count}")
    print(f"   失败: {result.failure_count}")

    # 显示详细结果
    for idx, r in enumerate(result.results):
        status = "✅" if r.success else "❌"
        preview = texts[idx][:20] + "..." if len(texts[idx]) > 20 else texts[idx]
        print(f"   {status} {r.user_id}: {preview}")

asyncio.run(batch_send_custom_texts())

带图片的自定义文案

import asyncio
import x_api
from pathlib import Path

async def batch_send_custom_with_images():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 用户数据
    users_data = [
        ("111111111", "张三,这是专属于你的图片!🎁"),
        ("222222222", "李四,送你一张特别的照片!📸"),
        ("333333333", "王五,希望你喜欢这个!✨"),
    ]

    user_ids = [u[0] for u in users_data]
    texts = [u[1] for u in users_data]

    # 批量上传图片
    image_bytes = Path("photo.jpg").read_bytes()
    upload_result = await twitter.upload_image_multiple_times(
        image_bytes, "dm_image", len(user_ids)
    )

    if upload_result.success_count != len(user_ids):
        print(f"❌ 上传失败: {upload_result.failure_count}")
        return

    print(f"✅ 图片上传完成")

    # 发送自定义私信
    result = await twitter.send_batch_direct_messages_with_custom_texts(
        user_ids=user_ids,
        texts=texts,
        media_ids=upload_result.media_ids,
    )

    print(f"\n📊 发送结果: {result.success_count}/{len(user_ids)} 成功")

asyncio.run(batch_send_custom_with_images())

并发控制与限流

处理大量请求时的并发控制策略。

分批处理

import asyncio
import x_api

async def batch_with_chunks():
    """分批处理大量用户"""
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 大量用户列表
    all_user_ids = [f"{i:09d}" for i in range(1, 101)]  # 100 个用户

    # 分批参数
    batch_size = 20
    delay_between_batches = 2.0  # 批次间隔(秒)

    message = "批量发送测试消息"

    total_success = 0
    total_failure = 0

    # 分批处理
    for i in range(0, len(all_user_ids), batch_size):
        batch = all_user_ids[i:i + batch_size]
        batch_num = i // batch_size + 1
        total_batches = (len(all_user_ids) + batch_size - 1) // batch_size

        print(f"\n📤 处理批次 {batch_num}/{total_batches} ({len(batch)} 个用户)")

        result = await twitter.send_batch_direct_messages(
            user_ids=batch,
            text=message,
        )

        total_success += result.success_count
        total_failure += result.failure_count

        print(f"   本批次: {result.success_count} 成功, {result.failure_count} 失败")

        # 批次间延迟(最后一批不需要)
        if i + batch_size < len(all_user_ids):
            print(f"   ⏳ 等待 {delay_between_batches} 秒...")
            await asyncio.sleep(delay_between_batches)

    print(f"\n📊 总计:")
    print(f"   成功: {total_success}/{len(all_user_ids)}")
    print(f"   失败: {total_failure}/{len(all_user_ids)}")

asyncio.run(batch_with_chunks())

限速处理

import asyncio
import x_api
import time

class RateLimiter:
    """简单的限速器"""

    def __init__(self, max_requests: int, time_window: float):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = []

    async def acquire(self):
        """获取请求许可"""
        now = time.time()

        # 清理过期的请求记录
        self.requests = [t for t in self.requests if now - t < self.time_window]

        # 如果达到限制,等待
        if len(self.requests) >= self.max_requests:
            wait_time = self.time_window - (now - self.requests[0])
            if wait_time > 0:
                print(f"   ⏳ 限速等待 {wait_time:.1f} 秒...")
                await asyncio.sleep(wait_time)
                self.requests = []

        self.requests.append(time.time())


async def batch_with_rate_limit():
    """带限速的批量发送"""
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 限速器:每 60 秒最多 30 个请求
    limiter = RateLimiter(max_requests=30, time_window=60)

    user_ids = [f"{i:09d}" for i in range(1, 51)]  # 50 个用户
    message = "限速批量发送测试"

    results = []

    for i, user_id in enumerate(user_ids, 1):
        await limiter.acquire()

        print(f"[{i}/{len(user_ids)}] 发送到 {user_id}...")

        result = await twitter.send_direct_message(
            user_id=user_id,
            text=message,
        )

        results.append(result)

        if result.success:
            print(f"   ✅ 成功")
        else:
            print(f"   ❌ 失败: {result.error_msg}")

    success = sum(1 for r in results if r.success)
    print(f"\n📊 总计: {success}/{len(user_ids)} 成功")

asyncio.run(batch_with_rate_limit())

大规模批量操作

处理数百甚至数千个用户的场景。

从文件读取用户列表

import asyncio
import x_api
from pathlib import Path

async def batch_from_file():
    """从文件读取用户列表并批量发送"""
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 读取用户列表文件(每行一个用户 ID)
    users_file = Path("users.txt")
    if not users_file.exists():
        print("❌ users.txt 文件不存在")
        return

    user_ids = users_file.read_text().strip().split("\n")
    user_ids = [uid.strip() for uid in user_ids if uid.strip()]

    print(f"📋 加载 {len(user_ids)} 个用户")

    # 读取消息模板
    message_file = Path("message.txt")
    if message_file.exists():
        message = message_file.read_text().strip()
    else:
        message = "这是一条批量发送的消息"

    # 分批发送
    batch_size = 50
    total_success = 0

    for i in range(0, len(user_ids), batch_size):
        batch = user_ids[i:i + batch_size]
        print(f"\n📤 批次 {i // batch_size + 1}: 发送到 {len(batch)} 个用户")

        result = await twitter.send_batch_direct_messages(batch, message)
        total_success += result.success_count

        print(f"   结果: {result.success_count} 成功, {result.failure_count} 失败")

        # 批次间延迟
        if i + batch_size < len(user_ids):
            await asyncio.sleep(3)

    print(f"\n📊 总计: {total_success}/{len(user_ids)} 成功")

asyncio.run(batch_from_file())

从 CSV 读取用户和文案

import asyncio
import csv
import x_api
from pathlib import Path

async def batch_from_csv():
    """从 CSV 读取用户和自定义文案"""
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # CSV 格式: user_id,name,message
    csv_file = Path("users_messages.csv")
    if not csv_file.exists():
        print("❌ CSV 文件不存在")
        return

    user_ids = []
    texts = []

    with open(csv_file, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            user_ids.append(row["user_id"])
            # 使用模板替换
            text = row["message"].replace("{name}", row["name"])
            texts.append(text)

    print(f"📋 加载 {len(user_ids)} 条记录")

    # 分批发送
    batch_size = 30

    for i in range(0, len(user_ids), batch_size):
        batch_users = user_ids[i:i + batch_size]
        batch_texts = texts[i:i + batch_size]

        print(f"\n📤 批次 {i // batch_size + 1}")

        result = await twitter.send_batch_direct_messages_with_custom_texts(
            user_ids=batch_users,
            texts=batch_texts,
        )

        print(f"   结果: {result.success_count} 成功")

        if i + batch_size < len(user_ids):
            await asyncio.sleep(2)

asyncio.run(batch_from_csv())

CSV 文件示例 (users_messages.csv):

user_id,name,message
111111111,张三,你好 {name}!感谢关注!
222222222,李四,你好 {name}!欢迎加入!
333333333,王五,你好 {name}!期待交流!

错误处理与重试

处理失败的请求并实现重试机制。

收集失败并重试

import asyncio
import x_api

async def batch_with_retry():
    """带重试的批量发送"""
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    user_ids = ["111111111", "222222222", "333333333", "444444444"]
    message = "测试消息"

    max_retries = 3
    retry_delay = 5.0

    # 首次发送
    print("📤 首次批量发送...")
    result = await twitter.send_batch_direct_messages(user_ids, message)

    print(f"   成功: {result.success_count}, 失败: {result.failure_count}")

    # 收集失败的用户
    failed_users = [r.user_id for r in result.results if not r.success]

    # 重试失败的用户
    retry_count = 0
    while failed_users and retry_count < max_retries:
        retry_count += 1
        print(f"\n🔄 重试 {retry_count}/{max_retries}: {len(failed_users)} 个用户")

        await asyncio.sleep(retry_delay)

        result = await twitter.send_batch_direct_messages(failed_users, message)

        print(f"   成功: {result.success_count}, 失败: {result.failure_count}")

        # 更新失败列表
        failed_users = [r.user_id for r in result.results if not r.success]

    if failed_users:
        print(f"\n❌ 最终失败: {len(failed_users)} 个用户")
        for user_id in failed_users:
            print(f"   - {user_id}")
    else:
        print(f"\n✅ 全部发送成功!")

asyncio.run(batch_with_retry())

保存失败记录

import asyncio
import json
import x_api
from datetime import datetime
from pathlib import Path

async def batch_with_failure_log():
    """批量发送并保存失败记录"""
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    user_ids = ["111111111", "222222222", "333333333"]
    message = "测试消息"

    result = await twitter.send_batch_direct_messages(user_ids, message)

    # 收集失败记录
    failures = []
    for r in result.results:
        if not r.success:
            failures.append({
                "user_id": r.user_id,
                "error_msg": r.error_msg,
                "http_status": r.http_status,
                "timestamp": datetime.now().isoformat(),
            })

    # 保存失败记录
    if failures:
        log_file = Path("failed_sends.json")

        # 追加到现有记录
        existing = []
        if log_file.exists():
            existing = json.loads(log_file.read_text())

        existing.extend(failures)
        log_file.write_text(json.dumps(existing, indent=2, ensure_ascii=False))

        print(f"❌ {len(failures)} 个失败记录已保存到 {log_file}")
    else:
        print("✅ 全部发送成功,无失败记录")

asyncio.run(batch_with_failure_log())

完整示例

#!/usr/bin/env python3
"""
批量操作完整示例
"""

import asyncio
import csv
import json
import os
import sys
from datetime import datetime
from pathlib import Path

try:
    import x_api
except ImportError:
    print("❌ 请先安装 x_api: maturin develop")
    sys.exit(1)


class BatchOperations:
    """批量操作管理器"""

    def __init__(self, cookies: str, proxy_url: str | None = None):
        self.twitter = x_api.Twitter(cookies, proxy_url)
        self.failures = []

    async def send_batch(
        self,
        user_ids: list[str],
        text: str,
        batch_size: int = 50,
        delay: float = 2.0,
    ) -> tuple[int, int]:
        """分批发送私信"""
        total_success = 0
        total_failure = 0

        for i in range(0, len(user_ids), batch_size):
            batch = user_ids[i:i + batch_size]
            result = await self.twitter.send_batch_direct_messages(batch, text)

            total_success += result.success_count
            total_failure += result.failure_count

            # 记录失败
            for r in result.results:
                if not r.success:
                    self.failures.append({
                        "user_id": r.user_id,
                        "error": r.error_msg,
                        "time": datetime.now().isoformat(),
                    })

            if i + batch_size < len(user_ids):
                await asyncio.sleep(delay)

        return total_success, total_failure

    def save_failures(self, file_path: str = "failures.json"):
        """保存失败记录"""
        if self.failures:
            Path(file_path).write_text(
                json.dumps(self.failures, indent=2, ensure_ascii=False)
            )


async def main():
    cookies = os.getenv("TWITTER_COOKIES")
    if not cookies:
        print("❌ 请设置 TWITTER_COOKIES")
        return

    ops = BatchOperations(cookies)

    # 示例用户
    user_ids = ["111111111", "222222222", "333333333"]

    success, failure = await ops.send_batch(
        user_ids=user_ids,
        text="批量测试消息",
        batch_size=10,
    )

    print(f"📊 结果: {success} 成功, {failure} 失败")

    ops.save_failures()


if __name__ == "__main__":
    asyncio.run(main())

相关链接