import argparse import logging import os import re from collections import defaultdict import graphviz import requests from bs4 import BeautifulSoup logging.basicConfig() logger = logging.getLogger() def setup_logging_level(debug=False): log_level = logging.DEBUG if debug else logging.ERROR logger.setLevel(log_level) logger.debug("Debugging enabled") def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('query', nargs='*', default="", help="freeform") parser.add_argument('--debug', dest='debug', action='store_true') return parser.parse_args() DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') PACKAGE_DETAILS_URL = 'https://tracker.debian.org/pkg/{pakage_name}' PACKAGE_VERSIONS_URL = 'https://qa.debian.org/madison.php?package={package_name}&table=debian' PACKAGE_NEWS_URL = 'https://tracker.debian.org/pkg/{package_name}/news/?page={page_number}' PACKAGE_VERSION_CHANGELOG_URL = 'https://tracker.debian.org/media/packages/{package_first_letter}/{package_name}/changelog-{package_version}' RELEASES = ['sid', 'trixie', 'bookworm', 'bullseye', 'buster'] CHANNELS = ['unstable', 'testing', 'stable', 'oldstable', 'oldoldstable'] SUBRELEASES = ['security', 'backports', 'proposed-updates'] CHANNELS_ORDER = ['experimental', 'unstable', 'testing', 'proposed-updates', 'stable', 'stable-security', 'stable-backports', 'oldstable', 'oldstable-security', 'oldstable-backports', 'oldoldstable', 'oldoldstable-security', 'oldoldstable-backports'] RELEASE_TO_CHANNEL = { 'sid': 'unstable', 'trixie': 'testing', 'bookworm': 'stable', 'bullseye': 'oldstable', 'buster': 'oldoldstable', } EXTRA_CHANNELS = ['experimental', 'proposed-updates'] CHANNEL_TO_RELEASE = {v: k for k, v in RELEASE_TO_CHANNEL.items()} EDGE_COLORS = ['red', 'blue', 'green', 'yellow', 'orange', 'purple', 'brown', 'gray', 'pink', 'cyan', 'magenta', 'crimson', 'darkgreen', 'darkblue', 'darkred', 'darkcyan', 'darkmagenta', 'darkgray'] def map_channel_to_release(full_channel): if '-' in full_channel and full_channel not in EXTRA_CHANNELS: channel, subrelease = full_channel.split('-') else: channel = full_channel if channel not in CHANNEL_TO_RELEASE and channel not in EXTRA_CHANNELS: raise Exception("Unknown channel: {}".format(channel)) if channel in CHANNEL_TO_RELEASE: return full_channel.replace(channel, CHANNEL_TO_RELEASE[channel]) return full_channel RELEASES_ORDER = [map_channel_to_release(channel) for channel in CHANNELS_ORDER] def map_release_to_channel(full_release): if '-' in full_release: release, subrelease = full_release.split('-') else: release = full_release if release not in RELEASE_TO_CHANNEL: raise Exception("Unknown release: {}".format(release)) return full_release.replace(release, RELEASE_TO_CHANNEL[release]) def get_cache_or_url(url, cache_file): if os.path.exists(cache_file): logger.debug("Getting cached response from {}".format(cache_file)) with open(cache_file, 'r') as f: response_text = f.read() else: os.makedirs(os.path.dirname(cache_file), exist_ok=True) logger.debug("Getting response from {}".format(url)) response = requests.get(url) if response.status_code != 200: raise Exception("Error getting response from {}".format(url)) response_text = response.text with open(cache_file, 'w+') as f: f.write(response.text) return response_text def parse_changelog(response_text, package_name, max_lines=10): versions_history = [] for line in response_text.split('\n'): if not line: continue if not line.startswith(package_name): continue version = re.search(r'\((.*?)\)', line).group(1) if re.search(r'\((.*?)\)', line) else None versions_history.append(version) max_lines -= 1 if max_lines == 0: break return versions_history def get_package_changelog(package_name, version): sanitized_version = version.replace('+', '').replace('~', '').replace(':', '') package_first_letter = package_name[0] url = PACKAGE_VERSION_CHANGELOG_URL.format(package_first_letter=package_first_letter, package_name=package_name, package_version=sanitized_version) cache_file = os.path.join(DATA_DIR, package_name, 'changelog-{}.html'.format(sanitized_version)) response_text = get_cache_or_url(url, cache_file) return parse_changelog(response_text, package_name) def parse_release_versions(text): release_versions = {} for line in text.split('\n'): if not line: continue package, version, release, archs = [e.strip() for e in line.split('|')] if '/' in release: release = release.split('/')[0] release_versions[release] = version return release_versions def get_package_versions(package_name, allowed_releases): url = PACKAGE_VERSIONS_URL.format(package_name=package_name) cache_file = os.path.join(DATA_DIR, package_name, 'versions.html') response_text = get_cache_or_url(url, cache_file) soup = BeautifulSoup(response_text, 'html.parser') pre_tag = soup.find('pre') if not pre_tag: raise Exception("No
 tag found on the page.")
    text = pre_tag.get_text()

    release_versions = parse_release_versions(text)
    logger.debug(release_versions)
    releases_histories = {}
    for release in allowed_releases:
        if release not in release_versions:
            logger.debug("Skipping release: {}".format(release))
            continue
        version = release_versions[release]
        logger.debug("Getting changelog for package: {}, version: {}".format(package_name, version, release))
        version_history = get_package_changelog(package_name, version)
        releases_histories[release] = version_history
    return releases_histories


def parse_news_title(news_title, package_name):
    """types of entries:
         curl 8.2.1-1 MIGRATED to testing
         Accepted curl 7.88.1-10+deb12u2 (source) into proposed-updates
    """
    news_title = news_title.replace('\n', '')
    migrated_title_re = '{} (.*) MIGRATED to (.*)'.format(package_name)
    accepted_title_re = 'Accepted {} (.*) \(.*\) into (.*)'.format(package_name)
    is_migrated = re.search(migrated_title_re, news_title)
    if is_migrated:
        version, branch = is_migrated.group(1), is_migrated.group(2)
        return version, branch
    is_accepted = re.search(accepted_title_re, news_title)
    if is_accepted:
        version, branch = is_accepted.group(1), is_accepted.group(2)
        return version, branch
    logger.warning("Unknown news title: {}".format(news_title))
    return None, None


def parse_news(response_text, package_name, allowed_channels, allowed_releases):
    soup = BeautifulSoup(response_text, 'html.parser')
    news_list = soup.find('ul', {'class': 'list-group'})
    if not news_list:
        raise Exception("No news-list found on the page.")
    channel_versions = defaultdict(list)
    for news_item in news_list.find_all('li'):
        news_date = news_item.find('span', {'class': 'news-date'}).get_text()
        news_title = news_item.find('span', {'class': 'news-title'}).get_text()
        version, channel = parse_news_title(news_title, package_name)
        if channel in allowed_channels:
            channel_versions[channel].append((version, news_date))
        elif channel in allowed_releases:
            logger.debug("Used release instead of channel: {}".format(channel))
            channel_versions[map_release_to_channel(channel)].append((version, news_date))
        else:
            logger.debug("Skipping channel: {}".format(channel))
    return channel_versions


def get_package_news(package_name, allowed_channels, allowed_releases, max_pages=4):
    release_versions = defaultdict(list)
    for page_number in range(1, max_pages):
        url = PACKAGE_NEWS_URL.format(package_name=package_name, page_number=page_number)
        cache_file = os.path.join(DATA_DIR, package_name, 'news_{}.html'.format(page_number))
        try:
            response_text = get_cache_or_url(url, cache_file)
        except Exception as e:
            logger.error(e)
            break

        new_channel_versions = parse_news(response_text, package_name, allowed_channels, allowed_releases)
        for channel, versions in new_channel_versions.items():
            release = map_channel_to_release(channel)
            release_versions[release].extend(versions)
    return release_versions


def build_graph(package_name, releases_histories, release_versions):
    dot = graphviz.Digraph()
    releases_graph = graphviz.Digraph()
    releases_graph.attr(rank='same')
    for idx, release in enumerate(RELEASES_ORDER[:-1]):
        releases_graph.edge(release, RELEASES_ORDER[idx + 1], style='invis')
    dot.subgraph(releases_graph)
    dot.attr(rankdir='BT', label='Package: {}'.format(package_name))
    edges = set()
    for idx, release_histories in enumerate(releases_histories.items()):
        release, versions = release_histories
        releases_graph.node(release, release, shape='box')
        for idx, version in enumerate(versions):
            version_safe = version.replace(':', '_')
            dot.node(version_safe, version)
            if idx == 0:
                edges.add((version_safe, release))
            else:
                prev_version = versions[idx - 1].replace(':', '_')
                edges.add((version_safe, prev_version))
    for edge in edges:
        dot.edge(*edge)

    for idx, release_versions in enumerate(release_versions.items()):
        release, versions = release_versions
        edge_color = EDGE_COLORS[idx % len(EDGE_COLORS)]
        release_edges = set()
        releases_graph.node(release, release, shape='box', color=edge_color)
        for idx, version_date in enumerate(versions):
            version, date = version_date
            version_safe = version.replace(':', '_')
            dot.node(version_safe, version)
            if idx == 0:
                release_edges.add((version_safe, release))
            else:
                prev_version = versions[idx - 1][0].replace(':', '_')
                release_edges.add((version_safe, prev_version))
        for edge in release_edges:
            dot.edge(*edge, color=edge_color, dir="none")
    dot.subgraph(releases_graph)

    filepath = os.path.join(DATA_DIR, package_name, 'package_versions.dot')
    dot.save(filepath)
    dot.render(filepath, view=True)


def get_package_details(package_name, allowed_releases=None):
    if not allowed_releases:
        allowed_releases = ['sid', 'trixie',
                            'bookworm', 'bookworm-backports', 'bookworm-security',
                            'bullseye', 'bullseye-backports', 'bullseye-security',
                            ]
    releases_histories = get_package_versions(package_name, allowed_releases)
    allowed_channels = [map_release_to_channel(release) for release in allowed_releases]
    allowed_channels += ['experimental', 'proposed-updates']
    release_versions = get_package_news(package_name, allowed_channels, allowed_releases)
    build_graph(package_name, releases_histories, release_versions)


def main():
    args = parse_args()
    setup_logging_level(args.debug)
    os.makedirs(DATA_DIR, exist_ok=True)
    get_package_details('curl')


if __name__ == '__main__':
    main()