目录

从-GitHub-批量下载项目各版本的方法

从 GitHub 批量下载项目各版本的方法

一、脚本功能概述

这个 Python 脚本的主要功能是从 GitHub 上下载指定项目的各个发布版本的压缩包( .zip.tar.gz 格式)。用户需要提供两个参数:一个是包含项目信息的 CSV 文件,另一个是用于保存下载版本信息的 CSV 文件。脚本会遍历项目列表,访问每个项目的 tags 页面,下载所有可用的版本压缩包,并记录相关信息到指定的 CSV 文件中。

二、脚本使用说明

在运行脚本前,请确保你已经安装了 requestsbeautifulsoup4 库。如果未安装,可以使用以下命令进行安装:

bash

pip install requests beautifulsoup4

运行脚本时,在命令行中输入以下格式的命令:

bash

python script.py project_list.csv save_info.csv

其中, script.py 是脚本文件名, project_list.csv 是包含项目信息的 CSV 文件, save_info.csv 是用于保存下载版本信息的 CSV 文件。

下面是完整的脚本。

import requests

from bs4 import BeautifulSoup

import os

import csv

import sys

import time

import random

from urllib.error import HTTPError

import signal

设置GitHub API的个人访问令牌

从这里获取:https://github.com/settings/tokens

access_token = ‘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’

分页参数

page = 1

num = 0

savelastfilename =""

lastfilename = ""

url_with_page = ""

fieldnames = [‘项目名称’, ’tags’, ‘版本号’, ‘压缩包名’,‘是否有发布’]

file = None

writer = None

proxies = {

“https1”: “https://182.204.177.61:4331”,

“https2”: “https://140.255.150.253:4361”,

“https3”: “https://113.231.18.51:4334”,

“https4”: “https://116.7.192.240:43581”,

“https5”: “https://121.61.160.170:43311”,

“https6”: “https://124.231.69.245:43311”,

“https7”: “https://183.128.97.139:43251”,

“https8”: “https://124.94.188.113:43341”,

“https9”: “https://1.82.107.78:4389”,

“https10”: “https://1.82.107.49:4379”,

}

headers = {

‘User-Agent’: ‘Mozilla/5.0’,

‘Authorization’: ‘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’,

‘Content-Type’: ’text/html’,

‘Accept’: ‘application/json’

}

定义信号处理函数

def signal_handler(sig, frame):

print("\n收到了中断信号,程序退出!!!")

sys.exit(0)

注册信号处理函数

signal.signal(signal.SIGINT, signal_handler)

def get_html_url_with_tags(file_name):

html_urls = []

Name = '’

with open(file_name, ‘r’, newline=’’) as file:

reader = csv.DictReader(file)

for row in reader:

Name = row.get(‘Name’)

html_url = row.get(‘HTML URL’)

if html_url:

new_html_url = html_url + “/tags”

html_urls.append(new_html_url)

#print(“Name="+ Name +”  url=" + new_html_url)

return Name,html_urls

def download_file(url, save_path, timeout=20, max_retries=3):

retries = 0

while retries < max_retries:

try:

发送GET请求获取文件内容,设置超时时间

#@retry(tries=3, delay=2)  # 重试3次,每次间隔2秒

#response = requests.get(url, proxies=proxies, headers=headers, timeout=15)

response = requests.get(url, proxies=proxies, timeout=15)

检查响应状态码

if response.status_code == 200:

写入文件

print(f"正在下载文件{save_path}…",end=’’)

with open(save_path, ‘wb’) as f:

f.write(response.content)

print(f"…文件下载完成")

return True

else:

print(f"下载失败:状态码 {response.status_code}")

#print(“下载失败:超过最大重试次数”)

check_connection_error(response)

except requests.exceptions.Timeout:

print(f"请求超时,正在尝试重试…", end="")

except requests.exceptions.RequestException as e:

print(f"请求异常:{e}", end="")

retries += 1

print(f"重试次数:{retries}")

return False

def check_connection_error(response):

“““检查是否由于连接问题而无法访问 GitHub”””

if response.status_code == 200:

print(“返回200!”)

return True

if response.status_code == 403:

print(“已达到 GitHub API 请求限制!”)

return False

elif response.status_code == 400:

print(“服务器无法理解请求!”)

return False

elif response.status_code == 502:

print(“远程服务器关闭了连接。”)

return False

elif response.status_code == 404:

print(“未找到请求的资源。”)

return False

elif response.status_code == 407:

print(“代理服务器需要身份验证!”)

return False

elif response.status_code == 406:

print(“客户端请求问题,可能是代理失效,建议更换代理IP列表”)

return False

elif response.status_code == 429:

print(“请求过多,请稍后重试。”)

return False

elif response.status_code == 504:

print(“网关超时,等等并重试或更换其它代理!”)

return False

elif response.status_code >= 500:

print(“远程服务器内部错误。”)

return False

else:

try:

print(“远程服务器返回未知错误,正在尝试获取更多详细信息…”)

response.raise_for_status()  # 如果请求失败,这将抛出一个 HTTPError 异常

print(“获取详细信息失败,当前状态码为:”, response.status_code)

return False  # 如果无法获取详细信息,则返回 False,表示请求失败

except HTTPError as e:

print(“发生了 HTTPError 异常:”, e)

return False  # 如果抛出 HTTPError 异常,则返回 False,表示请求失败

except ConnectionError as ce:

print(“连接被远程服务器关闭,没有返回任何响应。”)

return False  # 如果捕获到 ConnectionError 异常,则返回 False,表示请求失败

except requests.exceptions.Timeout:

print(“连接超时:可能是由于网络问题导致的连接失败”)

return False

except requests.exceptions.ConnectionError:

print(“连接错误:无法连接到服务器”)

return False

except requests.exceptions.RequestException as e:

print(“请求异常,最可能是代理问题:”, e)

return False

def wait_and_retry(wait_time=30):

“““等待一段时间后重试请求”””

print(f"等待 {wait_time} 秒后重试…")

time.sleep(wait_time)

def get_github_rate_limit(headers):

url = “

headers = {

‘User-Agent’: ‘Mozilla/5.0’,

‘Authorization’: ‘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’,

‘Content-Type’: ’text/html’,

‘Accept’: ‘application/json’

}

#response = requests.get(url, proxies=proxies, headers=headers)

response = requests.get(url, proxies=proxies)

data = response.json()

limit = data[“rate”][“limit”]

remaining = data[“rate”][“remaining”]

reset_time = data[“rate”][“reset”]

print(f"限速检查完成…”)

return limit, remaining, reset_time

def update_ifcheck_value(file_name, Name):

“““打开 CSV 文件,将指定 Name 对应的行的 ifcheck 字段值修改为 1"””

rows = []

with open(file_name, ‘r’, newline=’’) as file:

reader = csv.DictReader(file)

fieldnames = reader.fieldnames  # 获取表头字段名

for row in reader:

if row.get(‘Name’) == Name:

row[‘ifcheck’] = ‘1’  # 将 ifcheck 字段值修改为 1

rows.append(row)

写回 CSV 文件

with open(file_name, ‘w’, newline=’’) as file:

writer = csv.DictWriter(file, fieldnames=fieldnames)

writer.writeheader()

writer.writerows(rows)

def renamefile(name,filename):

current_path = os.getcwd()

old_name = filename

设置新的文件名

new_name = name + “_” + filename

构建新文件的完整路径

new_path = os.path.join(current_path, new_name)

构建旧文件的完整路径

old_path = os.path.join(current_path, old_name)

重命名文件

if os.path.isfile(new_path):

return False

else:

os.rename(old_path, new_path)

return True

def analyze_download_links(Name, url):

global lastfilename,num,headers

global writer,file

while True:

#判断是否为多页

if num == 20:

url_with_page = url + “?after=” + lastfilename

num = 0

else:

url_with_page = url

print(“访问的url=” + url_with_page)

#判断是否被限速,被限速的话,等待10~20秒

limit, remaining, reset_time=get_github_rate_limit(headers)

if remaining == 0:

count = 1

time.sleep(random.randint(10, 20))

print(“剩余请求次数为 0 了,现在更新header” )

headers = {

‘User-Agent’: ‘User-Agent:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0’,

‘Authorization’: ‘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’,

‘Content-Type’: ’text/html’,

‘Accept’: ‘application/json’

}

#通过一个代理列表中中的一个代理获取当前url项目的tags页面

#response = requests.get(url_with_page,proxies=proxies, headers=headers)

#直接获取当前url项目的tags页面

response = requests.get(url_with_page,proxies=proxies)

#进行容错判断,如果链接错误,则等一会重新链接

print(“进行服务器返回检查中…” )

if False == check_connection_error(response):

wait_and_retry()

print(“服务器返回错误,请稍等一会…” )

#time.sleep(random.randint(10, 20))

headers = {

‘User-Agent’: ‘Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11’,

‘Authorization’: ‘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’,

‘Content-Type’: ’text/html’,

‘Accept’: ‘application/json’

}

continue

#如果没有响应,则10~20秒,更换header重新链接

if response is None:

Exit loop if HTTP Error 422

print(“GitHub 未响应”)

time.sleep(random.randint(10, 20))

headers = {

‘User-Agent’: ‘Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11’,

‘Authorization’: ‘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’,

‘Content-Type’: ’text/html’,

‘Accept’: ‘application/json’

}

continue

#如果返回200,说明请求正确,处理返回数据

if response.status_code == 200:

soup = BeautifulSoup(response.text, ‘html.parser’)

#如果项目tags中没有版本,则返回

if “There aren’t any releases here” in response.text:

with open(save_file_name, ‘a’, newline=’’) as file:

writer = csv.writer(file)

writer.writerow([Name, url, ‘’, ‘’,‘0’])

print(“当前项目tags下无发布版本”)

break

#H抓取tags页面中所有的下载链接

download_links = soup.find_all(‘a’, href=lambda href: href and (href.endswith(’.zip’) or href.endswith(’.tar.gz’)))

#print(“所有链接:” + download_links)

if len(download_links) == 0:

print(“当前项目tags下无发布版本”)

break

#分析每一个链接

for link in download_links:

#print(“Found download link:”, link[‘href’])

file_name = os.path.basename(link[‘href’])  #file_name是压缩包名字

if os.path.isfile(file_name):

print(“该项目版本已经下载,略过!”)

continue

if download_file(“ + link[‘href’], file_name):  #拼接为完整下载链接后下载文件

#print(f"确认文件已下载完成”)

renamefile(Name,file_name)

lastfilename, _ = os.path.splitext(file_name)  #lastfilename 是去掉.zip或.gz后的文件名

if lastfilename.endswith(’.tar’):  #如果后面还有tar后缀

lastfilename, _ = os.path.splitext(lastfilename)  #lastfilename 文件名称,实际上版本号

#项目名称,tags url、版本号和压缩包文件名,是否找到发布包写入文件。1表示有发布包

with open(save_file_name, ‘a’, newline=’’) as file:

writer = csv.writer(file)

writer.writerow([Name, url, lastfilename, file_name,‘1’])

num = num + 1  #当前下载文件数加1

else:

headers = {

‘User-Agent’: ‘Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11’,

‘Authorization’: ‘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’,

‘Content-Type’: ’text/html’,

‘Accept’: ‘application/json’

}

print(f"更换header重新下载…”)

download_file(“ + link[‘href’], file_name)  #拼接为完整下载链接后下载文件

if  num == 20:

continue

#如果有多页,每页返回10个版本,一共20个压缩包,如果不到20表示版本页面不到一页,

if num < 20:

break

#user_input = input(“请输入任意内容,按 Enter 键结束程序:”)

#if user_input:

   print(“用户输入了ctrl+C:”, user_input)

#break

else:

print(f’页面返回不是200时,返回状态码是: {response.status_code}’)

continue

测试程序

if name == “main”:

if len(sys.argv) != 3:

print(“请提供两个参数:第一个参数是项目列表;第二个需要创新的文件用来保存该项目版本信息”)

sys.exit(1)

file_name = sys.argv[1]

if not file_name.endswith(’.csv’):

print(“Please provide a CSV file.”)

sys.exit(1)

if os.path.exists(file_name):

print(“打开文件,开始分析…”)

else:

print(“输入文件不存在,请确认”)

sys.exit(1)

save_file_name = sys.argv[2]

if not save_file_name.endswith(’.csv’):

print(“Please provide a CSV save file.”)

sys.exit(1)

#如果保存文件不存在,则创建文件,添加表头

if os.path.exists(save_file_name):

print(“保存文件已经存在,会在文件后面追加数据”)

else:

with open(save_file_name, ‘a’, newline=’’) as file:

writer = csv.writer(file)

writer.writerow(fieldnames)

#Name,html_urls = get_html_url_with_tags(file_name)

#for url in html_urls:

Name = '’

with open(file_name, ‘r’, newline=’’) as file:

reader = csv.DictReader(file)

for row in reader:

ifcheck = row.get(‘ifcheck’)

Name = row.get(‘Name’)

html_url = row.get(‘HTML URL’)

if ifcheck == ‘0’ and html_url:

new_html_url = html_url + “/tags”

analyze_download_links(Name,new_html_url)

update_ifcheck_value(file_name, Name)

#user_input = input(“您的输入中断了下载,按Ctrl+C键结束程序,其它键继续下载”)

#if user_input:

  print(“用户输入了:”, user_input)

  break

print(“检测完成!”)

————————————————————————————————