debian_pkgs/main.py

301 lines
12 KiB
Python
Raw Permalink Normal View History

2023-10-08 17:45:15 +02:00
import argparse
import logging
import os
import re
from collections import defaultdict
import graphviz
import requests
from bs4 import BeautifulSoup
logging.basicConfig()
logger = logging.getLogger()
def setup_logging_level(debug=False):
log_level = logging.DEBUG if debug else logging.ERROR
logger.setLevel(log_level)
logger.debug("Debugging enabled")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('query', nargs='*', default="", help="freeform")
parser.add_argument('--debug', dest='debug', action='store_true')
2024-06-17 12:19:46 +02:00
parser.add_argument('--raise-exceptions', dest='raise_exceptions', action='store_true')
2023-10-08 17:45:15 +02:00
return parser.parse_args()
2024-06-17 12:19:46 +02:00
args = parse_args()
setup_logging_level(args.debug)
2023-10-08 17:45:15 +02:00
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
PACKAGE_DETAILS_URL = 'https://tracker.debian.org/pkg/{pakage_name}'
PACKAGE_VERSIONS_URL = 'https://qa.debian.org/madison.php?package={package_name}&table=debian'
PACKAGE_NEWS_URL = 'https://tracker.debian.org/pkg/{package_name}/news/?page={page_number}'
PACKAGE_VERSION_CHANGELOG_URL = 'https://tracker.debian.org/media/packages/{package_first_letter}/{package_name}/changelog-{package_version}'
RELEASES = ['sid', 'trixie', 'bookworm', 'bullseye', 'buster']
CHANNELS = ['unstable', 'testing', 'stable', 'oldstable', 'oldoldstable']
SUBRELEASES = ['security', 'backports', 'proposed-updates']
CHANNELS_ORDER = ['experimental', 'unstable', 'testing', 'proposed-updates',
'stable', 'stable-security', 'stable-backports',
'oldstable', 'oldstable-security', 'oldstable-backports',
'oldoldstable', 'oldoldstable-security', 'oldoldstable-backports']
RELEASE_TO_CHANNEL = {
'sid': 'unstable',
'trixie': 'testing',
'bookworm': 'stable',
'bullseye': 'oldstable',
'buster': 'oldoldstable',
}
EXTRA_CHANNELS = ['experimental', 'proposed-updates']
CHANNEL_TO_RELEASE = {v: k for k, v in RELEASE_TO_CHANNEL.items()}
EDGE_COLORS = ['red', 'blue', 'green', 'yellow', 'orange', 'purple', 'brown', 'gray', 'pink', 'cyan',
'magenta', 'crimson', 'darkgreen', 'darkblue', 'darkred', 'darkcyan', 'darkmagenta', 'darkgray']
def map_channel_to_release(full_channel):
if '-' in full_channel and full_channel not in EXTRA_CHANNELS:
channel, subrelease = full_channel.split('-')
else:
channel = full_channel
if channel not in CHANNEL_TO_RELEASE and channel not in EXTRA_CHANNELS:
raise Exception("Unknown channel: {}".format(channel))
if channel in CHANNEL_TO_RELEASE:
return full_channel.replace(channel, CHANNEL_TO_RELEASE[channel])
return full_channel
RELEASES_ORDER = [map_channel_to_release(channel) for channel in CHANNELS_ORDER]
def map_release_to_channel(full_release):
if '-' in full_release:
release, subrelease = full_release.split('-')
else:
release = full_release
if release not in RELEASE_TO_CHANNEL:
raise Exception("Unknown release: {}".format(release))
return full_release.replace(release, RELEASE_TO_CHANNEL[release])
def get_cache_or_url(url, cache_file):
if os.path.exists(cache_file):
logger.debug("Getting cached response from {}".format(cache_file))
with open(cache_file, 'r') as f:
response_text = f.read()
else:
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
logger.debug("Getting response from {}".format(url))
response = requests.get(url)
if response.status_code != 200:
2024-06-17 12:19:46 +02:00
if args.raise_exceptions:
raise Exception("Error getting response from {}".format(url))
logger.error("Error getting response from {}".format(url))
2023-10-08 17:45:15 +02:00
response_text = response.text
with open(cache_file, 'w+') as f:
f.write(response.text)
return response_text
def parse_changelog(response_text, package_name, max_lines=10):
versions_history = []
for line in response_text.split('\n'):
if not line:
continue
if not line.startswith(package_name):
continue
version = re.search(r'\((.*?)\)', line).group(1) if re.search(r'\((.*?)\)', line) else None
versions_history.append(version)
max_lines -= 1
if max_lines == 0:
break
return versions_history
def get_package_changelog(package_name, version):
sanitized_version = version.replace('+', '').replace('~', '').replace(':', '')
package_first_letter = package_name[0]
url = PACKAGE_VERSION_CHANGELOG_URL.format(package_first_letter=package_first_letter, package_name=package_name,
package_version=sanitized_version)
cache_file = os.path.join(DATA_DIR, package_name, 'changelog-{}.html'.format(sanitized_version))
response_text = get_cache_or_url(url, cache_file)
return parse_changelog(response_text, package_name)
def parse_release_versions(text):
release_versions = {}
for line in text.split('\n'):
if not line:
continue
package, version, release, archs = [e.strip() for e in line.split('|')]
2024-06-17 12:19:46 +02:00
if 'amd64' not in archs and 'source' not in archs:
continue
2023-10-08 17:45:15 +02:00
if '/' in release:
release = release.split('/')[0]
release_versions[release] = version
return release_versions
def get_package_versions(package_name, allowed_releases):
url = PACKAGE_VERSIONS_URL.format(package_name=package_name)
cache_file = os.path.join(DATA_DIR, package_name, 'versions.html')
response_text = get_cache_or_url(url, cache_file)
soup = BeautifulSoup(response_text, 'html.parser')
pre_tag = soup.find('pre')
if not pre_tag:
raise Exception("No <pre> tag found on the page.")
text = pre_tag.get_text()
release_versions = parse_release_versions(text)
logger.debug(release_versions)
releases_histories = {}
for release in allowed_releases:
if release not in release_versions:
logger.debug("Skipping release: {}".format(release))
continue
version = release_versions[release]
logger.debug("Getting changelog for package: {}, version: {}".format(package_name, version, release))
version_history = get_package_changelog(package_name, version)
releases_histories[release] = version_history
return releases_histories
def parse_news_title(news_title, package_name):
"""types of entries:
curl 8.2.1-1 MIGRATED to testing
Accepted curl 7.88.1-10+deb12u2 (source) into proposed-updates
"""
news_title = news_title.replace('\n', '')
migrated_title_re = '{} (.*) MIGRATED to (.*)'.format(package_name)
accepted_title_re = 'Accepted {} (.*) \(.*\) into (.*)'.format(package_name)
is_migrated = re.search(migrated_title_re, news_title)
if is_migrated:
version, branch = is_migrated.group(1), is_migrated.group(2)
return version, branch
is_accepted = re.search(accepted_title_re, news_title)
if is_accepted:
version, branch = is_accepted.group(1), is_accepted.group(2)
return version, branch
logger.warning("Unknown news title: {}".format(news_title))
return None, None
def parse_news(response_text, package_name, allowed_channels, allowed_releases):
soup = BeautifulSoup(response_text, 'html.parser')
news_list = soup.find('ul', {'class': 'list-group'})
if not news_list:
raise Exception("No news-list found on the page.")
channel_versions = defaultdict(list)
for news_item in news_list.find_all('li'):
news_date = news_item.find('span', {'class': 'news-date'}).get_text()
news_title = news_item.find('span', {'class': 'news-title'}).get_text()
version, channel = parse_news_title(news_title, package_name)
if channel in allowed_channels:
channel_versions[channel].append((version, news_date))
elif channel in allowed_releases:
logger.debug("Used release instead of channel: {}".format(channel))
channel_versions[map_release_to_channel(channel)].append((version, news_date))
else:
logger.debug("Skipping channel: {}".format(channel))
return channel_versions
def get_package_news(package_name, allowed_channels, allowed_releases, max_pages=4):
release_versions = defaultdict(list)
for page_number in range(1, max_pages):
url = PACKAGE_NEWS_URL.format(package_name=package_name, page_number=page_number)
cache_file = os.path.join(DATA_DIR, package_name, 'news_{}.html'.format(page_number))
try:
response_text = get_cache_or_url(url, cache_file)
2024-06-17 12:19:46 +02:00
new_channel_versions = parse_news(response_text, package_name, allowed_channels, allowed_releases)
2023-10-08 17:45:15 +02:00
except Exception as e:
logger.error(e)
break
for channel, versions in new_channel_versions.items():
release = map_channel_to_release(channel)
release_versions[release].extend(versions)
return release_versions
def build_graph(package_name, releases_histories, release_versions):
dot = graphviz.Digraph()
releases_graph = graphviz.Digraph()
releases_graph.attr(rank='same')
for idx, release in enumerate(RELEASES_ORDER[:-1]):
2024-06-17 12:19:46 +02:00
if release in releases_histories or release in release_versions:
next_release = RELEASES_ORDER[idx + 1]
if next_release in releases_histories or next_release in release_versions:
releases_graph.edge(release, next_release, style='invis')
2023-10-08 17:45:15 +02:00
dot.attr(rankdir='BT', label='Package: {}'.format(package_name))
edges = set()
for idx, release_histories in enumerate(releases_histories.items()):
release, versions = release_histories
releases_graph.node(release, release, shape='box')
for idx, version in enumerate(versions):
version_safe = version.replace(':', '_')
dot.node(version_safe, version)
if idx == 0:
edges.add((version_safe, release))
else:
prev_version = versions[idx - 1].replace(':', '_')
edges.add((version_safe, prev_version))
for edge in edges:
dot.edge(*edge)
for idx, release_versions in enumerate(release_versions.items()):
release, versions = release_versions
edge_color = EDGE_COLORS[idx % len(EDGE_COLORS)]
release_edges = set()
releases_graph.node(release, release, shape='box', color=edge_color)
for idx, version_date in enumerate(versions):
version, date = version_date
version_safe = version.replace(':', '_')
2024-06-17 12:19:46 +02:00
dot.node(version_safe, "{}\n{}".format(version, date))
2023-10-08 17:45:15 +02:00
if idx == 0:
release_edges.add((version_safe, release))
else:
prev_version = versions[idx - 1][0].replace(':', '_')
release_edges.add((version_safe, prev_version))
for edge in release_edges:
dot.edge(*edge, color=edge_color, dir="none")
dot.subgraph(releases_graph)
filepath = os.path.join(DATA_DIR, package_name, 'package_versions.dot')
dot.save(filepath)
dot.render(filepath, view=True)
def get_package_details(package_name, allowed_releases=None):
if not allowed_releases:
allowed_releases = ['sid', 'trixie',
'bookworm', 'bookworm-backports', 'bookworm-security',
2024-06-17 12:19:46 +02:00
# 'bullseye', 'bullseye-backports', 'bullseye-security',
2023-10-08 17:45:15 +02:00
]
releases_histories = get_package_versions(package_name, allowed_releases)
allowed_channels = [map_release_to_channel(release) for release in allowed_releases]
2024-06-17 12:19:46 +02:00
allowed_channels += [
'experimental',
'proposed-updates',
]
2023-10-08 17:45:15 +02:00
release_versions = get_package_news(package_name, allowed_channels, allowed_releases)
build_graph(package_name, releases_histories, release_versions)
def main():
os.makedirs(DATA_DIR, exist_ok=True)
2024-06-17 12:19:46 +02:00
get_package_details('thunderbird')
2023-10-08 17:45:15 +02:00
if __name__ == '__main__':
main()