1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
import json
import logging
import re
from enum import StrEnum
from typing import Optional
from PySide6.QtCore import QObject, QSettings, Signal, Slot, Property, qDebug
from PySide6.QtNetwork import QHttpHeaders, QNetworkAccessManager, QNetworkReply, QNetworkRequestFactory
from PySide6.QtQml import QmlElement
from kodereviewer.data import PullRequest
from kodereviewer.project import Project
QML_IMPORT_NAME = "org.deprecated.kodereviewer"
QML_IMPORT_MAJOR_VERSION = 1
PULL_REQUEST_LIST_URL = re.compile(r'/pulls$')
COMMENT_LIST_URL = re.compile(r'/issues/(\d+)/comments')
FILE_LIST_URL = re.compile(r'/pulls/(\d+)/files')
CREATE_REVIEW_URL = re.compile(r'/pulls/(\d+)/reviews')
logger = logging.getLogger(__name__)
class ReviewEvent(StrEnum):
APPROVE = 'APPROVE'
REQUEST_CHANGES = 'REQUEST_CHANGES'
COMMENT = 'COMMENT'
@QmlElement
class NetworkManager(QObject):
_project: Optional[Project]
_manager: QNetworkAccessManager
_request_factory: QNetworkRequestFactory
projectChanged = Signal()
def __init__(self):
super().__init__()
self._project = None
self._manager = QNetworkAccessManager()
self._request_factory = QNetworkRequestFactory()
settings = QSettings()
github_token: str = str(settings.value("githubToken"))
headers = QHttpHeaders()
headers.append(QHttpHeaders.WellKnownHeader.Accept, "application/vnd.github+json")
headers.append(QHttpHeaders.WellKnownHeader.Authorization,
f'Bearer {github_token}')
headers.append(QHttpHeaders.WellKnownHeader.UserAgent, "kodereviewer")
print(f'Authorization: Bearer {github_token}')
self._request_factory.setCommonHeaders(headers)
self._manager.finished.connect(self.reply_finished)
def project(self) -> Optional[Project]:
return self._project
def set_project(self, project: Optional[Project]):
if project is None:
return
self._project = project
self.projectChanged.emit()
base_url = f'https://api.github.com/repos/{self._project.owner}/{self._project.name}'
self._request_factory.setBaseUrl(base_url)
project = Property(Project, fget=project, fset=set_project)
def reply_finished(self, reply: QNetworkReply):
if self._project is None:
print('Project not set')
return
response_body = reply.readAll()
if PULL_REQUEST_LIST_URL.search(reply.url().toString()):
self._project.load_pull_requests(response_body)
elif (match := COMMENT_LIST_URL.search(reply.url().toString())):
pull_request_number = int(match.groups()[0])
pull_request = self._project.find_pull_request(pull_request_number)
if pull_request is not None:
pull_request.load_comments(response_body)
elif (match := FILE_LIST_URL.search(reply.url().toString())):
pull_request_number = int(match.groups()[0])
pull_request: Optional[PullRequest] = self._project.find_pull_request(pull_request_number)
if pull_request is not None:
pull_request.load_files(response_body)
elif (match := CREATE_REVIEW_URL.search(reply.url().toString())):
logger.info(f'Got review reply: {response_body}')
else:
logger.info(f"Can't handle {reply.url()}")
@Slot()
def getPullRequests(self) -> None:
self._manager.get(self._request_factory.createRequest("/pulls"))
@Slot(int)
def getPullRequestComments(self, number: int) -> None:
self._manager.get(
self._request_factory.createRequest(f'/issues/{number}/comments')
)
@Slot(int)
def getFiles(self, pull_request_number) -> None:
self._manager.get(
self._request_factory.createRequest(
f'/pulls/{pull_request_number}/files'
)
)
@Slot(int, str, str, str)
def createReview(
self, pull_request_number: str, commit_id: str,
body: str, event: ReviewEvent
) -> None:
request = self._request_factory.createRequest(
f'/pulls/{pull_request_number}/reviews'
)
data = {
'commit_id': commit_id,
'body': body,
'event': event,
'comments': []
}
self._manager.post(request, json.dumps(data).encode())
|