Search 搜索采集示例¶
本文档提供 Search 模块的使用示例,包括五种搜索类型和分页采集。
目录¶
基础搜索¶
搜索热门帖子¶
import asyncio
from x_api_rs import Twitter
async def search_top_example():
client = await Twitter.create(cookies)
result = await client.search.search_top("python")
if result.success:
print(f"找到 {len(result.tweets)} 条热门帖子")
for tweet in result.tweets[:5]:
print(f" @{tweet.author_screen_name}: {tweet.text[:80]}...")
print(f" likes: {tweet.favorite_count}, retweets: {tweet.retweet_count}")
else:
print(f"搜索失败: {result.error_msg}")
asyncio.run(search_top_example())
搜索最新帖子¶
async def search_latest_example():
client = await Twitter.create(cookies)
result = await client.search.search_latest("twitter api")
if result.success:
print(f"找到 {len(result.tweets)} 条最新帖子")
for tweet in result.tweets[:5]:
print(f" [{tweet.created_at}] @{tweet.author_screen_name}")
print(f" {tweet.text[:100]}")
搜索用户¶
async def search_people_example():
client = await Twitter.create(cookies)
result = await client.search.search_people("Elon")
if result.success:
print(f"找到 {len(result.users)} 个用户")
for user in result.users[:5]:
print(f" @{user.screen_name} ({user.name})")
print(f" 粉丝: {user.followers_count}, 关注: {user.following_count}")
print(f" 简介: {user.description[:80] if user.description else 'N/A'}")
搜索媒体¶
async def search_media_example():
client = await Twitter.create(cookies)
result = await client.search.search_media("cat")
if result.success:
print(f"找到 {len(result.tweets)} 条媒体帖子")
for tweet in result.tweets[:5]:
print(f" @{tweet.author_screen_name}: {tweet.text[:60]}...")
if tweet.media_urls:
for url in tweet.media_urls:
print(f" 媒体: {url}")
搜索 Lists¶
async def search_lists_example():
client = await Twitter.create(cookies)
result = await client.search.search_lists("crypto")
if result.success:
print(f"找到 {len(result.lists)} 个 Lists")
for li in result.lists[:5]:
print(f" {li.name} (by @{li.creator_screen_name})")
print(f" 成员: {li.member_count}, 订阅: {li.subscriber_count}")
if li.description:
print(f" 描述: {li.description[:80]}")
分页采集¶
采集指定数量¶
async def collect_tweets(query: str, max_count: int = 100):
"""采集指定数量的帖子"""
client = await Twitter.create(cookies)
all_tweets = []
result = await client.search.search_latest(query)
if not result.success:
print(f"搜索失败: {result.error_msg}")
return all_tweets
all_tweets.extend(result.tweets)
while result.has_more and len(all_tweets) < max_count:
result = await client.search.search_latest(query, cursor=result.next_cursor)
if result.success:
all_tweets.extend(result.tweets)
print(f" 已采集 {len(all_tweets)} 条...")
else:
print(f"翻页失败: {result.error_msg}")
break
print(f"共采集 {len(all_tweets)} 条帖子")
return all_tweets
带间隔的采集(避免限流)¶
import asyncio
async def collect_with_delay(query: str, max_pages: int = 10, delay: float = 1.0):
"""带请求间隔的分页采集"""
client = await Twitter.create(cookies)
all_tweets = []
result = await client.search.search_latest(query)
if result.success:
all_tweets.extend(result.tweets)
page = 1
while result.success and result.has_more and page < max_pages:
await asyncio.sleep(delay) # 请求间隔
result = await client.search.search_latest(query, cursor=result.next_cursor)
if result.success:
all_tweets.extend(result.tweets)
page += 1
print(f"第 {page} 页: +{len(result.tweets)} 条")
return all_tweets
高级搜索¶
async def advanced_search_examples():
client = await Twitter.create(cookies)
# 搜索特定用户的帖子
result = await client.search.search_latest("from:elonmusk since:2024-01-01")
# 搜索高互动帖子
result = await client.search.search_top("python min_faves:100 min_retweets:50")
# 搜索中文帖子
result = await client.search.search_latest("AI lang:zh")
# 排除转发
result = await client.search.search_latest("rust -filter:retweets")
# 组合条件
result = await client.search.search_latest(
"from:elonmusk since:2024-06-01 until:2024-12-31 min_faves:1000 lang:en"
)
导出数据¶
导出为 JSON¶
import json
async def export_to_json(query: str, filename: str):
client = await Twitter.create(cookies)
result = await client.search.search_latest(query)
if result.success:
data = [{
"id": t.tweet_id,
"text": t.text,
"author": t.author_screen_name,
"likes": t.favorite_count,
"retweets": t.retweet_count,
"created_at": t.created_at,
"media_urls": t.media_urls,
} for t in result.tweets]
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"已导出 {len(data)} 条到 {filename}")
导出为 CSV¶
import csv
async def export_to_csv(query: str, filename: str):
client = await Twitter.create(cookies)
result = await client.search.search_latest(query)
if result.success:
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["ID", "Author", "Text", "Likes", "Retweets", "Created"])
for t in result.tweets:
writer.writerow([
t.tweet_id, t.author_screen_name,
t.text, t.favorite_count, t.retweet_count, t.created_at
])
print(f"已导出 {len(result.tweets)} 条到 {filename}")