DataBaseORM.py 6.9 KB


  1. from sqlalchemy import Column, String, Integer, DateTime, Boolean, Table, ForeignKey
  2. from sqlalchemy.orm import relationship
  3. from sqlalchemy import create_engine
  4. from sqlalchemy.ext.declarative import declarative_base
  5. import os, re, uuid
  6. from datetime import timedelta
  7. from baangt.base.PathManagement import ManagedPaths
  8. from baangt.base.GlobalConstants import EXECUTION_STAGE, TIMING_DURATION, TESTCASESTATUS
  9. managedPaths = ManagedPaths()
  10. DATABASE_URL = os.getenv('DATABASE_URL') or 'sqlite:///'+str(managedPaths.derivePathForOSAndInstallationOption().joinpath('testrun.db'))
  11. # handle sqlalchemy dialects
  12. dialect = re.match(r'\w+', DATABASE_URL)
  13. if dialect.group() == 'mysql':
  14. from sqlalchemy.dialects.mysql import BINARY as Binary
  15. else:
  16. from sqlalchemy import LargeBinary as Binary
  17. engine = create_engine(DATABASE_URL)
  18. base = declarative_base()
  19. #
  20. # UUID as bytes
  21. #
  22. def uuidAsBytes():
  23. return uuid.uuid4().bytes
  24. #
  25. # Testrun models
  26. #
  27. class TestrunLog(base):
  28. #
  29. # summary on Testrun results
  30. #
  31. __tablename__ = "testruns"
  32. # columns
  33. id = Column(Binary(16), primary_key=True, default=uuidAsBytes)
  34. testrunName = Column(String(32), nullable=False)
  35. logfileName = Column(String(1024), nullable=False)
  36. startTime = Column(DateTime, nullable=False)
  37. endTime = Column(DateTime, nullable=False)
  38. dataFile = Column(String(1024), nullable=True)
  39. statusOk = Column(Integer, nullable=False)
  40. statusFailed = Column(Integer, nullable=False)
  41. statusPaused = Column(Integer, nullable=False)
  42. RLPJson = Column(String, nullable=True) # ------------------------> New feature: comment for old DB
  43. # relationships
  44. globalVars = relationship('GlobalAttribute')
  45. testcase_sequences = relationship('TestCaseSequenceLog')
  46. @property
  47. def recordCount(self):
  48. return self.statusOk + self.statusPaused + self.statusFailed
  49. @property
  50. def duration(self):
  51. return (self.endTime - self.startTime).seconds
  52. @property
  53. def stage(self):
  54. for gv in self.globalVars:
  55. if gv.name == EXECUTION_STAGE:
  56. return gv.value
  57. return None
  58. def __str__(self):
  59. return str(uuid.UUID(bytes=self.id))
  60. def to_json(self):
  61. return {
  62. 'id': str(self),
  63. 'Name': self.testrunName,
  64. 'Summary': {
  65. 'TestRecords': self.recordCount,
  66. 'Successful': self.statusOk,
  67. 'Paused': self.statusPaused,
  68. 'Error': self.statusFailed,
  69. 'LogFile': self.logfileName,
  70. 'StartTime': self.startTime.strftime('%H:%M:%S'),
  71. 'EndTime': self.endTime.strftime('%H:%M:%S'),
  72. 'Duration': str(self.endTime - self.startTime),
  73. },
  74. 'GlobalSettings': {gv.name: gv.value for gv in self.globalVars},
  75. 'TestSequences': [tsq.to_json() for tsq in self.testcase_sequences],
  76. }
  77. class GlobalAttribute(base):
  78. #
  79. # global vars
  80. #
  81. __tablename__ = 'globals'
  82. # columns
  83. id = Column(Integer, primary_key=True)
  84. name = Column(String(64), nullable=False)
  85. value = Column(String(1024), nullable=True)
  86. testrun_id = Column(Binary(16), ForeignKey('testruns.id'), nullable=False)
  87. # relationships
  88. testrun = relationship('TestrunLog', foreign_keys=[testrun_id])
  89. #
  90. # Test Case Sequence models
  91. #
  92. class TestCaseSequenceLog(base):
  93. #
  94. # TestCase Sequence
  95. #
  96. __tablename__ = 'testCaseSequences'
  97. # columns
  98. id = Column(Binary(16), primary_key=True, default=uuidAsBytes)
  99. number = Column(Integer, nullable=False)
  100. testrun_id = Column(Binary(16), ForeignKey('testruns.id'), nullable=False)
  101. # relationships
  102. testrun = relationship('TestrunLog', foreign_keys=[testrun_id])
  103. testcases = relationship('TestCaseLog')
  104. @property
  105. def duration(self):
  106. #
  107. # duration in seconds
  108. #
  109. return sum([tc.duration for tc in self.testcases if tc.duration])
  110. def __str__(self):
  111. return str(uuid.UUID(bytes=self.id))
  112. def to_json(self):
  113. return {
  114. 'id': str(self),
  115. 'TestCases': [tc.to_json() for tc in self.testcases],
  116. }
  117. #
  118. # Test Case models
  119. #
  120. class TestCaseLog(base):
  121. #
  122. # TestCase results
  123. #
  124. __tablename__ = 'testCases'
  125. # columns
  126. id = Column(Binary(16), primary_key=True, default=uuidAsBytes)
  127. number = Column(Integer, nullable=False)
  128. testcase_sequence_id = Column(Binary(16), ForeignKey('testCaseSequences.id'), nullable=False)
  129. # relationships
  130. testcase_sequence = relationship('TestCaseSequenceLog', foreign_keys=[testcase_sequence_id])
  131. fields = relationship('TestCaseField', lazy='select')
  132. networkInfo = relationship('TestCaseNetworkInfo')
  133. @property
  134. def status(self):
  135. #
  136. # testcase status
  137. #
  138. for field in self.fields:
  139. if field.name == TESTCASESTATUS:
  140. return field.value
  141. return None
  142. @property
  143. def duration(self):
  144. #
  145. # duration in seconds
  146. #
  147. for field in self.fields:
  148. if field.name == TIMING_DURATION:
  149. # parse value from H:M:S.microseconds
  150. m = re.search(r'(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)', field.value)
  151. if m:
  152. factors = {
  153. 'hours': 3600,
  154. 'minutes': 60,
  155. 'seconds': 1,
  156. }
  157. #duration = {key: float(value) for key, value in m.groupdict().items()}
  158. #return timedelta(**duration)
  159. return sum([factors[key]*float(value) for key, value in m.groupdict().items()])
  160. return None
  161. def __str__(self):
  162. return str(uuid.UUID(bytes=self.id))
  163. def fields_as_dict(self):
  164. return {pr.name: pr.value for pr in self.fields}
  165. def to_json(self):
  166. return {
  167. 'id': str(self),
  168. 'Parameters': {pr.name: pr.value for pr in self.fields},
  169. 'NetworkInfo': [nw.to_json() for nw in self.networkInfo],
  170. }
  171. class TestCaseField(base):
  172. #
  173. # field for a TestCase results
  174. #
  175. __tablename__ = 'testCaseFields'
  176. # columns
  177. id = Column(Integer, primary_key=True)
  178. name = Column(String(64), nullable=False)
  179. value = Column(String(1024), nullable=True)
  180. testcase_id = Column(Binary(16), ForeignKey('testCases.id'), nullable=False)
  181. # relationships
  182. testcase = relationship('TestCaseLog', foreign_keys=[testcase_id])
  183. class TestCaseNetworkInfo(base):
  184. #
  185. # network info for a TestCase
  186. #
  187. __tablename__ = 'networkInfo'
  188. # columns
  189. id = Column(Integer, primary_key=True)
  190. browserName = Column(String(64), nullable=True)
  191. status = Column(Integer, nullable=True)
  192. method = Column(String(16), nullable=True)
  193. url = Column(String(256), nullable=True)
  194. contentType = Column(String(64), nullable=True)
  195. contentSize = Column(Integer, nullable=True)
  196. headers = Column(String(4096), nullable=True)
  197. params = Column(String(1024), nullable=True)
  198. response = Column(String(4096), nullable=True)
  199. startDateTime = Column(DateTime, nullable=True)
  200. duration = Column(Integer, nullable=True)
  201. testcase_id = Column(Binary(16), ForeignKey('testCases.id'), nullable=True)
  202. # relationships
  203. testcase = relationship('TestCaseLog', foreign_keys=[testcase_id])
  204. def to_json(self):
  205. return {
  206. 'BrowserName': self.browserName,
  207. 'Status': self.status,
  208. 'Method': self.method,
  209. 'URL': self.url,
  210. 'ContentType': self.contentType,
  211. 'ContentSize': self.contentSize,
  212. 'Headers': self.headers,
  213. 'Params': self.params,
  214. 'Response': '',#self.response,
  215. 'StartDateTime': str(self.startDateTime),
  216. 'Duration': self.duration,
  217. }
  218. # create tables
  219. #if __name__ == '__main__':
  220. base.metadata.create_all(engine)