TestCaseSequenceMaster.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. from baangt.base.HandleDatabase import HandleDatabase
  2. from baangt.TestCaseSequence.TestCaseSequenceParallel import TestCaseSequenceParallel
  3. from baangt.base.Timing.Timing import Timing
  4. from baangt.base.Utils import utils
  5. import baangt.base.GlobalConstants as GC
  6. import multiprocessing
  7. from pathlib import Path
  8. import time
  9. from datetime import datetime
  10. import sys
  11. import logging
  12. import gevent
  13. import gevent.queue
  14. import gevent.pool
  15. from baangt.base.RuntimeStatistics import Statistic
  16. from CloneXls import CloneXls
  17. logger = logging.getLogger("pyC")
  18. class TestCaseSequenceMaster:
  19. def __init__(self, **kwargs):
  20. self.name = None
  21. self.description = None
  22. self.timing: Timing = kwargs.get(GC.KWARGS_TIMING)
  23. self.testdataDataBase = None
  24. self.testrunAttributes = kwargs.get(GC.KWARGS_TESTRUNATTRIBUTES)
  25. self.testRunInstance = kwargs.get(GC.KWARGS_TESTRUNINSTANCE)
  26. self.testRunName = self.testRunInstance.testRunName
  27. self.dataRecords = {}
  28. # Extract relevant data for this TestSequence:
  29. self.testSequenceData = self.testrunAttributes[GC.STRUCTURE_TESTCASESEQUENCE].get(
  30. kwargs.get(GC.STRUCTURE_TESTCASESEQUENCE))[1]
  31. self.testCases = self.testSequenceData[GC.STRUCTURE_TESTCASE]
  32. self.kwargs = kwargs
  33. self.timingName = self.timing.takeTime(self.__class__.__name__, forceNew=True)
  34. try:
  35. self.statistics = Statistic()
  36. self.prepareExecution()
  37. if int(self.testSequenceData.get(GC.EXECUTION_PARALLEL, 0)) > 1:
  38. self.execute_parallel(self.testSequenceData.get(GC.EXECUTION_PARALLEL, 0))
  39. else:
  40. self.execute()
  41. except Exception as e:
  42. logger.warning(f"Uncought exception {e}")
  43. utils.traceback(exception_in=e)
  44. def prepareExecution(self):
  45. logger.info("Preparing Test Records...")
  46. if self.testSequenceData.get(GC.DATABASE_FROM_LINE) and not self.testSequenceData.get(GC.DATABASE_LINES, None):
  47. # Change old line selection format into new format:
  48. self.testSequenceData[
  49. GC.DATABASE_LINES] = f"{self.testSequenceData.get(GC.DATABASE_FROM_LINE)}-{self.testSequenceData.get(GC.DATABASE_TO_LINE)}"
  50. self.testSequenceData.pop(GC.DATABASE_FROM_LINE)
  51. self.testSequenceData.pop(GC.DATABASE_TO_LINE)
  52. self.__getDatabase()
  53. recordPointer = 0
  54. # Read all Testrecords into self.dataRecords:
  55. while True:
  56. self.dataRecords[recordPointer] = self.getNextRecord()
  57. if not self.dataRecords[recordPointer]:
  58. self.dataRecords.pop(recordPointer)
  59. recordPointer -= 1
  60. break
  61. recordPointer += 1
  62. self.testdataDataBase.update_datarecords(self.dataRecords, fileName=utils.findFileAndPathFromPath(
  63. self.testSequenceData[GC.DATABASE_FILENAME],
  64. basePath=str(Path(self.testRunInstance.globalSettingsFileNameAndPath).parent)),
  65. sheetName=self.testSequenceData[GC.DATABASE_SHEETNAME], noCloneXls=self.testRunInstance.noCloneXls)
  66. logger.info(f"{recordPointer + 1} test records read for processing")
  67. self.statistics.total_testcases(recordPointer + 1)
  68. def execute_parallel(self, parallelInstances):
  69. parallelInstances = int(parallelInstances)
  70. results = gevent.queue.Queue()
  71. records = gevent.queue.Queue()
  72. for n, record in self.dataRecords.items():
  73. records.put((n, record))
  74. def single_thread(sequenceNumber):
  75. # Consume records
  76. while not records.empty():
  77. n, record = records.get()
  78. kwargs = {
  79. GC.KWARGS_DATA: record,
  80. }
  81. kwargs.update(self.kwargs)
  82. logger.info(f"Starting parallel execution with TestRecord {n}, Details: " +
  83. str({k: kwargs[GC.KWARGS_DATA][k] for k in list(kwargs[GC.KWARGS_DATA])[0:5]}))
  84. process = TestCaseSequenceParallel(tcNumber=n,
  85. sequenceNumber=sequenceNumber,
  86. testcaseSequence=self.testCases,
  87. **kwargs)
  88. process.one_sequence(results)
  89. # Create and runconcurrent threads
  90. # 28.5.2020: We temporarily had a timeout here in Joinall as a few weeks back it looked like sometimes
  91. # the complete job hangs indefinitely. The timeout worked for the whole testun, not for a single
  92. # test case (spawn doesn't accept a timeout). Longer running test runs died - the timeout worked :)
  93. threads = gevent.joinall([gevent.spawn(single_thread, num)
  94. for num in range(parallelInstances)])
  95. # after joining all threads
  96. while not results.empty():
  97. result = results.get()
  98. for recordNumber, dataRecordAfterExecution in result[0].items():
  99. self.testRunInstance.setResult(recordNumber, dataRecordAfterExecution)
  100. for sequenceNumber, tcNumberAndTestEnd in result[1].items():
  101. self.testRunInstance.append2DTestCaseEndDateTimes(sequenceNumber, tcNumberAndTestEnd)
  102. def execute(self):
  103. # Execute all Testcases:
  104. for key, value in self.dataRecords.items():
  105. self.kwargs[GC.KWARGS_DATA] = value
  106. self.kwargs[GC.KWARGS_SEQUENCENUMBER] = 0 # There are no more sequences, so is always 0.
  107. logger.info(f"Starting execution with TestRecord {key}, Details: " +
  108. str({k: self.kwargs[GC.KWARGS_DATA][k] for k in list(self.kwargs[GC.KWARGS_DATA])[0:5]}))
  109. # Here the testcase is started.
  110. self.testRunInstance.executeDictSequenceOfClasses(self.testCases, GC.STRUCTURE_TESTCASE,
  111. **self.kwargs)
  112. d_t = datetime.fromtimestamp(time.time())
  113. self.testRunInstance.append1DTestCaseEndDateTimes(d_t)
  114. logger.info("execute append1DTestCaseEndDateTimes, param is {}".format(d_t))
  115. # Write Result back to TestRun for later saving in export format
  116. self.testRunInstance.setResult(key, value)
  117. self.statistics.update_testcase_sequence()
  118. def getNextRecord(self):
  119. if not self.testdataDataBase:
  120. self.__getDatabase()
  121. lDataRecord = self.testdataDataBase.readNextRecord()
  122. return lDataRecord
  123. def __getDatabase(self):
  124. if not self.testdataDataBase:
  125. self.testdataDataBase = HandleDatabase(globalSettings=self.testRunInstance.globalSettings,
  126. linesToRead=self.testSequenceData.get(GC.DATABASE_LINES))
  127. testDataFile = utils.findFileAndPathFromPath(
  128. self.testSequenceData[GC.DATABASE_FILENAME],
  129. basePath=str(Path(self.testRunInstance.globalSettingsFileNameAndPath).parent))
  130. if not self.testRunInstance.noCloneXls:
  131. cloneXls = CloneXls(testDataFile) # , logger=logger)
  132. testDataFile = cloneXls.update_or_make_clone(
  133. ignore_headers=["TestResult", "UseCount"])
  134. self.testSequenceData[GC.DATABASE_FILENAME] = testDataFile
  135. self.testdataDataBase.read_excel(
  136. fileName=testDataFile,
  137. sheetName=self.testSequenceData[GC.DATABASE_SHEETNAME])
  138. return self.testdataDataBase
  139. def tearDown(self):
  140. self.timing.takeTime(self.timingName)