批量操作示例¶
本文档提供各种批量操作的完整示例,包括批量上传、批量私信、并发控制等高级用法。
目录¶
前置准备¶
安装依赖¶
创建客户端¶
import x_api
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
批量上传图片¶
上传同一张图片多次,获取多个独立的 media_id。
基础批量上传¶
import asyncio
import x_api
from pathlib import Path
async def batch_upload_basic():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 读取图片
image_bytes = Path("photo.jpg").read_bytes()
# 批量上传 10 次
result = await twitter.upload_image_multiple_times(
image_bytes=image_bytes,
media_category="dm_image",
count=10,
)
print(f"📊 上传结果:")
print(f" 成功: {result.success_count}")
print(f" 失败: {result.failure_count}")
print(f" Media IDs: {result.media_ids}")
asyncio.run(batch_upload_basic())
为不同用途批量上传¶
import asyncio
import x_api
from pathlib import Path
async def batch_upload_for_different_uses():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
image_bytes = Path("photo.jpg").read_bytes()
# 为私信准备图片
dm_uploads = await twitter.upload_image_multiple_times(
image_bytes, "dm_image", 5
)
print(f"私信图片: {dm_uploads.success_count} 个")
# 为发帖准备图片
tweet_uploads = await twitter.upload_image_multiple_times(
image_bytes, "tweet_image", 3
)
print(f"发帖图片: {tweet_uploads.success_count} 个")
asyncio.run(batch_upload_for_different_uses())
批量发送私信¶
向多个用户发送相同内容的私信。
纯文本批量发送¶
import asyncio
import x_api
async def batch_send_text():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 目标用户列表
user_ids = [
"111111111",
"222222222",
"333333333",
"444444444",
"555555555",
]
message = "感谢关注!这是一条群发消息 🎉"
print(f"📤 发送私信到 {len(user_ids)} 个用户...")
result = await twitter.send_batch_direct_messages(
user_ids=user_ids,
text=message,
)
print(f"\n📊 发送结果:")
print(f" 成功: {result.success_count}/{len(user_ids)}")
print(f" 失败: {result.failure_count}/{len(user_ids)}")
# 统计成功和失败
success_users = []
failed_users = []
for r in result.results:
if r.success:
success_users.append(r.user_id)
else:
failed_users.append((r.user_id, r.error_msg))
if success_users:
print(f"\n✅ 成功用户: {', '.join(success_users)}")
if failed_users:
print(f"\n❌ 失败用户:")
for user_id, error in failed_users:
print(f" - {user_id}: {error}")
asyncio.run(batch_send_text())
批量发送带图片私信¶
每个用户发送相同文案 + 独立的图片。
import asyncio
import x_api
from pathlib import Path
async def batch_send_with_images():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
user_ids = ["111111111", "222222222", "333333333"]
message = "看看这张图片!📷"
# 步骤1:批量上传图片
print("📤 上传图片...")
image_bytes = Path("photo.jpg").read_bytes()
upload_result = await twitter.upload_image_multiple_times(
image_bytes=image_bytes,
media_category="dm_image",
count=len(user_ids),
)
if upload_result.success_count < len(user_ids):
print(f"⚠️ 只上传成功 {upload_result.success_count} 张,需要 {len(user_ids)} 张")
# 调整用户数量
user_ids = user_ids[:upload_result.success_count]
print(f"✅ 上传完成: {len(upload_result.media_ids)} 个 media_id")
# 步骤2:批量发送私信
print(f"\n📤 发送私信到 {len(user_ids)} 个用户...")
result = await twitter.send_batch_direct_messages(
user_ids=user_ids,
text=message,
media_ids=upload_result.media_ids,
)
print(f"\n📊 发送结果:")
print(f" 成功: {result.success_count}/{len(user_ids)}")
print(f" 失败: {result.failure_count}/{len(user_ids)}")
asyncio.run(batch_send_with_images())
批量发送自定义文案¶
每个用户发送不同的个性化内容。
纯文本自定义文案¶
import asyncio
import x_api
async def batch_send_custom_texts():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 用户和对应的文案
users_data = [
("111111111", "你好,张三!感谢你一直以来的支持!"),
("222222222", "你好,李四!很高兴认识你!"),
("333333333", "你好,王五!欢迎加入我们的社区!"),
("444444444", "你好,赵六!期待与你更多交流!"),
]
user_ids = [u[0] for u in users_data]
texts = [u[1] for u in users_data]
print(f"📤 发送个性化私信到 {len(user_ids)} 个用户...")
result = await twitter.send_batch_direct_messages_with_custom_texts(
user_ids=user_ids,
texts=texts,
)
print(f"\n📊 发送结果:")
print(f" 成功: {result.success_count}")
print(f" 失败: {result.failure_count}")
# 显示详细结果
for idx, r in enumerate(result.results):
status = "✅" if r.success else "❌"
preview = texts[idx][:20] + "..." if len(texts[idx]) > 20 else texts[idx]
print(f" {status} {r.user_id}: {preview}")
asyncio.run(batch_send_custom_texts())
带图片的自定义文案¶
import asyncio
import x_api
from pathlib import Path
async def batch_send_custom_with_images():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 用户数据
users_data = [
("111111111", "张三,这是专属于你的图片!🎁"),
("222222222", "李四,送你一张特别的照片!📸"),
("333333333", "王五,希望你喜欢这个!✨"),
]
user_ids = [u[0] for u in users_data]
texts = [u[1] for u in users_data]
# 批量上传图片
image_bytes = Path("photo.jpg").read_bytes()
upload_result = await twitter.upload_image_multiple_times(
image_bytes, "dm_image", len(user_ids)
)
if upload_result.success_count != len(user_ids):
print(f"❌ 上传失败: {upload_result.failure_count}")
return
print(f"✅ 图片上传完成")
# 发送自定义私信
result = await twitter.send_batch_direct_messages_with_custom_texts(
user_ids=user_ids,
texts=texts,
media_ids=upload_result.media_ids,
)
print(f"\n📊 发送结果: {result.success_count}/{len(user_ids)} 成功")
asyncio.run(batch_send_custom_with_images())
并发控制与限流¶
处理大量请求时的并发控制策略。
分批处理¶
import asyncio
import x_api
async def batch_with_chunks():
"""分批处理大量用户"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 大量用户列表
all_user_ids = [f"{i:09d}" for i in range(1, 101)] # 100 个用户
# 分批参数
batch_size = 20
delay_between_batches = 2.0 # 批次间隔(秒)
message = "批量发送测试消息"
total_success = 0
total_failure = 0
# 分批处理
for i in range(0, len(all_user_ids), batch_size):
batch = all_user_ids[i:i + batch_size]
batch_num = i // batch_size + 1
total_batches = (len(all_user_ids) + batch_size - 1) // batch_size
print(f"\n📤 处理批次 {batch_num}/{total_batches} ({len(batch)} 个用户)")
result = await twitter.send_batch_direct_messages(
user_ids=batch,
text=message,
)
total_success += result.success_count
total_failure += result.failure_count
print(f" 本批次: {result.success_count} 成功, {result.failure_count} 失败")
# 批次间延迟(最后一批不需要)
if i + batch_size < len(all_user_ids):
print(f" ⏳ 等待 {delay_between_batches} 秒...")
await asyncio.sleep(delay_between_batches)
print(f"\n📊 总计:")
print(f" 成功: {total_success}/{len(all_user_ids)}")
print(f" 失败: {total_failure}/{len(all_user_ids)}")
asyncio.run(batch_with_chunks())
限速处理¶
import asyncio
import x_api
import time
class RateLimiter:
"""简单的限速器"""
def __init__(self, max_requests: int, time_window: float):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
async def acquire(self):
"""获取请求许可"""
now = time.time()
# 清理过期的请求记录
self.requests = [t for t in self.requests if now - t < self.time_window]
# 如果达到限制,等待
if len(self.requests) >= self.max_requests:
wait_time = self.time_window - (now - self.requests[0])
if wait_time > 0:
print(f" ⏳ 限速等待 {wait_time:.1f} 秒...")
await asyncio.sleep(wait_time)
self.requests = []
self.requests.append(time.time())
async def batch_with_rate_limit():
"""带限速的批量发送"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 限速器:每 60 秒最多 30 个请求
limiter = RateLimiter(max_requests=30, time_window=60)
user_ids = [f"{i:09d}" for i in range(1, 51)] # 50 个用户
message = "限速批量发送测试"
results = []
for i, user_id in enumerate(user_ids, 1):
await limiter.acquire()
print(f"[{i}/{len(user_ids)}] 发送到 {user_id}...")
result = await twitter.send_direct_message(
user_id=user_id,
text=message,
)
results.append(result)
if result.success:
print(f" ✅ 成功")
else:
print(f" ❌ 失败: {result.error_msg}")
success = sum(1 for r in results if r.success)
print(f"\n📊 总计: {success}/{len(user_ids)} 成功")
asyncio.run(batch_with_rate_limit())
大规模批量操作¶
处理数百甚至数千个用户的场景。
从文件读取用户列表¶
import asyncio
import x_api
from pathlib import Path
async def batch_from_file():
"""从文件读取用户列表并批量发送"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 读取用户列表文件(每行一个用户 ID)
users_file = Path("users.txt")
if not users_file.exists():
print("❌ users.txt 文件不存在")
return
user_ids = users_file.read_text().strip().split("\n")
user_ids = [uid.strip() for uid in user_ids if uid.strip()]
print(f"📋 加载 {len(user_ids)} 个用户")
# 读取消息模板
message_file = Path("message.txt")
if message_file.exists():
message = message_file.read_text().strip()
else:
message = "这是一条批量发送的消息"
# 分批发送
batch_size = 50
total_success = 0
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
print(f"\n📤 批次 {i // batch_size + 1}: 发送到 {len(batch)} 个用户")
result = await twitter.send_batch_direct_messages(batch, message)
total_success += result.success_count
print(f" 结果: {result.success_count} 成功, {result.failure_count} 失败")
# 批次间延迟
if i + batch_size < len(user_ids):
await asyncio.sleep(3)
print(f"\n📊 总计: {total_success}/{len(user_ids)} 成功")
asyncio.run(batch_from_file())
从 CSV 读取用户和文案¶
import asyncio
import csv
import x_api
from pathlib import Path
async def batch_from_csv():
"""从 CSV 读取用户和自定义文案"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# CSV 格式: user_id,name,message
csv_file = Path("users_messages.csv")
if not csv_file.exists():
print("❌ CSV 文件不存在")
return
user_ids = []
texts = []
with open(csv_file, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
user_ids.append(row["user_id"])
# 使用模板替换
text = row["message"].replace("{name}", row["name"])
texts.append(text)
print(f"📋 加载 {len(user_ids)} 条记录")
# 分批发送
batch_size = 30
for i in range(0, len(user_ids), batch_size):
batch_users = user_ids[i:i + batch_size]
batch_texts = texts[i:i + batch_size]
print(f"\n📤 批次 {i // batch_size + 1}")
result = await twitter.send_batch_direct_messages_with_custom_texts(
user_ids=batch_users,
texts=batch_texts,
)
print(f" 结果: {result.success_count} 成功")
if i + batch_size < len(user_ids):
await asyncio.sleep(2)
asyncio.run(batch_from_csv())
CSV 文件示例 (users_messages.csv):
user_id,name,message
111111111,张三,你好 {name}!感谢关注!
222222222,李四,你好 {name}!欢迎加入!
333333333,王五,你好 {name}!期待交流!
错误处理与重试¶
处理失败的请求并实现重试机制。
收集失败并重试¶
import asyncio
import x_api
async def batch_with_retry():
"""带重试的批量发送"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
user_ids = ["111111111", "222222222", "333333333", "444444444"]
message = "测试消息"
max_retries = 3
retry_delay = 5.0
# 首次发送
print("📤 首次批量发送...")
result = await twitter.send_batch_direct_messages(user_ids, message)
print(f" 成功: {result.success_count}, 失败: {result.failure_count}")
# 收集失败的用户
failed_users = [r.user_id for r in result.results if not r.success]
# 重试失败的用户
retry_count = 0
while failed_users and retry_count < max_retries:
retry_count += 1
print(f"\n🔄 重试 {retry_count}/{max_retries}: {len(failed_users)} 个用户")
await asyncio.sleep(retry_delay)
result = await twitter.send_batch_direct_messages(failed_users, message)
print(f" 成功: {result.success_count}, 失败: {result.failure_count}")
# 更新失败列表
failed_users = [r.user_id for r in result.results if not r.success]
if failed_users:
print(f"\n❌ 最终失败: {len(failed_users)} 个用户")
for user_id in failed_users:
print(f" - {user_id}")
else:
print(f"\n✅ 全部发送成功!")
asyncio.run(batch_with_retry())
保存失败记录¶
import asyncio
import json
import x_api
from datetime import datetime
from pathlib import Path
async def batch_with_failure_log():
"""批量发送并保存失败记录"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
user_ids = ["111111111", "222222222", "333333333"]
message = "测试消息"
result = await twitter.send_batch_direct_messages(user_ids, message)
# 收集失败记录
failures = []
for r in result.results:
if not r.success:
failures.append({
"user_id": r.user_id,
"error_msg": r.error_msg,
"http_status": r.http_status,
"timestamp": datetime.now().isoformat(),
})
# 保存失败记录
if failures:
log_file = Path("failed_sends.json")
# 追加到现有记录
existing = []
if log_file.exists():
existing = json.loads(log_file.read_text())
existing.extend(failures)
log_file.write_text(json.dumps(existing, indent=2, ensure_ascii=False))
print(f"❌ {len(failures)} 个失败记录已保存到 {log_file}")
else:
print("✅ 全部发送成功,无失败记录")
asyncio.run(batch_with_failure_log())
完整示例¶
#!/usr/bin/env python3
"""
批量操作完整示例
"""
import asyncio
import csv
import json
import os
import sys
from datetime import datetime
from pathlib import Path
try:
import x_api
except ImportError:
print("❌ 请先安装 x_api: maturin develop")
sys.exit(1)
class BatchOperations:
"""批量操作管理器"""
def __init__(self, cookies: str, proxy_url: str | None = None):
self.twitter = x_api.Twitter(cookies, proxy_url)
self.failures = []
async def send_batch(
self,
user_ids: list[str],
text: str,
batch_size: int = 50,
delay: float = 2.0,
) -> tuple[int, int]:
"""分批发送私信"""
total_success = 0
total_failure = 0
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
result = await self.twitter.send_batch_direct_messages(batch, text)
total_success += result.success_count
total_failure += result.failure_count
# 记录失败
for r in result.results:
if not r.success:
self.failures.append({
"user_id": r.user_id,
"error": r.error_msg,
"time": datetime.now().isoformat(),
})
if i + batch_size < len(user_ids):
await asyncio.sleep(delay)
return total_success, total_failure
def save_failures(self, file_path: str = "failures.json"):
"""保存失败记录"""
if self.failures:
Path(file_path).write_text(
json.dumps(self.failures, indent=2, ensure_ascii=False)
)
async def main():
cookies = os.getenv("TWITTER_COOKIES")
if not cookies:
print("❌ 请设置 TWITTER_COOKIES")
return
ops = BatchOperations(cookies)
# 示例用户
user_ids = ["111111111", "222222222", "333333333"]
success, failure = await ops.send_batch(
user_ids=user_ids,
text="批量测试消息",
batch_size=10,
)
print(f"📊 结果: {success} 成功, {failure} 失败")
ops.save_failures()
if __name__ == "__main__":
asyncio.run(main())