ChangeLog.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. from openpyxl import load_workbook
  2. from datetime import datetime
  3. from copy import copy
  4. class ChangeLog:
  5. def __init__(
  6. self, source_file, clone_file, log_sheet_name="Change Logs", ignore_headers=[], ignore_sheets=[],
  7. case_sensitive_ignore=False
  8. ):
  9. """
  10. :param source_file: Path of excel file.
  11. :param clone_file: Path of clone version of source file.
  12. :param log_sheet_name: (optional) (default="Change Logs") Name of sheet containing change logs
  13. :param ignore_headers: (optional) list of headers to be ignored for change log
  14. :param ignore_sheets: (optional) list of sheet names to be ignored for change log
  15. :param case_sensitive_ignore: (default=False) If True, ignore sheets & headers list's data will be matched with same case.
  16. """
  17. self.source_file = source_file
  18. self.clone_file = clone_file
  19. self.log_sheet_name = log_sheet_name
  20. self.ignore_headers = ignore_headers
  21. self.ignore_sheets = ignore_sheets
  22. self.ignore_sheets.append(self.log_sheet_name)
  23. self.case_senstive_ignore = case_sensitive_ignore
  24. self.update_ignore_list()
  25. self.headers_column = {}
  26. def update_ignore_list(self):
  27. # Making data inside igonre lists in lower only if case_sensitive_ignore is false, so that it will be later
  28. # used at the time of comparision
  29. if self.case_senstive_ignore:
  30. return None
  31. if self.ignore_headers:
  32. self.ignore_headers = [header.lower() for header in self.ignore_headers]
  33. if self.ignore_sheets:
  34. self.ignore_sheets = [sheet.lower() for sheet in self.ignore_sheets]
  35. def xlsxChangeLog(self):
  36. """
  37. Method to trigger all functionalities including checking changes and updating clone file
  38. :return:
  39. """
  40. # Main method which will do all things including check and updating logs
  41. source = load_workbook(self.source_file)
  42. clone = load_workbook(self.clone_file)
  43. source_sheets = source.get_sheet_names()
  44. clone_sheets = clone.get_sheet_names()
  45. self.check_log_sheet(clone_sheets, clone)
  46. sheets = self.get_sheets(source_sheets, clone_sheets)
  47. logs = self.get_change_logs(sheets, source, clone)
  48. if logs: # if their is no update in the non ignored data than file won't be updated
  49. self.update_change_logs(logs, clone)
  50. self.update_clone_file(source, clone, source_sheets)
  51. clone.save(self.clone_file)
  52. clone.close()
  53. source.close()
  54. def check_log_sheet(self, clone_sheets, clone):
  55. # Checks if log worksheet is inside clone file or not, if not then it will create one
  56. if self.log_sheet_name not in clone_sheets:
  57. sht = clone.create_sheet(self.log_sheet_name)
  58. headers = ["Date & Time", "Sheet Name", "Header Name", "Row Number", "Old Value", "New Value"]
  59. for i in range(1, len(headers)+1): # looping through all headers and inserting them on correct position
  60. sht.cell(1, i).value = headers[i-1]
  61. def get_sheets(self, source_sheets, clone_sheets):
  62. # method is used to get sheets which are not inside ignore list, also deals with case_sensitive functionality
  63. sheets = []
  64. for sheet in source_sheets: # looping through sheets in source file
  65. if sheet in clone_sheets: # if sheet is not already present in clone file than change log is skipped
  66. if self.ignore_sheets: # if ignore_sheets doesn't have any values than sheet name is directly added in list
  67. if self.case_senstive_ignore: # if the name is case_sensitive then check for exact value
  68. if sheet not in self.ignore_sheets:
  69. sheets.append(sheet)
  70. elif sheet.lower() not in self.ignore_sheets: # if checking is not case sensitive than checking value
  71. sheets.append(sheet) # in lower as ignore list is also converted to lower
  72. else:
  73. sheets.append(sheet)
  74. return sheets
  75. def get_change_logs(self, sheets, source, clone):
  76. # method returns a list of change log with all data in correct column position
  77. logs = []
  78. for sheet in sheets:
  79. source_sht = source.get_sheet_by_name(sheet)
  80. clone_sht = clone.get_sheet_by_name(sheet)
  81. headers, new_headers, removed_headers = self.get_headers(source_sht, clone_sht)
  82. for header in new_headers:
  83. # adding new headers in change log list with appropriate values
  84. logs.append([
  85. datetime.now().strftime("%d-%m-%Y %H:%M"),
  86. source_sht.title,
  87. header,
  88. 1,
  89. "-",
  90. "(new header)"
  91. ])
  92. for header in removed_headers:
  93. # adding removed headers in change log list with appropriate values
  94. logs.append([
  95. datetime.now().strftime("%d-%m-%Y %H:%M"),
  96. source_sht.title,
  97. header,
  98. 1,
  99. "(removed header)",
  100. "-"
  101. ])
  102. for header in headers:
  103. # looping through the headers which are to be considered in change log
  104. if header in new_headers or header in removed_headers:
  105. # skipping header which are newly_add or removed so that change log will only have entry of header
  106. # else it will be filled with each row data for the changed header, which might make file very huge
  107. continue
  108. logs += self.find_changes(header, source_sht, clone_sht)
  109. return logs
  110. def update_change_logs(self, logs, clone):
  111. # Updates Chane Log sheet in log file
  112. sht = clone.get_sheet_by_name(self.log_sheet_name)
  113. for log in logs: # looping through logs list which contains a lists of log with data inside in perfect order
  114. row = 1 + sht.max_row # max row + 1 so the row is always added in the end of worksheet
  115. for j in range(1, len(log)+1): # looping through index of log data, which is also used as column number
  116. sht.cell(row, j).value = log[j-1]
  117. def update_clone_file(self, source, clone, source_sheets):
  118. # updating clone file by looping through all sheets of source_file so that the only other data in clone file is
  119. # change log sheet, this updating contains each and every data including the headers and sheet which are ignored
  120. # for change log. If the sheet already present in clone file then we delete it and create a new one with exact
  121. # data from source
  122. for sheet in source_sheets:
  123. if sheet in clone.sheetnames: # checking if the sheet exists in clone file, if it does than we first take its index than delete it
  124. clone_sht = clone.get_sheet_by_name(sheet)
  125. ind = clone.index(clone_sht)
  126. ignored_headers_data = self.get_ignored_data_from_clone(clone, sheet) # ignored data from clone file
  127. clone.remove(clone_sht)
  128. else: # if clone file doesn't has that sheet than make the index containing variable to "False"
  129. ind = "False" # A string is taken instead of any empty value because the original index can contain 0
  130. # which can cause conflict
  131. if ind != "False": # if we have an index value than make a new sheet in that position
  132. clone.create_sheet(sheet, ind)
  133. else: # else we will add the new sheet in second last position behind Change Log sheet
  134. clone.create_sheet(sheet, len(clone.sheetnames)-2)
  135. source_sht = source.get_sheet_by_name(sheet)
  136. clone_sht = clone.get_sheet_by_name(sheet)
  137. skipped = 0
  138. for row in range(1, source_sht.max_row+1): # looping through every row from source
  139. for column in range(1, source_sht.max_column+1): # looping through every column from source
  140. if sheet in self.headers_column:
  141. if column-1 in self.headers_column[sheet]:
  142. if row == 1: # if first row then write header
  143. clone_sht.cell(row, column).value = self.headers_column[sheet][column-1]
  144. elif row > 1: # write data from the ignore data list, column-1 coz the header was index from 0 starting point
  145. clone_sht.cell(row, column).value = ignored_headers_data[
  146. self.headers_column[sheet][column-1]][row-2] # row-2 because here it starts from 2 and list starts from 0
  147. continue
  148. clone_sht.cell(row, column).value = source_sht.cell(row, column-skipped).value #writing everything in clone
  149. if source_sht.cell(row, column-skipped).has_style: # if source cell has style than copy it to clone
  150. clone_sht.cell(row, column-skipped)._style = copy(source_sht.cell(row, column-skipped)._style)
  151. for idx, rd in source_sht.row_dimensions.items(): # copying width and height of rows and columns
  152. clone_sht.row_dimensions[idx] = copy(rd)
  153. def find_changes(self, header, source_sht, clone_sht):
  154. # finding changes for a column(header) from source to clone
  155. changes = []
  156. # indexing header position of source and clone, incase if position of columns are shifted without changing data
  157. # than also our change log will consider it as change as the cell value has changed, so we are looking for
  158. # changes with header index so that less useful data is not stored in change log
  159. source_column = [cell.value for cell in source_sht[1]].index(header) + 1
  160. clone_column = [cell.value for cell in clone_sht[1]].index(header) + 1
  161. # getting max row number of source and clone and using the greater one to loop in all the files so that each
  162. # and every data is processed inside loop
  163. if source_sht.max_row > clone_sht.max_row:
  164. nrow = source_sht.max_row + 1
  165. else:
  166. nrow = clone_sht.max_row + 1
  167. for i in range(2, nrow): # we have already processed headers so we have skipped it and started loop from 2
  168. source_value = source_sht.cell(i, source_column).value
  169. clone_value = clone_sht.cell(i, clone_column).value
  170. if clone_value != source_value:
  171. # if clone and source value didn't matched then creating a list with appropriate data in correct position
  172. changes.append([
  173. datetime.now().strftime("%d-%m-%Y %H:%M"),
  174. source_sht.title,
  175. header,
  176. i,
  177. clone_value,
  178. source_value
  179. ])
  180. return changes
  181. def get_headers(self, source_sht, clone_sht):
  182. # return a list of headers which are to be considered for change log, along with it return 2 others list with
  183. # new headers and removed headers so the change log contain directly entry of changes headers instead of whole
  184. # columns. Logic is same like get_sheets method
  185. source_headers = [cell.value for cell in source_sht[1]]
  186. clone_headers = [cell.value for cell in clone_sht[1]]
  187. self.headers_column[source_sht.title] = {}
  188. headers = []
  189. temp_headers = []
  190. new_headers = []
  191. removed_headers = []
  192. for header in source_headers:
  193. if self.ignore_headers:
  194. if self.case_senstive_ignore:
  195. if header not in self.ignore_headers:
  196. temp_headers.append(header)
  197. else:
  198. self.headers_column[source_sht.title][source_headers.index(header)] = header
  199. elif header.lower() not in self.ignore_headers:
  200. temp_headers.append(header)
  201. else:
  202. self.headers_column[source_sht.title][source_headers.index(header)] = header
  203. else:
  204. temp_headers.append(header)
  205. for header in temp_headers:
  206. if header not in clone_headers:
  207. new_headers.append(header)
  208. headers.append(header)
  209. for header in clone_headers:
  210. if self.ignore_headers:
  211. if self.case_senstive_ignore:
  212. if header not in self.ignore_headers:
  213. if header not in source_headers:
  214. removed_headers.append(header)
  215. elif header.lower() not in self.ignore_headers:
  216. if header not in source_headers:
  217. removed_headers.append(header)
  218. else:
  219. if header not in source_headers:
  220. removed_headers.append(header)
  221. return headers, new_headers, removed_headers
  222. def get_ignored_data_from_clone(self, clone, sheet):
  223. data = {}
  224. sht = clone.get_sheet_by_name(sheet)
  225. headers = [cell.value for cell in sht[1]]
  226. if not sheet in self.headers_column:
  227. return data
  228. for header in self.headers_column[sheet]:
  229. hd = self.headers_column[sheet][header] # getting header from class list storing index of ignored headers
  230. data[hd] = []
  231. for i in range(2, sht.max_row+1):
  232. data[hd].append(sht.cell(i, headers.index(hd)+1).value)
  233. return data