Browse Source

Use greenlet for parallel tasks

Ken Mercado 4 years ago
parent
commit
0823ab4f2d

+ 50 - 51
baangt/TestCaseSequence/TestCaseSequenceMaster.py

@@ -9,6 +9,9 @@ import time
 from datetime import datetime
 import sys
 import logging
+import gevent
+import gevent.queue
+import gevent.pool
 
 logger = logging.getLogger("pyC")
 
@@ -58,62 +61,58 @@ class TestCaseSequenceMaster:
         # In this case we need to request them, because the Testcases will run in their own
         # Processes
         parallelInstances = int(parallelInstances)
-        browserInstances = {}
-        for n in range(0, int(parallelInstances)):
+
+        # Browser instances
+        browsers = []
+        results = gevent.queue.Queue()
+        records = gevent.queue.Queue()
+
+        for n, record in self.dataRecords.items():
+            records.put((n, record))
+
+        def single_thread(sequenceNumber):
             # fixme: Browser should come from Testcase definition - not hardcoded. It's not that easy, as we might have many
             # fixme: Testcases, some Browser, some API and we might even have different browsers. For now we'll only
             # fixme: take Browser from globals-file
+            # create browser
+
             lBrowserName = self.testRunInstance.globalSettings.get("TC.Browser", GC.BROWSER_FIREFOX)
             lBrowserAttributes = self.testRunInstance.globalSettings.get("TC." + GC.BROWSER_ATTRIBUTES, None)
-            browserInstances[n] = self.testRunInstance.getBrowser(browserInstance=n,
-                                                                  browserName=lBrowserName,
-                                                                  browserAttributes=lBrowserAttributes)
-
-        processes = {}
-        processExecutions = {}
-        resultQueue = multiprocessing.Queue()
-
-        numberOfRecords = len(self.dataRecords)
-        for n in range(0, numberOfRecords, parallelInstances):
-            for x in range(0, parallelInstances):
-                if self.dataRecords.get(n + x):
-                    logger.debug(f"starting Process and Executions {x}. Value of n+x is {n + x}, "
-                                 f"Record = {str(self.dataRecords[n + x])[0:50]}")
-                    self.kwargs[GC.KWARGS_DATA] = self.dataRecords[n+x]
-                    # Prints the first 5 fields of the data record into the log:
-                    logger.info(f"Starting parallel execution with TestRecord {n+x}, Details: " +
-                        str({k: self.kwargs[GC.KWARGS_DATA][k] for k in list(self.kwargs[GC.KWARGS_DATA])[0:5]}))
-                    self.kwargs[GC.KWARGS_BROWSER] = browserInstances[x]
-                    processes[x] = TestCaseSequenceParallel(sequenceNumber=x,
-                                                            tcNumber=n + x,
-                                                            testcaseSequence=self.testCases,
-                                                            **self.kwargs)
-                    processExecutions[x] = multiprocessing.Process(target=processes[x].one_sequence,
-                                                                   args=(resultQueue,))
-                else:
-                    # This is the case when we have e.g. 4 parallel runs and 5 testcases,
-                    # First iteration: all 4 are used. Second iteration: only 1 used, 3 are empty.
-                    if processExecutions.get(x):
-                        processExecutions.pop(x)
-
-            for x in range(0, parallelInstances):
-                logger.info(f"starting execution of parallel instance {x}")
-                if processExecutions.get(x):
-                    processExecutions[x].start()
-
-            for x in range(0, parallelInstances):
-                if processExecutions.get(x):
-                    # Queue should be filled by now - take entries into Testrun-instance:
-                    while not resultQueue.empty():
-                        resultDictList = resultQueue.get()
-                        for recordNumber, dataRecordAfterExecution in resultDictList[0].items():
-                            self.testRunInstance.setResult(recordNumber, dataRecordAfterExecution)
-                        for sequenceNumber, tcNumberAndTestEnd in resultDictList[1].items():
-                            self.testRunInstance.append2DTestCaseEndDateTimes(sequenceNumber, tcNumberAndTestEnd)
-
-                    # Quit the running parallel process:
-                    logger.info(f"Stopping parallel instance {x}")
-                    processExecutions[x].join()
+            browser = self.testRunInstance.getBrowser(browserInstance=sequenceNumber,
+                                            browserName=lBrowserName,
+                                            browserAttributes=lBrowserAttributes)
+                                            
+            # Consume records
+            while not records.empty():
+                n, record = records.get()
+                kwargs = {
+                    GC.KWARGS_DATA: record,
+                    GC.KWARGS_BROWSER: browser,
+                    **self.kwargs
+                }
+
+                logger.info(f"Starting parallel execution with TestRecord {n}, Details: " +
+                    str({k: kwargs[GC.KWARGS_DATA][k] for k in list(kwargs[GC.KWARGS_DATA])[0:5]}))
+
+                process = TestCaseSequenceParallel(tcNumber=n,
+                                                    sequenceNumber=sequenceNumber,
+                                                    testcaseSequence=self.testCases,
+                                                    **kwargs)
+                process.one_sequence(results)
+
+
+        # Create and runconcurrent threads
+        threads = gevent.joinall([
+            gevent.spawn(single_thread, num) for num in range(parallelInstances)
+        ])
+
+        # after joining all threads
+        while not results.empty():
+            result = results.get()
+            for recordNumber, dataRecordAfterExecution in result[0].items():
+                self.testRunInstance.setResult(recordNumber, dataRecordAfterExecution)
+            for sequenceNumber, tcNumberAndTestEnd in result[1].items():
+                self.testRunInstance.append2DTestCaseEndDateTimes(sequenceNumber, tcNumberAndTestEnd)
 
     def execute(self):
         # Execute all Testcases:

+ 5 - 10
baangt/TestCaseSequence/TestCaseSequenceParallel.py

@@ -4,24 +4,24 @@ from baangt.TestSteps import Exceptions
 from baangt.base import GlobalConstants as GC
 from datetime import datetime
 import time
+import gevent.queue
 logger = logging.getLogger("pyC")
 
 
 class TestCaseSequenceParallel:
-    def __init__(self, sequenceNumber, tcNumber, testcaseSequence=None, **kwargs):
-        self.manager = multiprocessing.Manager()
-        self.process_list = self.manager.list()
+    def __init__(self, sequenceNumber: int, tcNumber: int, testcaseSequence=None, **kwargs):
         self.sequenceNumber = sequenceNumber
         self.dataRecord = kwargs.get(GC.KWARGS_DATA)
         self.tcNumber = tcNumber
         self.testcaseSequence = testcaseSequence
         self.kwargs = kwargs
 
-    def one_sequence(self, resultQueue: multiprocessing.Queue):
+    def one_sequence(self, results: gevent.queue.Queue):
         dataRecord = self.dataRecord
         currentRecordNumber = self.tcNumber
         testcaseSequence = self.testcaseSequence
         parallelizationSequenceNumber = self.sequenceNumber
+        
         logger.info(f"Starting one_sequence with SequenceNumber = {parallelizationSequenceNumber}, "
                     f"CurrentRecordNumber is {currentRecordNumber}")
 
@@ -34,14 +34,9 @@ class TestCaseSequenceParallel:
                                                                                 GC.STRUCTURE_TESTCASE,
                                                                                 **self.kwargs)
 
-
         except Exceptions.baangtTestStepException as e:
             logger.critical(f"Unhandled Error happened in parallel run {parallelizationSequenceNumber}: " + str(e))
             dataRecord[GC.TESTCASESTATUS] = GC.TESTCASESTATUS_ERROR
         finally:
-            # the result must be pushed into the queue:
-            logger.debug(
-                f"Starting to Put value in Queue {currentRecordNumber}. Len of datarecord: {len(str(dataRecord))}")
             d_t = datetime.fromtimestamp(time.time())
-            resultQueue.put([{self.tcNumber: dataRecord}, {self.sequenceNumber:  [self.tcNumber, d_t]}])
-            logger.debug(f"Finished putting Value i Queue for TC {currentRecordNumber}")
+            results.put([{self.tcNumber: dataRecord}, {self.sequenceNumber:  [self.tcNumber, d_t]}])