Source code for rtcclient.models

import logging
import os
import re

import xmltodict

from rtcclient import urlunquote, OrderedDict
from rtcclient.base import FieldBase


[docs] class Role(FieldBase): """The role in the project area or team area""" log = logging.getLogger("models.Role") def __str__(self): return self.label
[docs] class Member(FieldBase): """The member in the project area""" log = logging.getLogger("models.Member") def __init__(self, url, rtc_obj, raw_data=None, skip_full_attributes=True): FieldBase.__init__(self, url, rtc_obj, raw_data=raw_data, skip_full_attributes=skip_full_attributes) # add a new attribute mainly for the un-recorded member use self.email = urlunquote(self.url.split("/")[-1]) def __str__(self): if hasattr(self, "title"): return self.title return self.email def _initialize(self): pass def __initialize(self): pass
[docs] class Administrator(Member): """The administrator of the project area""" log = logging.getLogger("models.Administrator")
[docs] class ItemType(FieldBase): """The workitem type""" log = logging.getLogger("models.ItemType") def __str__(self): return self.title
[docs] class TeamArea(FieldBase): """The team area""" log = logging.getLogger("models.TeamArea") def __str__(self): return self.title
[docs] class PlannedFor(FieldBase): """The project plannedfor defines a start and end date along with an iteration breakdown """ log = logging.getLogger("models.PlannedFor") def __str__(self): return self.title
[docs] class FiledAgainst(FieldBase): """Category that identifies the component or functional area that the work item belongs to. """ log = logging.getLogger("models.FiledAgainst") def __str__(self): return self.title
[docs] class FoundIn(FieldBase): """Release in which the issue described in the work item was identified. """ log = logging.getLogger("models.FoundIn") def __str__(self): return self.title
[docs] class Severity(FieldBase): """Indication of the impact of the work item""" log = logging.getLogger("models.Severity") def __str__(self): return self.title
[docs] class Priority(FieldBase): """Ranked importance of a work item""" log = logging.getLogger("models.Priority") def __str__(self): return self.title
[docs] class Action(FieldBase): """The action to change the state of the workitem""" log = logging.getLogger("models.Action") def __str__(self): return self.title
[docs] class State(FieldBase): """Status of the work item. For example, New, In Progress, or Resolved.""" log = logging.getLogger("models.State") def __str__(self): return self.title
[docs] class Comment(FieldBase): """Comment about the work item""" log = logging.getLogger("models.Comment") def __init__(self, url, rtc_obj, raw_data=None, skip_full_attributes=True): self.id = url.split("/")[-1] FieldBase.__init__(self, url, rtc_obj, raw_data, skip_full_attributes=skip_full_attributes) def __str__(self): return self.id
[docs] class SavedQuery(FieldBase): """User saved query""" log = logging.getLogger("models.SavedQuery") def __init__(self, url, rtc_obj, raw_data=None, skip_full_attributes=True): self.id = url.split("/")[-1] FieldBase.__init__(self, url, rtc_obj, raw_data, skip_full_attributes=skip_full_attributes) def __str__(self): return self.title
[docs] class IncludedInBuild(FieldBase): """Which build includes the certain workitem""" log = logging.getLogger("models.IncludedInBuild") def __str__(self): return self.label
[docs] class ChangeSet(FieldBase): """ChangeSet""" log = logging.getLogger("models.ChangeSet") def __str__(self): return self.label
[docs] def getChanges(self): """Get all :class:`rtcclient.models.Change` objects in this changeset :return: a :class:`list` contains all the :class:`rtcclient.models.Change` objects :rtype: list """ identifier = self.url.split("/")[-1] resource_url = "/".join([ "%s" % self.rtc_obj.url, "resource/itemOid", "com.ibm.team.scm.ChangeSet", "%s?_mediaType=text/xml" % identifier ]) resp = self.get(resource_url, verify=self.rtc_obj.verify, proxies=self.rtc_obj.proxies, headers=self.rtc_obj.headers) raw_data = xmltodict.parse(resp.content).get("scm:ChangeSet") common_changes = dict() changes = raw_data.get("changes") for (key, value) in raw_data.items(): if key.startswith("@"): continue if "changes" != key: common_changes[key] = value return self._handle_changes(changes, common_changes)
def _handle_changes(self, changes, common_changes): change_objs = list() if isinstance(changes, OrderedDict): # only one single change changes.update(common_changes) change_objs.append(Change(None, self.rtc_obj, raw_data=changes)) elif isinstance(changes, list): # multiple changes for change in changes: change.update(common_changes) change_objs.append(Change(None, self.rtc_obj, raw_data=change)) return change_objs
[docs] class Change(FieldBase): """Change""" log = logging.getLogger("models.Change") def __init__(self, url, rtc_obj, raw_data=None, skip_full_attributes=True): FieldBase.__init__(self, url, rtc_obj, raw_data, skip_full_attributes=skip_full_attributes) def __str__(self): return self.internalId
[docs] def fetchBeforeStateFile(self, file_folder): """Fetch the initial file (before the change) to a folder If the file is newly added, then `None` will be returned. :param file_folder: the folder to store the file :return: the :class:`string` object :rtype: string """ if u"true" == self.before: self.log.info("This file is newly added. No previous file") else: self.log.info("Fetching initial file of this Change<%s>:" % self) return self._fetchFile(self.before, file_folder, override=False)
[docs] def fetchAfterStateFile(self, file_folder): """Fetch the final file (after the change) to a folder If the file has been deleted, then `None` will be returned. :param file_folder: the folder to store the file :return: the :class:`string` object :rtype: string """ if u"true" == self.after: self.log.info("This file has been deleted successfully.") else: self.log.info("Fetching final file of this Change<%s>:" % self) return self._fetchFile(self.after, file_folder)
[docs] def fetchCurrentFile(self, file_folder): """Fetch the current/final file (after the change) to a folder If the file has been deleted, then `None` will be returned. :param file_folder: the folder to store the file :return: the :class:`string` object :rtype: string """ return self.fetchAfterStateFile(file_folder)
def _fetchFile(self, state_id, file_folder, override=True): if self.raw_data['item']['@xsi:type'] == 'scm:FolderHandle': return file_url = "/".join([ "{0}/service", ("com.ibm.team.filesystem.service.internal." "rest.IFilesystemContentService"), "-", ("{1}?itemId={2}&stateId={3}" "&platformLineDelimiter=CRLF") ]) file_url = file_url.format(self.rtc_obj.url, self.component, self.item, state_id) self.log.debug("Start fetching file from %s ..." % file_url) resp = self.get(file_url, verify=self.rtc_obj.verify, headers=self.rtc_obj.headers) file_name = re.findall(r".+filename\*=UTF-8''(.+)", resp.headers["content-disposition"])[0] file_path = os.path.join(file_folder, file_name) if not override and os.path.exists(file_path): return with open(file_path, "wb") as file_content: file_content.write(resp.content) self.log.info("Successfully Fetching '%s' to '%s'" % (file_name, file_path)) return file_path
class Attachment(FieldBase): """Attachment of the work item""" log = logging.getLogger("models.Attachment") def __init__(self, url, rtc_obj, raw_data=None, skip_full_attributes=True): FieldBase.__init__(self, url, rtc_obj, raw_data, skip_full_attributes=skip_full_attributes) def __str__(self): return self.identifier + ": " + self.title