HandleDatabase.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. import logging
  2. from xlrd3 import open_workbook
  3. import itertools
  4. import json
  5. import baangt.base.CustGlobalConstants as CGC
  6. import baangt.base.GlobalConstants as GC
  7. from baangt.base.Utils import utils
  8. import baangt.TestSteps.Exceptions
  9. from pathlib import Path
  10. import xl2dict
  11. import re
  12. from random import randint
  13. logger = logging.getLogger("pyC")
  14. class HandleDatabase:
  15. def __init__(self, linesToRead, globalSettings=None):
  16. self.lineNumber = 3
  17. # FIXME: This is still not clean. GlobalSettings shouldn't be predefined in CustomConstants-Class
  18. self.globals = {
  19. CGC.CUST_TOASTS: "",
  20. GC.EXECUTION_STAGE: "",
  21. GC.TESTCASEERRORLOG: "",
  22. CGC.VIGOGFNUMMER: "",
  23. CGC.SAPPOLNR: "",
  24. CGC.PRAEMIE: "",
  25. CGC.POLNRHOST: "",
  26. GC.TESTCASESTATUS: "",
  27. GC.TIMING_DURATION: "",
  28. GC.SCREENSHOTS: "",
  29. GC.TIMELOG: "",
  30. }
  31. if globalSettings:
  32. for setting, value in globalSettings.items():
  33. self.globals[setting] = value
  34. self.range = self.__buildRangeOfRecords(linesToRead)
  35. self.rangeDict = {}
  36. self.__buildRangeDict()
  37. self.df_json = None
  38. self.dataDict = []
  39. self.recordPointer = 0
  40. self.sheet_dict = {}
  41. def __buildRangeDict(self):
  42. """
  43. Interprets the Range and creates a DICT of values, that we can loop over later
  44. @return: Creates empty self.rangeDict
  45. """
  46. for lRangeLine in self.range:
  47. for x in range(lRangeLine[0], lRangeLine[1]+1):
  48. self.rangeDict[x] = ""
  49. def __buildRangeOfRecords(self, rangeFromConfigFile):
  50. lRange = []
  51. if not rangeFromConfigFile:
  52. # No selection - means all records
  53. return [[0,99999]]
  54. else:
  55. # Format: 4;6-99;17-200;203 or
  56. # Format: 4,6-100,800-1000
  57. if ";" in rangeFromConfigFile:
  58. for kombination in rangeFromConfigFile.split(";"):
  59. lRange.append(HandleDatabase.__buildRangeOfRecordsOneEntry(kombination))
  60. elif "," in rangeFromConfigFile:
  61. for kombination in rangeFromConfigFile.split(","):
  62. lRange.append(HandleDatabase.__buildRangeOfRecordsOneEntry(kombination))
  63. else:
  64. lRange.append(HandleDatabase.__buildRangeOfRecordsOneEntry(rangeFromConfigFile))
  65. # Make sure these are numbers:
  66. for lRangeLine in lRange:
  67. lRangeLine[0] = HandleDatabase.__sanitizeNumbers(lRangeLine[0])
  68. lRangeLine[1] = HandleDatabase.__sanitizeNumbers(lRangeLine[1])
  69. return lRange
  70. @staticmethod
  71. def __sanitizeNumbers(numberIn):
  72. if isinstance(numberIn, dict):
  73. numberIn = numberIn['default']
  74. try:
  75. return int(numberIn.strip())
  76. except:
  77. return 0
  78. numberIn = numberIn.strip()
  79. return int(numberIn)
  80. @staticmethod
  81. def __buildRangeOfRecordsOneEntry(rangeIn):
  82. if "-" in rangeIn:
  83. # This is a range (17-22)
  84. return HandleDatabase.__buildRangeOfRecordsSingleRange(rangeIn)
  85. else:
  86. # This is a single entry:
  87. return [rangeIn, rangeIn]
  88. @staticmethod
  89. def __buildRangeOfRecordsSingleRange(rangeIn):
  90. lSplit = rangeIn.split("-")
  91. return [lSplit[0], lSplit[1]]
  92. def read_excel(self, fileName, sheetName):
  93. fileName = utils.findFileAndPathFromPath(fileName)
  94. if not fileName:
  95. logger.critical(f"Can't open file: {fileName}")
  96. return
  97. book = open_workbook(fileName)
  98. sheet = book.sheet_by_name(sheetName)
  99. # read header values into the list
  100. keys = [sheet.cell(0, col_index).value for col_index in range(sheet.ncols)]
  101. # if testresult header is present then taking its index, which is later used as column number
  102. testrun_index = [keys.index(x) for x in keys if str(x).lower() == "testresult"]
  103. if testrun_index:
  104. testrun_index = testrun_index[0] + 1 # adding +1 value which is the correct column position
  105. else: # if list is empty that means their is no testresult header
  106. testrun_index = 0
  107. for row_index in range(1, sheet.nrows):
  108. temp_dic = {}
  109. for col_index in range(sheet.ncols):
  110. temp_dic[keys[col_index]] = sheet.cell(row_index, col_index).value
  111. if type(temp_dic[keys[col_index]]) == float:
  112. temp_dic[keys[col_index]] = repr(temp_dic[keys[col_index]])
  113. if temp_dic[keys[col_index]][-2:] == ".0":
  114. temp_dic[keys[col_index]] = temp_dic[keys[col_index]][:-2]
  115. # row, column, sheetName & fileName which are later used in updating source testrun file
  116. temp_dic["testcase_row"] = row_index
  117. temp_dic["testcase_sheet"] = sheetName
  118. temp_dic["testcase_file"] = fileName
  119. temp_dic["testcase_column"] = testrun_index
  120. self.dataDict.append(temp_dic)
  121. for temp_dic in self.dataDict:
  122. new_data_dic = {}
  123. for keys in temp_dic:
  124. if type(temp_dic[keys]) != str:
  125. continue
  126. if '$(' in str(temp_dic[keys]):
  127. while '$(' in str(temp_dic[keys]):
  128. start_index = temp_dic[keys].index('$(')
  129. end_index = temp_dic[keys][start_index:].index(')')+start_index
  130. data_to_replace_with = temp_dic[temp_dic[keys][start_index+2:end_index]]
  131. temp_dic[keys] = temp_dic[keys].replace(
  132. temp_dic[keys][start_index:end_index+1], data_to_replace_with
  133. )
  134. if str(temp_dic[keys])[:4] == "RRD_":
  135. rrd_string = self.__process_rrd_string(temp_dic[keys])
  136. rrd_data = self.__rrd_string_to_python(rrd_string[4:], fileName)
  137. for data in rrd_data:
  138. new_data_dic[data] = rrd_data[data]
  139. elif str(temp_dic[keys][:4]) == "RRE_":
  140. rre_string = self.__process_rre_string(temp_dic[keys])
  141. rre_data = self.__rre_string_to_python(rre_string[4:])
  142. for data in rre_data:
  143. new_data_dic[data] = rre_data[data]
  144. elif str(temp_dic[keys][:4]) == "RLP_":
  145. temp_dic[keys] = self.rlp_process(temp_dic[keys], fileName)
  146. else:
  147. try:
  148. js = json.loads(temp_dic[keys])
  149. temp_dic[keys] = js
  150. except:
  151. pass
  152. for key in new_data_dic:
  153. temp_dic[key] = new_data_dic[key]
  154. def rlp_process(self, string, fileName):
  155. # Will get real data from rlp_ prefix string
  156. rlp_string = self.__process_rlp_string(string)[5:-1]
  157. rlp_data = self.__rlp_string_to_python(rlp_string, fileName)
  158. data = rlp_data
  159. self.rlp_iterate(data, fileName)
  160. return data
  161. def rlp_iterate(self, data, fileName):
  162. # Rlp datas are stored in either json or list. This function will loop on every data and convert every
  163. # Rlp string to data
  164. if type(data) is list:
  165. for dt in data:
  166. if type(dt) is str:
  167. dt = self.rlp_iterate(dt, fileName)
  168. elif type(dt) is dict or type(dt) is list:
  169. dt = self.rlp_iterate(dt, fileName)
  170. elif type(data) is dict:
  171. for key in data:
  172. if type(data[key]) is list or type(data[key]) is dict:
  173. data[key] = self.rlp_iterate(data[key], fileName)
  174. elif type(data[key]) is str:
  175. data[key] = self.rlp_iterate(data[key], fileName)
  176. elif type(data) is str:
  177. if data[:4] == "RLP_":
  178. data = self.rlp_process(data, fileName)
  179. return data
  180. def __compareEqualStageInGlobalsAndDataRecord(self, currentNewRecordDict:dict) -> bool:
  181. """
  182. As method name says, compares, whether Stage in Global-settings is equal to stage in Data Record,
  183. so that this record might be excluded, if it's for the wrong Stage.
  184. :param currentNewRecordDict: The current Record
  185. :return: Boolean
  186. """
  187. lAppend = True
  188. if self.globals.get(GC.EXECUTION_STAGE):
  189. if currentNewRecordDict.get(GC.EXECUTION_STAGE):
  190. if currentNewRecordDict[GC.EXECUTION_STAGE] != self.globals[GC.EXECUTION_STAGE]:
  191. lAppend = False
  192. return lAppend
  193. def __processRrd(self, sheet_name, data_looking_for, data_to_match: dict, sheet_dict=None, caller="RRD_"):
  194. """
  195. For more detail please refer to TestDataGenerator.py
  196. :param sheet_name:
  197. :param data_looking_for:
  198. :param data_to_match:
  199. :return: dictionary of TargetData
  200. """
  201. sheet_dict = self.sheet_dict if sheet_dict is None else sheet_dict
  202. matching_data = [list(x) for x in itertools.product(*[data_to_match[key] for key in data_to_match])]
  203. assert sheet_name in sheet_dict, \
  204. f"Excel file doesn't contain {sheet_name} sheet. Please recheck. Called in '{caller}'"
  205. base_sheet = sheet_dict[sheet_name]
  206. data_lis = []
  207. if type(data_looking_for) == str:
  208. data_looking_for = data_looking_for.split(",")
  209. for data in base_sheet:
  210. if len(matching_data) == 1 and len(matching_data[0]) == 0:
  211. if data_looking_for[0] == "*":
  212. data_lis.append(data)
  213. else:
  214. data_lis.append({keys: data[keys] for keys in data_looking_for})
  215. else:
  216. if [data[key] for key in data_to_match] in matching_data:
  217. if data_looking_for[0] == "*":
  218. data_lis.append(data)
  219. else:
  220. data_lis.append({keys: data[keys] for keys in data_looking_for})
  221. return data_lis
  222. def __rrd_string_to_python(self, raw_data, fileName):
  223. """
  224. Convert string to python data types
  225. :param raw_data:
  226. :return:
  227. """
  228. first_value = raw_data[1:-1].split(',')[0].strip()
  229. second_value = raw_data[1:-1].split(',')[1].strip()
  230. if second_value[0] == "[":
  231. second_value = ','.join(raw_data[1:-1].split(',')[1:]).strip()
  232. second_value = second_value[:second_value.index(']') + 1]
  233. third_value = [x.strip() for x in ']'.join(raw_data[1:-1].split(']')[1:]).split(',')[1:]]
  234. else:
  235. third_value = [x.strip() for x in raw_data[1:-1].split(',')[2:]]
  236. evaluated_list = ']],'.join(','.join(third_value)[1:-1].strip().split('],')).split('],')
  237. if evaluated_list[0] == "":
  238. evaluated_dict = {}
  239. else:
  240. evaluated_dict = {
  241. splited_data.split(':')[0]: self.__splitList(splited_data.split(':')[1]) for splited_data in
  242. evaluated_list
  243. }
  244. if second_value[0] == "[" and second_value[-1] == "]":
  245. second_value = self.__splitList(second_value)
  246. if first_value not in self.sheet_dict:
  247. self.sheet_dict, _ = self.__read_excel(path=fileName)
  248. processed_datas = self.__processRrd(first_value, second_value, evaluated_dict)
  249. assert len(processed_datas)>0, f"No matching data for RRD_. Please check the input file. Was searching for " \
  250. f"{first_value}, {second_value} and {str(evaluated_dict)} " \
  251. f"but didn't find anything"
  252. return processed_datas[randint(0, len(processed_datas)-1)]
  253. def __rre_string_to_python(self, raw_data):
  254. """
  255. Convert string to python data types
  256. :param raw_data:
  257. :return:
  258. """
  259. file_name = raw_data[1:-1].split(',')[0].strip()
  260. sheet_dict, _ = self.__read_excel(file_name)
  261. first_value = raw_data[1:-1].split(',')[1].strip()
  262. second_value = raw_data[1:-1].split(',')[2].strip()
  263. if second_value[0] == "[":
  264. second_value = ','.join(raw_data[1:-1].split(',')[2:]).strip()
  265. second_value = second_value[:second_value.index(']') + 1]
  266. third_value = [x.strip() for x in ']'.join(raw_data[1:-1].split(']')[1:]).split(',')[1:]]
  267. else:
  268. third_value = [x.strip() for x in raw_data[1:-1].split(',')[3:]]
  269. evaluated_list = ']],'.join(','.join(third_value)[1:-1].strip().split('],')).split('],')
  270. if evaluated_list[0] == "":
  271. evaluated_dict = {}
  272. else:
  273. evaluated_dict = {
  274. splited_data.split(':')[0]: self.__splitList(splited_data.split(':')[1]) for splited_data in
  275. evaluated_list
  276. }
  277. if second_value[0] == "[" and second_value[-1] == "]":
  278. second_value = self.__splitList(second_value)
  279. processed_datas = self.__processRrd(first_value, second_value, evaluated_dict, sheet_dict, caller="RRE_")
  280. assert len(processed_datas)>0, f"No matching data for RRD_. Please check the input file. Was searching for " \
  281. f"{first_value}, {second_value} and {str(evaluated_dict)} " \
  282. f"but didn't find anything"
  283. return processed_datas[randint(0, len(processed_datas)-1)]
  284. def __rlp_string_to_python(self, raw_data, fileName):
  285. # will convert rlp string to python
  286. sheetName = raw_data.split(',')[0].strip()
  287. headerName = raw_data.split(',')[1].strip().split('=')[0].strip()
  288. headerValue = raw_data.split(',')[1].strip().split('=')[1].strip()
  289. all_sheets, main_sheet = self.__read_excel(path=fileName, sheet_name=sheetName)
  290. data_list = []
  291. for data in main_sheet:
  292. main_value = data[headerName]
  293. if type(main_value) == float and str(main_value)[-2:] == '.0':
  294. main_value = str(int(main_value))
  295. if main_value.strip() == headerValue:
  296. for key in data:
  297. try:
  298. js = json.loads(data[key])
  299. data[key] = js
  300. except:
  301. pass
  302. data_list.append(data)
  303. return data_list
  304. def __process_rre_string(self, rre_string):
  305. """
  306. For more detail please refer to TestDataGenerator.py
  307. :param rre_string:
  308. :return:
  309. """
  310. processed_string = ','.join([word.strip() for word in rre_string.split(', ')])
  311. match = re.match(
  312. r"(RRE_(\(|\[))[\w\d\s\-./\\]+\.(xlsx|xls),[a-zA-z0-9\s]+,(\[?[a-zA-z\s,]+\]?|)|\*,\[([a-zA-z0-9\s]+:\[[a-zA-z0-9,\s]+\](,?))*\]",
  313. processed_string)
  314. err_string = f"{rre_string} not matching pattern RRE_(fileName, sheetName, TargetData," \
  315. f"[Header1:[Value1],Header2:[Value1,Value2]])"
  316. assert match, err_string
  317. return processed_string
  318. def __process_rrd_string(self, rrd_string):
  319. """
  320. For more detail please refer to TestDataGenerator.py
  321. :param rrd_string:
  322. :return:
  323. """
  324. processed_string = ','.join([word.strip() for word in rrd_string.split(', ')])
  325. match = re.match(
  326. r"(RRD_(\(|\[))[a-zA-z0-9\s]+,(\[?[a-zA-z\s,]+\]?|)|\*,\[([a-zA-z0-9\s]+:\[[a-zA-z0-9,\s]+\](,?))*\]",
  327. processed_string
  328. )
  329. err_string = f"{rrd_string} not matching pattern RRD_(sheetName,TargetData," \
  330. f"[Header1:[Value1],Header2:[Value1,Value2]])"
  331. assert match, err_string
  332. return processed_string
  333. def __process_rlp_string(self, rlp_string):
  334. processed_string = ','.join([word.strip() for word in rlp_string.split(', ')])
  335. match = re.match(
  336. r"(RLP_(\(|\[)).+,.+=.+(\]|\))",
  337. processed_string
  338. )
  339. err_string = f"{rlp_string} not matching pattern RLP_(sheetName,HeaderName=DataToMatch"
  340. assert match, err_string
  341. return processed_string
  342. def __splitList(self, raw_data):
  343. """
  344. Will convert string list to python list.
  345. i.e. "[value1,value2,value3]" ==> ["value1","value2","value3"]
  346. :param raw_data: string of list
  347. :return: Python list
  348. """
  349. proccesed_datas = [data.strip() for data in raw_data[1:-1].split(",")]
  350. return proccesed_datas
  351. def __read_excel(self, path, sheet_name=""):
  352. """
  353. For more detail please refer to TestDataGenerator.py
  354. :param path: Path to raw data xlsx file.
  355. :param sheet_name: Name of base sheet sheet where main input data is located. Default will be the first sheet.
  356. :return: Dictionary of all sheets and data, Dictionary of base sheet.
  357. """
  358. wb = open_workbook(path)
  359. sheet_lis = wb.sheet_names()
  360. sheet_dict = {}
  361. for sheet in sheet_lis:
  362. xl_obj = xl2dict.XlToDict()
  363. data = xl_obj.fetch_data_by_column_by_sheet_name(path,sheet_name=sheet)
  364. sheet_dict[sheet] = data
  365. if sheet_name == "":
  366. base_sheet = sheet_dict[sheet_lis[0]]
  367. else:
  368. assert sheet_name in sheet_dict, f"Excel file doesn't contain {sheet_name} sheet. Please recheck."
  369. base_sheet = sheet_dict[sheet_name]
  370. self.sheet_dict = sheet_dict
  371. self.base_sheet = base_sheet
  372. return sheet_dict, base_sheet
  373. def readNextRecord(self):
  374. """
  375. We built self.range during init. Now we need to iterate over the range(s) in range,
  376. find appropriate record and return that - one at a time
  377. @return:
  378. """
  379. if len(self.rangeDict) == 0:
  380. # All records were processed
  381. return None
  382. try:
  383. # the topmost record of the RangeDict (RangeDict was built by the range(s) from the TestRun
  384. # - 1 because there's a header line in the Excel-Sheet.
  385. lRecord = self.dataDict[(list(self.rangeDict.keys())[0])]
  386. while not self.__compareEqualStageInGlobalsAndDataRecord(lRecord):
  387. logger.debug(f"Skipped record {str(lRecord)[:30]} due to wrong stage: {lRecord[GC.EXECUTION_STAGE]} vs. "
  388. f"{self.globals[GC.EXECUTION_STAGE]}")
  389. self.rangeDict.pop(list(self.rangeDict.keys())[0])
  390. lRecord = self.dataDict[(list(self.rangeDict.keys())[0])]
  391. except Exception as e:
  392. logger.debug(f"Couldn't read record from database: {list(self.rangeDict.keys())[0]}")
  393. self.rangeDict.pop(list(self.rangeDict.keys())[0])
  394. return None
  395. # Remove the topmost entry from the rangeDict, so that next time we read the next entry in the lines above
  396. self.rangeDict.pop(list(self.rangeDict.keys())[0])
  397. return self.updateGlobals(lRecord)
  398. def readTestRecord(self, lineNumber=None):
  399. if lineNumber:
  400. self.lineNumber = lineNumber -1 # Base 0 vs. Base 1
  401. else:
  402. self.lineNumber += 1 # add 1 to read next line number
  403. try:
  404. record = self.dataDict[self.lineNumber]
  405. logger.info(f"Starting with Testrecord {self.lineNumber}, Details: " +
  406. str({k: record[k] for k in list(record)[0:5]}))
  407. return self.updateGlobals(record)
  408. except Exception as e:
  409. logger.critical(f"Couldn't read record# {self.lineNumber}")
  410. def updateGlobals(self, record):
  411. self.globals[CGC.CUST_TOASTS] = ""
  412. self.globals[GC.TESTCASEERRORLOG] = ""
  413. self.globals[GC.TIMING_DURATION] = ""
  414. self.globals[GC.TIMELOG] = ""
  415. self.globals[GC.TESTCASESTATUS] = ""
  416. self.globals[GC.SCREENSHOTS] = ""
  417. record.update(self.globals)
  418. return record