from dataclasses import dataclass, field from enum import auto, IntEnum from logging import getLogger from typing import Optional, Self, Union from PySide6.QtCore import QAbstractItemModel, QAbstractListModel, QByteArray, QModelIndex, QObject, QPersistentModelIndex, QStandardPaths, QUrl, Qt, Signal, Slot, Property from PySide6.QtQml import QmlElement from kodereviewer.data import PullRequest, ReviewComment QML_IMPORT_NAME = "org.deprecated.kodereviewer" QML_IMPORT_MAJOR_VERSION = 1 logger = getLogger(__name__) class Roles(IntEnum): Body = Qt.ItemDataRole.UserRole + 1 CreatedAt = auto() UpdatedAt = auto() Username = auto() AvatarUrl = auto() Diff = auto() Line = auto() # List model @dataclass class Thread(QObject): id: int diff: str = '' reviews: list[ReviewComment] = field(default_factory=list) def review_ids(self) -> list[int]: return [review._id for review in self.reviews] class ReviewModel(QAbstractListModel): _pull_request: Optional[PullRequest] threads: list[Thread] class Roles(IntEnum): Id = Qt.ItemDataRole.UserRole + 1 Diff = auto() Reviews = auto() def __init__(self): super().__init__() self._pull_request = None self.threads = [] pullRequestChanged = Signal() def get_pull_request(self) -> Optional[PullRequest]: return self._pull_request def set_pull_request(self, pull_request: Optional[PullRequest]) -> None: if pull_request is None: return if self._pull_request is not None and self._pull_request == pull_request: return self.beginResetModel() self._pull_request = pull_request self._pull_request.reviewsLoaded.connect(self._reset_model) self.endResetModel() self.pullRequestChanged.emit() pullRequest = Property(PullRequest, fget=get_pull_request, fset=set_pull_request, notify=pullRequestChanged) def _reset_model(self): assert self._pull_request is not None self.beginResetModel() self.load_reviews(self._pull_request._reviews) self.endResetModel() def load_reviews(self, reviews: list[ReviewComment]): self.mapping: dict[int, Thread] = {} for review in reviews: logger.info('Processing %d', review.id) if review._in_reply_to is not None: thread = self.mapping[review._in_reply_to] else: if review.id not in self.mapping: self.mapping[review._id] = Thread(id=review._id, diff=review._diff) thread = self.mapping[review._id] else: thread = self.mapping[review._id] thread.reviews.append(review) self.threads = list(self.mapping.values()) def data(self, index: QModelIndex | QPersistentModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> object: if self._pull_request is None: return None thread = self.threads[index.row()] if role == self.Roles.Id: return thread.id if role == self.Roles.Diff: return thread.diff if role == self.Roles.Reviews: return thread.reviews if role == Qt.ItemDataRole.DisplayRole: return f'Thread #{thread.id}' return None def rowCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int: if self._pull_request is None: return 0 return len(self.threads) def roleNames(self) -> dict[int, QByteArray]: return { self.Roles.Id: QByteArray(b'id'), self.Roles.Diff: QByteArray(b'diff'), self.Roles.Reviews: QByteArray(b'reviews'), } # Tree Model @dataclass class ThreadItem: parent: "Root" id: int reviews: list["ReviewItem"] = field(default_factory=list) def data(self, role: int): return None @dataclass class ReviewItem: parent: ThreadItem id: int review: ReviewComment def data(self, role: int): if role == Roles.Body: return self.review._body if role == Roles.CreatedAt: return self.review._created_at.strftime("%Y-%M-%d %H:%m") if role == Roles.UpdatedAt: return self.review._updated_at.strftime("%Y-%M-%d %H:%m") if role == Roles.Username: return self.review._.username if role == Roles.AvatarUrl: return self.review._.avatar_url if role == Roles.Diff: return self.review.diff if role == Roles.Line: return self.review.line if role == Qt.ItemDataRole.DisplayRole: return self.review._body return None @dataclass class Root: threads: list[ThreadItem] = field(default_factory=list) def data(self, role: int): return None Item = Union[Root, ThreadItem, ReviewItem] @QmlElement class TreeReviewModel(QAbstractItemModel): _pull_request: Optional[PullRequest] root: Root def __init__(self): super().__init__() self._pull_request = None self.root = Root() def get_pull_request(self) -> Optional[PullRequest]: return self._pull_request def set_pull_request(self, pull_request: Optional[PullRequest]) -> None: if pull_request is None: return if self._pull_request is not None and self._pull_request == pull_request: return self.beginResetModel() self._pull_request = pull_request self._pull_request.reviewsLoaded.connect(self._reset_model) self.endResetModel() self.pullRequestChanged.emit() pullRequestChanged = Signal() pullRequest = Property(PullRequest, fget=get_pull_request, fset=set_pull_request, notify=pullRequestChanged) def load_reviews(self, reviews: list[ReviewComment]): mapping: dict[int, ThreadItem] = {} for review in reviews: if review.id not in mapping: thread = ThreadItem(self.root, review.id) else: thread = mapping[review.id] thread.reviews.append(ReviewItem(thread, review.id, review)) def _reset_model(self) -> None: if self._pull_request is None: return self.beginResetModel() self.load_reviews(self._pull_request._reviews) self.endResetModel() def index(self, row: int, column: int, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> QModelIndex: if not self.hasIndex(row, column, parent): return QModelIndex() if not parent.isValid(): if 0 <= row < len(self.root.threads): return self.createIndex(row, column, self.root.threads[row]) else: item: Root | ThreadItem = parent.internalPointer() if isinstance(item, Root): if 0 <= row < len(self.root.threads): return self.createIndex(row, column, self.root.threads[row]) else: if 0 <= row < len(item.reviews): return self.createIndex(row, column, item.reviews[row]) return QModelIndex() def parent(self, index: QModelIndex) -> QModelIndex: if not index.isValid(): return QModelIndex() item: Item = index.internalPointer() if isinstance(item, Root): return QModelIndex() if isinstance(item, ThreadItem): return self.createIndex(self.root.threads.index(item), 0, self.root) if isinstance(item, ReviewItem): return self.createIndex(item.parent.reviews.index(item), 0, item.parent) raise ValueError(f'Unknown {item}') def rowCount(self, index: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int: parent: Item if not index.isValid(): return len(self.root.threads) parent = index.internalPointer() if isinstance(parent, Root): return len(self.root.threads) if isinstance(parent, ThreadItem): return len(parent.reviews) return 0 def columnCount(self, parent: QModelIndex | QPersistentModelIndex = QModelIndex()) -> int: return 1 def data(self, index: QModelIndex | QPersistentModelIndex, role: int = Qt.ItemDataRole.DisplayRole) -> object: if self._pull_request is None: return None reviews: list[ReviewComment] = self._pull_request._reviews review: ReviewComment = reviews[index.row()] item: Item = index.internalPointer() return item.data(role) def roleNames(self) -> dict[int, QByteArray]: return { Roles.Body: QByteArray(b'body'), Roles.CreatedAt: QByteArray(b'createdAt'), Roles.UpdatedAt: QByteArray(b'updatedAt'), Roles.Username: QByteArray(b'username'), Roles.AvatarUrl: QByteArray(b'avatarUrl'), Roles.Diff: QByteArray(b'diff'), Roles.Line: QByteArray(b'line'), }