ExportResults.py 34 KB


  1. import xlsxwriter
  2. import logging
  3. import json
  4. import baangt.base.GlobalConstants as GC
  5. from baangt.base.Timing.Timing import Timing
  6. import sys
  7. from baangt.base.Utils import utils
  8. from pathlib import Path
  9. from typing import Optional
  10. from xlsxwriter.worksheet import (
  11. Worksheet, cell_number_tuple, cell_string_tuple)
  12. from sqlalchemy import create_engine
  13. from sqlalchemy.orm import sessionmaker
  14. from baangt.base.DataBaseORM import DATABASE_URL, TestrunLog, TestCaseSequenceLog
  15. from baangt.base.DataBaseORM import TestCaseLog, TestCaseField, GlobalAttribute, TestCaseNetworkInfo
  16. from datetime import datetime
  17. import time
  18. from baangt import plugin_manager
  19. import re
  20. import csv
  21. from dateutil.parser import parse
  22. import os
  23. logger = logging.getLogger("pyC")
  24. class ExportResults:
  25. def __init__(self, **kwargs):
  26. self.kwargs = kwargs
  27. self.testList = []
  28. self.testRunInstance = kwargs.get(GC.KWARGS_TESTRUNINSTANCE)
  29. self.testCasesEndDateTimes_1D = kwargs.get('testCasesEndDateTimes_1D')
  30. self.testCasesEndDateTimes_2D = kwargs.get('testCasesEndDateTimes_2D')
  31. self.networkInfo = self._get_network_info(kwargs.get('networkInfo'))
  32. self.testRunName = self.testRunInstance.testRunName
  33. self.dataRecords = self.testRunInstance.dataRecords
  34. try:
  35. self.exportFormat = kwargs.get(GC.KWARGS_TESTRUNATTRIBUTES).get(GC.EXPORT_FORMAT)[GC.EXPORT_FORMAT]
  36. if not self.exportFormat:
  37. self.exportFormat = GC.EXP_XLSX
  38. except KeyError:
  39. self.exportFormat = GC.EXP_XLSX
  40. self.fileName = self.__getOutputFileName()
  41. logger.info("Export-Sheet for results: " + self.fileName)
  42. # export results to DB
  43. self.uuids = {}
  44. self.exportToDataBase()
  45. if self.exportFormat == GC.EXP_XLSX:
  46. self.fieldListExport = kwargs.get(GC.KWARGS_TESTRUNATTRIBUTES).get(GC.EXPORT_FORMAT)["Fieldlist"]
  47. self.workbook = xlsxwriter.Workbook(self.fileName)
  48. self.summarySheet = self.workbook.add_worksheet("Summary")
  49. self.worksheet = self.workbook.add_worksheet("Output")
  50. self.jsonSheet = self.workbook.add_worksheet("JSON")
  51. self.timingSheet = self.workbook.add_worksheet("Timing")
  52. self.cellFormatGreen = self.workbook.add_format()
  53. self.cellFormatGreen.set_bg_color('green')
  54. self.cellFormatRed = self.workbook.add_format()
  55. self.cellFormatRed.set_bg_color('red')
  56. self.cellFormatBold = self.workbook.add_format()
  57. self.cellFormatBold.set_bold(bold=True)
  58. self.summaryRow = 0
  59. self.__setHeaderDetailSheetExcel()
  60. self.makeSummaryExcel()
  61. self.exportResultExcel()
  62. self.exportJsonExcel()
  63. self.exportAdditionalData()
  64. self.exportTiming = ExportTiming(self.dataRecords,
  65. self.timingSheet)
  66. if self.networkInfo:
  67. self.networkSheet = self.workbook.add_worksheet("Network")
  68. self.exportNetWork = ExportNetWork(self.networkInfo,
  69. self.testCasesEndDateTimes_1D,
  70. self.testCasesEndDateTimes_2D,
  71. self.workbook,
  72. self.networkSheet)
  73. self.closeExcel()
  74. elif self.exportFormat == GC.EXP_CSV:
  75. self.export2CSV()
  76. #self.exportToDataBase()
  77. def exportAdditionalData(self):
  78. # Runs only, when KWARGS-Parameter is set.
  79. if self.kwargs.get(GC.EXPORT_ADDITIONAL_DATA):
  80. addExportData = self.kwargs[GC.EXPORT_ADDITIONAL_DATA]
  81. # Loop over the items. KEY = Tabname, Value = Data to be exported.
  82. # For data KEY = Fieldname, Value = Cell-Value
  83. for key, value in addExportData.items():
  84. lExport = ExportAdditionalDataIntoTab(tabname=key, valueDict=value, outputExcelSheet=self.workbook)
  85. lExport.export()
  86. # -- API support --
  87. def getSummary(self):
  88. #
  89. # returns records status as dict
  90. #
  91. summary = {'Testrecords': len(self.dataRecords)}
  92. summary['Successful'] = len([x for x in self.dataRecords.values()
  93. if x[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_SUCCESS])
  94. summary['Paused'] = len([x for x in self.dataRecords.values()
  95. if x[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_WAITING])
  96. summary['Error'] = len([x for x in self.dataRecords.values()
  97. if x[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_ERROR])
  98. # logfile
  99. summary['Logfile'] = logger.handlers[1].baseFilename
  100. # timing
  101. timing: Timing = self.testRunInstance.timing
  102. summary['Starttime'], summary['Endtime'], summary['Duration'] = timing.returnTimeSegment(GC.TIMING_TESTRUN)
  103. summary['Globals'] = {key: value for key, value in self.testRunInstance.globalSettings.items()}
  104. return summary
  105. # -- END of API support --
  106. def export2CSV(self):
  107. """
  108. Writes CSV-File of datarecords
  109. """
  110. f = open(self.fileName, 'w')
  111. writer = csv.DictWriter(f, self.dataRecords[0].keys())
  112. writer.writeheader()
  113. for i in range(0, len(self.dataRecords) - 1):
  114. writer.writerow(self.dataRecords[i])
  115. f.close()
  116. def exportToDataBase(self):
  117. #
  118. # writes results to DB
  119. #
  120. logger.info(f'Export results to database at: {DATABASE_URL}')
  121. engine = create_engine(DATABASE_URL)
  122. # create a Session
  123. Session = sessionmaker(bind=engine)
  124. session = Session()
  125. # get timings
  126. timing: Timing = self.testRunInstance.timing
  127. start, end, duration = timing.returnTimeSegment(GC.TIMING_TESTRUN)
  128. # get status
  129. success = 0
  130. error = 0
  131. waiting = 0
  132. for value in self.dataRecords.values():
  133. if value[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_SUCCESS:
  134. success += 1
  135. elif value[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_ERROR:
  136. error += 1
  137. if value[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_WAITING:
  138. waiting += 1
  139. # get documents
  140. datafiles = self.fileName
  141. # create testrun object
  142. tr_log = TestrunLog(
  143. testrunName=self.testRunName,
  144. logfileName=logger.handlers[1].baseFilename,
  145. startTime=datetime.strptime(start, "%H:%M:%S"),
  146. endTime=datetime.strptime(end, "%H:%M:%S"),
  147. statusOk=success,
  148. statusFailed=error,
  149. statusPaused=waiting,
  150. dataFile=datafiles,
  151. )
  152. # add to DataBase
  153. session.add(tr_log)
  154. # set globals
  155. for key, value in self.testRunInstance.globalSettings.items():
  156. globalVar = GlobalAttribute(
  157. name=key,
  158. value=str(value),
  159. testrun=tr_log,
  160. )
  161. session.add(globalVar)
  162. session.commit()
  163. # create testcase sequence instance
  164. tcs_log = TestCaseSequenceLog(testrun=tr_log)
  165. # create testcases
  166. for tc in self.dataRecords.values():
  167. # create TestCaseLog instances
  168. tc_log = TestCaseLog(testcase_sequence=tcs_log)
  169. session.add(tc_log)
  170. # add TestCase fields
  171. for key, value in tc.items():
  172. field = TestCaseField(name=key, value=str(value), testcase=tc_log)
  173. session.add(field)
  174. session.commit()
  175. # collect uuids
  176. self.uuids['testrun'] = tr_log.id
  177. self.uuids['testcases'] = []
  178. for tc in tcs_log.testcases:
  179. self.uuids['testcases'].append(tc.id)
  180. # network info
  181. if self.networkInfo:
  182. for entry in self.networkInfo:
  183. if type(entry.get('testcase')) == type(1):
  184. nw_info = TestCaseNetworkInfo(
  185. testcase=tcs_log.testcases[entry.get('testcase')-1],
  186. browserName=entry.get('browserName'),
  187. status=entry.get('status'),
  188. method=entry.get('method'),
  189. url=entry.get('url'),
  190. contentType=entry.get('contentType'),
  191. contentSize=entry.get('contentSize'),
  192. headers=str(entry.get('headers')),
  193. params=str(entry.get('params')),
  194. response=entry.get('response'),
  195. startDateTime=datetime.strptime(entry.get('startDateTime')[:19], '%Y-%m-%dT%H:%M:%S'),
  196. duration=entry.get('duration'),
  197. )
  198. session.add(nw_info)
  199. '''
  200. for info in self.networkInfo:
  201. for entry in info['log']['entries']:
  202. # get TestCase number
  203. test_case_num = self.exportNetWork._get_test_case_num(entry['startedDateTime'], entry['pageref'])
  204. # check if number is int
  205. if type(test_case_num) == type(1):
  206. test_case_num -= 1
  207. nw_info = TestCaseNetworkInfo(
  208. testcase=tcs_log.testcases[test_case_num],
  209. browserName=entry.get('pageref'),
  210. status=entry['response'].get('status'),
  211. method=entry['request'].get('method'),
  212. url=entry['request'].get('url'),
  213. contentType=entry['response']['content'].get('mimeType'),
  214. contentSize=entry['response']['content'].get('size'),
  215. headers=str(entry['response']['headers']),
  216. params=str(entry['request']['queryString']),
  217. response=entry['response']['content'].get('text'),
  218. startDateTime=datetime.strptime(entry['startedDateTime'][:19], '%Y-%m-%dT%H:%M:%S'),
  219. duration=entry.get('time'),
  220. )
  221. session.add(nw_info)
  222. '''
  223. session.commit()
  224. def _get_test_case_num(self, start_date_time, browser_name):
  225. d_t = parse(start_date_time)
  226. d_t = d_t.replace(tzinfo=None)
  227. if self.testCasesEndDateTimes_1D:
  228. for index, dt_end in enumerate(self.testCasesEndDateTimes_1D):
  229. if d_t < dt_end:
  230. return index + 1
  231. elif self.testCasesEndDateTimes_2D:
  232. browser_num = re.findall(r"\d+\.?\d*", str(browser_name))[-1] \
  233. if re.findall(r"\d+\.?\d*", str(browser_name)) else 0
  234. dt_list_index = int(browser_num) if int(browser_num) > 0 else 0
  235. for i, tcAndDtEnd in enumerate(self.testCasesEndDateTimes_2D[dt_list_index]):
  236. if d_t < tcAndDtEnd[1]:
  237. return tcAndDtEnd[0] + 1
  238. return 'unknown'
  239. def _get_network_info(self, networkInfoDict):
  240. #
  241. # extracts network info data from the given dict
  242. #
  243. if networkInfoDict:
  244. extractedNetworkInfo = []
  245. for info in networkInfoDict:
  246. #extractedEntry = {}
  247. for entry in info['log']['entries']:
  248. # extract the current entry
  249. extractedNetworkInfo.append({
  250. 'testcase': self._get_test_case_num(entry['startedDateTime'], entry['pageref']),
  251. 'browserName': entry.get('pageref'),
  252. 'status': entry['response'].get('status'),
  253. 'method': entry['request'].get('method'),
  254. 'url': entry['request'].get('url'),
  255. 'contentType': entry['response']['content'].get('mimeType'),
  256. 'contentSize': entry['response']['content'].get('size'),
  257. 'headers': entry['response']['headers'],
  258. 'params': entry['request']['queryString'],
  259. 'response': entry['response']['content'].get('text'),
  260. 'startDateTime': entry['startedDateTime'],
  261. 'duration': entry.get('time'),
  262. })
  263. return extractedNetworkInfo
  264. return None
  265. def exportResultExcel(self, **kwargs):
  266. self._exportData()
  267. def exportJsonExcel(self):
  268. # headers
  269. headers = [
  270. 'Stage',
  271. 'UUID',
  272. 'Attribute',
  273. 'Value',
  274. ]
  275. # header style
  276. header_style = self.workbook.add_format()
  277. header_style.set_bold()
  278. # write header
  279. for index in range(len(headers)):
  280. self.jsonSheet.write(0, index, headers[index], header_style)
  281. # write data
  282. row = 0
  283. for index, testcase in self.dataRecords.items():
  284. # add TestCase fields
  285. for key, value in testcase.items():
  286. row += 1
  287. self.jsonSheet.write(row, 0, 'stage')
  288. self.jsonSheet.write(row, 1, self.uuids['testcases'][index])
  289. self.jsonSheet.write(row, 2, key)
  290. self.jsonSheet.write(row, 3, str(value))
  291. # Autowidth
  292. for n in range(len(headers)):
  293. ExcelSheetHelperFunctions.set_column_autowidth(self.jsonSheet, n)
  294. def makeSummaryExcel(self):
  295. self.summarySheet.write(0, 0, f"Testreport for {self.testRunName}", self.cellFormatBold)
  296. self.summarySheet.set_column(0, last_col=0, width=15)
  297. # get testrunname my
  298. self.testList.append(self.testRunName)
  299. # Testrecords
  300. self.__writeSummaryCell("Testrecords", len(self.dataRecords), row=2, format=self.cellFormatBold)
  301. value = len([x for x in self.dataRecords.values()
  302. if x[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_SUCCESS])
  303. self.testList.append(value) # Ok my
  304. if not value:
  305. value = ""
  306. self.__writeSummaryCell("Successful", value, format=self.cellFormatGreen)
  307. self.testList.append(value) # paused my
  308. self.__writeSummaryCell("Paused", len([x for x in self.dataRecords.values()
  309. if x[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_WAITING]))
  310. value = len([x["Screenshots"] for x in self.dataRecords.values()
  311. if x[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_ERROR])
  312. self.testList.append(value) # error my
  313. if not value:
  314. value = ""
  315. self.__writeSummaryCell("Error", value, format=self.cellFormatRed)
  316. # Logfile
  317. self.__writeSummaryCell("Logfile", logger.handlers[1].baseFilename, row=7)
  318. # get logfilename for database my
  319. self.testList.append(logger.handlers[1].baseFilename)
  320. # database id
  321. self.__writeSummaryCell("Testrun DB UUID", self.uuids.get('testrun'), row=8)
  322. # Timing
  323. timing: Timing = self.testRunInstance.timing
  324. start, end, duration = timing.returnTimeSegment(GC.TIMING_TESTRUN)
  325. self.__writeSummaryCell("Starttime", start, row=10)
  326. # get start end during time my
  327. self.testList.append(start)
  328. self.testList.append(end)
  329. self.__writeSummaryCell("Endtime", end)
  330. self.__writeSummaryCell("Duration", duration, format=self.cellFormatBold)
  331. self.__writeSummaryCell("Avg. Dur", "")
  332. # Globals:
  333. self.__writeSummaryCell("Global settings for this testrun", "", format=self.cellFormatBold, row=15)
  334. for key, value in self.testRunInstance.globalSettings.items():
  335. self.__writeSummaryCell(key, str(value))
  336. # get global data my
  337. self.testList.append(str(value))
  338. # Testcase and Testsequence setting
  339. self.summaryRow += 1
  340. self.__writeSummaryCell("TestSequence settings follow:", "", format=self.cellFormatBold)
  341. lSequence = self.testRunInstance.testRunUtils.getSequenceByNumber(testRunName=self.testRunName, sequence="1")
  342. if lSequence:
  343. for key, value in lSequence[1].items():
  344. if isinstance(value, list) or isinstance(value, dict):
  345. continue
  346. self.__writeSummaryCell(key, str(value))
  347. def __writeSummaryCell(self, lineHeader, lineText, row=None, format=None, image=False):
  348. if not row:
  349. self.summaryRow += 1
  350. else:
  351. self.summaryRow = row
  352. if not lineText:
  353. # If we have no lineText we want to apply format to the Header
  354. self.summarySheet.write(self.summaryRow, 0, lineHeader, format)
  355. else:
  356. self.summarySheet.write(self.summaryRow, 0, lineHeader)
  357. self.summarySheet.write(self.summaryRow, 1, lineText, format)
  358. def __getOutputFileName(self):
  359. if self.testRunInstance.globalSettings[GC.PATH_ROOT]:
  360. basePath = Path(self.testRunInstance.globalSettings[GC.PATH_ROOT])
  361. elif "/" not in self.testRunInstance.globalSettings[GC.DATABASE_EXPORTFILENAMEANDPATH][0:1]:
  362. basePath = Path(sys.modules['__main__'].__file__).parent
  363. else:
  364. basePath = ""
  365. l_file: Path = Path(basePath).joinpath(self.testRunInstance.globalSettings[GC.DATABASE_EXPORTFILENAMEANDPATH])
  366. if "~" in str(l_file.absolute()):
  367. l_file = l_file.expanduser()
  368. if not Path(l_file).is_dir():
  369. logger.info(f"Create directory {l_file}")
  370. Path(l_file).mkdir(parents=True, exist_ok=True)
  371. if self.exportFormat == GC.EXP_XLSX:
  372. lExtension = '.xlsx'
  373. elif self.exportFormat == GC.EXP_CSV:
  374. lExtension = '.csv'
  375. else:
  376. logger.critical(f"wrong export file format: {self.exportFormat}, using 'xlsx' instead")
  377. lExtension = '.xlsx'
  378. l_file = l_file.joinpath("baangt_" + self.testRunName + "_" + utils.datetime_return() + lExtension)
  379. logger.debug(f"Filename for export: {str(l_file)}")
  380. return str(l_file)
  381. def __setHeaderDetailSheetExcel(self):
  382. # the 1st column is DB UUID
  383. self.worksheet.write(0, 0, 'DB UUID')
  384. # Add fields with name "RESULT_*" to output fields.
  385. i = 1
  386. self.__extendFieldList()
  387. for column in self.fieldListExport:
  388. self.worksheet.write(0, i, column)
  389. i += 1
  390. # add JSON field
  391. self.worksheet.write(0, len(self.fieldListExport)+1, "JSON")
  392. def __extendFieldList(self):
  393. """
  394. Fields, that start with "RESULT_" shall always be exported.
  395. Other fields, that shall always be exported are also added (Testcaseerrorlog, etc.)
  396. If global Parameter "TC.ExportAllFields" is set to True ALL fields will be exported
  397. @return:
  398. """
  399. if self.testRunInstance.globalSettings.get("TC.ExportAllFields", False):
  400. self.fieldListExport = [] # Make an empty list, so that we don't have duplicates
  401. for key in self.dataRecords[0].keys():
  402. self.fieldListExport.append(key)
  403. return
  404. try:
  405. for key in self.dataRecords[0].keys():
  406. if "RESULT_" in key.upper():
  407. if not key in self.fieldListExport:
  408. self.fieldListExport.append(key)
  409. except Exception as e:
  410. logger.critical(
  411. f'looks like we have no data in records: {self.dataRecords}, len of dataRecords: {len(self.dataRecords)}')
  412. # They are added here, because they'll not necessarily appear in the first record of the export data:
  413. self.fieldListExport.append(GC.TESTCASEERRORLOG)
  414. self.fieldListExport.append(GC.SCREENSHOTS)
  415. def _exportData(self):
  416. for key, value in self.dataRecords.items():
  417. # write DB UUID
  418. self.worksheet.write(key + 1, 0, self.uuids['testcases'][key])
  419. # write RESULT fields
  420. for (n, column) in enumerate(self.fieldListExport):
  421. self.__writeCell(key + 1, n + 1, value, column)
  422. # Also write everything as JSON-String into the last column
  423. self.worksheet.write(key + 1, len(self.fieldListExport) + 1, json.dumps(value))
  424. # Create autofilter
  425. self.worksheet.autofilter(0, 0, len(self.dataRecords.items()), len(self.fieldListExport))
  426. # Make cells wide enough
  427. for n in range(0, len(self.fieldListExport)):
  428. ExcelSheetHelperFunctions.set_column_autowidth(self.worksheet, n)
  429. def __writeCell(self, line, cellNumber, testRecordDict, fieldName, strip=False):
  430. if fieldName in testRecordDict.keys() and testRecordDict[fieldName]:
  431. # Remove leading New-Line:
  432. if '\n' in testRecordDict[fieldName][0:5] or strip:
  433. testRecordDict[fieldName] = testRecordDict[fieldName].strip()
  434. # Do different stuff for Dicts and Lists:
  435. if isinstance(testRecordDict[fieldName], dict):
  436. self.worksheet.write(line, cellNumber, testRecordDict[fieldName])
  437. elif isinstance(testRecordDict[fieldName], list):
  438. self.worksheet.write(line, cellNumber,
  439. utils.listToString(testRecordDict[fieldName]))
  440. else:
  441. if fieldName == GC.TESTCASESTATUS:
  442. if testRecordDict[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_SUCCESS:
  443. self.worksheet.write(line, cellNumber, testRecordDict[fieldName], self.cellFormatGreen)
  444. elif testRecordDict[GC.TESTCASESTATUS] == GC.TESTCASESTATUS_ERROR:
  445. self.worksheet.write(line, cellNumber, testRecordDict[fieldName], self.cellFormatRed)
  446. elif fieldName == GC.SCREENSHOTS:
  447. if type(testRecordDict[fieldName]) == list:
  448. self.worksheet.insert_image(line, cellNumber, testRecordDict[fieldName][-1], {'x_scale': 0.05,
  449. 'y_scale': 0.05})
  450. for nm in range(len(testRecordDict[fieldName]) - 1):
  451. self.worksheet.insert_image(line, len(self.fieldListExport) + nm + 1,
  452. testRecordDict[fieldName][nm],
  453. {'x_scale': 0.05, 'y_scale': 0.05})
  454. else:
  455. self.worksheet.insert_image(line, cellNumber, testRecordDict[fieldName], {'x_scale': 0.05,
  456. 'y_scale': 0.05})
  457. self.worksheet.set_row(line, 35)
  458. else:
  459. self.worksheet.write(line, cellNumber, testRecordDict[fieldName])
  460. def closeExcel(self):
  461. self.workbook.close()
  462. # Next line doesn't work on MAC. Returns "not authorized"
  463. # subprocess.Popen([self.filename], shell=True)
  464. class ExportAdditionalDataIntoTab:
  465. def __init__(self, tabname, valueDict, outputExcelSheet: xlsxwriter.Workbook):
  466. self.tab = outputExcelSheet.add_worksheet(tabname)
  467. self.values = valueDict
  468. def export(self):
  469. self.makeHeader()
  470. self.writeLines()
  471. def makeHeader(self):
  472. for cellNumber, entries in self.values.items():
  473. for column, (key, value) in enumerate(entries.items()):
  474. self.tab.write(0, column, key)
  475. break # Write header only for first line.
  476. def writeLines(self):
  477. currentLine = 1
  478. for line, values in self.values.items():
  479. for column, (key, value) in enumerate(values.items()):
  480. self.tab.write(currentLine, column, value)
  481. currentLine += 1
  482. class ExcelSheetHelperFunctions:
  483. def __init__(self):
  484. pass
  485. @staticmethod
  486. def set_column_autowidth(worksheet: Worksheet, column: int):
  487. """
  488. Set the width automatically on a column in the `Worksheet`.
  489. !!! Make sure you run this function AFTER having all cells filled in
  490. the worksheet!
  491. """
  492. maxwidth = ExcelSheetHelperFunctions.get_column_width(worksheet=worksheet, column=column)
  493. if maxwidth is None:
  494. return
  495. elif maxwidth > 45:
  496. maxwidth = 45
  497. worksheet.set_column(first_col=column, last_col=column, width=maxwidth)
  498. @staticmethod
  499. def get_column_width(worksheet: Worksheet, column: int) -> Optional[int]:
  500. """Get the max column width in a `Worksheet` column."""
  501. strings = getattr(worksheet, '_ts_all_strings', None)
  502. if strings is None:
  503. strings = worksheet._ts_all_strings = sorted(
  504. worksheet.str_table.string_table,
  505. key=worksheet.str_table.string_table.__getitem__)
  506. lengths = set()
  507. for row_id, colums_dict in worksheet.table.items(): # type: int, dict
  508. data = colums_dict.get(column)
  509. if not data:
  510. continue
  511. if type(data) is cell_string_tuple:
  512. iter_length = len(strings[data.string])
  513. if not iter_length:
  514. continue
  515. lengths.add(iter_length)
  516. continue
  517. if type(data) is cell_number_tuple:
  518. iter_length = len(str(data.number))
  519. if not iter_length:
  520. continue
  521. lengths.add(iter_length)
  522. if not lengths:
  523. return None
  524. return max(lengths)
  525. class ExportNetWork:
  526. headers = ['BrowserName', 'TestCaseNum', 'Status', 'Method', 'URL', 'ContentType', 'ContentSize', 'Headers',
  527. 'Params', 'Response', 'startDateTime', 'Duration/ms']
  528. def __init__(self, networkInfo: dict, testCasesEndDateTimes_1D: list,
  529. testCasesEndDateTimes_2D: list, workbook: xlsxwriter.Workbook, sheet: xlsxwriter.worksheet):
  530. self.networkInfo = networkInfo
  531. #self.testCasesEndDateTimes_1D = testCasesEndDateTimes_1D
  532. #self.testCasesEndDateTimes_2D = testCasesEndDateTimes_2D
  533. self.workbook = workbook
  534. self.sheet = sheet
  535. header_style = self.get_header_style()
  536. self.write_header(style=header_style)
  537. self.set_column_align()
  538. self.write_content()
  539. self.set_column_width()
  540. def set_column_align(self):
  541. right_align_indexes = list()
  542. right_align_indexes.append(ExportNetWork.headers.index('ContentSize'))
  543. right_align_indexes.append(ExportNetWork.headers.index('Duration/ms'))
  544. right_align_style = self.get_column_style(alignment='right')
  545. left_align_style = self.get_column_style(alignment='left')
  546. [self.sheet.set_column(i, i, cell_format=right_align_style) if i in right_align_indexes else
  547. self.sheet.set_column(i, i, cell_format=left_align_style) for i in range(len(ExportNetWork.headers))]
  548. def set_column_width(self):
  549. [ExcelSheetHelperFunctions.set_column_autowidth(self.sheet, i) for i in range(len(ExportNetWork.headers))]
  550. def get_header_style(self):
  551. header_style = self.workbook.add_format()
  552. header_style.set_bg_color("#00CCFF")
  553. header_style.set_color("#FFFFFF")
  554. header_style.set_bold()
  555. header_style.set_border()
  556. return header_style
  557. def get_column_style(self, alignment=None):
  558. column_style = self.workbook.add_format()
  559. column_style.set_color("black")
  560. column_style.set_align('right') if alignment == 'right' \
  561. else column_style.set_align('left') if alignment == 'left' else None
  562. column_style.set_border()
  563. return column_style
  564. def write_header(self, style=None):
  565. for index, value in enumerate(ExportNetWork.headers):
  566. self.sheet.write(0, index, value, style)
  567. def _get_test_case_num(self, start_date_time, browser_name):
  568. d_t = parse(start_date_time)
  569. d_t = d_t.replace(tzinfo=None)
  570. if self.testCasesEndDateTimes_1D:
  571. for index, dt_end in enumerate(self.testCasesEndDateTimes_1D):
  572. if d_t < dt_end:
  573. return index + 1
  574. elif self.testCasesEndDateTimes_2D:
  575. browser_num = re.findall(r"\d+\.?\d*", str(browser_name))[-1] \
  576. if re.findall(r"\d+\.?\d*", str(browser_name)) else 0
  577. dt_list_index = int(browser_num) if int(browser_num) > 0 else 0
  578. for i, tcAndDtEnd in enumerate(self.testCasesEndDateTimes_2D[dt_list_index]):
  579. if d_t < tcAndDtEnd[1]:
  580. return tcAndDtEnd[0] + 1
  581. return 'unknown'
  582. def write_content(self):
  583. if not self.networkInfo:
  584. return
  585. #partition_index = 0
  586. for index in range(len(self.networkInfo)):
  587. data_list = [
  588. self.networkInfo[index]['browserName'],
  589. self.networkInfo[index]['testcase'],
  590. self.networkInfo[index]['status'],
  591. self.networkInfo[index]['method'],
  592. self.networkInfo[index]['url'],
  593. self.networkInfo[index]['contentType'],
  594. self.networkInfo[index]['contentSize'],
  595. self.networkInfo[index]['headers'],
  596. self.networkInfo[index]['params'],
  597. self.networkInfo[index]['response'],
  598. self.networkInfo[index]['startDateTime'],
  599. self.networkInfo[index]['duration'],
  600. ]
  601. for i in range(len(data_list)):
  602. self.sheet.write(index + 1, i, str(data_list[i]) or 'null')
  603. '''
  604. for info in self.networkInfo:
  605. for index, entry in enumerate(info['log']['entries']):
  606. browser_name = entry['pageref']
  607. status = entry['response']['status']
  608. method = entry['request']['method']
  609. url = entry['request']['url']
  610. content_type = entry['response']['content']['mimeType']
  611. content_size = entry['response']['content']['size']
  612. headers = entry['response']['headers']
  613. params = entry['request']['queryString']
  614. response = entry['response']['content']['text'] if 'text' in entry['response']['content'] else ''
  615. start_date_time = entry['startedDateTime']
  616. duration = entry['time']
  617. test_case_num = self._get_test_case_num(start_date_time, browser_name)
  618. data_list = [browser_name, test_case_num, status, method, url, content_type, content_size,
  619. headers, params, response, start_date_time, duration]
  620. [self.sheet.write(index + partition_index + 1, i, str(data_list[i]) or 'null')
  621. for i in range(len(data_list))]
  622. partition_index += len(info['log']['entries'])
  623. '''
  624. class ExportTiming:
  625. def __init__(self, testdataRecords: dict, sheet: xlsxwriter.worksheet):
  626. self.testdataRecords = testdataRecords
  627. self.sheet: xlsxwriter.worksheet = sheet
  628. self.sections = {}
  629. self.findAllTimingSections()
  630. self.writeHeader()
  631. self.writeLines()
  632. # Autowidth
  633. for n in range(0, len(self.sections) + 1):
  634. ExcelSheetHelperFunctions.set_column_autowidth(self.sheet, n)
  635. def writeHeader(self):
  636. self.wc(0, 0, "Testcase#")
  637. for index, key in enumerate(self.sections.keys(), start=1):
  638. self.wc(0, index, key)
  639. def writeLines(self):
  640. for tcNumber, (key, line) in enumerate(self.testdataRecords.items(), start=1):
  641. self.wc(tcNumber, 0, tcNumber)
  642. lSections = self.interpretTimeLog(line[GC.TIMELOG])
  643. for section, timingValue in lSections.items():
  644. # find, in which column this section should be written:
  645. for column, key in enumerate(self.sections.keys(), 1):
  646. if key == section:
  647. self.wc(tcNumber, column,
  648. timingValue[GC.TIMING_DURATION])
  649. continue
  650. @staticmethod
  651. def shortenTimingValue(timingValue):
  652. # TimingValue is seconds in Float. 2 decimals is enough:
  653. timingValue = int(float(timingValue) * 100)
  654. return timingValue / 100
  655. def writeCell(self, row, col, content, format=None):
  656. self.sheet.write(row, col, content, format)
  657. wc = writeCell
  658. def findAllTimingSections(self):
  659. """
  660. We try to have an ordered list of Timing Sequences. As each Testcase might have different sections we'll have
  661. to make guesses
  662. @return:
  663. """
  664. lSections = {}
  665. for key, line in self.testdataRecords.items():
  666. lTiming: dict = ExportTiming.interpretTimeLog(line[GC.TIMELOG])
  667. for key in lTiming.keys():
  668. if lSections.get(key):
  669. continue
  670. else:
  671. lSections[key] = None
  672. self.sections = lSections
  673. @staticmethod
  674. def interpretTimeLog(lTimeLog):
  675. """Example Time Log:
  676. Complete Testrun: Start: 1579553837.241974 - no end recorded
  677. TestCaseSequenceMaster: Start: 1579553837.243414 - no end recorded
  678. CustTestCaseMaster: Start: 1579553838.97329 - no end recorded
  679. Browser Start: , since last call: 2.3161418437957764
  680. Empfehlungen: , since last call: 6.440968036651611, ZIDs:[175aeac023237a73], TS:2020-01-20 21:57:46.525577
  681. Annahme_RABAZ: , since last call: 2.002716064453125e-05, ZIDs:[6be7d0a44e59acf6], TS:2020-01-20 21:58:37.203583
  682. Antrag drucken: , since last call: 9.075241088867188, ZIDs:[6be7d0a44e59acf6, b27c3875ddcbb4fa], TS:2020-01-20 21:58:38.040137
  683. Warten auf Senden an Bestand Button: , since last call: 1.3927149772644043
  684. Senden an Bestand: , since last call: 9.60469913482666, ZIDs:[66b12fa4869cf8a0, ad1f3d47c4694e26], TS:2020-01-20 21:58:49.472288
  685. where the first part before ":" is the section, "since last call:" is the duration, TS: is the timestamp
  686. Update 29.3.2020: Format changed to "since last call: 00:xx:xx,", rest looks identical.
  687. """
  688. lExport = {}
  689. lLines = lTimeLog.split("\n")
  690. for line in lLines:
  691. parts = line.split(",")
  692. if len(parts) < 2:
  693. continue
  694. if "Start:" in line:
  695. # Format <sequence>: <Start>: <time.loctime>
  696. continue
  697. else:
  698. lSection = parts[0].replace(":", "").strip()
  699. lDuration = parts[1].split("since last call: ")[1]
  700. lExport[lSection] = {GC.TIMING_DURATION: lDuration}
  701. return lExport