ResultsBrowser.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. from sqlalchemy import create_engine, desc, and_
  2. from sqlalchemy.orm import sessionmaker
  3. from baangt.base.DataBaseORM import DATABASE_URL, engine, TestrunLog, GlobalAttribute, TestCaseLog, TestCaseSequenceLog, TestCaseField
  4. from baangt.base.ExportResults.ExportResults import ExcelSheetHelperFunctions
  5. from baangt.base.PathManagement import ManagedPaths
  6. import baangt.base.GlobalConstants as GC
  7. import uuid
  8. from datetime import datetime
  9. from xlsxwriter import Workbook
  10. import logging
  11. import os
  12. logger = logging.getLogger("pyC")
  13. class ResultsBrowser:
  14. def __init__(self, db_url=None):
  15. # setup db engine
  16. if db_url:
  17. engine = create_engine(db_url)
  18. else:
  19. engine = create_engine(DATABASE_URL)
  20. self.db = sessionmaker(bind=engine)()
  21. # result query set
  22. self.query_set = []
  23. # tag of the current query set
  24. self.tag = None
  25. # set of stages
  26. self.stages = None
  27. # path management
  28. self.managedPaths = ManagedPaths()
  29. logger.info(f'Initiated with DATABASE_URL: {db_url if db_url else DATABASE_URL}')
  30. def __del__(self):
  31. self.db.close()
  32. def average_duration(self, testcase_sequence=None, testcase=None):
  33. #
  34. # average durationof the testruns or particular testcases within the query set
  35. # testcase values:
  36. # None: the whole testrun
  37. # integer: the specified testcase
  38. #
  39. if testcase_sequence is None:
  40. # whole testrun
  41. durations = [tr.duration for tr in self.query_set]
  42. elif testcase is None:
  43. # specified testcase sequence
  44. durations = [tr.testcase_sequences[testcase_sequence].duration for tr in self.query_set if testcase_sequence < len(tr.testcase_sequences)]
  45. else:
  46. # specific testcase
  47. durations = [
  48. tr.testcase_sequences[testcase_sequence].testcases[testcase].duration for tr in self.query_set \
  49. if testcase_sequence < len(tr.testcase_sequences) and testcase < len(tr.testcase_sequences[testcase_sequence].testcases)
  50. ]
  51. return round(sum(durations) / len(durations), 2)
  52. def size(self, testcase_sequence=None):
  53. #
  54. # the maximum number of testcase sequences
  55. #
  56. # test case sequences
  57. if testcase_sequence is None:
  58. return max([len(tr.testcase_sequences) for tr in self.query_set])
  59. # test cases
  60. return max([len(tr.testcase_sequences[testcase_sequence].testcases) for tr in self.query_set])
  61. def name_list(self):
  62. names = self.db.query(TestrunLog.testrunName).group_by(TestrunLog.testrunName).order_by(TestrunLog.testrunName).all()
  63. return [x[0] for x in names]
  64. def stage_list(self):
  65. stages = self.db.query(GlobalAttribute.value).filter_by(name=GC.EXECUTION_STAGE)\
  66. .group_by(GlobalAttribute.value).order_by(GlobalAttribute.value).all()
  67. return [x[0] for x in stages]
  68. def get(self, ids):
  69. #
  70. # get TestrunLogs by id (list of uuid string)
  71. #
  72. # set the tag
  73. self.tag = {
  74. 'Date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  75. }
  76. # get records by id
  77. records = []
  78. for id in ids:
  79. records.append(self.db.query(TestrunLog).get(uuid.UUID(id).bytes))
  80. self.query_set = records
  81. def query(self, name=None, stage=None, start_date=None, end_date=None):
  82. #
  83. # get TestrunLogs by name, stage and dates
  84. #
  85. # set the tag
  86. self.tag = {
  87. 'Name': name,
  88. 'Stage': stage,
  89. 'Date from': start_date.strftime('%Y-%m-%d') if start_date else None,
  90. 'Date to': end_date.strftime('%Y-%m-%d') if end_date else None,
  91. }
  92. # get records
  93. records = []
  94. logger.info(f'Quering: name={self.tag.get("Name")}, stage={self.tag.get("Stage")}, dates=({self.tag.get("Date from")}, {self.tag.get("Date to")})')
  95. # filter by name and stage
  96. if name and stage:
  97. self.stages = {stage}
  98. records = self.db.query(TestrunLog).order_by(TestrunLog.startTime).filter_by(testrunName=name)\
  99. .filter(TestrunLog.globalVars.any(and_(GlobalAttribute.name==GC.EXECUTION_STAGE, GlobalAttribute.value==stage))).all()
  100. # filter by name
  101. elif name:
  102. # get Testrun stages
  103. stages = self.db.query(GlobalAttribute.value).filter(GlobalAttribute.testrun.has(TestrunLog.testrunName==name))\
  104. .filter_by(name=GC.EXECUTION_STAGE).group_by(GlobalAttribute.value).order_by(GlobalAttribute.value).all()
  105. self.stages = {x[0] for x in stages}
  106. for s in self.stages:
  107. logs = self.db.query(TestrunLog).order_by(TestrunLog.startTime).filter_by(testrunName=name)\
  108. .filter(TestrunLog.globalVars.any(and_(GlobalAttribute.name==GC.EXECUTION_STAGE, GlobalAttribute.value==s))).all()
  109. records.extend(logs)
  110. # filter by stage
  111. elif stage:
  112. self.stages = {stage}
  113. # get Testrun names
  114. names = self.db.query(TestrunLog.testrunName)\
  115. .filter(TestrunLog.globalVars.any(and_(GlobalAttribute.name==GC.EXECUTION_STAGE, GlobalAttribute.value==stage)))\
  116. .group_by(TestrunLog.testrunName).order_by(TestrunLog.testrunName).all()
  117. names = [x[0] for x in names]
  118. for n in names:
  119. logs = self.db.query(TestrunLog).order_by(TestrunLog.startTime).filter_by(testrunName=n)\
  120. .filter(TestrunLog.globalVars.any(and_(GlobalAttribute.name==GC.EXECUTION_STAGE, GlobalAttribute.value==stage))).all()
  121. records.extend(logs)
  122. # get all testruns ordered by name and stage
  123. else:
  124. # get Testrun names
  125. names = self.db.query(TestrunLog.testrunName).group_by(TestrunLog.testrunName).order_by(TestrunLog.testrunName).all()
  126. names = [x[0] for x in names]
  127. self.stages = set()
  128. for n in names:
  129. # get Testrun stages
  130. stages = self.db.query(GlobalAttribute.value).filter(GlobalAttribute.testrun.has(TestrunLog.testrunName==n))\
  131. .filter_by(name=GC.EXECUTION_STAGE).group_by(GlobalAttribute.value).order_by(GlobalAttribute.value).all()
  132. stages = [x[0] for x in stages]
  133. self.stages.update(stages)
  134. for s in stages:
  135. logs = self.db.query(TestrunLog).order_by(TestrunLog.startTime).filter_by(testrunName=n)\
  136. .filter(TestrunLog.globalVars.any(and_(GlobalAttribute.name==GC.EXECUTION_STAGE, GlobalAttribute.value==s))).all()
  137. records.extend(logs)
  138. # filter by dates
  139. if start_date and end_date:
  140. self.query_set = [log for log in records if log.startTime > start_date and log.startTime < end_date]
  141. elif start_date:
  142. self.query_set = [log for log in records if log.startTime > start_date]
  143. elif end_date:
  144. self.query_set = [log for log in records if log.startTime < end_date]
  145. else:
  146. self.query_set = records
  147. logger.info(f'Number of found records: {len(self.query_set)}')
  148. def export(self):
  149. #
  150. # export the query set to xlsx
  151. #
  152. # set labels
  153. labelTetsrun = 'TestRun'
  154. labelTestCaseSequence = 'Test Case Sequence'
  155. labelTestCase = 'Test Case'
  156. labelAvgDuration = 'Avg. Duration'
  157. # initialize workbook
  158. path_to_file = self.managedPaths.getOrSetDBExportPath().joinpath(f'TestrunLogs_{"_".join(list(map(str, self.tag.values())))}.xlsx')
  159. workbook = Workbook(str(path_to_file))
  160. # define cell formats
  161. # green background
  162. cellFormatGreen = workbook.add_format({'bg_color': 'green'})
  163. #cellFormatGreen.set_bg_color('green')
  164. # red background
  165. cellFormatRed = workbook.add_format({'bg_color': 'red'})
  166. #cellFormatRed.set_bg_color('red')
  167. # bold font
  168. cellFormatBold = workbook.add_format({'bold': True})
  169. # bold and italic font
  170. cellFormatBoldItalic = workbook.add_format({'bold': True, 'italic': True})
  171. # summary tab
  172. sheet = workbook.add_worksheet('Summary')
  173. sheet.set_column(first_col=0, last_col=0, width=18)
  174. #sheet.set_column(first_col=1, last_col=1, width=12)
  175. # title
  176. sheet.write(0, 0, f'{labelTetsrun}s Summary', cellFormatBold)
  177. # parameters
  178. line = 1
  179. for key, value in self.tag.items():
  180. line += 1
  181. sheet.write(line, 0, key)#, cellFormatBold)
  182. sheet.write(line, 1, value)
  183. # average duration
  184. line += 2
  185. sheet.write(line, 0, labelAvgDuration, cellFormatBold)
  186. sheet.write(line, 1, self.average_duration())
  187. # testcases
  188. line += 2
  189. sheet.write(line, 0, f'{labelTestCase}s', cellFormatBold)
  190. status_style = {
  191. GC.TESTCASESTATUS_SUCCESS: cellFormatGreen,
  192. GC.TESTCASESTATUS_ERROR: cellFormatRed,
  193. GC.TESTCASESTATUS_WAITING: None,
  194. }
  195. for tcs_index in range(self.size()):
  196. # testcase sequence
  197. line += 1
  198. sheet.write(line, 0, labelTestCaseSequence)
  199. sheet.write(line, 1, tcs_index)
  200. line += 1
  201. sheet.write(line, 0, labelAvgDuration)
  202. sheet.write(line, 1, self.average_duration(testcase_sequence=tcs_index))
  203. # test cases
  204. # header
  205. line += 2
  206. sheet.write(line, 0, f'{labelTetsrun} Date', cellFormatBoldItalic)
  207. sheet.write(line, 1, labelTestCase, cellFormatBoldItalic)
  208. line += 1
  209. for i in range(self.size(testcase_sequence=tcs_index)):
  210. sheet.write(line, 1 + i, i)
  211. id_col = i + 3
  212. sheet.write(line - 1, id_col, f'{labelTetsrun} ID', cellFormatBoldItalic)
  213. # status
  214. for tr in self.query_set:
  215. line += 1
  216. sheet.write(line, 0, tr.startTime.strftime('%Y-%m-%d %H:%M:%S'))
  217. col = 1
  218. for tc in tr.testcase_sequences[tcs_index].testcases:
  219. sheet.write(line, col, tc.status, status_style.get(tc.status))
  220. #sheet.write(line, col, tc.duration, status_style.get(tc.status))
  221. col += 1
  222. #sheet.write(line, col, tr.duration)
  223. #sheet.write(line, col+1, tr.testcase_sequences[0].duration)
  224. sheet.write(line, id_col, str(tr))
  225. line += 1
  226. sheet.write(line, 0, labelAvgDuration, cellFormatBoldItalic)
  227. for tc_index in range(self.size(testcase_sequence=tcs_index)):
  228. sheet.write(line, tc_index+1, self.average_duration(testcase_sequence=tcs_index, testcase=tc_index))
  229. # test case tabs
  230. for stage in self.stages:
  231. sheet = workbook.add_worksheet(f'{stage}_JSON')
  232. # write headers
  233. headers = [
  234. 'Stage',
  235. f'{labelTetsrun} ID',
  236. f'{labelTestCase} ID',
  237. 'Attribute',
  238. 'Value',
  239. ]
  240. for index, label in enumerate(headers):
  241. sheet.write(0, index, label, cellFormatBold)
  242. # write data
  243. line = 1
  244. for tr in self.query_set:
  245. # check the stage
  246. if tr.stage == stage:
  247. for tcs in tr.testcase_sequences:
  248. for tc in tcs.testcases:
  249. for field in tc.fields:
  250. sheet.write(line, 0, stage)
  251. sheet.write(line, 1, str(tr))
  252. sheet.write(line, 2, str(tc))
  253. sheet.write(line, 3, field.name)
  254. sheet.write(line, 4, field.value)
  255. line += 1
  256. # autowidth
  257. for i in range(len(headers)):
  258. ExcelSheetHelperFunctions.set_column_autowidth(sheet, i)
  259. workbook.close()
  260. logger.info(f'Query successfully exported to {path_to_file}')
  261. return path_to_file