跳转至

Communities 模块 API 文档

Communities 模块提供 Twitter 社区的搜索和管理功能,支持搜索公开社区、加入感兴趣的社区。

目录

模块概述

Communities 模块支持两种使用方式:

方式 1: 通过 Twitter 主入口访问(推荐)

from x_api_rs import Twitter

client = await Twitter.create(cookies)
result = await client.communities.search("python")

方式 2: 独立创建 CommunitiesClient

from x_api_rs.communities import CommunitiesClient

# 异步创建客户端
communities_client = await CommunitiesClient.create(cookies, proxy_url="http://proxy:8080")
result = await communities_client.search("python")

功能列表

功能 方法 说明
搜索社区 search() 搜索公开社区,支持分页
加入社区 join() 加入指定的社区

CommunitiesClient 类

创建方法

@staticmethod
async def create(
    cookies: str,
    proxy_url: str | None = None,
    enable_ja3: bool = True
) -> CommunitiesClient

异步创建新的 Communities 客户端实例。

参数:

  • cookies (str): Twitter 账号的 cookies 字符串
  • proxy_url (str | None): 可选的代理服务器 URL
  • enable_ja3 (bool): 是否启用 JA3/TLS 指纹模拟(默认 True)
  • True: 使用 Chrome 136 TLS 指纹模拟,增强反检测能力
  • False: 使用 rquest 默认 TLS 配置(无指纹模拟)

返回: CommunitiesClient 实例

异常: RuntimeError - 初始化失败

示例:

# 基础使用(启用 JA3,默认行为)
client = await CommunitiesClient.create(cookies)

# 使用代理
client = await CommunitiesClient.create(cookies, proxy_url="http://proxy:8080")

# 禁用 JA3 指纹模拟
client = await CommunitiesClient.create(cookies, enable_ja3=False)

async def search(
    query: str,
    cursor: str | None = None
) -> CommunitySearchResult

搜索公开社区,支持分页获取更多结果。

参数:

  • query (str): 搜索关键词
  • cursor (str | None): 分页游标,用于获取下一页数据(首次请求传 None)

返回: CommunitySearchResult 对象,包含社区列表和分页信息

异常: RuntimeError - 搜索失败

示例:

# 搜索 Python 相关社区
result = await client.communities.search("python")

if result.success:
    print(f"找到 {len(result.communities)} 个社区")

    for community in result.communities:
        print(f"社区: {community.name}")
        print(f"  ID: {community.rest_id}")
        print(f"  成员: {community.member_count:,}")
        if community.description:
            print(f"  简介: {community.description}")

    # 检查是否有更多结果
    if result.has_more():
        # 获取下一页
        next_result = await client.communities.search("python", cursor=result.next_cursor)
else:
    print(f"搜索失败: {result.error_msg}")

join

async def join(
    community_id: str
) -> JoinCommunityResult

加入指定的社区。

参数:

  • community_id (str): 社区的 REST ID

返回: JoinCommunityResult 对象

异常: RuntimeError - 加入失败

示例:

# 加入社区
result = await client.communities.join("1234567890123456789")

if result.success:
    if result.already_member:
        print("你已经是该社区的成员")
    else:
        print("成功加入社区!")
else:
    print(f"加入失败: {result.error_msg}")

类型定义

Community

社区信息。

属性:

  • rest_id (str): 社区 REST ID
  • name (str): 社区名称
  • description (str | None): 社区描述
  • member_count (int): 成员数量

示例:

result = await client.communities.search("rust")

if result.success:
    for community in result.communities:
        print(f"社区: {community.name}")
        print(f"  ID: {community.rest_id}")
        print(f"  成员: {community.member_count:,}")
        if community.description:
            print(f"  简介: {community.description}")

CommunitySearchResult

社区搜索结果。

属性:

  • success (bool): 请求是否成功
  • communities (list[Community]): 社区列表
  • next_cursor (str | None): 下一页游标(如果为 None 表示没有更多数据)
  • error_msg (str): 错误消息
  • http_status (int): HTTP 状态码

方法:

  • has_more() -> bool: 判断是否还有更多数据

示例:

result = await client.communities.search("python")

if result.success:
    print(f"本页找到 {len(result.communities)} 个社区")

    # 检查是否有更多数据
    if result.has_more():
        print("还有更多社区可以加载")
        next_page = await client.communities.search("python", cursor=result.next_cursor)
    else:
        print("已显示所有结果")
else:
    print(f"搜索失败 (HTTP {result.http_status}): {result.error_msg}")

JoinCommunityResult

加入社区结果。

属性:

  • success (bool): 请求是否成功
  • already_member (bool): 是否已经是成员
  • error_msg (str): 错误消息
  • http_status (int): HTTP 状态码

示例:

result = await client.communities.join("1234567890123456789")

if result.success:
    if result.already_member:
        print("⚠️ 你已经在这个社区中")
    else:
        print("✅ 成功加入社区!")
else:
    print(f"❌ 加入失败: {result.error_msg}")

使用示例

搜索社区并加入

import asyncio
from x_api_rs import Twitter

async def search_and_join():
    client = await Twitter.create(cookies)

    # 搜索社区
    result = await client.communities.search("python")

    if result.success and len(result.communities) > 0:
        # 获取第一个社区
        community = result.communities[0]
        print(f"找到社区: {community.name}")
        print(f"成员数: {community.member_count:,}")

        # 加入该社区
        join_result = await client.communities.join(community.rest_id)

        if join_result.success:
            if join_result.already_member:
                print("你已经是成员了")
            else:
                print("成功加入社区!")
        else:
            print(f"加入失败: {join_result.error_msg}")

asyncio.run(search_and_join())

分页搜索所有结果

import asyncio
from x_api_rs import Twitter

async def search_all_communities():
    client = await Twitter.create(cookies)
    query = "rust"
    all_communities = []
    cursor = None

    while True:
        result = await client.communities.search(query, cursor)

        if not result.success:
            print(f"搜索失败: {result.error_msg}")
            break

        print(f"本次获取 {len(result.communities)} 个社区")
        all_communities.extend(result.communities)

        # 检查是否还有更多数据
        if not result.has_more():
            print("已获取所有社区")
            break

        cursor = result.next_cursor
        await asyncio.sleep(1)  # 避免限流

    print(f"\n总计找到 {len(all_communities)} 个社区")

    # 按成员数排序
    all_communities.sort(key=lambda c: c.member_count, reverse=True)

    print("\n成员最多的 5 个社区:")
    for i, community in enumerate(all_communities[:5], 1):
        print(f"{i}. {community.name}: {community.member_count:,} 成员")

asyncio.run(search_all_communities())

批量加入多个社区

import asyncio
from x_api_rs import Twitter

async def join_multiple_communities():
    client = await Twitter.create(cookies)

    # 搜索感兴趣的主题
    topics = ["python", "rust", "javascript"]

    for topic in topics:
        result = await client.communities.search(topic)

        if result.success and len(result.communities) > 0:
            # 加入每个主题的第一个社区
            community = result.communities[0]
            join_result = await client.communities.join(community.rest_id)

            if join_result.success:
                if join_result.already_member:
                    print(f"⚠️ 已在社区: {community.name}")
                else:
                    print(f"✅ 加入社区: {community.name}")
            else:
                print(f"❌ 加入失败: {community.name}")

        await asyncio.sleep(2)  # 避免限流

asyncio.run(join_multiple_communities())

导出社区列表到 CSV

import csv
import asyncio
from x_api_rs import Twitter

async def export_communities_to_csv():
    client = await Twitter.create(cookies)

    # 搜索社区
    result = await client.communities.search("tech")

    if not result.success:
        print("搜索失败")
        return

    # 导出到 CSV
    with open("communities.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["ID", "Name", "Members", "Description"])

        for community in result.communities:
            writer.writerow([
                community.rest_id,
                community.name,
                community.member_count,
                community.description or ""
            ])

    print(f"已导出 {len(result.communities)} 个社区到 communities.csv")

asyncio.run(export_communities_to_csv())

最佳实践

1. 验证操作结果

result = await client.communities.search("python")

# 始终检查 success 状态
if result.success:
    # 成功处理
    print(f"找到 {len(result.communities)} 个社区")
else:
    # 失败处理
    print(f"Error: {result.error_msg}")
    print(f"HTTP Status: {result.http_status}")

2. 处理分页

# ✅ 好:检查是否有更多数据
cursor = None
while True:
    result = await client.communities.search(query, cursor)
    # 处理数据...

    if not result.has_more():
        break

    cursor = result.next_cursor
    await asyncio.sleep(1)

# ❌ 不好:假设只有一页数据
result = await client.communities.search(query)
# 只处理第一页...

3. 避免频繁请求

# ✅ 推荐:添加延迟避免限流
for topic in topics:
    result = await client.communities.search(topic)
    # 处理结果...
    await asyncio.sleep(1)  # 每次请求间隔 1 秒

# ❌ 不推荐:连续高速请求容易触发限流
for topic in topics:
    result = await client.communities.search(topic)

4. 检查重复加入

# 利用 already_member 字段避免重复操作
result = await client.communities.join(community_id)

if result.success:
    if result.already_member:
        print("已经在社区中,跳过")
    else:
        print("成功加入新社区")

5. 合理处理空结果

result = await client.communities.search("very_specific_query")

if result.success:
    if len(result.communities) == 0:
        print("未找到相关社区,尝试其他关键词")
    else:
        # 处理结果...
        pass

常见问题

Q1: 如何获取社区 ID?

通过搜索获取社区信息,其中包含 rest_id

result = await client.communities.search("python")
if result.success and len(result.communities) > 0:
    community_id = result.communities[0].rest_id
    print(f"社区 ID: {community_id}")

Q2: 搜索结果为空是什么原因?

可能的原因: - 关键词太具体,没有匹配的社区 - 社区可能是私密的,不出现在搜索结果中 - 尝试使用更通用的关键词

Q3: 加入社区失败的常见原因?

  • 社区可能是私密的,需要审核
  • 账号权限不足(新账号可能有限制)
  • 已达到加入社区数量上限
  • 社区 ID 无效或社区已被删除

Q4: 如何判断是否已经在社区中?

检查 JoinCommunityResult.already_member 字段:

result = await client.communities.join(community_id)

if result.success and result.already_member:
    print("你已经是该社区的成员")

Q5: 搜索支持哪些语言?

Twitter 社区搜索支持多语言,包括中文、英文等。使用对应语言的关键词进行搜索:

# 英文
result = await client.communities.search("python programming")

# 中文
result = await client.communities.search("编程学习")

Q6: 分页游标是什么格式?

分页游标是由 Twitter API 返回的不透明字符串,不需要手动构造:

# ❌ 错误:不要手动构造游标
cursor = "some_random_string"

# ✅ 正确:使用 API 返回的游标
result = await client.communities.search("python")
if result.has_more():
    cursor = result.next_cursor  # 使用返回的游标
    next_result = await client.communities.search("python", cursor=cursor)

Q7: 每次搜索返回多少个社区?

Twitter API 每次搜索通常返回约 20-50 个社区,具体数量由服务器决定。使用分页获取更多结果。

Q8: 如何退出社区?

当前版本暂不支持退出社区功能,该功能将在未来版本中添加。

Q9: 可以搜索私密社区吗?

不可以。搜索功能只能找到公开社区。私密社区需要通过邀请链接加入。

Q10: 如何处理搜索限流?

如果遇到 HTTP 429 错误(请求过于频繁):

result = await client.communities.search("python")

if result.http_status == 429:
    print("请求过于频繁,等待 60 秒后重试")
    await asyncio.sleep(60)
    result = await client.communities.search("python")

建议: - 每次搜索间隔至少 1 秒 - 分页获取时间隔 2-3 秒 - 遇到限流后等待 1 分钟再继续


下一步