|
@@ -1,19 +1,83 @@
|
|
|
-import csv
|
|
|
import itertools
|
|
|
-import xlsxwriter
|
|
|
-import xl2dict
|
|
|
-import xlrd
|
|
|
import errno
|
|
|
import os
|
|
|
import logging
|
|
|
import faker
|
|
|
-from random import sample, randint
|
|
|
+from random import choice
|
|
|
import baangt.base.GlobalConstants as GC
|
|
|
import re
|
|
|
+import sys
|
|
|
+import pandas as pd
|
|
|
+from CloneXls import CloneXls
|
|
|
+import json
|
|
|
|
|
|
logger = logging.getLogger("pyC")
|
|
|
|
|
|
|
|
|
+class PrefixDataManager:
|
|
|
+ def __init__(self, dataList: list, prefix: str, tdg_object=None):
|
|
|
+ """
|
|
|
+ This class manages data list as per functionality of their prefix
|
|
|
+ :param dataList: List of data to be managed
|
|
|
+ :param prefix: RRE, RRD, FKR, RND are the prefixs managed by this class
|
|
|
+ :param tdg_object: TestDataGenerator object, only useful in RRE, RRD prefix to update usecount
|
|
|
+ """
|
|
|
+ self.dataList = dataList # Iterable object to be managed
|
|
|
+ self.prefix = prefix # Prefix data
|
|
|
+ self.tdg_object = tdg_object # used to update usecount data, only for rre and rrd
|
|
|
+ self.process()
|
|
|
+
|
|
|
+ def process(self):
|
|
|
+ """
|
|
|
+ It processes the data list with respect to their prefix and save it in dataList attribute.
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if self.prefix.lower() == "rrd" or self.prefix.lower() == "rre":
|
|
|
+ self.dataList = [
|
|
|
+ data for data in self.dataList if not self.tdg_object.usecountDataRecords[repr(data)]["limit"] or \
|
|
|
+ self.tdg_object.usecountDataRecords[repr(data)]['use'] < self.tdg_object.usecountDataRecords[repr(data)]['limit']
|
|
|
+ ] # removing data with reached use limit
|
|
|
+
|
|
|
+ elif self.prefix.lower() == "fkr":
|
|
|
+ fake = faker.Faker(self.dataList[1]) # Creating faker class object
|
|
|
+ fake_lis = []
|
|
|
+ if len(self.dataList) == 3: # Checks if their is any predefined number of fake data to be generated
|
|
|
+ if int(self.dataList[2]) == 0: # if number of predefined data is 0 then we need to
|
|
|
+ fake_lis.append([fake, self.dataList[0]]) # generate new data for each call so the required info
|
|
|
+ fake_lis = tuple(fake_lis) # is saved as tuple so program can distinguish
|
|
|
+ else:
|
|
|
+ for x in range(int(self.dataList[2])): # if greater than 0, list is created with defined number of data
|
|
|
+ fake_lis.append(getattr(fake, self.dataList[0])()) # on every call random data is sent from list
|
|
|
+ else:
|
|
|
+ for x in range(5): # if their is no predefined number of data than a list of 5 fake data
|
|
|
+ fake_lis.append(getattr(fake, self.dataList[0])()) # is generated
|
|
|
+ self.dataList = fake_lis
|
|
|
+
|
|
|
+ def return_random(self):
|
|
|
+ """
|
|
|
+ It returns data as per their prefix
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if self.prefix == "rre" or self.prefix == "rrd":
|
|
|
+ if not len(self.dataList):
|
|
|
+ raise BaseException(f"Not enough data, please verify if data is present or usecount limit" \
|
|
|
+ "has reached!!")
|
|
|
+ data = choice(self.dataList)
|
|
|
+ self.tdg_object.usecountDataRecords[repr(data)]['use'] += 1 # updates usecount in TDG object
|
|
|
+ if self.tdg_object.usecountDataRecords[repr(data)]['limit'] and \
|
|
|
+ self.tdg_object.usecountDataRecords[repr(data)]['use'] >= self.tdg_object.usecountDataRecords[repr(data)]['limit']:
|
|
|
+ self.dataList.remove(data) # checks usecount after using a data and removes if limit is reached
|
|
|
+ return data
|
|
|
+
|
|
|
+ elif self.prefix.lower() == "fkr":
|
|
|
+ if type(self.dataList) == tuple: # if type is tuple then we need generate new data on every call
|
|
|
+ return getattr(self.dataList[0][0], self.dataList[0][1])()
|
|
|
+ return choice(self.dataList) # else it is a list and we can send any random data from it
|
|
|
+
|
|
|
+ elif self.prefix == 'rnd':
|
|
|
+ return choice(self.dataList)
|
|
|
+
|
|
|
+
|
|
|
class TestDataGenerator:
|
|
|
"""
|
|
|
TestDataGenerator Class is to used to create a TestData file from raw excel file containing all possible values.
|
|
@@ -27,21 +91,31 @@ class TestDataGenerator:
|
|
|
6. List of header = ``[<title1>, <title2>, <title3>]``
|
|
|
7. Faker Prefix = ``FKR_(<type>, <locale>, <number_of_data>)``
|
|
|
8. RRD Prefix = ``RRD_(<sheetName>,<TargetData>,[<Header1>:[<Value1>],<Header2>:[<Value1>,<Value2>]])``
|
|
|
+ 9. RRE Prefix = ``RRE_(<fileName>,<sheetName>,<TargetData>,[<Header1>:[<Value1>],<Header2>:[<Value1>,<Value2>]])``
|
|
|
+ 10. Renv Prefix = ``RENV_(<environmentVariable>,<default>)
|
|
|
|
|
|
:param rawExcelPath: Takes input path for xlsx file containing input data.
|
|
|
:param sheetName: Name of sheet where all base data is located.
|
|
|
:method write: Will write the final processed data in excel/csv file.
|
|
|
"""
|
|
|
- def __init__(self, rawExcelPath=GC.TESTDATAGENERATOR_INPUTFILE, sheetName=""):
|
|
|
- self.path = os.path.abspath(rawExcelPath)
|
|
|
+ def __init__(self, rawExcelPath=GC.TESTDATAGENERATOR_INPUTFILE, sheetName="",
|
|
|
+ from_handleDatabase=False, noUpdate=True):
|
|
|
+ self.fileNameAndPath = os.path.abspath(rawExcelPath)
|
|
|
self.sheet_name = sheetName
|
|
|
- if not os.path.isfile(self.path):
|
|
|
- raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.path)
|
|
|
- self.sheet_dict, self.raw_data_json = self.__read_excel(self.path, self.sheet_name)
|
|
|
- self.remove_header = []
|
|
|
- self.processed_datas = self.__process_data(self.raw_data_json)
|
|
|
- self.headers = [x for x in list(self.processed_datas[0].keys()) if x not in self.remove_header]
|
|
|
- self.final_data = self.__generateFinalData(self.processed_datas)
|
|
|
+ if not os.path.isfile(self.fileNameAndPath):
|
|
|
+ raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.fileNameAndPath)
|
|
|
+ self.allDataFramesDict, self.mainDataFrame = self.read_excel(self.fileNameAndPath, self.sheet_name)
|
|
|
+ self.openedSheetsDataFrame = {} # stores filenames which contains sheetname: sheetDataFrame
|
|
|
+ self.usecountHeaderName = {} # contains filenames which contains sheetname: usecountHeaderName
|
|
|
+ self.usecountDataRecords = {} # used to maintain usecount limit record and verify non of the data cross limit
|
|
|
+ self.rreRrdProcessedData = {} # stores rre processed data with its criteria as key and used when same criteria
|
|
|
+ self.noUpdateFiles = noUpdate
|
|
|
+ if not from_handleDatabase:
|
|
|
+ self.processed_data = self.processDataFrame(self.mainDataFrame)
|
|
|
+ self.finalDataFrame = self.updatePrefixData(self.processed_data)
|
|
|
+ if self.usecountHeaderName:
|
|
|
+ if not self.noUpdateFiles:
|
|
|
+ self.save_usecount() # saving source input file once everything is done
|
|
|
|
|
|
def write(self, OutputFormat=GC.TESTDATAGENERATOR_OUTPUT_FORMAT, batch_size=0, outputfile=None):
|
|
|
"""
|
|
@@ -51,269 +125,86 @@ class TestDataGenerator:
|
|
|
:param outputfile: name and path of outputfile.
|
|
|
:return:
|
|
|
"""
|
|
|
+ if batch_size > 0:
|
|
|
+ if len(self.finalDataFrame) > batch_size:
|
|
|
+ data_lis = self.finalDataFrame.sample(n=batch_size)
|
|
|
+ else:
|
|
|
+ data_lis = self.finalDataFrame
|
|
|
+ logger.debug("Total final data is smaller than batch size.")
|
|
|
+ else:
|
|
|
+ data_lis = self.finalDataFrame
|
|
|
+
|
|
|
if OutputFormat.lower() == "xlsx":
|
|
|
if outputfile == None:
|
|
|
outputfile = GC.TESTDATAGENERATOR_OUTPUTFILE_XLSX
|
|
|
- self.__write_excel(batch_size=batch_size, outputfile=outputfile)
|
|
|
+ with pd.ExcelWriter(outputfile) as writer:
|
|
|
+ data_lis.to_excel(writer, index=False)
|
|
|
+ writer.save()
|
|
|
+
|
|
|
elif OutputFormat.lower() == "csv":
|
|
|
if outputfile == None:
|
|
|
outputfile = GC.TESTDATAGENERATOR_OUTPUTFILE_CSV
|
|
|
- self.__write_csv(batch_size=batch_size, outputfile=outputfile)
|
|
|
- else:
|
|
|
- logger.debug("Incorrect file format")
|
|
|
+ data_lis.to_csv(outputfile)
|
|
|
|
|
|
- def __write_excel(self, outputfile=GC.TESTDATAGENERATOR_OUTPUTFILE_XLSX, batch_size=0):
|
|
|
- """
|
|
|
- Writes TestData file with final processsed data.
|
|
|
- :param outputfile: Name and path for output file.
|
|
|
- :param batch_size: No. of data to be randomly selected and written in output file.
|
|
|
- :return: None
|
|
|
- """
|
|
|
- if batch_size > 0:
|
|
|
- if len(self.final_data) > batch_size:
|
|
|
- data_lis = sample(self.final_data, batch_size)
|
|
|
- else:
|
|
|
- data_lis = self.final_data
|
|
|
- logger.debug("Total final data is smaller than batch size.")
|
|
|
else:
|
|
|
- data_lis = self.final_data
|
|
|
- with xlsxwriter.Workbook(outputfile) as workbook:
|
|
|
- worksheet = workbook.add_worksheet()
|
|
|
- worksheet.write_row(0, 0, self.headers)
|
|
|
- for row_num, data in enumerate(data_lis):
|
|
|
- worksheet.write_row(row_num+1, 0, data)
|
|
|
-
|
|
|
- def __write_csv(self, outputfile=GC.TESTDATAGENERATOR_OUTPUTFILE_CSV, batch_size=0):
|
|
|
- """
|
|
|
- Writes final data in csv
|
|
|
- :param outputfile: Name and path of output file
|
|
|
- :param batch_size: No. of data to be randomly selected and written in output file.
|
|
|
- :return:
|
|
|
- """
|
|
|
- if batch_size > 0:
|
|
|
- if len(self.final_data) > batch_size:
|
|
|
- data_lis = sample(self.final_data, batch_size)
|
|
|
- else:
|
|
|
- data_lis = self.final_data
|
|
|
- else:
|
|
|
- data_lis = self.final_data
|
|
|
- with open(outputfile, 'w', newline='\n', encoding='utf-8-sig') as file:
|
|
|
- fl = csv.writer(file)
|
|
|
- fl.writerow(self.headers)
|
|
|
- for dt in data_lis:
|
|
|
- fl.writerow(list(dt))
|
|
|
+ logger.debug("Incorrect file format")
|
|
|
|
|
|
- def __generateFinalData(self, processed_data):
|
|
|
+ def updatePrefixData(self, processed_data):
|
|
|
"""
|
|
|
- This method will do the final process on the processed_data. Processed_data contains list of dictionary, each
|
|
|
- dictionary is the row from input file which are processed to be interact able in python as per the requirement.
|
|
|
-
|
|
|
- First loop is of processed_data
|
|
|
- Second loop is of the dictionary(row) and each key:value of that dictionary is header:processed_data
|
|
|
-
|
|
|
- Method will first check the data type of value.
|
|
|
- If it is a string than method will put it inside a list(i.e. ["string"])
|
|
|
- If it is a tuple than it is a data with prefix so it will be sent to ``__prefix_data_processing`` method for
|
|
|
- further processing.
|
|
|
- Else the value is of type list.
|
|
|
-
|
|
|
- Then we store all this lists in a list(can be treat as row). This list contains value of cells. They are evaluted
|
|
|
- i.e. ranges are converted to list, strings are converted to list & list are all ready list. So to generate all
|
|
|
- possible combinations from it we use ``iterable`` module.
|
|
|
-
|
|
|
- Once this list of lists which contains all possible combinations is created we will call
|
|
|
- ``__update_prefix_data_in_final_list`` method. This method will insert the processed prefix data along with the
|
|
|
- data of all combinations list in the final list with the correct position of every value.
|
|
|
-
|
|
|
- Finally it will return the list of lists which is completely processed and ready to be written in output file.
|
|
|
|
|
|
:param processed_data:
|
|
|
:return: Final_data_list
|
|
|
"""
|
|
|
- final_data = []
|
|
|
- for lis in processed_data:
|
|
|
- index = {}
|
|
|
- data_lis = []
|
|
|
- for key in lis:
|
|
|
- if type(lis[key]) == str:
|
|
|
- data = [lis[key]]
|
|
|
- elif type(lis[key]) == tuple:
|
|
|
- if len(lis[key]) > 0:
|
|
|
- self.__prefix_data_processing(lis, key, index)
|
|
|
- continue
|
|
|
+ for dic in processed_data:
|
|
|
+ for key in dic.copy():
|
|
|
+ if type(dic[key]) == PrefixDataManager:
|
|
|
+ data = dic[key].return_random()
|
|
|
+ if type(data) == dict:
|
|
|
+ del dic[key]
|
|
|
+ dic.update(data)
|
|
|
else:
|
|
|
- data = ['']
|
|
|
- else:
|
|
|
- data = lis[key]
|
|
|
- data_lis.append(data)
|
|
|
- datas = list(itertools.product(*data_lis))
|
|
|
- self.__update_prefix_data_in_final_list(datas, index, final_data)
|
|
|
- logger.info(f"Total generated data = {len(final_data)}")
|
|
|
+ dic[key] = data
|
|
|
+ final_data = pd.DataFrame(processed_data)
|
|
|
return final_data
|
|
|
|
|
|
- def __update_prefix_data_in_final_list(self, data_list, dictionary, final_list):
|
|
|
- """
|
|
|
- This method will insert the data from the dictionary to final_list. So further it can be written in the output.
|
|
|
-
|
|
|
- ``data_list`` is the list where all possible combinations generated by the input lists and ranges are stored.
|
|
|
- ``dictionary`` is where the data with prefix are stored with their index value(which will be used to place data)
|
|
|
- ``final_list`` is the list where final data will be stored after merging values from data_list and dictionary
|
|
|
-
|
|
|
- First it will iterate through data_list which is list of lists(Also you can take it as list of rows).
|
|
|
- Second loop will go through dictionary and check the data type of each value. Pairings inside dictionary are of
|
|
|
- ``index: value`` here index is the position from where this value was picked. So it will be used in placing the
|
|
|
- data in their correct position.
|
|
|
-
|
|
|
- List:
|
|
|
- =====
|
|
|
- If value is list then it is a data with ``FKR_`` prefix with number of data 0(i.e. create new fake data for
|
|
|
- every output). So we will create the faker module instance as per the input and will generate fake data for
|
|
|
- every row. And will insert them in the position of index(key of dictionary).
|
|
|
-
|
|
|
-
|
|
|
- Dictionary:
|
|
|
- ==========
|
|
|
- If it is not of type list then we will check that if it is of type dict. If yes then this is a data with
|
|
|
- "RRD_" prefix and we have to select the random data here. So we will start looping through this dictionary.
|
|
|
- Remember this is the third loop. This dictionary contains header:value(TargetDatas from matched data) pair.
|
|
|
- On every loop it will first check that if the same header is stored in done dictionary. If yes then it will
|
|
|
- get value of it from done dictionary. Then it will create a list from the TargetData list. This new list
|
|
|
- will contain only data which has same value for same header stored in done dictionary.
|
|
|
- i.e. If matching Header has value x then from TargetDatas list only data where header=x will be
|
|
|
- considered for random pick.
|
|
|
-
|
|
|
- Then the random value is selected from that list.
|
|
|
- If none of the header is processed before(for the same row). Then it will get random data from the list and
|
|
|
- will store header: value pair in done dictionary so it is used in the checking process as above.
|
|
|
-
|
|
|
- It will also check if the ``self.header`` list contains all the header which are in the random selected data.
|
|
|
- If not then it will add the header there.
|
|
|
-
|
|
|
- At last we will index the position of header inside self.headers list and will insert the value in same
|
|
|
- position and append the row in final_data list.
|
|
|
-
|
|
|
- Tuple:
|
|
|
- ======
|
|
|
- If the type is tuple then we need to simply pick a random value from it and insert it in the same position of
|
|
|
- index(key of dictionary for current value).
|
|
|
-
|
|
|
- :param data_list:
|
|
|
- :param dictionary:
|
|
|
- :param final_list:
|
|
|
- :return: None
|
|
|
- """
|
|
|
- for data in data_list:
|
|
|
- data = list(data)
|
|
|
- done = {}
|
|
|
- for ind in dictionary:
|
|
|
- if type(dictionary[ind]) == list:
|
|
|
- fake = faker.Faker(dictionary[ind][2])
|
|
|
- data.insert(ind, getattr(fake, dictionary[ind][1])())
|
|
|
- else:
|
|
|
- if type(dictionary[ind][0]) == dict:
|
|
|
- sorted_data = False
|
|
|
- for header in dictionary[ind][0]:
|
|
|
- if header in done:
|
|
|
- match = done[header]
|
|
|
- sorted_data = [x for x in dictionary[ind] if x[header] == match]
|
|
|
- break
|
|
|
- if not sorted_data:
|
|
|
- sorted_data = dictionary[ind]
|
|
|
- data_to_insert = sorted_data[randint(0, len(sorted_data) - 1)]
|
|
|
- for keys in data_to_insert:
|
|
|
- if keys not in self.headers:
|
|
|
- self.headers.append(keys)
|
|
|
- if keys not in done:
|
|
|
- data.insert(self.headers.index(keys), data_to_insert[keys])
|
|
|
- done[keys] = data_to_insert[keys]
|
|
|
- else:
|
|
|
- data_to_insert = dictionary[ind][randint(0, len(dictionary[ind]) - 1)]
|
|
|
- data.insert(ind, data_to_insert)
|
|
|
- final_list.append(data)
|
|
|
-
|
|
|
- def __prefix_data_processing(self, dic, key, dictionary: dict):
|
|
|
- """
|
|
|
- This method will process the datas with prefix.
|
|
|
-
|
|
|
- ``dic`` the dictionary where all data which are in final process is stored
|
|
|
- ``key`` the header of the current data which will be used now to call the data.
|
|
|
- ``dictionary`` in which the values will be inserted after performing their process.
|
|
|
-
|
|
|
- First it will check the first value of tuple.
|
|
|
- If it is ``Faker`` then in will continue the process and will check the 4th value of tuple.
|
|
|
- If the 4th value(which is used to determine the number of fake data to be generated and store inside a list)
|
|
|
- is ``0`` then the method will store the values as it is in a list, because ``0`` value means we have to generate
|
|
|
- new fake data for every output data, so it will be done later.
|
|
|
- If it is greater than ``0`` then this method will create tuple with the given number of fake data and store it.
|
|
|
- (If no number is given then default number is 5.)
|
|
|
-
|
|
|
- If first value is not ``Faker`` then no process will be done.
|
|
|
-
|
|
|
- Finally the data will be inserted in the dictionary.
|
|
|
-
|
|
|
- :param dic:
|
|
|
- :param key:
|
|
|
- :param dictionary:
|
|
|
- :return:
|
|
|
- """
|
|
|
- ltuple = dic[key]
|
|
|
- if ltuple[0] == "Faker":
|
|
|
- fake = faker.Faker(ltuple[2])
|
|
|
- fake_lis = []
|
|
|
- if len(ltuple) == 4:
|
|
|
- if int(ltuple[3]) == 0:
|
|
|
- dictionary[list(dic.keys()).index(key)] = list(ltuple)
|
|
|
- return True
|
|
|
- else:
|
|
|
- for x in range(int(ltuple[3])):
|
|
|
- fake_lis.append(getattr(fake, ltuple[1])())
|
|
|
- else:
|
|
|
- for x in range(5):
|
|
|
- fake_lis.append(getattr(fake, ltuple[1])())
|
|
|
- dictionary[list(dic.keys()).index(key)] = tuple(fake_lis)
|
|
|
- return True
|
|
|
- else:
|
|
|
- dictionary[list(dic.keys()).index(key)] = ltuple
|
|
|
- return True
|
|
|
-
|
|
|
- def __process_data(self, raw_json):
|
|
|
+ def processDataFrame(self, dataFrame):
|
|
|
"""
|
|
|
This method is used to Process all the raw unprocessed data read from the excel file.
|
|
|
|
|
|
- It will first send the header to ``__data_generator`` so that if it is a list then it will get converted in
|
|
|
+ It will first send the header to ``__splitList`` so that if it is a list then it will get converted in
|
|
|
individual header.
|
|
|
|
|
|
Later it will process the values using ``__data_generator``.
|
|
|
|
|
|
- It will then check returned iterable type, if it is a tuple that mean input value was with prefix, so, it will
|
|
|
- further check if the tuple contains dict. If True than prefix was RRD_. In that case we will have to deal with
|
|
|
- the original header of the input value. Because if the original value's header is not in the TargetData then this
|
|
|
- header will contain no value in the output file and my cause errors too. So the header will added in
|
|
|
- ``self.remove_header`` list which will be further used to remove it from main header list.
|
|
|
+ It will then check returned data type, and convert it into list if it is not already. Then from every row which
|
|
|
+ are dict contains value/values inside list it will use itertools to create all possible combination, now every
|
|
|
+ data in the value list is converted in a new row with value as string.
|
|
|
|
|
|
- Finally it will return list of dictionarys. Each dictionary contains processed data of a row of input file.
|
|
|
- Processed data are the raw data converted into python data type and iterables. Ranges are converted into list.
|
|
|
+ Finally it will return list of dictionary. Each dictionary contains processed data of a row of input file.
|
|
|
+ Processed data are the raw data converted into python data type and class. Ranges are converted into list.
|
|
|
|
|
|
- :param raw_json:
|
|
|
+ :param dataFrame:
|
|
|
:return:
|
|
|
"""
|
|
|
processed_datas = []
|
|
|
- for raw_data in raw_json:
|
|
|
+ json_df = json.loads(dataFrame.to_json(orient="records"))
|
|
|
+ for raw_data in json_df:
|
|
|
if not list(raw_data.values())[0]:
|
|
|
continue
|
|
|
processed_data = {}
|
|
|
for key in raw_data:
|
|
|
- keys = self.__data_generators(key)
|
|
|
+ keys = self.__splitList(key)
|
|
|
for ke in keys:
|
|
|
- processed_data[ke] = self.__data_generators(raw_data[key])
|
|
|
- if type(processed_data[ke]) == tuple and len(processed_data[ke])>0:
|
|
|
- if type(processed_data[ke][0]) == dict:
|
|
|
- if ke not in processed_data[ke][0]:
|
|
|
- self.remove_header.append(ke)
|
|
|
- processed_datas.append(processed_data)
|
|
|
+ data = self.data_generators(raw_data[key])
|
|
|
+ if type(data) != list:
|
|
|
+ processed_data[ke] = [data]
|
|
|
+ else:
|
|
|
+ processed_data[ke] = data
|
|
|
+ product = list(self.product_dict(**processed_data))
|
|
|
+ processed_datas += product
|
|
|
return processed_datas
|
|
|
|
|
|
- def __data_generators(self, raw_data):
|
|
|
+ def data_generators(self, raw_data_old):
|
|
|
"""
|
|
|
This method first send the data to ``__raw_data_string_process`` method to split the data and remove the unwanted
|
|
|
spaces.
|
|
@@ -324,71 +215,62 @@ class TestDataGenerator:
|
|
|
|
|
|
Later according to the prefix of data and the data_type assigned it will convert them.
|
|
|
Simple list and strings are converted in to ``list`` type.
|
|
|
- Data with prefix will converted in to ``tuple`` type so further it will be helpful in distinguishing. Also it will
|
|
|
- insert the prefix name in first value of tuple if the prefix is ``FKR_`` so it will be helpful in further process.
|
|
|
+ Data with prefix will be processed and the data list generated by them is used to create PrefixDataManager class
|
|
|
+ so later when we need to get the final data this class is used to get the data as per prefix and requirements.
|
|
|
|
|
|
- Finally it will return the iterable for further process.
|
|
|
+ Finally it will return the data for further process.
|
|
|
|
|
|
:param raw_data:
|
|
|
:return: List or Tuple containing necessary data
|
|
|
"""
|
|
|
- raw_data, prefix, data_type = self.__raw_data_string_process(raw_data)
|
|
|
+ raw_data, prefix, data_type = self.__raw_data_string_process(raw_data_old)
|
|
|
if len(raw_data)<=1:
|
|
|
return [""]
|
|
|
- if raw_data[0] == "[" and raw_data[-1] == "]" and prefix == "":
|
|
|
- processed_datas = self.__splitList(raw_data)
|
|
|
- processed_datas = data_type(processed_datas)
|
|
|
+
|
|
|
+ if prefix == "Rnd":
|
|
|
+ if "-" in raw_data:
|
|
|
+ raw_data = raw_data.split('-')
|
|
|
+ start = raw_data[0].strip()
|
|
|
+ end = raw_data[1].strip()
|
|
|
+ step = 1
|
|
|
+ if "," in end:
|
|
|
+ raw_data = end.split(",")
|
|
|
+ end = raw_data[0].strip()
|
|
|
+ step = raw_data[1].strip()
|
|
|
+ processed_datas = [x for x in range(int(start), int(end) + 1, int(step))]
|
|
|
+ else:
|
|
|
+ processed_datas = self.__splitList(raw_data)
|
|
|
+ processed_datas = PrefixDataManager(processed_datas, 'rnd')
|
|
|
|
|
|
elif prefix == "Faker":
|
|
|
- processed_datas = [data.strip() for data in raw_data[1:-1].split(",")]
|
|
|
- processed_datas.insert(0, "Faker")
|
|
|
- processed_datas = data_type(processed_datas)
|
|
|
+ dataList = [data.strip() for data in raw_data[1:-1].split(",")]
|
|
|
+ processed_datas = PrefixDataManager(dataList, prefix="fkr")
|
|
|
|
|
|
elif prefix == "Rrd":
|
|
|
- first_value = raw_data[1:-1].split(',')[0].strip()
|
|
|
- second_value = raw_data[1:-1].split(',')[1].strip()
|
|
|
- if second_value[0] == "[":
|
|
|
- second_value = ','.join(raw_data[1:-1].split(',')[1:]).strip()
|
|
|
- second_value = second_value[:second_value.index(']')+1]
|
|
|
- third_value = [x.strip() for x in ']'.join(raw_data[1:-1].split(']')[1:]).split(',')[1:]]
|
|
|
- else:
|
|
|
- third_value = [x.strip() for x in raw_data[1:-1].split(',')[2:]]
|
|
|
- evaluated_list = ']],'.join(','.join(third_value)[1:-1].strip().split('],')).split('],')
|
|
|
- if evaluated_list[0] == "":
|
|
|
- evaluated_dict = {}
|
|
|
- else:
|
|
|
- evaluated_dict = {
|
|
|
- splited_data.split(':')[0]: self.__splitList(splited_data.split(':')[1]) for splited_data in evaluated_list
|
|
|
- }
|
|
|
- if second_value[0] == "[" and second_value[-1] == "]":
|
|
|
- second_value = self.__splitList(second_value)
|
|
|
- processed_datas = self.__processRrd(first_value, second_value,evaluated_dict)
|
|
|
- processed_datas = data_type(processed_datas)
|
|
|
+ sheet_name, data_looking_for, data_to_match = self.extractDataFromRrd(raw_data)
|
|
|
+ try:
|
|
|
+ dataList = self.__processRrdRre(sheet_name, data_looking_for, data_to_match)
|
|
|
+ processed_datas = PrefixDataManager(dataList, prefix='rrd', tdg_object=self)
|
|
|
+ except KeyError:
|
|
|
+ sys.exit(f"Please check that source files contains all the headers mentioned in : {raw_data_old}")
|
|
|
|
|
|
elif prefix == "Rre":
|
|
|
file_name = raw_data[1:-1].split(',')[0].strip()
|
|
|
- sheet_dict, _ = self.__read_excel(file_name)
|
|
|
- first_value = raw_data[1:-1].split(',')[1].strip()
|
|
|
- second_value = raw_data[1:-1].split(',')[2].strip()
|
|
|
- if second_value[0] == "[":
|
|
|
- second_value = ','.join(raw_data[1:-1].split(',')[2:]).strip()
|
|
|
- second_value = second_value[:second_value.index(']')+1]
|
|
|
- third_value = [x.strip() for x in ']'.join(raw_data[1:-1].split(']')[1:]).split(',')[1:]]
|
|
|
- else:
|
|
|
- third_value = [x.strip() for x in raw_data[1:-1].split(',')[3:]]
|
|
|
- evaluated_list = ']],'.join(','.join(third_value)[1:-1].strip().split('],')).split('],')
|
|
|
- if evaluated_list[0] == "":
|
|
|
- evaluated_dict = {}
|
|
|
- else:
|
|
|
- evaluated_dict = {
|
|
|
- splited_data.split(':')[0]: self.__splitList(splited_data.split(':')[1]) for splited_data in evaluated_list
|
|
|
- }
|
|
|
- if second_value[0] == "[" and second_value[-1] == "]":
|
|
|
- second_value = self.__splitList(second_value)
|
|
|
- processed_datas = self.__processRrd(first_value, second_value, evaluated_dict, sheet_dict)
|
|
|
- processed_datas = data_type(processed_datas)
|
|
|
+ sheet_name, data_looking_for, data_to_match = self.extractDataFromRrd(raw_data, index=1)
|
|
|
+ try:
|
|
|
+ dataList = self.__processRrdRre(sheet_name, data_looking_for, data_to_match, filename=file_name)
|
|
|
+ processed_datas = PrefixDataManager(dataList, prefix="rre", tdg_object=self)
|
|
|
+ except KeyError:
|
|
|
+ sys.exit(f"Please check that source files contains all the headers mentioned in : {raw_data_old}")
|
|
|
+
|
|
|
+ elif prefix == "Renv":
|
|
|
+ processed_datas = self.get_env_variable(raw_data)
|
|
|
+
|
|
|
+ elif raw_data[0] == "[" and raw_data[-1] == "]":
|
|
|
+ processed_datas = self.__splitList(raw_data)
|
|
|
|
|
|
elif "-" in raw_data:
|
|
|
+ raw_data_original = raw_data[:]
|
|
|
raw_data = raw_data.split('-')
|
|
|
start = raw_data[0].strip()
|
|
|
end = raw_data[1].strip()
|
|
@@ -397,14 +279,138 @@ class TestDataGenerator:
|
|
|
raw_data = end.split(",")
|
|
|
end = raw_data[0].strip()
|
|
|
step = raw_data[1].strip()
|
|
|
- processed_datas = [x for x in range(int(start), int(end)+1, int(step))]
|
|
|
- processed_datas = data_type(processed_datas)
|
|
|
+ try:
|
|
|
+ processed_datas = [x for x in range(int(start), int(end)+1, int(step))]
|
|
|
+ except:
|
|
|
+ processed_datas = [raw_data_original.strip()]
|
|
|
|
|
|
else:
|
|
|
- processed_datas = [raw_data.strip()]
|
|
|
- processed_datas = data_type(processed_datas)
|
|
|
+ processed_datas = raw_data.strip()
|
|
|
return processed_datas
|
|
|
|
|
|
+ def extractDataFromRrd(self, raw_data, index=0):
|
|
|
+ """
|
|
|
+ Splits rrd/rre string and used them according to their position
|
|
|
+ :param raw_data:
|
|
|
+ :param index:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ first_value = raw_data[1:-1].split(',')[0+index].strip()
|
|
|
+ second_value = raw_data[1:-1].split(',')[1+index].strip()
|
|
|
+ if second_value[0] == "[":
|
|
|
+ second_value = ','.join(raw_data[1:-1].split(',')[1+index:]).strip()
|
|
|
+ second_value = second_value[:second_value.index(']') + 1]
|
|
|
+ third_value = [x.strip() for x in ']'.join(raw_data[1:-1].split(']')[1:]).split(',')[1:]]
|
|
|
+ else:
|
|
|
+ third_value = [x.strip() for x in raw_data[1:-1].split(',')[2+index:]]
|
|
|
+ evaluated_list = ']],'.join(','.join(third_value)[1:-1].strip().split('],')).split('],')
|
|
|
+ if evaluated_list[0] == "":
|
|
|
+ evaluated_dict = {}
|
|
|
+ else:
|
|
|
+ evaluated_dict = {
|
|
|
+ splited_data.split(':')[0]: self.__splitList(splited_data.split(':')[1]) for splited_data in
|
|
|
+ evaluated_list
|
|
|
+ }
|
|
|
+ if second_value[0] == "[" and second_value[-1] == "]":
|
|
|
+ second_value = self.__splitList(second_value)
|
|
|
+ return first_value, second_value, evaluated_dict
|
|
|
+
|
|
|
+ def __processRrdRre(self, sheet_name, data_looking_for, data_to_match: dict, filename=None):
|
|
|
+ if filename: # if filename than it is an rre file so we have to use filename instead of base file
|
|
|
+ filename = os.path.join(os.path.dirname(self.fileNameAndPath), filename)
|
|
|
+ if not self.noUpdateFiles:
|
|
|
+ file_name = ".".join(filename.split(".")[:-1])
|
|
|
+ file_extension = filename.split(".")[-1]
|
|
|
+ file = file_name + "_baangt" + "." + file_extension
|
|
|
+ else:
|
|
|
+ file = filename
|
|
|
+ if not file in self.openedSheetsDataFrame:
|
|
|
+ logger.debug(f"Creating clone file of: {filename}")
|
|
|
+ if not self.noUpdateFiles:
|
|
|
+ filename = CloneXls(filename).update_or_make_clone()
|
|
|
+ self.openedSheetsDataFrame[filename] = {}
|
|
|
+ filename = file
|
|
|
+ if sheet_name in self.openedSheetsDataFrame[filename]:
|
|
|
+ df = self.openedSheetsDataFrame[filename][sheet_name]
|
|
|
+ else:
|
|
|
+ df = pd.read_excel(filename, sheet_name, dtype=str)
|
|
|
+ df.fillna("", inplace=True)
|
|
|
+ self.openedSheetsDataFrame[filename][sheet_name] = df
|
|
|
+ else:
|
|
|
+ df = self.allDataFramesDict[sheet_name]
|
|
|
+ if not self.fileNameAndPath in self.openedSheetsDataFrame:
|
|
|
+ self.openedSheetsDataFrame[self.fileNameAndPath] = {}
|
|
|
+ if not sheet_name in self.openedSheetsDataFrame[self.fileNameAndPath]:
|
|
|
+ self.openedSheetsDataFrame[self.fileNameAndPath][sheet_name] = df
|
|
|
+ df1 = df.copy()
|
|
|
+ for key, value in data_to_match.items():
|
|
|
+ if not isinstance(value, list):
|
|
|
+ value = [value]
|
|
|
+ df1 = df1.loc[df1[key].isin(value)]
|
|
|
+ data_lis = []
|
|
|
+
|
|
|
+ if type(data_looking_for) == str:
|
|
|
+ data_looking_for = data_looking_for.split(",")
|
|
|
+ data_new_header = {}
|
|
|
+ for header in data_looking_for:
|
|
|
+ if ":" in header:
|
|
|
+ old_header = header.split(":")[0].strip()
|
|
|
+ new_header = header.split(":")[1].strip()
|
|
|
+ else:
|
|
|
+ old_header = header
|
|
|
+ new_header = header
|
|
|
+ data_new_header[old_header] = new_header
|
|
|
+
|
|
|
+ key_name = repr(sheet_name) + repr(data_looking_for) + repr(data_to_match) + repr(filename)
|
|
|
+ if key_name in self.rreRrdProcessedData:
|
|
|
+ logger.debug(f"Data Gathered from previously saved data.")
|
|
|
+ return self.rreRrdProcessedData[key_name]
|
|
|
+
|
|
|
+ usecount, limit, usecount_header = self.check_usecount(df.columns.values.tolist())
|
|
|
+ if not filename:
|
|
|
+ if self.fileNameAndPath not in self.usecountHeaderName:
|
|
|
+ self.usecountHeaderName[self.fileNameAndPath] = {}
|
|
|
+ if sheet_name not in self.usecountHeaderName[self.fileNameAndPath] and usecount_header:
|
|
|
+ self.usecountHeaderName[self.fileNameAndPath][sheet_name] = usecount_header
|
|
|
+ else:
|
|
|
+ if filename not in self.usecountHeaderName:
|
|
|
+ self.usecountHeaderName[filename] = {}
|
|
|
+ if sheet_name not in self.usecountHeaderName[filename]:
|
|
|
+ self.usecountHeaderName[filename][sheet_name] = usecount_header
|
|
|
+ df1_dict = df1.to_dict(orient="index")
|
|
|
+ for index in df1_dict:
|
|
|
+ data = df1_dict[index]
|
|
|
+ if usecount_header:
|
|
|
+ try:
|
|
|
+ used_limit = int(data[usecount_header])
|
|
|
+ except:
|
|
|
+ used_limit = 0
|
|
|
+ else:
|
|
|
+ used_limit = 0
|
|
|
+
|
|
|
+ if data_looking_for[0] == "*":
|
|
|
+ if usecount_header:
|
|
|
+ del data[usecount_header]
|
|
|
+ data_lis.append(data)
|
|
|
+ self.usecountDataRecords[repr(data)] = {
|
|
|
+ "use": used_limit, "limit": limit, "index": index,
|
|
|
+ "sheet_name": sheet_name, "file_name": filename
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ dt = {data_new_header[header]: data[header] for header in data_new_header}
|
|
|
+ data_lis.append(dt)
|
|
|
+ self.usecountDataRecords[repr(dt)] = {
|
|
|
+ "use": used_limit, "limit": limit, "index": index,
|
|
|
+ "sheet_name": sheet_name, "file_name": filename
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(data_lis) == 0:
|
|
|
+ logger.info(f"No data matching: {data_to_match}")
|
|
|
+ sys.exit(f"No data matching: {data_to_match}")
|
|
|
+ logger.debug(f"New Data Gathered.")
|
|
|
+ self.rreRrdProcessedData[key_name] = data_lis
|
|
|
+ return data_lis
|
|
|
+
|
|
|
def __raw_data_string_process(self, raw_string):
|
|
|
"""
|
|
|
Returns ``String, prefix, data_type`` which are later used to decided the process to perform on string.
|
|
@@ -430,7 +436,8 @@ class TestDataGenerator:
|
|
|
prefix = ""
|
|
|
if len(raw_string)>4:
|
|
|
if raw_string[3] == "_":
|
|
|
- if raw_string[:4].lower() == "rnd_": # Random
|
|
|
+ if raw_string[:4].lower() == "rnd_":
|
|
|
+ prefix = "Rnd"
|
|
|
raw_string = raw_string[4:]
|
|
|
data_type = tuple
|
|
|
elif raw_string[:4].lower() == "fkr_":
|
|
@@ -450,13 +457,28 @@ class TestDataGenerator:
|
|
|
else:
|
|
|
data_type = list
|
|
|
else:
|
|
|
+ if raw_string[:5].lower() == "renv_":
|
|
|
+ prefix = "Renv"
|
|
|
+ raw_string = raw_string[5:]
|
|
|
data_type = list
|
|
|
else:
|
|
|
data_type = list
|
|
|
return raw_string, prefix, data_type
|
|
|
|
|
|
- @staticmethod
|
|
|
- def __read_excel(path, sheet_name=""):
|
|
|
+ def get_str_sheet(self, excel, sheet):
|
|
|
+ """
|
|
|
+ Returns dataFrame with all data treated as string
|
|
|
+ :param excel:
|
|
|
+ :param sheet:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ columns = excel.parse(sheet).columns
|
|
|
+ converters = {column: str for column in columns}
|
|
|
+ data = excel.parse(sheet, converters=converters)
|
|
|
+ data.fillna("", inplace=True)
|
|
|
+ return data
|
|
|
+
|
|
|
+ def read_excel(self, path, sheet_name="", return_json=False):
|
|
|
"""
|
|
|
This method will read the input excel file.
|
|
|
It will read all the sheets inside this excel file and will create a dictionary of dictionary containing all data
|
|
@@ -472,19 +494,21 @@ class TestDataGenerator:
|
|
|
:param sheet_name: Name of base sheet sheet where main input data is located. Default will be the first sheet.
|
|
|
:return: Dictionary of all sheets and data, Dictionary of base sheet.
|
|
|
"""
|
|
|
- wb = xlrd.open_workbook(path)
|
|
|
- sheet_lis = wb.sheet_names()
|
|
|
- sheet_dict = {}
|
|
|
+ wb = pd.ExcelFile(path)
|
|
|
+ sheet_lis = wb.sheet_names
|
|
|
+ sheet_df = {}
|
|
|
for sheet in sheet_lis:
|
|
|
- xl_obj = xl2dict.XlToDict()
|
|
|
- data = xl_obj.fetch_data_by_column_by_sheet_name(path,sheet_name=sheet)
|
|
|
- sheet_dict[sheet] = data
|
|
|
+ sheet_df[sheet] = self.get_str_sheet(wb, sheet)
|
|
|
+ sheet_df[sheet].fillna("", inplace=True)
|
|
|
+ if return_json:
|
|
|
+ for df in sheet_df.keys():
|
|
|
+ sheet_df[df] = json.loads(sheet_df[df].to_json(orient="records"))
|
|
|
if sheet_name == "":
|
|
|
- base_sheet = sheet_dict[sheet_lis[0]]
|
|
|
+ base_sheet = sheet_df[sheet_lis[0]]
|
|
|
else:
|
|
|
- assert sheet_name in sheet_dict, f"Excel file doesn't contain {sheet_name} sheet. Please recheck."
|
|
|
- base_sheet = sheet_dict[sheet_name]
|
|
|
- return sheet_dict, base_sheet
|
|
|
+ assert sheet_name in sheet_df, f"Excel file doesn't contain {sheet_name} sheet. Please recheck."
|
|
|
+ base_sheet = sheet_df[sheet_name]
|
|
|
+ return sheet_df, base_sheet
|
|
|
|
|
|
@staticmethod
|
|
|
def __splitList(raw_data):
|
|
@@ -494,55 +518,71 @@ class TestDataGenerator:
|
|
|
:param raw_data: string of list
|
|
|
:return: Python list
|
|
|
"""
|
|
|
- proccesed_datas = [data.strip() for data in raw_data[1:-1].split(",")]
|
|
|
+ if raw_data[0] == "[" and raw_data[-1] == "]":
|
|
|
+ data = raw_data[1:-1]
|
|
|
+ else:
|
|
|
+ data = raw_data
|
|
|
+ proccesed_datas = [data.strip() for data in data.split(",")]
|
|
|
return proccesed_datas
|
|
|
|
|
|
- def __processRrd(self, sheet_name, data_looking_for, data_to_match: dict, sheet_dict=None, caller="RRD_"):
|
|
|
+ def check_usecount(self, data):
|
|
|
+ # used to find and return if their is usecount header and limit in input file
|
|
|
+ usecount = False
|
|
|
+ limit = 0
|
|
|
+ usecount_header = None
|
|
|
+ for header in data:
|
|
|
+ if "usecount" in header.lower():
|
|
|
+ usecount = True
|
|
|
+ usecount_header = header
|
|
|
+ if "usecount_" in header.lower():
|
|
|
+ try:
|
|
|
+ limit = int(header.lower().strip().split("count_")[1])
|
|
|
+ except:
|
|
|
+ limit = 0
|
|
|
+ return usecount, limit, usecount_header
|
|
|
+
|
|
|
+ def save_usecount(self):
|
|
|
"""
|
|
|
- This function is internal function to process the data wil RRD_ prefix.
|
|
|
- The General input in excel file is like ``RRD_[sheetName,TargetData,[Header:[values**],Header:[values**]]]``
|
|
|
- So this program will take already have processed input i.e. strings converted as python string and list to
|
|
|
- python list, vice versa.
|
|
|
-
|
|
|
- ``sheet_name`` is the python string referring to TargetData containing string.
|
|
|
-
|
|
|
- ``data_looking_for`` is expected to be a string or list of TargetData. When there are multiple values then the
|
|
|
- previous process will send list else it will send string. When a string is received it will be automatically
|
|
|
- converted in list so the program will alwaus have to deal with list. If input is "*" then this method will take
|
|
|
- all value in the matched row as TargetData.
|
|
|
-
|
|
|
- ``data_to_match`` is a python dictionary created by the previous process. It will contain the key value pair
|
|
|
- same as given in input but just converted in python dict. Then all possible combinations will be generated inside
|
|
|
- this method. If an empty list is given by the user in the excel file then this list will get emptu dictionary from
|
|
|
- the previous process. Thus then this method will pick TargetData from all rows of the target sheet.
|
|
|
-
|
|
|
- :param sheet_name:
|
|
|
- :param data_looking_for:
|
|
|
- :param data_to_match:
|
|
|
- :return: dictionary of TargetData
|
|
|
+ Saves the excel file in which have usecount to be updated
|
|
|
+ :return:
|
|
|
"""
|
|
|
- sheet_dict = self.sheet_dict if sheet_dict is None else sheet_dict
|
|
|
- matching_data = [list(x) for x in itertools.product(*[data_to_match[key] for key in data_to_match])]
|
|
|
- assert sheet_name in sheet_dict, \
|
|
|
- f"Excel file doesn't contain {sheet_name} sheet. Please recheck. Called in '{caller}'"
|
|
|
- base_sheet = sheet_dict[sheet_name]
|
|
|
- data_lis = []
|
|
|
- if type(data_looking_for) == str:
|
|
|
- data_looking_for = data_looking_for.split(",")
|
|
|
-
|
|
|
- for data in base_sheet:
|
|
|
- if len(matching_data) == 1 and len(matching_data[0]) == 0:
|
|
|
- if data_looking_for[0] == "*":
|
|
|
- data_lis.append(data)
|
|
|
- else:
|
|
|
- data_lis.append({keys: data[keys] for keys in data_looking_for})
|
|
|
- else:
|
|
|
- if [data[key] for key in data_to_match] in matching_data:
|
|
|
- if data_looking_for[0] == "*":
|
|
|
- data_lis.append(data)
|
|
|
- else:
|
|
|
- data_lis.append({keys: data[keys] for keys in data_looking_for})
|
|
|
- return data_lis
|
|
|
+ if self.noUpdateFiles:
|
|
|
+ return
|
|
|
+ for filename in self.usecountHeaderName:
|
|
|
+ logger.debug(f"Updating file {filename} with usecounts.")
|
|
|
+ sheet_dict = self.openedSheetsDataFrame[filename]
|
|
|
+ ex = pd.ExcelFile(filename)
|
|
|
+ for sheet in ex.sheet_names:
|
|
|
+ if sheet in sheet_dict:
|
|
|
+ continue
|
|
|
+ df = self.get_str_sheet(ex, sheet)
|
|
|
+ sheet_dict[sheet] = df
|
|
|
+ with pd.ExcelWriter(filename) as writer:
|
|
|
+ for sheetname in sheet_dict:
|
|
|
+ sheet_dict[sheetname].to_excel(writer, sheetname, index=False)
|
|
|
+ writer.save()
|
|
|
+ logger.debug(f"File updated {filename}.")
|
|
|
+
|
|
|
+ def update_usecount_in_source(self, data):
|
|
|
+ """
|
|
|
+ Updates usecount in the dataframe in correct position. These function only updates not save the file.
|
|
|
+ :param data:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if self.noUpdateFiles:
|
|
|
+ return
|
|
|
+ filename = self.usecountDataRecords[repr(data)]["file_name"]
|
|
|
+ sheetName = self.usecountDataRecords[repr(data)]["sheet_name"]
|
|
|
+ if not filename:
|
|
|
+ filename = self.fileNameAndPath
|
|
|
+ if filename not in self.usecountHeaderName:
|
|
|
+ return
|
|
|
+ elif sheetName not in self.usecountHeaderName[filename]:
|
|
|
+ return
|
|
|
+ if not self.usecountHeaderName[filename][sheetName]:
|
|
|
+ return
|
|
|
+ self.openedSheetsDataFrame[filename][sheetName][self.usecountHeaderName[filename][sheetName]][
|
|
|
+ self.usecountDataRecords[repr(data)]["index"]] = self.usecountDataRecords[repr(data)]["use"]
|
|
|
|
|
|
def __process_rrd_string(self, rrd_string):
|
|
|
"""
|
|
@@ -594,6 +634,35 @@ class TestDataGenerator:
|
|
|
assert match, err_string
|
|
|
return processed_string
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def get_env_variable(string):
|
|
|
+ """
|
|
|
+ Returns environment variable or the default value predefined.
|
|
|
+ :param string:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ variable = string[1:-1].strip().split(',')[0].strip()
|
|
|
+ data = os.environ.get(variable)
|
|
|
+ try:
|
|
|
+ if not data:
|
|
|
+ data = string[1:-1].strip().split(',')[1].strip()
|
|
|
+ logger.info(f"{variable} not found in environment, using {data} instead")
|
|
|
+ except:
|
|
|
+ raise BaseException(f"Can't find {variable} in envrionment & default value is also not set")
|
|
|
+ return data
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def product_dict(**kwargs):
|
|
|
+ """
|
|
|
+ :param kwargs: Dictionary containing values in a list
|
|
|
+ :return: yield dictionaries with individual value in dictionary from value list
|
|
|
+ """
|
|
|
+ keys = kwargs.keys()
|
|
|
+ vals = kwargs.values()
|
|
|
+ for instance in itertools.product(*vals):
|
|
|
+ yield dict(zip(keys, instance))
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
lTestDataGenerator = TestDataGenerator("../../tests/0TestInput/RawTestData.xlsx")
|
|
|
lTestDataGenerator.write()
|