123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- import baangt.base.GlobalConstants as GC
- from baangt.base.Timing.Timing import Timing
- from baangt.base.BrowserHandling.BrowserHandling import BrowserDriver
- from baangt.base.ApiHandling import ApiHandling
- import sys
- from pkg_resources import parse_version
- import logging
- from baangt.TestSteps.Exceptions import baangtTestStepException
- from baangt.TestSteps.AddressCreation import AddressCreate
- from baangt.base.PDFCompare import PDFCompare, PDFCompareDetails
- from baangt.base.Faker import Faker as baangtFaker
- from baangt.base.Utils import utils
- from baangt.base.RuntimeStatistics import Statistic
- import random
- import itertools
- logger = logging.getLogger("pyC")
- class TestStepMaster:
- def __init__(self, executeDirect=True, **kwargs):
- self.anchor = None
- self.anchorLocator = None
- self.anchorLocatorType = None
- self.testCaseStatus = None
- self.ifActive = False
- self.ifIsTrue = True
- self.elseIsTrue = False
- self.ifLis = [self.ifIsTrue]
- self.elseLis = [self.elseIsTrue]
- self.ifConditions = 0
- self.repeatIsTrue = [False]
- self.repeatDict = [] # used to store steps command of repeat data
- self.repeatData = [] # used to store RLP_ data to be looped in repeat
- self.repeatCount = [] # Used to store count of randomdata in loop
- self.repeatActive = 0 # to sync active repeat counts with repeat done and will be execute when both are equal
- self.repeatDone = 0
- self.baangtFaker = None
- self.statistics = Statistic()
- self.kwargs = kwargs
- self.testRunInstance = kwargs.get(GC.KWARGS_TESTRUNINSTANCE)
- self.testcaseDataDict = kwargs.get(GC.KWARGS_DATA)
- self.timing: Timing = kwargs.get(GC.KWARGS_TIMING)
- self.timingName = self.timing.takeTime(self.__class__.__name__, forceNew=True)
- self.browserSession: BrowserDriver = kwargs.get(GC.KWARGS_BROWSER)
- self.apiSession: ApiHandling = kwargs.get(GC.KWARGS_API_SESSION)
- self.testStepNumber = kwargs.get(GC.STRUCTURE_TESTSTEP) # Set in TestRunData by TestCaseMaster
- self.testRunUtil = self.testRunInstance.testRunUtils
- # Get the release from testrun-customizing if set: (will influence, whether or not elements will be executed)
- self.globalRelease = self.testRunInstance.globalSettings.get("Release", "")
- # check, if this TestStep has additional Parameters and if so, execute
- lSequence = self.testRunUtil.getSequenceByNumber(testRunName=self.testRunInstance.testRunName,
- sequence=kwargs.get(GC.STRUCTURE_TESTCASESEQUENCE))
- self.testCase = self.testRunUtil.getTestCaseByNumber(lSequence, kwargs.get(GC.STRUCTURE_TESTCASE))
- self.testStep = self.testRunUtil.getTestStepByNumber(self.testCase, self.testStepNumber)
- if self.testStep and len(self.testStep) > 1:
- if not isinstance(self.testStep[1], str) and executeDirect:
- # This TestStepMaster-Instance should actually do something - activitites are described
- # in the TestExecutionSteps.
- # Otherwise there's only a classname in TestStep[0]
- self.executeDirect(self.testStep[1][GC.STRUCTURE_TESTSTEPEXECUTION])
- # Teardown makes only sense, when we actually executed something directly in here
- # Otherwise (if it was 1 or 2 Tab-stops more to the left) we'd take execution time without
- # having done anything
- self.teardown()
- self.statistics.update_teststep_sequence()
- def executeDirect(self, executionCommands):
- """
- Executes a sequence of Commands. Will be subclassed in other modules.
- :param executionCommands:
- :return:
- """
- for index, (key, command) in enumerate(executionCommands.items()):
- try:
- self.executeDirectSingle(index, command)
- except Exception as ex:
- # 2020-07-16: This Exception is cought, printed and then nothing. That's not good. The test run
- # continues forever, despite this exception. Correct way would be to set test case to error and stop
- # this test case
- # Before we change something here we should check, why the calling function raises an error.
- logger.critical(ex)
- self.testcaseDataDict[GC.TESTCASESTATUS] = GC.TESTCASESTATUS_ERROR
- return
- def manageNestedCondition(self, condition="", ifis=False):
- if condition.upper() == "IF":
- self.ifConditions += 1
- self.ifLis.append(ifis)
- self.elseLis.append(False)
- elif condition.upper() == "ELSE":
- self.elseLis[-1] = True
- elif condition.upper() == "ENDIF":
- self.ifConditions -= 1
- if self.ifConditions < 0:
- raise BaseException("Numbers of ENDIF are greater than IF.")
- self.ifLis.pop()
- self.elseLis.pop()
- assert len(self.ifLis) == self.ifConditions + 1 and len(self.elseLis) == self.ifConditions + 1
- self.ifIsTrue = self.ifLis[-1]
- self.elseIsTrue = self.elseLis[-1]
- def executeDirectSingle(self, commandNumber, command, replaceFromDict=None):
- """
- This will execute a single instruction
- """
- # when we have an IF-condition and it's condition was not TRUE, then skip whatever comes here until we
- # reach Endif
- if not self.ifIsTrue and not self.elseIsTrue:
- if command["Activity"].upper() == "IF":
- self.manageNestedCondition(condition=command["Activity"].upper(), ifis=self.ifIsTrue)
- return
- if command["Activity"].upper() != "ELSE" and command["Activity"].upper() != "ENDIF":
- return
- if self.repeatIsTrue[-1]: # If repeat statement is active then execute this
- if command["Activity"].upper() == "REPEAT": # To sync active repeat with repeat done
- self.repeatActive += 1
- if command["Activity"].upper() != "REPEAT-DONE": # store command in repeatDict
- self.repeatDict[-1][commandNumber] = command
- return
- else:
- self.repeatDone += 1 # to sync repeat done with active repeat
- if self.repeatDone < self.repeatActive: # if all repeat-done are not synced with repeat store the data
- self.repeatDict[-1][commandNumber] = command
- logger.info(command)
- return
- self.repeatIsTrue[-1] = False
- if self.repeatData[-1]:
- data_list = []
- if type(self.repeatData[-1]) is list:
- for data_dic in self.repeatData[-1]:
- keys, values = zip(*data_dic.items())
- final_values = []
- for value in values: # coverting none list values to list. Useful in make all possible data using itertools
- if type(value) is not list:
- final_values.append([value])
- else:
- final_values.append(value)
- data_l = [dict(zip(keys, v)) for v in itertools.product(*final_values)] # itertools to make all possible combinations
- data_list.extend(data_l)
- else:
- data_list = [self.repeatData[-1]]
- if len(self.repeatCount) > 0 and self.repeatCount[-1]: # get random data from list of data
- try:
- data_list = random.sample(data_list, int(self.repeatCount[-1]))
- except:
- pass
- if data_list:
- for data in data_list:
- temp_dic = dict(self.repeatDict[-1])
- processed_data = {}
- for key in temp_dic:
- try:
- processed_data = dict(temp_dic[key])
- except Exception as ex:
- logger.debug(ex)
- try:
- self.executeDirectSingle(key, processed_data, replaceFromDict=data)
- except Exception as ex:
- logger.info(ex)
- else:
- temp_dic = dict(self.repeatDict[-1])
- for key in temp_dic:
- try:
- processed_data = dict(temp_dic[key])
- except Exception as ex:
- logger.debug(ex)
- for ky in processed_data:
- try:
- processed_data[ky] = self.replaceVariables(processed_data[ky])
- except Exception as ex:
- logger.info(ex)
- try:
- self.executeDirectSingle(key, processed_data)
- except Exception as ex:
- logger.info(ex)
- del self.repeatIsTrue[-1]
- del self.repeatDict[-1]
- del self.repeatData[-1]
- del self.repeatCount[-1]
- self.repeatDone -= 1
- self.repeatActive -= 1
- return
- lActivity = command["Activity"].upper()
- if lActivity == "COMMENT":
- return # Comment's are ignored
- lLocatorType = command["LocatorType"].upper()
- try:
- lLocator = self.replaceVariables(command["Locator"])
- except Exception as ex:
- logger.info(ex)
- if lLocator and not lLocatorType: # If locatorType is empty, default it to XPATH
- lLocatorType = 'XPATH'
- xpath, css, id = self.__setLocator(lLocatorType, lLocator)
- if self.anchor and xpath:
- if xpath[0:3] == '///': # Xpath doesn't want to use Anchor
- xpath = xpath[1:]
- logger.debug(f"Anchor active, but escaped. Using pure xpath: {xpath}")
- else:
- logger.debug(f"Anchor active. combining {self.anchorLocator} with {xpath}")
- xpath = self.anchorLocator + xpath
- lValue = str(command["Value"])
- lValue2 = str(command["Value2"])
- lComparison = command["Comparison"]
- lOptional = utils.anything2Boolean(command["Optional"])
- # check release line
- lRelease = command["Release"]
- # Timeout defaults to 20 seconds, if not set otherwise.
- lTimeout = TestStepMaster._setTimeout(command["Timeout"])
- lTimingString = f"TS {commandNumber} {lActivity.lower()}"
- self.timing.takeTime(lTimingString)
- logger.info(
- f"Executing TestStepDetail {commandNumber} with parameters: act={lActivity}, lType={lLocatorType}, loc={lLocator}, "
- f"Val1={lValue}, comp={lComparison}, Val2={lValue2}, Optional={lOptional}, timeout={lTimeout}")
- original_value = lValue # used as key to make rlp json dict and will be used further to make sheet name
- lValue, lValue2 = self.replaceAllVariables(lValue, lValue2, replaceFromDict=replaceFromDict)
- if not TestStepMaster.ifQualifyForExecution(self.globalRelease, lRelease):
- logger.debug(f"we skipped this line due to {lRelease} disqualifies according to {self.globalRelease} ")
- return
- if lActivity == "GOTOURL":
- self.browserSession.goToUrl(lValue)
- elif lActivity == "SETTEXT":
- self.browserSession.findByAndSetText(xpath=xpath, css=css, id=id, value=lValue, timeout=lTimeout)
- elif lActivity == "SETTEXTIF":
- self.browserSession.findByAndSetTextIf(xpath=xpath, css=css, id=id, value=lValue, timeout=lTimeout,
- optional=lOptional)
- elif lActivity == "FORCETEXT":
- self.browserSession.findByAndForceText(xpath=xpath, css=css, id=id, value=lValue, timeout=lTimeout)
- elif lActivity == "SETANCHOR":
- if not lLocator:
- self.anchor = None
- self.anchorLocator = None
- self.anchorLocatorType = None
- else:
- found = self.browserSession.findBy(xpath=xpath, css=css, id=id, timeout=lTimeout)
- if found:
- self.anchor = self.browserSession.element
- self.anchorLocator = lLocator
- self.anchorLocatorType = lLocatorType
- logger.debug(f"Anchor set: {lLocatorType} {lLocator}")
- else:
- logger.error(f"Anchor should be set, but can't be found in the current page: {lLocatorType}, {lLocator}")
- raise ValueError(f"Anchor should be set, but can't be found in the current page: {lLocatorType}, {lLocator}")
- elif lActivity == 'HANDLEIFRAME':
- self.browserSession.handleIframe(lLocator)
- elif lActivity == 'SWITCHWINDOW':
- lWindow = lValue
- if lWindow.isnumeric():
- lWindow = int(lWindow)
- self.browserSession.handleWindow(windowNumber=lWindow, timeout=lTimeout)
- elif lActivity == "CLICK":
- self.browserSession.findByAndClick(xpath=xpath, css=css, id=id, timeout=lTimeout)
- elif lActivity == "CLICKIF":
- self.browserSession.findByAndClickIf(xpath=xpath, css=css, id=id, timeout=lTimeout, value=lValue)
- elif lActivity == "PAUSE":
- self.browserSession.sleep(seconds=float(lValue))
- elif lActivity == "IF":
- # Originally we had only Comparisons. Now we also want to check for existance of Field
- if not lValue and lLocatorType and lLocator:
- lValue = self.browserSession.findBy(xpath=xpath, css=css, id=id, optional=lOptional,
- timeout=lTimeout)
- self.__doComparisons(lComparison=lComparison, value1=lValue, value2=lValue2)
- logger.debug(f"IF-condition {lValue} {lComparison} {lValue2} evaluated to: {self.ifIsTrue} ")
- elif lActivity == "ELSE":
- if not self.ifIsTrue:
- self.manageNestedCondition(condition=lActivity)
- logger.debug("Executing ELSE-condition")
- elif lActivity == "ENDIF":
- self.manageNestedCondition(condition=lActivity)
- elif lActivity == "REPEAT":
- self.repeatActive += 1
- self.repeatIsTrue.append(True)
- self.repeatDict.append({})
- if original_value not in self.testRunInstance.json_dict:
- self.testRunInstance.json_dict[original_value] = []
- self.testRunInstance.json_dict[original_value].append(lValue)
- self.repeatData.append(lValue)
- self.repeatCount.append(lValue2)
- elif lActivity == 'GOBACK':
- self.browserSession.goBack()
- elif lActivity == 'APIURL':
- self.apiSession.setBaseURL(lValue)
- elif lActivity == 'ENDPOINT':
- self.apiSession.setEndPoint(lValue)
- elif lActivity == 'POST':
- self.apiSession.postURL(content=lValue)
- elif lActivity == 'GET':
- self.apiSession.getURL()
- elif lActivity == 'HEADER':
- self.apiSession.setHeaders(setHeaderData=lValue)
- elif lActivity == 'SAVE':
- self.doSaveData(lValue, lValue2, lLocatorType, lLocator)
- elif lActivity == 'CLEAR':
- # Clear a variable:
- if self.testcaseDataDict.get(lValue):
- del self.testcaseDataDict[lValue]
- elif lActivity == 'SAVETO':
- # In this case, we need to parse the real field, not the representation of the replaced field value
- self.doSaveData(command['Value'], lValue2, lLocatorType, lLocator)
- elif lActivity == 'SUBMIT':
- self.browserSession.submit()
- elif lActivity == "ADDRESS_CREATE":
- # Create Address with option lValue and lValue2
- AddressCreate(lValue, lValue2)
- # Update testcaseDataDict with addressDict returned from
- AddressCreate.returnAddress()
- self.testcaseDataDict.update(AddressCreate.returnAddress())
- elif lActivity == 'ASSERT':
- value_found = self.browserSession.findByAndWaitForValue(xpath=xpath, css=css, id=id, optional=lOptional,
- timeout=lTimeout)
- if not self.__doComparisons(lComparison=lComparison, value1=value_found, value2=lValue):
- raise baangtTestStepException(f"Expected Value: {lValue}, Value found :{value_found} ")
- elif lActivity == 'IBAN':
- # Create Random IBAN. Value1 = Input-Parameter for IBAN-Function. Value2=Fieldname
- self.__getIBAN(lValue, lValue2)
- elif lActivity == 'PDFCOMPARE':
- self.doPDFComparison(lValue)
- elif lActivity == 'CHECKLINKS':
- self.checkLinks()
- elif lActivity == 'ALERTIF':
- self.browserSession.confirmAlertIfAny()
- elif lActivity == GC.TESTCASESTATUS_STOP.upper():
- self.testcaseDataDict[GC.TESTCASESTATUS_STOP] = "X" # will stop the test case
- elif lActivity == GC.TESTCASESTATUS_STOPERROR.upper():
- self.testcaseDataDict[GC.TESTCASESTATUS_STOPERROR] = "X" # will stop the test case and set error
- elif lActivity[0:3] == "ZZ_":
- # custom command. Do nothing and return
- return
- else:
- raise BaseException(f"Unknown command in TestStep {lActivity}")
- self.timing.takeTime(lTimingString)
- def doPDFComparison(self, lValue, lFieldnameForResults="DOC_Compare"):
- lFiles = self.browserSession.findNewFiles()
- if len(lFiles) > 1:
- # fixme: Do something! There were more than 1 files since last check. Damn
- logger.critical(f"There were {len(lFiles)} files new since last check. Can't handle that. ")
- raise Exception
- elif len(lFiles) == 1:
- # Wonderful. Let's do the PDF-Comparison
- lPDFDataClass = PDFCompareDetails()
- lPDFDataClass.fileName = lFiles[0][0]
- lPDFDataClass.referenceID = lValue
- lDict = {"": lPDFDataClass}
- lPDFCompare = PDFCompare()
- lDict = lPDFCompare.compare_multiple(lDict)
- self.testcaseDataDict[lFieldnameForResults + "_Status"] = lDict[""].Status
- self.testcaseDataDict[lFieldnameForResults + "_Results"] = lDict[""].StatusText
- def replaceAllVariables(self, lValue, lValue2, replaceFromDict=None):
- # Replace variables from data file
- try:
- if len(lValue) > 0:
- lValue = self.replaceVariables(lValue, replaceFromDict=replaceFromDict)
- if len(lValue2) > 0:
- lValue2 = self.replaceVariables(lValue2, replaceFromDict=replaceFromDict)
- except Exception as ex:
- logger.info(ex)
- return lValue, lValue2
- def __getIBAN(self, lValue, lValue2):
- from baangt.base.IBAN import IBAN
- if not lValue2:
- logger.critical("IBAN-Method called without destination field name in column 'Value 2'")
- return
- lIBAN = IBAN()
- self.testcaseDataDict[lValue2] = lIBAN.getRandomIBAN()
- pass
- def checkLinks(self):
- """
- Will check all links on the current webpage
- Result will be written into "CheckedLinks" in TestDataDict
- If theres a returncode >= 400 in the list, we'll mark the testcase as failed
- """
- if self.testcaseDataDict.get("CheckedLinks"):
- self.testcaseDataDict["Result_CheckedLinks"] += self.browserSession.checkLinks()
- else:
- self.testcaseDataDict["Result_CheckedLinks"] = self.browserSession.checkLinks()
- for entry in self.testcaseDataDict["Result_CheckedLinks"]:
- if entry[0] > 400:
- self.testcaseDataDict[GC.TESTCASESTATUS] = GC.TESTCASESTATUS_ERROR
- self.testcaseDataDict[GC.TESTCASEERRORLOG] = self.testcaseDataDict.get(GC.TESTCASEERRORLOG, "") \
- + "URL-Checker error"
- break
- @staticmethod
- def ifQualifyForExecution(version_global, version_line):
- """ This function will test version_global and version_line
- @return True or False
- """
- if not version_global.strip():
- # No value is defined in Release, return True
- return True
- if not version_line.strip():
- # we skipped this line
- return True
- # split the version line
- if not len(version_line.strip().split(" ")) == 2:
- logger.debug(f"Invalid release format {version_line} ")
- return True
- comp_operator, version = version_line.strip().split(" ")
- if comp_operator == "<":
- return parse_version(version_global) < parse_version(version)
- elif comp_operator == ">":
- return parse_version(version_global) > parse_version(version)
- elif comp_operator == ">=":
- return parse_version(version_global) >= parse_version(version)
- elif comp_operator == "<=":
- return parse_version(version_global) <= parse_version(version)
- elif comp_operator == "=" or comp_operator == "==":
- return parse_version(version_global) == parse_version(version)
- else:
- logger.debug(f"Global version {version_global}, line version {version_line} ")
- return False
- def doSaveData(self, toField, valueForField, lLocatorType, lLocator):
- """
- Save fields. Either from an existing DICT (usually in API-Mode) or from a Webelement (in Browser-Mode)
- :param toField:
- :param valueForField:
- :param lLocatorType:
- :param lLocator:
- :return: no return parameter. The implicit return is a value in a field.
- """
- if self.testCase[1][GC.KWARGS_TESTCASETYPE] == GC.KWARGS_BROWSER:
- xpath, css, id = TestStepMaster.__setLocator(lLocatorType, lLocator)
- self.testcaseDataDict[toField] = self.browserSession.findByAndWaitForValue(xpath=xpath, css=css, id=id)
- logger.debug(f"Saved {self.testcaseDataDict[toField]} to {toField}")
- elif self.testCase[1][GC.KWARGS_TESTCASETYPE] == GC.KWARGS_API_SESSION:
- self.testcaseDataDict[toField] = valueForField
- else:
- sys.exit("Testcase Type not supported")
- @staticmethod
- def __setLocator(lLocatorType, lLocator):
- return utils.setLocatorFromLocatorType(lLocatorType, lLocator)
- @staticmethod
- def _setTimeout(lTimeout):
- return 20 if not lTimeout else float(lTimeout)
- def __doComparisons(self, lComparison, value1, value2):
- if isinstance(value1, bool) or isinstance(value2, bool):
- value1 = utils.anything2Boolean(value1)
- value2 = utils.anything2Boolean(value2)
- if value2 == 'None':
- value2 = None
- if lComparison == "=":
- if value1 == value2:
- self.manageNestedCondition(condition="IF", ifis=True)
- else:
- self.manageNestedCondition(condition="IF", ifis=False)
- elif lComparison == "!=":
- self.ifIsTrue = False if value1 == value2 else True
- elif lComparison == ">":
- if value1 > value2:
- self.manageNestedCondition(condition="IF", ifis=True)
- else:
- self.manageNestedCondition(condition="IF", ifis=False)
- elif lComparison == "<":
- if value1 < value2:
- self.manageNestedCondition(condition="IF", ifis=True)
- else:
- self.manageNestedCondition(condition="IF", ifis=False)
- elif not lComparison: # Check only, if Value1 has a value.
- val = True if value1 else False
- self.manageNestedCondition(condition="IF", ifis=val)
- else:
- raise BaseException(f"Comparison Operator not supported/unknown {lComparison}")
- return self.ifIsTrue
- def execute(self):
- """Method is overwritten in all children/subclasses"""
- pass
- def teardown(self):
- if self.testCaseStatus:
- if not self.testcaseDataDict[GC.TESTCASESTATUS]:
- self.testcaseDataDict[GC.TESTCASESTATUS] = self.testCaseStatus
- elif self.testCaseStatus == GC.TESTCASESTATUS_SUCCESS:
- # We'll not overwrite a potential Error Status of the testcase
- pass
- elif self.testCaseStatus == GC.TESTCASESTATUS_ERROR:
- self.testcaseDataDict = GC.TESTCASESTATUS_ERROR
- else:
- sys.exit("No idea, what happened here. Unknown condition appeared")
- self.timing.takeTime(self.timingName)
- def replaceVariables(self, expression, replaceFromDict=None):
- """
- The syntax for variables is currently $(<column_name_from_data_file>). Multiple variables can be assigned
- in one cell, for instance perfectly fine: "http://$(BASEURL)/$(ENDPOINT)"
- There's a special syntax for the faker module: $(FAKER.<fakermethod>).
- Also a special syntax for API-Handling: $(APIHandling.<DictElementName>).
- @param expression: the current cell, either as fixed value, e.g. "Franzi" or with a varible $(DATE)
- @return: the replaced value, e.g. if expression was $(DATE) and the value in column "DATE" of data-file was
- "01.01.2020" then return will be "01.01.2020"
- """
- if "$(" not in expression:
- return expression
- while "$(" in expression:
- if expression[0:2] == "$(":
- left_part = ""
- else:
- left_part = expression.split("$(")[0]
- center = expression[len(left_part) + 2:]
- center = center.split(")")[0]
- right_part = expression[len(left_part) + len(center) + 3:]
- centerValue = ""
- if replaceFromDict: # json is supplied with repeat tag, that json is used here to get main data
- dic = replaceFromDict
- for key in center.split('.')[-1:]:
- dic = self.iterate_json(dic, key)
- centerValue = dic
- if centerValue: # if we got value from the passed json then bypass this if else conditions
- pass
- elif "." not in center:
- # Replace the variable with the value from data structure
- centerValue = self.testcaseDataDict.get(center)
- else:
- # This is a reference to a DICT with ".": for instance APIHandling.AnswerContent("<bla>")
- dictVariable = center.split(".")[0]
- dictValue = center.split(".")[1]
- if dictVariable == 'ANSWER_CONTENT':
- centerValue = self.apiSession.session[1].answerJSON.get(dictValue, "Empty")
- elif dictVariable == 'FAKER':
- # This is to call Faker Module with the Method, that is given after the .
- centerValue = self._getFakerData(dictValue)
- elif self.testcaseDataDict.get(dictVariable):
- dic = self.testcaseDataDict.get(dictVariable)
- for key in center.split('.')[1:]:
- dic = self.iterate_json(dic, key)
- centerValue = dic
- else:
- raise BaseException(f"Missing code to replace value for: {center}")
- if not centerValue:
- if center in self.testcaseDataDict.keys():
- # The variable exists, but has no value.
- centerValue = ""
- else:
- raise BaseException(f"Variable not found: {center}, input parameter was: {expression}")
- expression = "".join([left_part, str(centerValue), right_part])
- return expression
- def iterate_json(self, data, key):
- # itereate through list of json and create list of data from dictionary inside those list
- if type(data) is list:
- lis = []
- for dt in data:
- dt = self.iterate_json(dt, key)
- lis.append(dt)
- return lis
- elif type(data) is dict:
- return data.get(key)
- def _getFakerData(self, fakerMethod):
- if not self.baangtFaker:
- self.baangtFaker = baangtFaker()
- logger.debug(f"Calling faker with method: {fakerMethod}")
- return self.baangtFaker.fakerProxy(fakerMethod=fakerMethod)
|