跳转至

DM 模块 API 文档

DM(Direct Message)模块提供完整的 Twitter 私信功能,支持单条发送、批量发送和自定义文案。

目录

模块概述

DM 模块提供两种使用方式:

方式 1: 通过 Twitter 主入口访问(推荐)

from x_api_rs import Twitter

client = Twitter(cookies)
result = await client.dm.send_message("user_id", "Hello!")

方式 2: 独立创建 DMClient

from x_api_rs.dm import DMClient

dm_client = DMClient(cookies, proxy_url="http://proxy:8080")
result = await dm_client.send_message("user_id", "Hello!")

DMClient 类

构造函数

DMClient(
    cookies: str,
    proxy_url: str | None = None,
    enable_ja3: bool = True
)

创建新的 DM 客户端实例。

参数:

  • cookies (str): Twitter 账号的 cookies 字符串
  • proxy_url (str | None): 可选的代理服务器 URL(格式:http://host:portsocks5://host:port
  • enable_ja3 (bool): 是否启用 JA3/TLS 指纹模拟(默认 True,增强反检测能力)

返回: DMClient 实例

异常: RuntimeError - 初始化失败

示例:

# 基础使用
client = DMClient(cookies)

# 使用代理
client = DMClient(cookies, proxy_url="http://proxy:8080")

# 禁用 JA3 指纹模拟
client = DMClient(cookies, enable_ja3=False)

send_message

async def send_message(
    user_id: str,
    text: str,
    media_id: str | None = None
) -> DMResult

发送单条私信到指定用户。

参数:

  • user_id (str): 目标用户的 Twitter ID
  • text (str): 消息内容,最大 10000 字符
  • media_id (str | None): 可选的媒体附件 ID(来自 upload.image 返回的 media_id_string

返回: DMResult 对象,包含发送状态、事件 ID 等信息

异常: RuntimeError - 发送失败

示例:

# 发送纯文本消息
result = await client.dm.send_message("123456", "Hello!")
if result.success:
    print(f"发送成功,事件ID: {result.event_id}")
else:
    print(f"发送失败: {result.error_msg}")

# 发送带图片的消息
upload_result = await client.upload.image(image_bytes, "dm_image")
if upload_result.success:
    result = await client.dm.send_message(
        "123456",
        "请看这张图片",
        media_id=upload_result.media_id_string
    )

send_message_v2

async def send_message_v2(
    user_id: str,
    text: str
) -> DMResult

使用新版 GraphQL API 发送单条私信,提供更稳定的发送体验。

参数:

  • user_id (str): 目标用户的 Twitter ID
  • text (str): 消息内容,最大 10000 字符

返回: DMResult 对象

异常: RuntimeError - 发送失败

注意: v2 API 暂不支持媒体附件

示例:

result = await client.dm.send_message_v2("123456", "Hello from v2!")
if result.success:
    print(f"发送成功: {result.event_id}")

send_batch

async def send_batch(
    user_ids: list[str],
    text: str,
    client_transaction_ids: list[str] | None = None,
    media_ids: list[str | None] | None = None
) -> BatchDMResult

批量发送私信(相同内容)到多个用户。

参数:

  • user_ids (list[str]): 目标用户 ID 列表
  • text (str): 消息内容
  • client_transaction_ids (list[str] | None): 可选的客户端事务 ID 列表(长度必须与 user_ids 一致)
  • media_ids (list[str | None] | None): 可选的媒体 ID 列表(长度必须与 user_ids 一致)

返回: BatchDMResult 对象,包含批量发送结果

异常: RuntimeError - 批量发送失败或参数验证失败

示例:

# 基础批量发送
user_ids = ["123", "456", "789"]
result = await client.dm.send_batch(user_ids, "批量消息")
print(f"成功: {result.success_count}, 失败: {result.failure_count}")

# 查看详细结果
for item in result.results:
    if item.success:
        print(f"✓ 用户 {item.user_id}: {item.event_id}")
    else:
        print(f"✗ 用户 {item.user_id}: {item.error_msg}")

# 带自定义事务 ID
transaction_ids = ["tx_001", "tx_002", "tx_003"]
result = await client.dm.send_batch(user_ids, "消息", transaction_ids)

# 每个用户发送不同的图片
media_ids = ["media_1", "media_2", None]  # 第三个用户不发图片
result = await client.dm.send_batch(user_ids, "请看图片", media_ids=media_ids)

send_batch_v2

async def send_batch_v2(
    user_ids: list[str],
    text: str
) -> BatchDMResult

使用新版 GraphQL API 批量发送相同内容的私信。

参数:

  • user_ids (list[str]): 目标用户 ID 列表
  • text (str): 消息内容

返回: BatchDMResult 对象

异常: RuntimeError - 批量发送失败

示例:

user_ids = ["123", "456", "789"]
result = await client.dm.send_batch_v2(user_ids, "批量消息 v2")
print(f"成功: {result.success_count}, 失败: {result.failure_count}")

send_batch_with_custom_texts

async def send_batch_with_custom_texts(
    user_ids: list[str],
    texts: list[str],
    client_transaction_ids: list[str] | None = None,
    media_ids: list[str | None] | None = None
) -> BatchDMResult

批量发送自定义文案私信,每个用户接收不同的消息内容。

参数:

  • user_ids (list[str]): 目标用户 ID 列表
  • texts (list[str]): 消息内容列表(长度必须与 user_ids 一致)
  • client_transaction_ids (list[str] | None): 可选的客户端事务 ID 列表
  • media_ids (list[str | None] | None): 可选的媒体 ID 列表

返回: BatchDMResult 对象

异常: RuntimeError - 批量发送失败或参数长度不匹配

示例:

user_ids = ["123", "456", "789"]
texts = [
    "你好,张三!欢迎使用我们的服务。",
    "你好,李四!感谢您的支持。",
    "你好,王五!祝您使用愉快。"
]

result = await client.dm.send_batch_with_custom_texts(user_ids, texts)
print(f"成功: {result.success_count}, 失败: {result.failure_count}")

# 带媒体附件
media_ids = ["media_1", "media_2", None]
result = await client.dm.send_batch_with_custom_texts(
    user_ids,
    texts,
    media_ids=media_ids
)

send_batch_with_custom_texts_v2

async def send_batch_with_custom_texts_v2(
    user_ids: list[str],
    texts: list[str]
) -> BatchDMResult

使用新版 GraphQL API 批量发送自定义文案私信。

参数:

  • user_ids (list[str]): 目标用户 ID 列表
  • texts (list[str]): 消息内容列表(长度必须与 user_ids 一致)

返回: BatchDMResult 对象

异常: RuntimeError - 批量发送失败或参数长度不匹配

示例:

user_ids = ["123", "456"]
texts = ["你好,张三!", "你好,李四!"]
result = await client.dm.send_batch_with_custom_texts_v2(user_ids, texts)

类型定义

DMResult

单条私信发送结果。

属性:

  • success (bool): 是否发送成功
  • user_id (str): 目标用户 ID
  • message (str): 成功消息(通常为 "Message sent successfully")
  • error_msg (str): 错误消息(失败时有值)
  • http_status (int): HTTP 状态码
  • event_id (str | None): 私信事件 ID(成功时有值)
  • media_id (str | None): 媒体 ID(如果发送了媒体附件)

示例:

result = await client.dm.send_message("123", "Hello!")
print(result.success)      # True
print(result.user_id)      # "123"
print(result.event_id)     # "1234567890123456789"
print(result.http_status)  # 200

BatchDMResult

批量私信发送结果。

属性:

  • success_count (int): 成功发送的数量
  • failure_count (int): 失败发送的数量
  • results (list[DMResult]): 详细结果列表

示例:

result = await client.dm.send_batch(user_ids, "批量消息")
print(f"成功: {result.success_count}")
print(f"失败: {result.failure_count}")

for item in result.results:
    if item.success:
        print(f"✓ {item.user_id}")
    else:
        print(f"✗ {item.user_id}: {item.error_msg}")

DecodedMessage

解码的私信内容(用于消息解析)。

属性:

  • text (str): 文本内容
  • media (list[MediaAttachment]): 媒体附件列表

方法:

  • has_media() -> bool: 是否有媒体附件
  • has_text() -> bool: 是否有文本内容

MediaAttachment

媒体附件信息。

属性:

  • media_id (str): 媒体 ID
  • filename (str | None): 文件名(可选)

MediaInput

用于编码消息时传入的媒体信息。

属性:

  • media_id (str): 媒体 ID
  • filename (str | None): 文件名(可选)

使用示例

基础示例

import asyncio
from x_api_rs import Twitter

async def main():
    client = Twitter(cookies)

    # 单条发送
    result = await client.dm.send_message("123456", "Hello!")
    if result.success:
        print(f"成功: {result.event_id}")

asyncio.run(main())

批量发送示例

async def batch_send():
    client = Twitter(cookies)

    user_ids = ["123", "456", "789"]
    result = await client.dm.send_batch(user_ids, "批量测试消息")

    print(f"总计: {len(user_ids)}")
    print(f"成功: {result.success_count}")
    print(f"失败: {result.failure_count}")

    # 重试失败的
    failed_ids = [r.user_id for r in result.results if not r.success]
    if failed_ids:
        print(f"重试 {len(failed_ids)} 个失败的...")
        retry_result = await client.dm.send_batch(failed_ids, "重试消息")

自定义文案示例

async def personalized_messages():
    client = Twitter(cookies)

    # 准备用户数据
    users = [
        {"id": "123", "name": "张三"},
        {"id": "456", "name": "李四"},
        {"id": "789", "name": "王五"},
    ]

    user_ids = [u["id"] for u in users]
    texts = [f"你好,{u['name']}!欢迎使用我们的服务。" for u in users]

    result = await client.dm.send_batch_with_custom_texts(user_ids, texts)

    for r in result.results:
        user_name = next(u["name"] for u in users if u["id"] == r.user_id)
        if r.success:
            print(f"✓ {user_name} ({r.user_id}): 发送成功")
        else:
            print(f"✗ {user_name} ({r.user_id}): {r.error_msg}")

带媒体附件示例

async def send_with_media():
    client = Twitter(cookies)

    # 上传图片
    with open("image.jpg", "rb") as f:
        image_bytes = f.read()

    upload_result = await client.upload.image(image_bytes, "dm_image")
    if not upload_result.success:
        print(f"上传失败: {upload_result.error_msg}")
        return

    # 发送带图片的私信
    result = await client.dm.send_message(
        "123456",
        "请看这张图片",
        media_id=upload_result.media_id_string
    )

    if result.success:
        print(f"发送成功: {result.event_id}")

并发控制示例

import asyncio
from itertools import islice

async def send_with_rate_limit():
    client = Twitter(cookies)

    user_ids = ["123", "456", "789", ...]  # 大量用户
    batch_size = 5  # 每批次 5 个用户
    delay = 2  # 每批次间隔 2 秒

    def batched(iterable, n):
        """将列表分批"""
        it = iter(iterable)
        while True:
            batch = list(islice(it, n))
            if not batch:
                return
            yield batch

    for batch in batched(user_ids, batch_size):
        result = await client.dm.send_batch(batch, "批量消息")
        print(f"批次完成: 成功 {result.success_count}, 失败 {result.failure_count}")
        await asyncio.sleep(delay)  # 等待避免限流

最佳实践

1. 错误处理

try:
    result = await client.dm.send_message("user_id", "Hello!")
    if result.success:
        # 成功处理
        log_success(result.event_id)
    else:
        # 业务失败处理
        log_failure(result.error_msg, result.http_status)
except Exception as e:
    # 异常处理(网络错误、超时等)
    log_exception(str(e))

2. 批量操作

  • 使用 send_batch 而不是循环调用 send_message(更高效)
  • 控制并发数量,避免触发速率限制
  • 添加批次间延迟(建议 1-2 秒)
  • 记录失败结果并实施重试策略

3. 速率限制

Twitter API 有速率限制,建议:

  • 每批次不超过 10 个用户
  • 每批次间隔 1-2 秒
  • 监听 HTTP 429 错误并实施指数退避
  • 使用多个账号分散请求

4. 媒体附件

  • 先上传图片获取 media_id_string
  • 每条私信最多 1 张图片
  • 使用正确的媒体类别(dm_image
  • 验证上传成功后再发送私信

5. 日志记录

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

result = await client.dm.send_message("user_id", "Hello!")
if result.success:
    logger.info(f"私信发送成功: user_id={result.user_id}, event_id={result.event_id}")
else:
    logger.error(f"私信发送失败: user_id={result.user_id}, error={result.error_msg}")

常见问题

Q1: 如何获取用户 ID?

用户 ID 可以通过以下方式获取:

  • 使用 client.user.get_profile("username") 查询用户资料
  • 从 Twitter 网页端查看用户主页的 URL
  • 使用第三方工具

Q2: 批量发送时部分失败怎么办?

result = await client.dm.send_batch(user_ids, "消息")

# 提取失败的用户 ID
failed_ids = [r.user_id for r in result.results if not r.success]

# 重试失败的用户
if failed_ids:
    retry_result = await client.dm.send_batch(failed_ids, "消息")

Q3: 如何避免被限流?

  • 控制并发数量(建议 3-5 个并发)
  • 添加批次间延迟(建议 1-2 秒)
  • 使用指数退避策略处理 429 错误
  • 使用多个账号分散请求

Q4: v1 和 v2 API 有什么区别?

特性 v1 API v2 API
媒体附件 ✅ 支持 ❌ 不支持
自定义事务 ID ✅ 支持 ❌ 不支持
稳定性 较好 更好
推荐场景 需要媒体附件 纯文本消息

Q5: 如何处理超长文本?

Twitter 私信最大支持 10000 字符。如果需要发送更长内容:

def split_message(text: str, max_length: int = 9000) -> list[str]:
    """分割长文本"""
    return [text[i:i+max_length] for i in range(0, len(text), max_length)]

# 发送多条消息
messages = split_message(long_text)
for msg in messages:
    await client.dm.send_message("user_id", msg)
    await asyncio.sleep(1)  # 避免限流

Q6: 如何判断用户是否可以接收私信?

发送私信前无法直接判断。建议:

  • 捕获发送失败的情况
  • 记录失败的用户 ID
  • 避免重复发送给失败的用户

下一步