__init__.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import sys
  2. import json
  3. import xlrd3 as xlrd
  4. from openpyxl import load_workbook
  5. from logging import getLogger
  6. logger = getLogger("pyC")
  7. class Writer:
  8. """
  9. This class is made to update existing excel file.
  10. First it will open the file in python and then we can do multiple writes and once everything is update we can use
  11. save method in order to save the updated excel file. Hence, this class is very useful is saving time while updating
  12. excel files.
  13. """
  14. def __init__(self, path, sheet):
  15. self.path = path
  16. self.sht = sheet
  17. self.workbook = load_workbook(self.path)
  18. self.sheet = self.workbook[self.sht]
  19. def write(self, row, col, data):
  20. # Update the values using row and col number.
  21. # Note :- We are using openpyxl so row & column index will start from 1 instead of 0
  22. self.sheet.cell(row, col).value = data
  23. def save(self):
  24. # Call this method to save the file once every updates are written
  25. self.workbook.save(self.path)
  26. self.workbook.close()
  27. class Mover:
  28. """
  29. This class is used to copy data from one excel file to another. We can also use some filters to copy only targeted
  30. data.
  31. """
  32. def __init__(self, source_file_path, source_sheet, destination_file_path, destination_sheet, lines=0):
  33. """
  34. :param source_file_path: Path with filename of source excel file
  35. :param source_sheet: Sheet name of the source
  36. :param destination_file_path: Path with filename of Destination excel file
  37. :param destination_sheet: Sheet name of the destination
  38. :param lines: Number of lines starting from 1 to be considered for moving from source.
  39. """
  40. self.source_file_path = source_file_path
  41. self.source_sheet = source_sheet
  42. self.destination_file_path = destination_file_path
  43. self.destination_sheet = destination_sheet
  44. self.lines = lines
  45. def move(self, filters={}, add_missing_columns=False):
  46. """
  47. :param filters: dictionary of filters with header as key and data as value or list of data as value
  48. :param add_missing_columns: True if we need all headers from source file to destination file which are currently
  49. not in destination file.
  50. :return:
  51. """
  52. try:
  53. source = self.read_xlsx(self.source_file_path, self.source_sheet)
  54. destination = self.read_xlsx(self.destination_file_path, self.destination_sheet)
  55. except FileNotFoundError as e:
  56. logger.critical(f"File not found: {e}")
  57. return
  58. destination_wb = Writer(self.destination_file_path, self.destination_sheet) # Writer class object used to update existing file
  59. if add_missing_columns:
  60. self.add_missing_columns(source, destination, destination_wb)
  61. # again opening destination file as it is updated with the source file columns
  62. destination = self.read_xlsx(self.destination_file_path, self.destination_sheet)
  63. end = self.lines or source.nrows # if number of rows to be considered from source is not predefined the take all
  64. new_data = [source.row(row) for row in range(1, end)] # create a new list of all data and remove the filtered data
  65. remove_data = [] # rows not matching filter are stored here which is used later to remove data from new_data
  66. for filter in filters: # iterate through the dictionary of filter and
  67. ind = [x.value for x in source.row(0)].index(filter) # getting index of filter header then use the same index to check data
  68. if type(filters[filter]) is not list:
  69. filters[filter] = [filters[filter]]
  70. for row in new_data:
  71. if row[ind].value not in filters[filter]: # check if data is matching with filter
  72. remove_data.append(row)
  73. for row in remove_data: # removing unmatched data from new_data list
  74. new_data.remove(row)
  75. row_num = destination.nrows # used to maintain new row number
  76. for data in new_data: # iterating through the data to be written and writing then on the correct cells
  77. row_num += 1
  78. for cell in range(len(data)):
  79. try:
  80. # getting column number where new data is to be written with the help of indexing header in destination file
  81. ind = [x.value for x in destination.row(0)].index(source.row(0)[cell].value) + 1
  82. except ValueError:
  83. # if add_missing_columns is false then ValueError is thrown for the headers which are not present in destination
  84. continue
  85. destination_wb.write(row_num, ind, data[cell].value)
  86. destination_wb.save()
  87. def read_xlsx(self, path, sheet):
  88. # reading xlsx file using xlrd
  89. wb = xlrd.open_workbook(path)
  90. sht = wb.sheet_by_name(sheet)
  91. return sht
  92. def add_missing_columns(self, source, destination, destination_wb):
  93. # looking for the headers which are not present in destination file and then updating destination file
  94. source_headers = [x.value for x in source.row(0)]
  95. destination_headers = [x.value for x in destination.row(0)]
  96. col = len(destination_headers)
  97. for headers in source_headers:
  98. if headers not in destination_headers:
  99. col += 1
  100. destination_wb.write(1, col, headers)
  101. destination_wb.save()
  102. def parse_json(path):
  103. """
  104. Takes parameter from a json file.
  105. :param path:
  106. :return:
  107. """
  108. js = json.load(open(path))
  109. lines = int(js["lines"]) or 0
  110. mover = Mover(js["source_file"], js["source_sheet"], js["destination_file"], js["destination_sheet"], lines)
  111. if js["add_missing_columns"].lower() == "true":
  112. add_missing = True
  113. else:
  114. add_missing = False
  115. mover.move(js["filters"], add_missing)
  116. if __name__ == "__main__":
  117. js = sys.argv[-1]
  118. if ".json" in js.lower():
  119. parse_json(js)