-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgithub_client.py
More file actions
286 lines (240 loc) · 12.3 KB
/
Copy pathgithub_client.py
File metadata and controls
286 lines (240 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#!/usr/bin/env python3
"""
GitHub API 客户端封装
封装fork和star查询相关API
"""
import logging
import time
from typing import Optional
import requests
from config import GITHUB_TOKEN, GITHUB_API_BASE
logger = logging.getLogger(__name__)
class GitHubAPIError(Exception):
"""GitHub API 调用失败(包含状态码和错误信息)"""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
super().__init__(f"GitHub API 错误 {status_code}: {message}")
class GitHubClient:
"""GitHub API 客户端"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "github-notify-bot/1.0"
})
self.api_base = GITHUB_API_BASE
self._rate_limit_remaining: Optional[int] = None
self._rate_limit_reset: Optional[int] = None
def _update_rate_limit(self, response: requests.Response) -> None:
"""从 response headers 更新 rate limit 跟踪状态"""
self._rate_limit_remaining = int(response.headers.get("X-RateLimit-Remaining", 9999))
self._rate_limit_reset = int(response.headers.get("X-RateLimit-Reset", 0))
def _get(self, url: str, params: Optional[dict] = None, retry: int = 3,
extra_headers: Optional[dict] = None) -> Optional[dict]:
"""
发送GET请求,带重试和rate limit处理。
成功返回 dict,失败(网络/权限等)返回 None,
rate limit 耗尽且等待后重试仍然失败则抛出 GitHubAPIError。
extra_headers 为可选的自定义请求头,会合并到 session 默认头中。
"""
for attempt in range(retry):
try:
# rate limit 快耗尽之前主动等
if self._rate_limit_remaining is not None and self._rate_limit_remaining < 10:
now = time.time()
wait_time = (self._rate_limit_reset - now + 5) if self._rate_limit_reset else 60
logger.warning(f"Rate limit 剩余 {self._rate_limit_remaining},等待 {wait_time:.0f}s")
time.sleep(max(wait_time, 1))
request_headers = dict(self.session.headers)
if extra_headers:
request_headers.update(extra_headers)
response = self.session.get(url, params=params, headers=request_headers, timeout=30)
self._update_rate_limit(response)
if response.status_code == 403:
if self._rate_limit_remaining == 0:
# 只有在还没超重试次数时才能等后重试
if attempt < retry - 1:
wait_time = self._rate_limit_reset - time.time() + 5
logger.warning(f"Rate limit 用尽,等待 {wait_time:.0f}s 后重试(第 {attempt+1}/{retry} 次)")
time.sleep(max(wait_time, 1))
continue # 重试
else:
logger.error(f"Rate limit 用尽,重试次数已耗尽")
raise GitHubAPIError(429, "Rate limit 用尽,重试次数已耗尽")
else:
# token权限不够或其他403,立即失败不重试
logger.error(f"API 403(非rate limit): {response.text[:200]}")
raise GitHubAPIError(403, response.text[:200])
elif response.status_code == 401:
logger.error("GitHub Token 无效或已过期,请检查 GITHUB_TOKEN 环境变量")
raise GitHubAPIError(401, "Token 无效或已过期")
elif response.status_code == 404:
logger.warning(f"API 404(资源不存在): {url}")
return None
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"API请求超时(第 {attempt+1}/{retry} 次)[{url}]")
if attempt < retry - 1:
time.sleep(2 ** attempt)
else:
logger.error(f"API请求最终超时 [{url}]")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"API请求失败(第 {attempt+1}/{retry} 次)[{url}]: {e}")
if attempt < retry - 1:
time.sleep(2 ** attempt)
else:
logger.error(f"API请求最终失败 [{url}]: {e}")
return None
# retry耗尽(理论上不会走到这里,因为403会抛异常)
raise GitHubAPIError(429, "重试次数耗尽,请求失败")
def get_user_repos(self, username: str) -> list:
"""获取用户的所有仓库"""
if not username or not username.strip():
logger.error("用户名无效(为空),跳过")
return []
repos = []
page = 1
per_page = 100
while True:
url = f"{self.api_base}/users/{username}/repos"
params = {"type": "owner", "sort": "updated", "per_page": per_page, "page": page}
data = self._get(url, params)
if data is None:
break
repos.extend(data)
if len(data) < per_page:
break
page += 1
if page > 100:
logger.warning(f"分页超过100页({username}),强制停止,可能有异常")
break
if repos:
logger.info(f"获取用户 {username} 的仓库列表成功,共 {len(repos)} 个")
else:
logger.warning(f"用户 {username} 的仓库列表为空或API调用失败(共 {len(repos)} 个)")
return repos
def get_forks(self, repo_full_name: str) -> tuple:
"""
获取仓库的所有fork。
返回 (forks_list, error),error 为 None 表示成功。
"""
url = f"{self.api_base}/repos/{repo_full_name}/forks"
params = {"per_page": 100, "sort": "newest"}
data = self._get(url, params)
if data is None:
return [], "API调用失败,无法获取fork列表"
return data, None
def get_stargazers(self, repo_full_name: str, max_retries: int = 3) -> tuple:
"""
获取仓库的所有stargazer(带star时间),自动处理分页。
返回 (all_stars, error),error 为 None 表示成功或只是"无数据",
error 为 str 表示 API 错误(all_stars 可能是部分数据)。
通过 Link header 判断是否有下一页,而不是只看stars是否为空。
"""
all_stars = []
page = 1
per_page = 100
has_next_page = True
consecutive_rate_limit_retries = 0
max_consecutive_rate_limit_retries = max_retries # 最多允许连续N次 rate limit 后重试
while has_next_page:
url = f"{self.api_base}/repos/{repo_full_name}/stargazers"
params = {"per_page": per_page, "page": page, "sort": "starred", "direction": "desc"}
headers = {"Accept": "application/vnd.github.v3.star+json"}
try:
response = self.session.get(url, params=params, headers=headers, timeout=30)
self._update_rate_limit(response)
if response.status_code == 403:
if self._rate_limit_remaining == 0:
if consecutive_rate_limit_retries < max_consecutive_rate_limit_retries:
wait_time = self._rate_limit_reset - time.time() + 5
logger.warning(f"Rate limit 用尽(stargazer),等待 {wait_time:.0f}s(第 {consecutive_rate_limit_retries+1}/{max_consecutive_rate_limit_retries} 次)")
time.sleep(max(wait_time, 1))
consecutive_rate_limit_retries += 1
continue # 重试同一页
else:
logger.error(f"Rate limit 连续重试 {max_consecutive_rate_limit_retries} 次后仍耗尽,放弃该仓库")
return all_stars, f"Rate limit 重试次数耗尽"
else:
logger.error(f"API 403(stargazer,非rate limit): {response.text[:200]}")
return all_stars, f"403错误(非rate limit): {response.text[:100]}"
elif response.status_code == 401:
logger.error("GitHub Token 无效或已过期")
return all_stars, "Token 无效或已过期"
elif response.status_code == 404:
logger.warning(f"仓库不存在: {repo_full_name}")
return all_stars, f"仓库不存在: {repo_full_name}"
response.raise_for_status()
stars = response.json()
all_stars.extend(stars)
# 重置 rate limit 连续重试计数(成功则清零)
consecutive_rate_limit_retries = 0
# 通过 Link header 判断是否有下一页
link = response.headers.get("Link", "")
has_next_page = "next" in link
# 兜底:如果 Link header 解析失败但当前页有数据,
# 说明可能是 GitHub 返回了数据但 Link header 丢失,
# 此时继续翻页直到真的没数据
if not link and stars and len(stars) == per_page:
logger.warning(f"Link header 缺失但仍有数据,继续翻页,repo={repo_full_name}")
has_next_page = True
page += 1
if page > 50:
logger.warning(f"Stargazer 分页超过50页,可能有异常,repo={repo_full_name}")
has_next_page = False
except requests.exceptions.RequestException as e:
logger.error(f"API请求失败(stargazer page {page})[{url}]: {e}")
return all_stars, f"请求异常: {e}"
return all_stars, None
def check_new_forks(self, repo_full_name: str, last_check: dict) -> tuple:
"""
检查新的fork。
返回 (new_forks, latest_created_at, error):
- latest_created_at 为当前最新 fork 的创建时间(可用于推进状态)
- error 为 None 表示成功
"""
forks, err = self.get_forks(repo_full_name)
if err:
return [], None, err
latest_created_at = forks[0].get('created_at') if forks else None
repo_name = repo_full_name.split('/')[-1]
last_time = last_check.get(repo_name)
if not last_time:
return [], latest_created_at, None
new_forks = []
for fork in forks:
if fork.get('created_at') and fork['created_at'] > last_time:
new_forks.append(fork)
return new_forks, latest_created_at, None
def check_new_stars(self, repo_full_name: str, last_check: dict) -> tuple:
"""
检查新的star。
返回 (new_stars, error),error 为 None 表示成功。
starred_at 字段缺失的记录会被跳过并计数。
"""
stars, err = self.get_stargazers(repo_full_name)
if err:
# API 出错了,不信任部分数据,跳过本轮检测
logger.error(f"获取 stargazer 失败({repo_full_name}): {err}")
return [], err
missing_starred_at = 0
new_stars = []
if stars:
repo_name = repo_full_name.split('/')[-1]
last_time = last_check.get(repo_name)
if last_time:
for star in stars:
starred_at = star.get('starred_at')
if not starred_at:
missing_starred_at += 1
continue
if starred_at > last_time:
new_stars.append(star)
if missing_starred_at > 0:
logger.warning(f"有 {missing_starred_at} 条 stargazer 记录缺少 starred_at,已跳过")
return new_stars, err