commit af910869b818858875f0a286a138ebc2c8536d8a Author: Daniel Tsvetkov Date: Sun Oct 8 17:45:15 2023 +0200 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..53d8677 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +data +venv +.idea + diff --git a/main.py b/main.py new file mode 100644 index 0000000..64f297b --- /dev/null +++ b/main.py @@ -0,0 +1,290 @@ +import argparse +import logging +import os +import re +from collections import defaultdict + +import graphviz +import requests +from bs4 import BeautifulSoup + +logging.basicConfig() +logger = logging.getLogger() + + +def setup_logging_level(debug=False): + log_level = logging.DEBUG if debug else logging.ERROR + logger.setLevel(log_level) + logger.debug("Debugging enabled") + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('query', nargs='*', default="", help="freeform") + parser.add_argument('--debug', dest='debug', action='store_true') + return parser.parse_args() + + +DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') +PACKAGE_DETAILS_URL = 'https://tracker.debian.org/pkg/{pakage_name}' + +PACKAGE_VERSIONS_URL = 'https://qa.debian.org/madison.php?package={package_name}&table=debian' +PACKAGE_NEWS_URL = 'https://tracker.debian.org/pkg/{package_name}/news/?page={page_number}' +PACKAGE_VERSION_CHANGELOG_URL = 'https://tracker.debian.org/media/packages/{package_first_letter}/{package_name}/changelog-{package_version}' + +RELEASES = ['sid', 'trixie', 'bookworm', 'bullseye', 'buster'] +CHANNELS = ['unstable', 'testing', 'stable', 'oldstable', 'oldoldstable'] +SUBRELEASES = ['security', 'backports', 'proposed-updates'] + +CHANNELS_ORDER = ['experimental', 'unstable', 'testing', 'proposed-updates', + 'stable', 'stable-security', 'stable-backports', + 'oldstable', 'oldstable-security', 'oldstable-backports', + 'oldoldstable', 'oldoldstable-security', 'oldoldstable-backports'] + +RELEASE_TO_CHANNEL = { + 'sid': 'unstable', + 'trixie': 'testing', + 'bookworm': 'stable', + 'bullseye': 'oldstable', + 'buster': 'oldoldstable', +} + +EXTRA_CHANNELS = ['experimental', 'proposed-updates'] + +CHANNEL_TO_RELEASE = {v: k for k, v in RELEASE_TO_CHANNEL.items()} + +EDGE_COLORS = ['red', 'blue', 'green', 'yellow', 'orange', 'purple', 'brown', 'gray', 'pink', 'cyan', + 'magenta', 'crimson', 'darkgreen', 'darkblue', 'darkred', 'darkcyan', 'darkmagenta', 'darkgray'] + + +def map_channel_to_release(full_channel): + if '-' in full_channel and full_channel not in EXTRA_CHANNELS: + channel, subrelease = full_channel.split('-') + else: + channel = full_channel + if channel not in CHANNEL_TO_RELEASE and channel not in EXTRA_CHANNELS: + raise Exception("Unknown channel: {}".format(channel)) + if channel in CHANNEL_TO_RELEASE: + return full_channel.replace(channel, CHANNEL_TO_RELEASE[channel]) + return full_channel + + +RELEASES_ORDER = [map_channel_to_release(channel) for channel in CHANNELS_ORDER] + + +def map_release_to_channel(full_release): + if '-' in full_release: + release, subrelease = full_release.split('-') + else: + release = full_release + if release not in RELEASE_TO_CHANNEL: + raise Exception("Unknown release: {}".format(release)) + return full_release.replace(release, RELEASE_TO_CHANNEL[release]) + + +def get_cache_or_url(url, cache_file): + if os.path.exists(cache_file): + logger.debug("Getting cached response from {}".format(cache_file)) + with open(cache_file, 'r') as f: + response_text = f.read() + else: + os.makedirs(os.path.dirname(cache_file), exist_ok=True) + logger.debug("Getting response from {}".format(url)) + response = requests.get(url) + if response.status_code != 200: + raise Exception("Error getting response from {}".format(url)) + response_text = response.text + with open(cache_file, 'w+') as f: + f.write(response.text) + return response_text + + +def parse_changelog(response_text, package_name, max_lines=10): + versions_history = [] + for line in response_text.split('\n'): + if not line: + continue + if not line.startswith(package_name): + continue + version = re.search(r'\((.*?)\)', line).group(1) if re.search(r'\((.*?)\)', line) else None + + versions_history.append(version) + max_lines -= 1 + if max_lines == 0: + break + return versions_history + + +def get_package_changelog(package_name, version): + sanitized_version = version.replace('+', '').replace('~', '').replace(':', '') + package_first_letter = package_name[0] + url = PACKAGE_VERSION_CHANGELOG_URL.format(package_first_letter=package_first_letter, package_name=package_name, + package_version=sanitized_version) + cache_file = os.path.join(DATA_DIR, package_name, 'changelog-{}.html'.format(sanitized_version)) + response_text = get_cache_or_url(url, cache_file) + return parse_changelog(response_text, package_name) + + +def parse_release_versions(text): + release_versions = {} + for line in text.split('\n'): + if not line: + continue + package, version, release, archs = [e.strip() for e in line.split('|')] + if '/' in release: + release = release.split('/')[0] + release_versions[release] = version + return release_versions + + +def get_package_versions(package_name, allowed_releases): + url = PACKAGE_VERSIONS_URL.format(package_name=package_name) + cache_file = os.path.join(DATA_DIR, package_name, 'versions.html') + response_text = get_cache_or_url(url, cache_file) + + soup = BeautifulSoup(response_text, 'html.parser') + pre_tag = soup.find('pre') + if not pre_tag: + raise Exception("No
 tag found on the page.")
+    text = pre_tag.get_text()
+
+    release_versions = parse_release_versions(text)
+    logger.debug(release_versions)
+    releases_histories = {}
+    for release in allowed_releases:
+        if release not in release_versions:
+            logger.debug("Skipping release: {}".format(release))
+            continue
+        version = release_versions[release]
+        logger.debug("Getting changelog for package: {}, version: {}".format(package_name, version, release))
+        version_history = get_package_changelog(package_name, version)
+        releases_histories[release] = version_history
+    return releases_histories
+
+
+def parse_news_title(news_title, package_name):
+    """types of entries:
+         curl 8.2.1-1 MIGRATED to testing
+         Accepted curl 7.88.1-10+deb12u2 (source) into proposed-updates
+    """
+    news_title = news_title.replace('\n', '')
+    migrated_title_re = '{} (.*) MIGRATED to (.*)'.format(package_name)
+    accepted_title_re = 'Accepted {} (.*) \(.*\) into (.*)'.format(package_name)
+    is_migrated = re.search(migrated_title_re, news_title)
+    if is_migrated:
+        version, branch = is_migrated.group(1), is_migrated.group(2)
+        return version, branch
+    is_accepted = re.search(accepted_title_re, news_title)
+    if is_accepted:
+        version, branch = is_accepted.group(1), is_accepted.group(2)
+        return version, branch
+    logger.warning("Unknown news title: {}".format(news_title))
+    return None, None
+
+
+def parse_news(response_text, package_name, allowed_channels, allowed_releases):
+    soup = BeautifulSoup(response_text, 'html.parser')
+    news_list = soup.find('ul', {'class': 'list-group'})
+    if not news_list:
+        raise Exception("No news-list found on the page.")
+    channel_versions = defaultdict(list)
+    for news_item in news_list.find_all('li'):
+        news_date = news_item.find('span', {'class': 'news-date'}).get_text()
+        news_title = news_item.find('span', {'class': 'news-title'}).get_text()
+        version, channel = parse_news_title(news_title, package_name)
+        if channel in allowed_channels:
+            channel_versions[channel].append((version, news_date))
+        elif channel in allowed_releases:
+            logger.debug("Used release instead of channel: {}".format(channel))
+            channel_versions[map_release_to_channel(channel)].append((version, news_date))
+        else:
+            logger.debug("Skipping channel: {}".format(channel))
+    return channel_versions
+
+
+def get_package_news(package_name, allowed_channels, allowed_releases, max_pages=4):
+    release_versions = defaultdict(list)
+    for page_number in range(1, max_pages):
+        url = PACKAGE_NEWS_URL.format(package_name=package_name, page_number=page_number)
+        cache_file = os.path.join(DATA_DIR, package_name, 'news_{}.html'.format(page_number))
+        try:
+            response_text = get_cache_or_url(url, cache_file)
+        except Exception as e:
+            logger.error(e)
+            break
+
+        new_channel_versions = parse_news(response_text, package_name, allowed_channels, allowed_releases)
+        for channel, versions in new_channel_versions.items():
+            release = map_channel_to_release(channel)
+            release_versions[release].extend(versions)
+    return release_versions
+
+
+def build_graph(package_name, releases_histories, release_versions):
+    dot = graphviz.Digraph()
+    releases_graph = graphviz.Digraph()
+    releases_graph.attr(rank='same')
+    for idx, release in enumerate(RELEASES_ORDER[:-1]):
+        releases_graph.edge(release, RELEASES_ORDER[idx + 1], style='invis')
+    dot.subgraph(releases_graph)
+    dot.attr(rankdir='BT', label='Package: {}'.format(package_name))
+    edges = set()
+    for idx, release_histories in enumerate(releases_histories.items()):
+        release, versions = release_histories
+        releases_graph.node(release, release, shape='box')
+        for idx, version in enumerate(versions):
+            version_safe = version.replace(':', '_')
+            dot.node(version_safe, version)
+            if idx == 0:
+                edges.add((version_safe, release))
+            else:
+                prev_version = versions[idx - 1].replace(':', '_')
+                edges.add((version_safe, prev_version))
+    for edge in edges:
+        dot.edge(*edge)
+
+    for idx, release_versions in enumerate(release_versions.items()):
+        release, versions = release_versions
+        edge_color = EDGE_COLORS[idx % len(EDGE_COLORS)]
+        release_edges = set()
+        releases_graph.node(release, release, shape='box', color=edge_color)
+        for idx, version_date in enumerate(versions):
+            version, date = version_date
+            version_safe = version.replace(':', '_')
+            dot.node(version_safe, version)
+            if idx == 0:
+                release_edges.add((version_safe, release))
+            else:
+                prev_version = versions[idx - 1][0].replace(':', '_')
+                release_edges.add((version_safe, prev_version))
+        for edge in release_edges:
+            dot.edge(*edge, color=edge_color, dir="none")
+    dot.subgraph(releases_graph)
+
+    filepath = os.path.join(DATA_DIR, package_name, 'package_versions.dot')
+    dot.save(filepath)
+    dot.render(filepath, view=True)
+
+
+def get_package_details(package_name, allowed_releases=None):
+    if not allowed_releases:
+        allowed_releases = ['sid', 'trixie',
+                            'bookworm', 'bookworm-backports', 'bookworm-security',
+                            'bullseye', 'bullseye-backports', 'bullseye-security',
+                            ]
+    releases_histories = get_package_versions(package_name, allowed_releases)
+    allowed_channels = [map_release_to_channel(release) for release in allowed_releases]
+    allowed_channels += ['experimental', 'proposed-updates']
+    release_versions = get_package_news(package_name, allowed_channels, allowed_releases)
+    build_graph(package_name, releases_histories, release_versions)
+
+
+def main():
+    args = parse_args()
+    setup_logging_level(args.debug)
+    os.makedirs(DATA_DIR, exist_ok=True)
+    get_package_details('curl')
+
+
+if __name__ == '__main__':
+    main()
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..4f65c22
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,8 @@
+beautifulsoup4==4.12.2
+certifi==2023.7.22
+charset-normalizer==3.3.0
+graphviz==0.20.1
+idna==3.4
+requests==2.31.0
+soupsieve==2.5
+urllib3==2.0.6