import re from typing import Optional from PySide6.QtCore import QObject, QSettings, Signal, Slot, Property, qDebug from PySide6.QtNetwork import QHttpHeaders, QNetworkAccessManager, QNetworkReply, QNetworkRequestFactory from PySide6.QtQml import QmlElement from kodereviewer.data import PullRequest from kodereviewer.project import Project QML_IMPORT_NAME = "org.deprecated.kodereviewer" QML_IMPORT_MAJOR_VERSION = 1 PULL_REQUEST_LIST_URL = re.compile(r'/pulls$') COMMENT_LIST_URL = re.compile(r'/issues/(\d+)/comments') FILE_LIST_URL = re.compile(r'/pulls/(\d+)/files') @QmlElement class NetworkManager(QObject): _project: Optional[Project] _manager: QNetworkAccessManager _request_factory: QNetworkRequestFactory projectChanged = Signal() def __init__(self): super().__init__() self._project = None self._manager = QNetworkAccessManager() self._request_factory = QNetworkRequestFactory() settings = QSettings() github_token: str = str(settings.value("githubToken")) headers = QHttpHeaders() headers.append(QHttpHeaders.WellKnownHeader.Accept, "application/vnd.github+json") headers.append(QHttpHeaders.WellKnownHeader.Authorization, f'Bearer {github_token}') headers.append(QHttpHeaders.WellKnownHeader.UserAgent, "kodereviewer") print(f'Authorization: Bearer {github_token}') self._request_factory.setCommonHeaders(headers) self._manager.finished.connect(self.reply_finished) def project(self) -> Optional[Project]: return self._project def set_project(self, project: Optional[Project]): if project is None: return self._project = project self.projectChanged.emit() base_url = f'https://api.github.com/repos/{self._project.owner}/{self._project.name}' self._request_factory.setBaseUrl(base_url) project = Property(Project, fget=project, fset=set_project) def reply_finished(self, reply: QNetworkReply): if self._project is None: print('Project not set') return response_body = reply.readAll() if PULL_REQUEST_LIST_URL.search(reply.url().toString()): self._project.load_pull_requests(response_body) elif (match := COMMENT_LIST_URL.search(reply.url().toString())): pull_request_number = int(match.groups()[0]) pull_request = self._project.find_pull_request(pull_request_number) if pull_request is not None: pull_request.load_comments(response_body) elif (match := FILE_LIST_URL.search(reply.url().toString())): pull_request_number = int(match.groups()[0]) pull_request: Optional[PullRequest] = self._project.find_pull_request(pull_request_number) if pull_request is not None: pull_request.load_files(response_body) else: print(f"Can't handle {reply.url()}") @Slot() def getPullRequests(self) -> None: self._manager.get(self._request_factory.createRequest("/pulls")) @Slot(int) def getPullRequestComments(self, number: int) -> None: self._manager.get(self._request_factory.createRequest(f'/issues/{number}/comments')) @Slot(int) def getFiles(self, pull_request_number) -> None: self._manager.get(self._request_factory.createRequest(f'/pulls/{pull_request_number}/files'))