|
@@ -10,10 +10,41 @@ from pathlib import Path
|
|
|
import xl2dict
|
|
|
import re
|
|
|
from random import randint
|
|
|
+from openpyxl import load_workbook
|
|
|
+from baangt.TestDataGenerator.TestDataGenerator import TestDataGenerator
|
|
|
|
|
|
logger = logging.getLogger("pyC")
|
|
|
|
|
|
|
|
|
+class Writer:
|
|
|
+ """
|
|
|
+ This class is made to update existing excel file.
|
|
|
+ First it will open the file in python and then we can do multiple writes and once everything is update we can use
|
|
|
+ save method in order to save the updated excel file. Hence, this class is very useful is saving time while updating
|
|
|
+ excel files.
|
|
|
+ """
|
|
|
+ def __init__(self, path):
|
|
|
+ self.path = path
|
|
|
+ self.workbook = load_workbook(self.path)
|
|
|
+
|
|
|
+ def write(self, row, data, sht):
|
|
|
+ # Update the values using row and col number.
|
|
|
+ # Note :- We are using openpyxl so row & column index will start from 1 instead of 0
|
|
|
+ column = 0
|
|
|
+ sheet = self.workbook[sht]
|
|
|
+ headers = next(sheet.rows)
|
|
|
+ for header in headers: # checks if usecount header is present in sheet
|
|
|
+ if "usecount" in str(header.value).lower():
|
|
|
+ column = headers.index(header) + 1
|
|
|
+ if column:
|
|
|
+ sheet.cell(row, column).value = data
|
|
|
+
|
|
|
+ def save(self):
|
|
|
+ # Call this method to save the file once every updates are written
|
|
|
+ self.workbook.save(self.path)
|
|
|
+ self.workbook.close()
|
|
|
+
|
|
|
+
|
|
|
class HandleDatabase:
|
|
|
def __init__(self, linesToRead, globalSettings=None):
|
|
|
self.lineNumber = 3
|
|
@@ -113,6 +144,13 @@ class HandleDatabase:
|
|
|
# read header values into the list
|
|
|
keys = [sheet.cell(0, col_index).value for col_index in range(sheet.ncols)]
|
|
|
|
|
|
+ # if testresult header is present then taking its index, which is later used as column number
|
|
|
+ testrun_index = [keys.index(x) for x in keys if str(x).lower() == "testresult"]
|
|
|
+ if testrun_index:
|
|
|
+ testrun_index = testrun_index[0] + 1 # adding +1 value which is the correct column position
|
|
|
+ else: # if list is empty that means their is no testresult header
|
|
|
+ testrun_index = 0
|
|
|
+
|
|
|
for row_index in range(1, sheet.nrows):
|
|
|
temp_dic = {}
|
|
|
for col_index in range(sheet.ncols):
|
|
@@ -121,10 +159,18 @@ class HandleDatabase:
|
|
|
temp_dic[keys[col_index]] = repr(temp_dic[keys[col_index]])
|
|
|
if temp_dic[keys[col_index]][-2:] == ".0":
|
|
|
temp_dic[keys[col_index]] = temp_dic[keys[col_index]][:-2]
|
|
|
-
|
|
|
+ # row, column, sheetName & fileName which are later used in updating source testrun file
|
|
|
+ temp_dic["testcase_row"] = row_index
|
|
|
+ temp_dic["testcase_sheet"] = sheetName
|
|
|
+ temp_dic["testcase_file"] = fileName
|
|
|
+ temp_dic["testcase_column"] = testrun_index
|
|
|
self.dataDict.append(temp_dic)
|
|
|
+ self.usecount_dict = {} # used to maintain usecount limit record and verify if that non of the data cross limit
|
|
|
+ self.writer = Writer(fileName) # Writer class object to save file only in end, which will save time
|
|
|
|
|
|
- for temp_dic in self.dataDict:
|
|
|
+ def update_datarecords(self, dataDict, fileName):
|
|
|
+ for td in dataDict:
|
|
|
+ temp_dic = dataDict[td]
|
|
|
new_data_dic = {}
|
|
|
for keys in temp_dic:
|
|
|
if type(temp_dic[keys]) != str:
|
|
@@ -150,6 +196,8 @@ class HandleDatabase:
|
|
|
new_data_dic[data] = rre_data[data]
|
|
|
elif str(temp_dic[keys][:4]) == "RLP_":
|
|
|
temp_dic[keys] = self.rlp_process(temp_dic[keys], fileName)
|
|
|
+ elif str(temp_dic[keys][:5]).upper() == "RENV_":
|
|
|
+ temp_dic[keys] = str(TestDataGenerator.get_env_variable(temp_dic[keys][5:]))
|
|
|
else:
|
|
|
try:
|
|
|
js = json.loads(temp_dic[keys])
|
|
@@ -158,6 +206,7 @@ class HandleDatabase:
|
|
|
pass
|
|
|
for key in new_data_dic:
|
|
|
temp_dic[key] = new_data_dic[key]
|
|
|
+ self.writer.save() # saving source input file once everything is done
|
|
|
|
|
|
def rlp_process(self, string, fileName):
|
|
|
# Will get real data from rlp_ prefix string
|
|
@@ -202,7 +251,8 @@ class HandleDatabase:
|
|
|
lAppend = False
|
|
|
return lAppend
|
|
|
|
|
|
- def __processRrd(self, sheet_name, data_looking_for, data_to_match: dict, sheet_dict=None, caller="RRD_"):
|
|
|
+ def __processRrd(self, sheet_name, data_looking_for, data_to_match: dict, sheet_dict=None, caller="RRD_",
|
|
|
+ file_name=None):
|
|
|
"""
|
|
|
For more detail please refer to TestDataGenerator.py
|
|
|
:param sheet_name:
|
|
@@ -213,26 +263,65 @@ class HandleDatabase:
|
|
|
sheet_dict = self.sheet_dict if sheet_dict is None else sheet_dict
|
|
|
matching_data = [list(x) for x in itertools.product(*[data_to_match[key] for key in data_to_match])]
|
|
|
assert sheet_name in sheet_dict, \
|
|
|
- f"Excel file doesn't contain {sheet_name} sheet. Please recheck. Called in '{caller}'"
|
|
|
+ f"Excel file {file_name} doesn't contain {sheet_name} sheet. Please recheck. Called in '{caller}'"
|
|
|
base_sheet = sheet_dict[sheet_name]
|
|
|
data_lis = []
|
|
|
if type(data_looking_for) == str:
|
|
|
data_looking_for = data_looking_for.split(",")
|
|
|
|
|
|
+ usecount, limit, usecount_header = self.check_usecount(base_sheet[0])
|
|
|
+
|
|
|
for data in base_sheet:
|
|
|
+ dt = ""
|
|
|
if len(matching_data) == 1 and len(matching_data[0]) == 0:
|
|
|
if data_looking_for[0] == "*":
|
|
|
- data_lis.append(data)
|
|
|
+ dt = data
|
|
|
+ data_lis.append(dt)
|
|
|
else:
|
|
|
- data_lis.append({keys: data[keys] for keys in data_looking_for})
|
|
|
+ dt = {keys: data[keys] for keys in data_looking_for}
|
|
|
+ data_lis.append(dt)
|
|
|
else:
|
|
|
if [data[key] for key in data_to_match] in matching_data:
|
|
|
if data_looking_for[0] == "*":
|
|
|
- data_lis.append(data)
|
|
|
+ dt = data
|
|
|
+ data_lis.append(dt)
|
|
|
+ else:
|
|
|
+ dt = {keys: data[keys] for keys in data_looking_for}
|
|
|
+ data_lis.append(dt)
|
|
|
+ if dt:
|
|
|
+ if repr(dt) not in self.usecount_dict:
|
|
|
+ if usecount_header:
|
|
|
+ if data[usecount_header]:
|
|
|
+ used_limit = int(data[usecount_header])
|
|
|
+ else:
|
|
|
+ used_limit = 0
|
|
|
else:
|
|
|
- data_lis.append({keys: data[keys] for keys in data_looking_for})
|
|
|
+ used_limit = 0
|
|
|
+ self.usecount_dict[repr(dt)] = {
|
|
|
+ "use": used_limit, "limit": limit, "index": base_sheet.index(data) + 2, "sheet_name": sheet_name
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ if limit:
|
|
|
+ if not self.usecount_dict[repr(dt)]["use"] < self.usecount_dict[repr(dt)]["limit"]:
|
|
|
+ data_lis.remove(dt)
|
|
|
return data_lis
|
|
|
|
|
|
+ def check_usecount(self, data):
|
|
|
+ # used to find and return if their is usecount header and limit in input file
|
|
|
+ usecount = False
|
|
|
+ limit = 0
|
|
|
+ usecount_header = None
|
|
|
+ for header in data:
|
|
|
+ if "usecount" in header.lower():
|
|
|
+ usecount = True
|
|
|
+ usecount_header = header
|
|
|
+ if "usecount_" in header.lower():
|
|
|
+ try:
|
|
|
+ limit = int(header.lower().strip().split("count_")[1])
|
|
|
+ except:
|
|
|
+ limit = 0
|
|
|
+ return usecount, limit, usecount_header
|
|
|
+
|
|
|
def __rrd_string_to_python(self, raw_data, fileName):
|
|
|
"""
|
|
|
Convert string to python data types
|
|
@@ -259,11 +348,18 @@ class HandleDatabase:
|
|
|
second_value = self.__splitList(second_value)
|
|
|
if first_value not in self.sheet_dict:
|
|
|
self.sheet_dict, _ = self.__read_excel(path=fileName)
|
|
|
- processed_datas = self.__processRrd(first_value, second_value, evaluated_dict)
|
|
|
- assert len(processed_datas)>0, f"No matching data for RRD_. Please check the input file. Was searching for " \
|
|
|
+ processed_datas = self.__processRrd(first_value, second_value, evaluated_dict, file_name=fileName)
|
|
|
+ assert len(processed_datas)>0, f"No matching data for RRD_. Please check the input file {fileName}. Was searching for " \
|
|
|
f"{first_value}, {second_value} and {str(evaluated_dict)} " \
|
|
|
- f"but didn't find anything"
|
|
|
- return processed_datas[randint(0, len(processed_datas)-1)]
|
|
|
+ f"but didn't find anything. Also please check the usecount limit if their is any."
|
|
|
+ final_data = processed_datas[randint(0, len(processed_datas)-1)]
|
|
|
+ if repr(final_data) in self.usecount_dict:
|
|
|
+ self.usecount_dict[repr(final_data)]["use"] += 1
|
|
|
+ self.writer.write(
|
|
|
+ self.usecount_dict[repr(final_data)]["index"], self.usecount_dict[repr(final_data)]["use"],
|
|
|
+ self.usecount_dict[repr(final_data)]["sheet_name"]
|
|
|
+ )
|
|
|
+ return final_data
|
|
|
|
|
|
def __rre_string_to_python(self, raw_data):
|
|
|
"""
|
|
@@ -295,7 +391,8 @@ class HandleDatabase:
|
|
|
assert len(processed_datas)>0, f"No matching data for RRD_. Please check the input file. Was searching for " \
|
|
|
f"{first_value}, {second_value} and {str(evaluated_dict)} " \
|
|
|
f"but didn't find anything"
|
|
|
- return processed_datas[randint(0, len(processed_datas)-1)]
|
|
|
+ final_data = randint(0, len(processed_datas)-1)
|
|
|
+ return processed_datas[final_data]
|
|
|
|
|
|
def __rlp_string_to_python(self, raw_data, fileName):
|
|
|
# will convert rlp string to python
|
|
@@ -386,7 +483,7 @@ class HandleDatabase:
|
|
|
if sheet_name == "":
|
|
|
base_sheet = sheet_dict[sheet_lis[0]]
|
|
|
else:
|
|
|
- assert sheet_name in sheet_dict, f"Excel file doesn't contain {sheet_name} sheet. Please recheck."
|
|
|
+ assert sheet_name in sheet_dict, f"Excel file {path} doesn't contain {sheet_name} sheet. Please recheck."
|
|
|
base_sheet = sheet_dict[sheet_name]
|
|
|
self.sheet_dict = sheet_dict
|
|
|
self.base_sheet = base_sheet
|