跳转至

快速开始指南

本指南帮助您快速上手 X API Python 客户端,从安装到发送第一条私信。

目录

安装

pip install x-api-rs

验证安装

from x_api_rs import Twitter

# 验证模块导入成功
client = Twitter(cookies="test")
print("✅ x-api-rs 安装成功")

如果能正常运行,说明安装成功。

获取 Cookies

方法 1: 浏览器开发者工具

  1. 使用 Chrome/Firefox 登录 twitter.com
  2. F12 打开开发者工具
  3. 切换到 Network 标签页
  4. 刷新页面或执行任意操作
  5. 选择任意请求,查看 Request Headers
  6. 找到 Cookie 字段,复制完整内容

示例:

ct0=your_csrf_token_here; auth_token=your_auth_token_here; twid=u%3D1234567890

方法 2: 使用浏览器扩展

安装 Cookie 导出扩展(如 EditThisCookie),一键导出 cookies。

方法 3: 使用 auth_token 转换

如果您只有 auth_token,可以使用转换功能:

import asyncio
from x_api_rs import Twitter

async def main():
    result = await Twitter.auth_token_to_cookies(
        "your_auth_token_here",
        proxy_url="http://proxy:8080"  # 可选
    )

    print(f"用户 ID: {result.user_id}")
    print(f"Cookies: {result.cookies}")

    # 使用转换后的 cookies
    client = Twitter(result.cookies)

asyncio.run(main())

创建客户端

基础用法

from x_api_rs import Twitter

# 创建客户端
client = Twitter(cookies="your_cookies_here")

# 验证 cookies
if client.validate_cookies():
    print("✅ Cookies 有效")
else:
    print("❌ Cookies 无效")

使用代理

# HTTP 代理
client = Twitter(
    cookies="your_cookies_here",
    proxy_url="http://proxy:8080"
)

# SOCKS5 代理
client = Twitter(
    cookies="your_cookies_here",
    proxy_url="socks5://proxy:1080"
)

从环境变量读取

import os
from x_api_rs import Twitter

# 设置环境变量
# export TWITTER_COOKIES="ct0=xxx; auth_token=yyy; twid=u%3D123"

cookies = os.getenv("TWITTER_COOKIES")
if not cookies:
    raise ValueError("请设置 TWITTER_COOKIES 环境变量")

client = Twitter(cookies)

发送第一条私信

单条发送

import asyncio
from x_api_rs import Twitter

async def main():
    client = Twitter(cookies="your_cookies_here")

    # 发送私信
    result = await client.dm.send_message(
        user_id="123456789",  # 替换为真实的用户 ID
        text="Hello! 这是我的第一条私信 🎉"
    )

    # 检查结果
    if result.success:
        print(f"✅ 发送成功!")
        print(f"   事件 ID: {result.event_id}")
        print(f"   用户 ID: {result.user_id}")
    else:
        print(f"❌ 发送失败: {result.error_msg}")
        print(f"   HTTP 状态码: {result.http_status}")

# 运行
asyncio.run(main())

批量发送

import asyncio
from x_api_rs import Twitter

async def batch_send():
    client = Twitter(cookies="your_cookies_here")

    # 准备用户 ID 列表
    user_ids = [
        "123456789",
        "987654321",
        "111222333"
    ]

    # 批量发送相同内容
    result = await client.dm.send_batch(
        user_ids=user_ids,
        text="批量发送测试消息!"
    )

    # 查看结果
    print(f"📊 批量发送完成:")
    print(f"   成功: {result.success_count} 条")
    print(f"   失败: {result.failure_count} 条")

    # 查看详细结果
    for item in result.results:
        if item.success:
            print(f"   ✅ 用户 {item.user_id}: 成功")
        else:
            print(f"   ❌ 用户 {item.user_id}: {item.error_msg}")

asyncio.run(batch_send())

上传图片

上传单张图片

import asyncio
from x_api_rs import Twitter

async def upload_image():
    client = Twitter(cookies="your_cookies_here")

    # 读取图片文件
    with open("image.jpg", "rb") as f:
        image_bytes = f.read()

    # 上传图片
    result = await client.upload.image(
        image_bytes=image_bytes,
        media_category="tweet_image"  # 用于发帖
    )

    if result.success:
        print(f"✅ 上传成功!")
        print(f"   Media ID: {result.media_id_string}")
        return result.media_id_string
    else:
        print(f"❌ 上传失败: {result.error_msg}")
        return None

asyncio.run(upload_image())

媒体类别说明

类别 用途
tweet_image 用于发布帖子
dm_image 用于私信附件
banner_image 用于用户背景图

发送带图片的私信

async def send_image_dm():
    client = Twitter(cookies="your_cookies_here")

    # 1. 上传图片
    with open("product.jpg", "rb") as f:
        upload_result = await client.upload.image(f.read(), "dm_image")

    if not upload_result.success:
        print(f"上传失败: {upload_result.error_msg}")
        return

    # 2. 发送带图片的私信
    result = await client.dm.send_message(
        user_id="123456789",
        text="请看这张图片!",
        media_id=upload_result.media_id_string
    )

    if result.success:
        print(f"✅ 发送成功!")
    else:
        print(f"❌ 发送失败: {result.error_msg}")

asyncio.run(send_image_dm())

发布帖子

发布纯文本帖子

import asyncio
from x_api_rs import Twitter

async def post_tweet():
    client = Twitter(cookies="your_cookies_here")

    # 发布帖子
    result = await client.posts.create_tweet(
        text="Hello World! 这是我的第一条推文 🚀"
    )

    if result.success:
        print(f"✅ 发帖成功!")
        print(f"   Tweet ID: {result.tweet_id}")
        print(f"   查看链接: https://twitter.com/i/web/status/{result.tweet_id}")
    else:
        print(f"❌ 发帖失败: {result.error_msg}")

asyncio.run(post_tweet())

发布带图片的帖子

async def post_tweet_with_image():
    client = Twitter(cookies="your_cookies_here")

    # 1. 上传图片
    with open("image.jpg", "rb") as f:
        upload_result = await client.upload.image(f.read(), "tweet_image")

    if not upload_result.success:
        print(f"上传失败: {upload_result.error_msg}")
        return

    # 2. 发布带图片的帖子
    result = await client.posts.create_tweet(
        text="分享一张图片 📷",
        media_ids=[upload_result.media_id_string]
    )

    if result.success:
        print(f"✅ 发帖成功!")
        print(f"   Tweet ID: {result.tweet_id}")

asyncio.run(post_tweet_with_image())

完整示例

以下是一个综合示例,展示了多种功能的组合使用:

#!/usr/bin/env python3
"""
X API 快速开始完整示例

演示私信、上传、发帖等核心功能
"""

import asyncio
import os
from x_api_rs import Twitter

async def main():
    # 1. 初始化客户端
    print("=" * 50)
    print("🔐 初始化 Twitter 客户端")
    print("=" * 50)

    cookies = os.getenv("TWITTER_COOKIES")
    if not cookies:
        print("❌ 请设置 TWITTER_COOKIES 环境变量")
        print("   示例: export TWITTER_COOKIES='ct0=xxx; auth_token=yyy; twid=u%3D123'")
        return

    client = Twitter(cookies)

    # 验证 cookies
    if not client.validate_cookies():
        print("❌ Cookies 验证失败!")
        return

    print("✅ Cookies 验证成功\n")

    # 2. 发送私信
    print("=" * 50)
    print("📨 发送私信测试")
    print("=" * 50)

    user_id = "123456789"  # 替换为真实用户 ID
    dm_result = await client.dm.send_message(
        user_id=user_id,
        text="Hello! 这是一条测试私信"
    )

    if dm_result.success:
        print(f"✅ 私信发送成功")
        print(f"   事件 ID: {dm_result.event_id}\n")
    else:
        print(f"❌ 私信发送失败: {dm_result.error_msg}\n")

    # 3. 上传图片
    print("=" * 50)
    print("📤 上传图片测试")
    print("=" * 50)

    # 检查图片文件是否存在
    image_path = "test_image.jpg"
    if os.path.exists(image_path):
        with open(image_path, "rb") as f:
            upload_result = await client.upload.image(f.read(), "tweet_image")

        if upload_result.success:
            print(f"✅ 图片上传成功")
            print(f"   Media ID: {upload_result.media_id_string}\n")
            media_id = upload_result.media_id_string
        else:
            print(f"❌ 图片上传失败: {upload_result.error_msg}\n")
            media_id = None
    else:
        print(f"⚠️  图片文件不存在: {image_path}")
        print("   跳过上传测试\n")
        media_id = None

    # 4. 发布帖子
    print("=" * 50)
    print("📝 发布帖子测试")
    print("=" * 50)

    tweet_text = "Hello from X API! 🚀"
    if media_id:
        # 带图片发帖
        tweet_result = await client.posts.create_tweet(
            text=tweet_text,
            media_ids=[media_id]
        )
    else:
        # 纯文本发帖
        tweet_result = await client.posts.create_tweet(text=tweet_text)

    if tweet_result.success:
        print(f"✅ 帖子发布成功")
        print(f"   Tweet ID: {tweet_result.tweet_id}")
        print(f"   链接: https://twitter.com/i/web/status/{tweet_result.tweet_id}\n")
    else:
        print(f"❌ 帖子发布失败: {tweet_result.error_msg}\n")

    # 5. 批量操作
    print("=" * 50)
    print("📦 批量操作测试")
    print("=" * 50)

    user_ids = ["123456789", "987654321", "111222333"]
    batch_result = await client.dm.send_batch(
        user_ids=user_ids,
        text="批量测试消息"
    )

    print(f"批量发送完成:")
    print(f"   成功: {batch_result.success_count} 条")
    print(f"   失败: {batch_result.failure_count}\n")

    # 6. 获取用户资料
    print("=" * 50)
    print("👤 获取用户资料测试")
    print("=" * 50)

    user_result = await client.user.get_profile("elonmusk")
    if user_result.success:
        print(f"✅ 用户资料获取成功")
        print(f"   用户名: {user_result.name}")
        print(f"   用户 ID: {user_result.rest_id}")
        print(f"   粉丝数: {user_result.followers_count:,}")
        print(f"   关注数: {user_result.following_count:,}\n")
    else:
        print(f"❌ 用户资料获取失败: {user_result.error_msg}\n")

    # 完成
    print("=" * 50)
    print("✨ 快速开始示例运行完成!")
    print("=" * 50)

if __name__ == "__main__":
    # 运行主函数
    asyncio.run(main())

运行完整示例

# 设置环境变量
export TWITTER_COOKIES="ct0=xxx; auth_token=yyy; twid=u%3D123"

# 运行示例
python quickstart_full_example.py

错误处理

基础错误处理

async def safe_send():
    client = Twitter(cookies)

    try:
        result = await client.dm.send_message("user_id", "Hello!")

        # 检查业务结果
        if result.success:
            print(f"成功: {result.event_id}")
        else:
            print(f"失败: {result.error_msg} (HTTP {result.http_status})")

    except Exception as e:
        # 捕获异常(网络错误、超时等)
        print(f"异常: {e}")

asyncio.run(safe_send())

高级错误处理(带重试)

import asyncio

async def send_with_retry(client, user_id, text, max_retries=3):
    """带重试的发送函数"""
    for attempt in range(max_retries):
        try:
            result = await client.dm.send_message(user_id, text)

            if result.success:
                return result

            # 业务失败,检查是否需要重试
            if result.http_status == 429:  # 速率限制
                wait_time = 2 ** attempt  # 指数退避
                print(f"触发速率限制,等待 {wait_time} 秒...")
                await asyncio.sleep(wait_time)
                continue
            else:
                # 其他错误,不重试
                return result

        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)

    raise Exception(f"重试 {max_retries} 次后仍然失败")

常见问题

Q1: 如何获取用户 ID?

# 通过用户名获取资料(包含用户 ID)
result = await client.user.get_profile("username")
if result.success:
    user_id = result.rest_id
    print(f"用户 ID: {user_id}")

Q2: 如何判断 cookies 是否有效?

if client.validate_cookies():
    print("Cookies 有效")
else:
    print("Cookies 无效或已过期")

Q3: 如何设置请求超时?

import asyncio

try:
    result = await asyncio.wait_for(
        client.dm.send_message("user_id", "Hello!"),
        timeout=30.0  # 30秒超时
    )
except asyncio.TimeoutError:
    print("请求超时")

Q4: 如何并发发送多条私信?

import asyncio

# 创建任务列表
tasks = [
    client.dm.send_message("123", "消息1"),
    client.dm.send_message("456", "消息2"),
    client.dm.send_message("789", "消息3"),
]

# 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)

for result in results:
    if isinstance(result, Exception):
        print(f"异常: {result}")
    elif result.success:
        print(f"成功: {result.event_id}")

下一步

恭喜您完成快速开始!接下来您可以:

  1. 深入学习各模块
  2. DM 模块详解
  3. Upload 模块详解
  4. Posts 模块详解
  5. User 模块详解

  6. 查看更多示例

  7. 批量操作示例
  8. DM 完整示例
  9. 上传示例

  10. 了解架构设计

  11. 模块总览
  12. 架构文档

  13. 参与开发

  14. 开发指南
  15. 测试文档

获取帮助

如果遇到问题:

  1. 查看 常见问题
  2. 阅读 API 文档
  3. 提交 GitHub Issue
  4. 加入讨论区提问

祝您使用愉快!🎉