示例代码索引¶
本目录包含 X API 的各种使用示例,帮助您快速上手并掌握各种使用场景。
目录¶
快速开始¶
最小示例¶
import asyncio
from x_api_rs import Twitter
async def main():
# 创建客户端
client = Twitter(cookies="your_cookies_here")
# 发送私信
result = await client.dm.send_message("123456", "Hello!")
print(f"成功: {result.success}")
asyncio.run(main())
完整示例¶
查看 quickstart.md 了解更详细的入门指南。
示例分类¶
基础示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 快速开始 | quickstart.md | 最基础的使用示例 |
| 创建客户端 | quickstart.md#创建客户端 | 各种创建客户端的方式 |
| 错误处理 | quickstart.md#错误处理 | 异常和错误处理示例 |
DM 私信示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 单条发送 | dm_demo.md#发送单条私信 | 发送单条私信 |
| 批量发送 | dm_demo.md#批量发送相同内容 | 批量发送相同内容 |
| 自定义文案 | dm_demo.md#批量发送自定义文案 | 每个用户不同内容 |
| 带图片发送 | dm_demo.md#发送带图片的私信 | 发送带媒体附件的私信 |
Upload 上传示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 上传图片 | upload_demo.md#上传单张图片 | 上传单张图片 |
| 上传视频 | upload_demo.md#上传视频 | 上传视频文件 |
| 批量上传 | upload_demo.md#批量上传图片 | 批量上传获取多个 media_id |
Posts 帖子示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 发布帖子 | posts_demo.md#发布纯文本帖子 | 发布纯文本帖子 |
| 带图片发帖 | posts_demo.md#发布带图片的帖子 | 发布带图片的帖子 |
| 回复帖子 | posts_demo.md#回复帖子 | 回复其他用户的帖子 |
| 点赞转发 | posts_demo.md#点赞和取消点赞 | 点赞和转发帖子 |
User 用户示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 获取资料 | user_demo.md#查询用户资料 | 查询用户资料 |
| 编辑资料 | user_demo.md#编辑个人资料 | 更新个人资料 |
| 更换头像 | user_demo.md#更换头像 | 上传并更换头像 |
Inbox 收件箱示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 获取消息 | inbox_demo.md#获取最新私信 | 获取收件箱最新消息 |
| 分页获取 | inbox_demo.md#分页获取历史消息 | 使用游标分页获取 |
| 提取实体 | inbox_demo.md#提取消息实体 | 提取链接、话题、@提及 |
| 实时监控 | inbox_demo.md#实时监控收件箱 | 定期检查新消息 |
批量操作示例¶
| 示例 | 文档 | 描述 |
|---|---|---|
| 批量私信 | batch_operations.md#批量发送私信 | 高效的批量私信发送 |
| 并发控制 | batch_operations.md#并发控制与限流 | 控制并发数量和速率限制 |
| 错误重试 | batch_operations.md#错误处理与重试 | 失败重试策略 |
| 大规模操作 | batch_operations.md#大规模批量操作 | 处理数百甚至数千个用户 |
运行示例¶
环境准备¶
# 1. 安装依赖
pip install maturin
# 2. 构建项目
cd /path/to/x_python
maturin develop --features python
# 3. 准备 cookies
export TWITTER_COOKIES="ct0=xxx; auth_token=yyy; twid=u%3D123"
运行示例代码¶
使用自己的代码¶
import asyncio
import os
from x_api_rs import Twitter
async def main():
# 从环境变量获取 cookies
cookies = os.getenv("TWITTER_COOKIES")
if not cookies:
print("请设置 TWITTER_COOKIES 环境变量")
return
client = Twitter(cookies)
# 你的代码...
result = await client.dm.send_message("user_id", "Hello!")
print(result)
if __name__ == "__main__":
asyncio.run(main())
常见场景¶
场景 1: 群发通知消息¶
需求: 向100个用户发送相同的通知消息
解决方案: batch_operations.md#群发通知
user_ids = ["123", "456", ...] # 100个用户
result = await client.dm.send_batch(user_ids, "重要通知:...")
print(f"成功: {result.success_count}, 失败: {result.failure_count}")
场景 2: 个性化营销消息¶
需求: 向每个用户发送包含其名字的个性化消息
解决方案: dm_demo.md#个性化消息
users = [
{"id": "123", "name": "张三"},
{"id": "456", "name": "李四"},
]
user_ids = [u["id"] for u in users]
texts = [f"你好,{u['name']}!" for u in users]
result = await client.dm.send_batch_with_custom_texts(user_ids, texts)
场景 3: 带图片的产品推广¶
需求: 发送产品图片和介绍文字给潜在客户
解决方案: dm_demo.md#产品推广
# 1. 上传产品图片
with open("product.jpg", "rb") as f:
upload_result = await client.upload.image(f.read(), "dm_image")
# 2. 发送带图片的私信
result = await client.dm.send_message(
"user_id",
"查看我们的新产品!",
media_id=upload_result.media_id_string
)
场景 4: 自动发帖机器人¶
需求: 定时发布内容到 Twitter
解决方案: posts_demo.md#定时发帖
import asyncio
from datetime import datetime
async def scheduled_post():
client = Twitter(cookies)
while True:
current_hour = datetime.now().hour
if current_hour == 9: # 每天9点发帖
result = await client.posts.create_tweet(
text=f"早安!今天是 {datetime.now().strftime('%Y-%m-%d')}"
)
print(f"发帖成功: {result.tweet_id}")
await asyncio.sleep(3600) # 每小时检查一次
场景 5: 批量上传图片库¶
需求: 上传多张图片用于后续发帖
解决方案: upload_demo.md#批量上传
import os
async def upload_gallery():
client = Twitter(cookies)
media_ids = []
for filename in os.listdir("images/"):
with open(f"images/{filename}", "rb") as f:
result = await client.upload.image(f.read(), "tweet_image")
if result.success:
media_ids.append(result.media_id_string)
print(f"成功上传 {len(media_ids)} 张图片")
return media_ids
场景 6: 用户资料同步¶
需求: 定期更新个人资料信息
解决方案: user_demo.md#资料同步
from x_api_rs.user import EditUserParams
async def sync_profile():
client = Twitter(cookies)
# 更新资料
params = EditUserParams(
name="Updated Name",
description="Updated bio | Tech enthusiast | Python developer"
)
result = await client.user.edit_profile(params)
print(f"资料已更新: {result.name}")
# 更换头像
with open("new_avatar.jpg", "rb") as f:
upload_result = await client.upload.image(f.read(), "banner_image")
if upload_result.success:
await client.user.change_profile_image(upload_result.media_id_string)
场景 7: 大规模批量操作(避免限流)¶
需求: 向10000个用户发送私信,避免触发速率限制
解决方案: batch_operations.md#大规模操作
from itertools import islice
async def massive_send():
client = Twitter(cookies)
user_ids = [...] # 10000个用户
def batched(iterable, n):
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
success_total = 0
failure_total = 0
for i, batch in enumerate(batched(user_ids, 10)):
result = await client.dm.send_batch(batch, "消息")
success_total += result.success_count
failure_total += result.failure_count
print(f"批次 {i+1}: 成功 {result.success_count}, 失败 {result.failure_count}")
await asyncio.sleep(2) # 批次间延迟
print(f"总计: 成功 {success_total}, 失败 {failure_total}")
场景 8: 多账号并行操作¶
需求: 使用多个 Twitter 账号并行发送消息
解决方案: batch_operations.md#多账号
async def multi_account_send():
accounts = [
{"cookies": "cookies_1", "users": ["123", "456"]},
{"cookies": "cookies_2", "users": ["789", "012"]},
]
async def send_for_account(account):
client = Twitter(account["cookies"])
return await client.dm.send_batch(account["users"], "消息")
tasks = [send_for_account(acc) for acc in accounts]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results, 1):
print(f"账号 {i}: 成功 {result.success_count}, 失败 {result.failure_count}")
最佳实践总结¶
1. 使用异步编程¶
# ✅ 推荐:使用 asyncio
import asyncio
async def main():
result = await client.dm.send_message(...)
asyncio.run(main())
# ❌ 不推荐:阻塞调用
# (X API 不提供同步 API)
2. 复用客户端实例¶
# ✅ 推荐:创建一次,多次使用
client = Twitter(cookies)
await client.dm.send_message(...)
await client.posts.create_tweet(...)
# ❌ 不推荐:重复创建
for i in range(10):
client = Twitter(cookies)
await client.dm.send_message(...)
3. 使用批量 API¶
# ✅ 推荐:使用批量 API
result = await client.dm.send_batch(user_ids, "消息")
# ❌ 不推荐:循环调用
for user_id in user_ids:
await client.dm.send_message(user_id, "消息")
4. 添加错误处理¶
# ✅ 推荐:完整的错误处理
try:
result = await client.dm.send_message(...)
if result.success:
log_success()
else:
log_failure(result.error_msg)
except Exception as e:
log_exception(e)
# ❌ 不推荐:忽略错误
result = await client.dm.send_message(...)
5. 控制并发和速率¶
# ✅ 推荐:控制并发
semaphore = asyncio.Semaphore(5)
async def send_limited(user_id):
async with semaphore:
return await client.dm.send_message(user_id, "消息")
# ❌ 不推荐:无限制并发
# 可能触发 API 限流
调试技巧¶
启用详细日志¶
使用 asyncio 调试模式¶
import asyncio
import warnings
# 启用调试模式
warnings.simplefilter('always', ResourceWarning)
asyncio.run(main(), debug=True)
打印请求详情¶
result = await client.dm.send_message("user_id", "Hello!")
print(f"成功: {result.success}")
print(f"状态码: {result.http_status}")
print(f"错误: {result.error_msg}")
print(f"事件ID: {result.event_id}")
性能优化¶
使用连接池¶
# Twitter 客户端自动使用连接池
# 复用客户端实例即可获得连接池的好处
client = Twitter(cookies)
# 多次请求自动复用连接
await client.dm.send_message(...)
await client.dm.send_message(...)
批量操作分组¶
# 将大批量操作分组,避免单次请求过大
from itertools import islice
def batched(iterable, n):
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
for batch in batched(large_user_list, 10):
await client.dm.send_batch(batch, "消息")
更多资源¶
贡献示例¶
如果您有好的示例代码,欢迎贡献!
- Fork 本项目
- 在
examples/python/目录添加示例代码 - 在
docs/examples/添加文档说明 - 提交 Pull Request
示例代码应该: - 清晰易懂 - 包含注释 - 遵循最佳实践 - 可以独立运行