__init__.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import sys
  2. import json
  3. import xlrd3 as xlrd
  4. from openpyxl import load_workbook
  5. from logging import getLogger
  6. logger = getLogger("pyC")
  7. class Writer:
  8. """
  9. This class is made to update existing excel file.
  10. First it will open the file in python and then we can do multiple writes and once everything is update we can use
  11. save method in order to save the updated excel file. Hence, this class is very useful is saving time while updating
  12. excel files.
  13. """
  14. def __init__(self, path, sheet):
  15. self.path = path
  16. self.sht = sheet
  17. self.workbook = load_workbook(self.path)
  18. self.sheet = self.workbook[self.sht]
  19. def write(self, row, col, data):
  20. # Update the values using row and col number.
  21. # Note :- We are using openpyxl so row & column index will start from 1 instead of 0
  22. self.sheet.cell(row, col).value = data
  23. def save(self):
  24. # Call this method to save the file once every updates are written
  25. self.workbook.save(self.path)
  26. self.workbook.close()
  27. class Mover:
  28. """
  29. This class is used to copy data from one excel file to another. We can also use some filters to copy only targeted
  30. data.
  31. """
  32. def __init__(self, source_file_path, source_sheet, destination_file_path, destination_sheet, lines=0):
  33. """
  34. :param source_file_path: Path with filename of source excel file
  35. :param source_sheet: Sheet name of the source
  36. :param destination_file_path: Path with filename of Destination excel file
  37. :param destination_sheet: Sheet name of the destination
  38. :param lines: Number of lines starting from 1 to be considered for moving from source.
  39. """
  40. self.source_file_path = source_file_path
  41. if not isinstance(self.source_file_path, list):
  42. self.source_file_path = [self.source_file_path]
  43. self.source_sheet = source_sheet
  44. self.destination_file_path = destination_file_path
  45. self.destination_sheet = destination_sheet
  46. self.lines = self.process_lines(lines)
  47. def move(self, filters={}, add_missing_columns=False):
  48. """
  49. :param filters: dictionary of filters with header as key and data as value or list of data as value
  50. :param add_missing_columns: True if we need all headers from source file to destination file which are currently
  51. not in destination file.
  52. :return:
  53. """
  54. for source_file_path in self.source_file_path:
  55. try:
  56. source = self.read_xlsx(source_file_path, self.source_sheet)
  57. destination = self.read_xlsx(self.destination_file_path, self.destination_sheet)
  58. except FileNotFoundError as e:
  59. logger.critical(f"File not found: {e}")
  60. return
  61. destination_wb = Writer(self.destination_file_path, self.destination_sheet) # Writer class object used to update existing file
  62. if add_missing_columns:
  63. self.add_missing_columns(source, destination, destination_wb)
  64. # again opening destination file as it is updated with the source file columns
  65. destination = self.read_xlsx(self.destination_file_path, self.destination_sheet)
  66. end = self.lines # if number of rows to be considered from source is not predefined the take all
  67. if not end:
  68. for i in range(source.nrows):
  69. end.append(i)
  70. new_data = [source.row(row) for row in end] # create a new list of all data and remove the filtered data
  71. remove_data = [] # rows not matching filter are stored here which is used later to remove data from new_data
  72. for filter in filters: # iterate through the dictionary of filter and
  73. ind = [x.value for x in source.row(0)].index(filter) # getting index of filter header then use the same index to check data
  74. if type(filters[filter]) is not list:
  75. filters[filter] = [filters[filter]]
  76. for row in new_data:
  77. if row[ind].value not in filters[filter]: # check if data is matching with filter
  78. remove_data.append(row)
  79. for row in remove_data: # removing unmatched data from new_data list
  80. try:
  81. new_data.remove(row)
  82. except ValueError:
  83. pass
  84. row_num = destination.nrows # used to maintain new row number
  85. for data in new_data: # iterating through the data to be written and writing then on the correct cells
  86. row_num += 1
  87. for cell in range(len(data)):
  88. try:
  89. # getting column number where new data is to be written with the help of indexing header in destination file
  90. ind = [x.value for x in destination.row(0)].index(source.row(0)[cell].value) + 1
  91. except ValueError:
  92. # if add_missing_columns is false then ValueError is thrown for the headers which are not present in destination
  93. continue
  94. destination_wb.write(row_num, ind, data[cell].value)
  95. destination_wb.save()
  96. def read_xlsx(self, path, sheet):
  97. # reading xlsx file using xlrd
  98. wb = xlrd.open_workbook(path)
  99. sht = wb.sheet_by_name(sheet)
  100. return sht
  101. def add_missing_columns(self, source, destination, destination_wb):
  102. # looking for the headers which are not present in destination file and then updating destination file
  103. source_headers = [x.value for x in source.row(0)]
  104. destination_headers = [x.value for x in destination.row(0)]
  105. col = len(destination_headers)
  106. for headers in source_headers:
  107. if headers not in destination_headers:
  108. col += 1
  109. destination_wb.write(1, col, headers)
  110. destination_wb.save()
  111. def process_lines(self, lines):
  112. line_lis = []
  113. lis = [x.strip() for x in lines.strip().split(',')]
  114. while "" in lis:
  115. lis.remove("")
  116. for l in lis:
  117. if "-" in l:
  118. start = int(l.split("-")[0].strip())
  119. end = int(l.split("-")[1].strip())
  120. for i in range(int(start), int(end)+1):
  121. if i not in line_lis:
  122. line_lis.append(i)
  123. elif l.strip().isnumeric():
  124. i = int(l.strip())
  125. if i not in line_lis:
  126. line_lis.append(i)
  127. else:
  128. print(l)
  129. raise BaseException('Lines structure is wrong. For multiple values "," is used & for range "-" is used'\
  130. 'please verify it again!')
  131. return line_lis
  132. def parse_json(path):
  133. """
  134. Takes parameter from a json file.
  135. :param path:
  136. :return:
  137. """
  138. js = json.load(open(path))
  139. lines = int(js["lines"]) or 0
  140. mover = Mover(js["source_file"], js["source_sheet"], js["destination_file"], js["destination_sheet"], lines)
  141. if js["add_missing_columns"].lower() == "true":
  142. add_missing = True
  143. else:
  144. add_missing = False
  145. mover.move(js["filters"], add_missing)
  146. if __name__ == "__main__":
  147. js = sys.argv[-1]
  148. if ".json" in js.lower():
  149. parse_json(js)