DM 模块 API 文档¶
DM(Direct Message)模块提供完整的 Twitter 私信功能,支持单条发送、批量发送和自定义文案。
目录¶
模块概述¶
DM 模块提供两种使用方式:
方式 1: 通过 Twitter 主入口访问(推荐)¶
from x_api_rs import Twitter
client = Twitter(cookies)
result = await client.dm.send_message("user_id", "Hello!")
方式 2: 独立创建 DMClient¶
from x_api_rs.dm import DMClient
dm_client = DMClient(cookies, proxy_url="http://proxy:8080")
result = await dm_client.send_message("user_id", "Hello!")
DMClient 类¶
构造函数¶
创建新的 DM 客户端实例。
参数:
cookies(str): Twitter 账号的 cookies 字符串proxy_url(str | None): 可选的代理服务器 URL(格式:http://host:port或socks5://host:port)enable_ja3(bool): 是否启用 JA3/TLS 指纹模拟(默认 True,增强反检测能力)
返回: DMClient 实例
异常: RuntimeError - 初始化失败
示例:
# 基础使用
client = DMClient(cookies)
# 使用代理
client = DMClient(cookies, proxy_url="http://proxy:8080")
# 禁用 JA3 指纹模拟
client = DMClient(cookies, enable_ja3=False)
send_message¶
发送单条私信到指定用户。
参数:
user_id(str): 目标用户的 Twitter IDtext(str): 消息内容,最大 10000 字符media_id(str | None): 可选的媒体附件 ID(来自upload.image返回的media_id_string)
返回: DMResult 对象,包含发送状态、事件 ID 等信息
异常: RuntimeError - 发送失败
示例:
# 发送纯文本消息
result = await client.dm.send_message("123456", "Hello!")
if result.success:
print(f"发送成功,事件ID: {result.event_id}")
else:
print(f"发送失败: {result.error_msg}")
# 发送带图片的消息
upload_result = await client.upload.image(image_bytes, "dm_image")
if upload_result.success:
result = await client.dm.send_message(
"123456",
"请看这张图片",
media_id=upload_result.media_id_string
)
send_message_v2¶
使用新版 GraphQL API 发送单条私信,提供更稳定的发送体验。
参数:
user_id(str): 目标用户的 Twitter IDtext(str): 消息内容,最大 10000 字符
返回: DMResult 对象
异常: RuntimeError - 发送失败
注意: v2 API 暂不支持媒体附件
示例:
result = await client.dm.send_message_v2("123456", "Hello from v2!")
if result.success:
print(f"发送成功: {result.event_id}")
send_batch¶
async def send_batch(
user_ids: list[str],
text: str,
client_transaction_ids: list[str] | None = None,
media_ids: list[str | None] | None = None
) -> BatchDMResult
批量发送私信(相同内容)到多个用户。
参数:
user_ids(list[str]): 目标用户 ID 列表text(str): 消息内容client_transaction_ids(list[str] | None): 可选的客户端事务 ID 列表(长度必须与 user_ids 一致)media_ids(list[str | None] | None): 可选的媒体 ID 列表(长度必须与 user_ids 一致)
返回: BatchDMResult 对象,包含批量发送结果
异常: RuntimeError - 批量发送失败或参数验证失败
示例:
# 基础批量发送
user_ids = ["123", "456", "789"]
result = await client.dm.send_batch(user_ids, "批量消息")
print(f"成功: {result.success_count}, 失败: {result.failure_count}")
# 查看详细结果
for item in result.results:
if item.success:
print(f"✓ 用户 {item.user_id}: {item.event_id}")
else:
print(f"✗ 用户 {item.user_id}: {item.error_msg}")
# 带自定义事务 ID
transaction_ids = ["tx_001", "tx_002", "tx_003"]
result = await client.dm.send_batch(user_ids, "消息", transaction_ids)
# 每个用户发送不同的图片
media_ids = ["media_1", "media_2", None] # 第三个用户不发图片
result = await client.dm.send_batch(user_ids, "请看图片", media_ids=media_ids)
send_batch_v2¶
使用新版 GraphQL API 批量发送相同内容的私信。
参数:
user_ids(list[str]): 目标用户 ID 列表text(str): 消息内容
返回: BatchDMResult 对象
异常: RuntimeError - 批量发送失败
示例:
user_ids = ["123", "456", "789"]
result = await client.dm.send_batch_v2(user_ids, "批量消息 v2")
print(f"成功: {result.success_count}, 失败: {result.failure_count}")
send_batch_with_custom_texts¶
async def send_batch_with_custom_texts(
user_ids: list[str],
texts: list[str],
client_transaction_ids: list[str] | None = None,
media_ids: list[str | None] | None = None
) -> BatchDMResult
批量发送自定义文案私信,每个用户接收不同的消息内容。
参数:
user_ids(list[str]): 目标用户 ID 列表texts(list[str]): 消息内容列表(长度必须与 user_ids 一致)client_transaction_ids(list[str] | None): 可选的客户端事务 ID 列表media_ids(list[str | None] | None): 可选的媒体 ID 列表
返回: BatchDMResult 对象
异常: RuntimeError - 批量发送失败或参数长度不匹配
示例:
user_ids = ["123", "456", "789"]
texts = [
"你好,张三!欢迎使用我们的服务。",
"你好,李四!感谢您的支持。",
"你好,王五!祝您使用愉快。"
]
result = await client.dm.send_batch_with_custom_texts(user_ids, texts)
print(f"成功: {result.success_count}, 失败: {result.failure_count}")
# 带媒体附件
media_ids = ["media_1", "media_2", None]
result = await client.dm.send_batch_with_custom_texts(
user_ids,
texts,
media_ids=media_ids
)
send_batch_with_custom_texts_v2¶
使用新版 GraphQL API 批量发送自定义文案私信。
参数:
user_ids(list[str]): 目标用户 ID 列表texts(list[str]): 消息内容列表(长度必须与 user_ids 一致)
返回: BatchDMResult 对象
异常: RuntimeError - 批量发送失败或参数长度不匹配
示例:
user_ids = ["123", "456"]
texts = ["你好,张三!", "你好,李四!"]
result = await client.dm.send_batch_with_custom_texts_v2(user_ids, texts)
类型定义¶
DMResult¶
单条私信发送结果。
属性:
success(bool): 是否发送成功user_id(str): 目标用户 IDmessage(str): 成功消息(通常为 "Message sent successfully")error_msg(str): 错误消息(失败时有值)http_status(int): HTTP 状态码event_id(str | None): 私信事件 ID(成功时有值)media_id(str | None): 媒体 ID(如果发送了媒体附件)
示例:
result = await client.dm.send_message("123", "Hello!")
print(result.success) # True
print(result.user_id) # "123"
print(result.event_id) # "1234567890123456789"
print(result.http_status) # 200
BatchDMResult¶
批量私信发送结果。
属性:
success_count(int): 成功发送的数量failure_count(int): 失败发送的数量results(list[DMResult]): 详细结果列表
示例:
result = await client.dm.send_batch(user_ids, "批量消息")
print(f"成功: {result.success_count}")
print(f"失败: {result.failure_count}")
for item in result.results:
if item.success:
print(f"✓ {item.user_id}")
else:
print(f"✗ {item.user_id}: {item.error_msg}")
DecodedMessage¶
解码的私信内容(用于消息解析)。
属性:
text(str): 文本内容media(list[MediaAttachment]): 媒体附件列表
方法:
has_media()-> bool: 是否有媒体附件has_text()-> bool: 是否有文本内容
MediaAttachment¶
媒体附件信息。
属性:
media_id(str): 媒体 IDfilename(str | None): 文件名(可选)
MediaInput¶
用于编码消息时传入的媒体信息。
属性:
media_id(str): 媒体 IDfilename(str | None): 文件名(可选)
使用示例¶
基础示例¶
import asyncio
from x_api_rs import Twitter
async def main():
client = Twitter(cookies)
# 单条发送
result = await client.dm.send_message("123456", "Hello!")
if result.success:
print(f"成功: {result.event_id}")
asyncio.run(main())
批量发送示例¶
async def batch_send():
client = Twitter(cookies)
user_ids = ["123", "456", "789"]
result = await client.dm.send_batch(user_ids, "批量测试消息")
print(f"总计: {len(user_ids)}")
print(f"成功: {result.success_count}")
print(f"失败: {result.failure_count}")
# 重试失败的
failed_ids = [r.user_id for r in result.results if not r.success]
if failed_ids:
print(f"重试 {len(failed_ids)} 个失败的...")
retry_result = await client.dm.send_batch(failed_ids, "重试消息")
自定义文案示例¶
async def personalized_messages():
client = Twitter(cookies)
# 准备用户数据
users = [
{"id": "123", "name": "张三"},
{"id": "456", "name": "李四"},
{"id": "789", "name": "王五"},
]
user_ids = [u["id"] for u in users]
texts = [f"你好,{u['name']}!欢迎使用我们的服务。" for u in users]
result = await client.dm.send_batch_with_custom_texts(user_ids, texts)
for r in result.results:
user_name = next(u["name"] for u in users if u["id"] == r.user_id)
if r.success:
print(f"✓ {user_name} ({r.user_id}): 发送成功")
else:
print(f"✗ {user_name} ({r.user_id}): {r.error_msg}")
带媒体附件示例¶
async def send_with_media():
client = Twitter(cookies)
# 上传图片
with open("image.jpg", "rb") as f:
image_bytes = f.read()
upload_result = await client.upload.image(image_bytes, "dm_image")
if not upload_result.success:
print(f"上传失败: {upload_result.error_msg}")
return
# 发送带图片的私信
result = await client.dm.send_message(
"123456",
"请看这张图片",
media_id=upload_result.media_id_string
)
if result.success:
print(f"发送成功: {result.event_id}")
并发控制示例¶
import asyncio
from itertools import islice
async def send_with_rate_limit():
client = Twitter(cookies)
user_ids = ["123", "456", "789", ...] # 大量用户
batch_size = 5 # 每批次 5 个用户
delay = 2 # 每批次间隔 2 秒
def batched(iterable, n):
"""将列表分批"""
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
for batch in batched(user_ids, batch_size):
result = await client.dm.send_batch(batch, "批量消息")
print(f"批次完成: 成功 {result.success_count}, 失败 {result.failure_count}")
await asyncio.sleep(delay) # 等待避免限流
最佳实践¶
1. 错误处理¶
try:
result = await client.dm.send_message("user_id", "Hello!")
if result.success:
# 成功处理
log_success(result.event_id)
else:
# 业务失败处理
log_failure(result.error_msg, result.http_status)
except Exception as e:
# 异常处理(网络错误、超时等)
log_exception(str(e))
2. 批量操作¶
- 使用
send_batch而不是循环调用send_message(更高效) - 控制并发数量,避免触发速率限制
- 添加批次间延迟(建议 1-2 秒)
- 记录失败结果并实施重试策略
3. 速率限制¶
Twitter API 有速率限制,建议:
- 每批次不超过 10 个用户
- 每批次间隔 1-2 秒
- 监听 HTTP 429 错误并实施指数退避
- 使用多个账号分散请求
4. 媒体附件¶
- 先上传图片获取
media_id_string - 每条私信最多 1 张图片
- 使用正确的媒体类别(
dm_image) - 验证上传成功后再发送私信
5. 日志记录¶
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
result = await client.dm.send_message("user_id", "Hello!")
if result.success:
logger.info(f"私信发送成功: user_id={result.user_id}, event_id={result.event_id}")
else:
logger.error(f"私信发送失败: user_id={result.user_id}, error={result.error_msg}")
常见问题¶
Q1: 如何获取用户 ID?¶
用户 ID 可以通过以下方式获取:
- 使用
client.user.get_profile("username")查询用户资料 - 从 Twitter 网页端查看用户主页的 URL
- 使用第三方工具
Q2: 批量发送时部分失败怎么办?¶
result = await client.dm.send_batch(user_ids, "消息")
# 提取失败的用户 ID
failed_ids = [r.user_id for r in result.results if not r.success]
# 重试失败的用户
if failed_ids:
retry_result = await client.dm.send_batch(failed_ids, "消息")
Q3: 如何避免被限流?¶
- 控制并发数量(建议 3-5 个并发)
- 添加批次间延迟(建议 1-2 秒)
- 使用指数退避策略处理 429 错误
- 使用多个账号分散请求
Q4: v1 和 v2 API 有什么区别?¶
| 特性 | v1 API | v2 API |
|---|---|---|
| 媒体附件 | ✅ 支持 | ❌ 不支持 |
| 自定义事务 ID | ✅ 支持 | ❌ 不支持 |
| 稳定性 | 较好 | 更好 |
| 推荐场景 | 需要媒体附件 | 纯文本消息 |
Q5: 如何处理超长文本?¶
Twitter 私信最大支持 10000 字符。如果需要发送更长内容:
def split_message(text: str, max_length: int = 9000) -> list[str]:
"""分割长文本"""
return [text[i:i+max_length] for i in range(0, len(text), max_length)]
# 发送多条消息
messages = split_message(long_text)
for msg in messages:
await client.dm.send_message("user_id", msg)
await asyncio.sleep(1) # 避免限流
Q6: 如何判断用户是否可以接收私信?¶
发送私信前无法直接判断。建议:
- 捕获发送失败的情况
- 记录失败的用户 ID
- 避免重复发送给失败的用户
下一步¶
- 查看 Upload 模块文档 了解媒体上传
- 查看 示例代码 了解更多用法
- 查看 测试文档 了解如何测试