TestCaseSequenceMaster.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. from baangt.base.HandleDatabase import HandleDatabase
  2. from baangt.TestCaseSequence.TestCaseSequenceParallel import TestCaseSequenceParallel
  3. from baangt.base.Timing.Timing import Timing
  4. from baangt.base.Utils import utils
  5. import baangt.base.GlobalConstants as GC
  6. import multiprocessing
  7. from pathlib import Path
  8. import time
  9. from datetime import datetime
  10. import sys
  11. import logging
  12. import gevent
  13. import gevent.queue
  14. import gevent.pool
  15. from baangt.base.RuntimeStatistics import Statistic
  16. logger = logging.getLogger("pyC")
  17. class TestCaseSequenceMaster:
  18. def __init__(self, **kwargs):
  19. self.name = None
  20. self.description = None
  21. self.timing: Timing = kwargs.get(GC.KWARGS_TIMING)
  22. self.testdataDataBase = None
  23. self.testrunAttributes = kwargs.get(GC.KWARGS_TESTRUNATTRIBUTES)
  24. self.testRunInstance = kwargs.get(GC.KWARGS_TESTRUNINSTANCE)
  25. self.testRunName = self.testRunInstance.testRunName
  26. self.dataRecords = {}
  27. # Extract relevant data for this TestSequence:
  28. self.testSequenceData = self.testrunAttributes[GC.STRUCTURE_TESTCASESEQUENCE].get(
  29. kwargs.get(GC.STRUCTURE_TESTCASESEQUENCE))[1]
  30. self.testCases = self.testSequenceData[GC.STRUCTURE_TESTCASE]
  31. self.kwargs = kwargs
  32. self.timingName = self.timing.takeTime(self.__class__.__name__, forceNew=True)
  33. self.statistics = Statistic()
  34. self.prepareExecution()
  35. if int(self.testSequenceData.get(GC.EXECUTION_PARALLEL, 0)) > 1:
  36. self.execute_parallel(self.testSequenceData.get(GC.EXECUTION_PARALLEL, 0))
  37. else:
  38. self.execute()
  39. def prepareExecution(self):
  40. if self.testSequenceData.get(GC.DATABASE_FROM_LINE) and not self.testSequenceData.get(GC.DATABASE_LINES, None):
  41. # Change old line selection format into new format:
  42. self.testSequenceData[
  43. GC.DATABASE_LINES] = f"{self.testSequenceData.get(GC.DATABASE_FROM_LINE)}-{self.testSequenceData.get(GC.DATABASE_TO_LINE)}"
  44. self.testSequenceData.pop(GC.DATABASE_FROM_LINE)
  45. self.testSequenceData.pop(GC.DATABASE_TO_LINE)
  46. self.__getDatabase()
  47. recordPointer = 0
  48. # Read all Testrecords into self.dataRecords:
  49. while True:
  50. self.dataRecords[recordPointer] = self.getNextRecord()
  51. if not self.dataRecords[recordPointer]:
  52. self.dataRecords.pop(recordPointer)
  53. recordPointer -= 1
  54. break
  55. recordPointer += 1
  56. logger.info(f"{recordPointer + 1} test records read for processing")
  57. self.statistics.total_testcases(recordPointer + 1)
  58. def execute_parallel(self, parallelInstances):
  59. parallelInstances = int(parallelInstances)
  60. results = gevent.queue.Queue()
  61. records = gevent.queue.Queue()
  62. for n, record in self.dataRecords.items():
  63. records.put((n, record))
  64. def single_thread(sequenceNumber):
  65. # Consume records
  66. while not records.empty():
  67. n, record = records.get()
  68. kwargs = {
  69. GC.KWARGS_DATA: record,
  70. }
  71. kwargs.update(self.kwargs)
  72. logger.info(f"Starting parallel execution with TestRecord {n}, Details: " +
  73. str({k: kwargs[GC.KWARGS_DATA][k] for k in list(kwargs[GC.KWARGS_DATA])[0:5]}))
  74. process = TestCaseSequenceParallel(tcNumber=n,
  75. sequenceNumber=sequenceNumber,
  76. testcaseSequence=self.testCases,
  77. **kwargs)
  78. process.one_sequence(results)
  79. # Create and runconcurrent threads
  80. # 28.5.2020: We temporarily had a timeout here in Joinall as a few weeks back it looked like sometimes
  81. # the complete job hangs indefinitely. The timeout worked for the whole testun, not for a single
  82. # test case (spawn doesn't accept a timeout). Longer running test runs died - the timeout worked :)
  83. threads = gevent.joinall([gevent.spawn(single_thread, num)
  84. for num in range(parallelInstances)])
  85. # after joining all threads
  86. while not results.empty():
  87. result = results.get()
  88. for recordNumber, dataRecordAfterExecution in result[0].items():
  89. self.testRunInstance.setResult(recordNumber, dataRecordAfterExecution)
  90. for sequenceNumber, tcNumberAndTestEnd in result[1].items():
  91. self.testRunInstance.append2DTestCaseEndDateTimes(sequenceNumber, tcNumberAndTestEnd)
  92. def execute(self):
  93. # Execute all Testcases:
  94. for key, value in self.dataRecords.items():
  95. self.kwargs[GC.KWARGS_DATA] = value
  96. self.kwargs[GC.KWARGS_SEQUENCENUMBER] = 0 # There are no more sequences, so is always 0.
  97. logger.info(f"Starting execution with TestRecord {key}, Details: " +
  98. str({k: self.kwargs[GC.KWARGS_DATA][k] for k in list(self.kwargs[GC.KWARGS_DATA])[0:5]}))
  99. # Here the testcase is started.
  100. self.testRunInstance.executeDictSequenceOfClasses(self.testCases, GC.STRUCTURE_TESTCASE,
  101. **self.kwargs)
  102. d_t = datetime.fromtimestamp(time.time())
  103. self.testRunInstance.append1DTestCaseEndDateTimes(d_t)
  104. logger.info("execute append1DTestCaseEndDateTimes, param is {}".format(d_t))
  105. # Write Result back to TestRun for later saving in export format
  106. self.testRunInstance.setResult(key, value)
  107. self.statistics.update_testcase_sequence()
  108. def getNextRecord(self):
  109. if not self.testdataDataBase:
  110. self.__getDatabase()
  111. lDataRecord = self.testdataDataBase.readNextRecord()
  112. return lDataRecord
  113. def __getDatabase(self):
  114. if not self.testdataDataBase:
  115. self.testdataDataBase = HandleDatabase(globalSettings=self.testRunInstance.globalSettings,
  116. linesToRead=self.testSequenceData.get(GC.DATABASE_LINES))
  117. self.testdataDataBase.read_excel(
  118. fileName=utils.findFileAndPathFromPath(
  119. self.testSequenceData[GC.DATABASE_FILENAME],
  120. basePath=str(Path(self.testRunInstance.globalSettingsFileNameAndPath).parent)),
  121. sheetName=self.testSequenceData[GC.DATABASE_SHEETNAME])
  122. return self.testdataDataBase
  123. def tearDown(self):
  124. self.timing.takeTime(self.timingName)