123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import sys
- import json
- import xlrd3 as xlrd
- from openpyxl import load_workbook
- class Writer:
- """
- This class is made to update existing excel file.
- First it will open the file in python and then we can do multiple writes and once everything is update we can use
- save method in order to save the updated excel file. Hence, this class is very useful is saving time while updating
- excel files.
- """
- def __init__(self, path, sheet):
- self.path = path
- self.sht = sheet
- self.workbook = load_workbook(self.path)
- self.sheet = self.workbook[self.sht]
- def write(self, row, col, data):
- # Update the values using row and col number.
- # Note :- We are using openpyxl so row & column index will start from 1 instead of 0
- self.sheet.cell(row, col).value = data
- def save(self):
- # Call this method to save the file once every updates are written
- self.workbook.save(self.path)
- self.workbook.close()
- class Mover:
- """
- This class is used to copy data from one excel file to another. We can also use some filters to copy only targeted
- data.
- """
- def __init__(self, source_file_path, source_sheet, destination_file_path, destination_sheet, lines=0):
- """
- :param source_file_path: Path with filename of source excel file
- :param source_sheet: Sheet name of the source
- :param destination_file_path: Path with filename of Destination excel file
- :param destination_sheet: Sheet name of the destination
- :param lines: Number of lines starting from 1 to be considered for moving from source.
- """
- self.source_file_path = source_file_path
- self.source_sheet = source_sheet
- self.destination_file_path = destination_file_path
- self.destination_sheet = destination_sheet
- self.lines = lines
- def move(self, filters={}, add_missing_columns=False):
- """
- :param filters: dictionary of filters with header as key and data as value or list of data as value
- :param add_missing_columns: True if we need all headers from source file to destination file which are currently
- not in destination file.
- :return:
- """
- source = self.read_xlsx(self.source_file_path, self.source_sheet)
- destination = self.read_xlsx(self.destination_file_path, self.destination_sheet)
- destination_wb = Writer(self.destination_file_path, self.destination_sheet) # Writer class object used to update existing file
- if add_missing_columns:
- self.add_missing_columns(source, destination, destination_wb)
- # again opening destination file as it is updated with the source file columns
- destination = self.read_xlsx(self.destination_file_path, self.destination_sheet)
- end = self.lines or source.nrows # if number of rows to be considered from source is not predefined the take all
- new_data = [source.row(row) for row in range(1, end)] # create a new list of all data and remove the filtered data
- remove_data = [] # rows not matching filter are stored here which is used later to remove data from new_data
- for filter in filters: # iterate through the dictionary of filter and
- ind = [x.value for x in source.row(0)].index(filter) # getting index of filter header then use the same index to check data
- if type(filters[filter]) is not list:
- filters[filter] = [filters[filter]]
- for row in new_data:
- if row[ind].value not in filters[filter]: # check if data is matching with filter
- remove_data.append(row)
- for row in remove_data: # removing unmatched data from new_data list
- new_data.remove(row)
- row_num = destination.nrows # used to maintain new row number
- for data in new_data: # iterating through the data to be written and writing then on the correct cells
- row_num += 1
- for cell in range(len(data)):
- try:
- # getting column number where new data is to be written with the help of indexing header in destination file
- ind = [x.value for x in destination.row(0)].index(source.row(0)[cell].value) + 1
- except ValueError:
- # if add_missing_columns is false then ValueError is thrown for the headers which are not present in destination
- continue
- destination_wb.write(row_num, ind, data[cell].value)
- destination_wb.save()
- def read_xlsx(self, path, sheet):
- # reading xlsx file using xlrd
- wb = xlrd.open_workbook(path)
- sht = wb.sheet_by_name(sheet)
- return sht
- def add_missing_columns(self, source, destination, destination_wb):
- # looking for the headers which are not present in destination file and then updating destination file
- source_headers = [x.value for x in source.row(0)]
- destination_headers = [x.value for x in destination.row(0)]
- col = len(destination_headers)
- for headers in source_headers:
- if headers not in destination_headers:
- col += 1
- destination_wb.write(1, col, headers)
- destination_wb.save()
- def parse_json(path):
- """
- Takes parameter from a json file.
- :param path:
- :return:
- """
- js = json.load(open(path))
- lines = int(js["lines"]) or 0
- mover = Mover(js["source_file"], js["source_sheet"], js["destination_file"], js["destination_sheet"], lines)
- if js["add_missing_columns"].lower() == "true":
- add_missing = True
- else:
- add_missing = False
- mover.move(js["filters"], add_missing)
- if __name__ == "__main__":
- js = sys.argv[-1]
- if ".json" in js.lower():
- parse_json(js)
|