routes.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from StatisticServer import app, db
  2. from flask import request, jsonify, abort, make_response, Response
  3. from StatisticServer.models import TestRunStatistics, ActivityStatistics
  4. from sqlalchemy import exc
  5. from threading import Thread
  6. import json
  7. def update_activities(activity_headers):
  8. try:
  9. activity_stats = db.session.query(ActivityStatistics).all()
  10. except exc.OperationalError as ex:
  11. if "no such table:" in str(ex):
  12. db.session.rollback()
  13. db.create_all()
  14. activity_stats = db.session.query(ActivityStatistics).all()
  15. for activity_stat in activity_stats:
  16. activity_json = activity_stat.to_json()
  17. for key in activity_json:
  18. if key != "TestRunUUID":
  19. if key not in activity_headers:
  20. activity_headers.append(key)
  21. testrun_headers = [
  22. 'TestRunUUID', 'Browser', 'api', 'soap', 'web', 'TestCaseExecuted', 'TestCasePassed', 'TestCaseFailed',
  23. 'TestCasePaused', 'SequenceClass', 'TestCaseClass', 'TestStepMaster', 'TestStepSequences', 'TestSteps',
  24. 'TestCaseSequences', 'XPATH', 'CSS', 'ID', 'equalTo', 'notEqualTo', 'smallerThan', 'greaterThan', 'timeout',
  25. 'optional', 'release', 'Duration', 'ip', 'sendingTime'
  26. ]
  27. activity_headers = [] # To keep records of every new activity field so we don't have to manage it while creating csv
  28. t = Thread(target=update_activities, args=[activity_headers,])
  29. t.daemon = True
  30. t.start()
  31. @app.route('/', methods=['POST'])
  32. def main():
  33. try:
  34. dic = request.json
  35. if "TestRunUUID" not in dic:
  36. abort(400)
  37. if dic["TestRunUUID"] == "PYTEST":
  38. return make_response(jsonify(dic), 200)
  39. if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
  40. ip = request.environ['REMOTE_ADDR']
  41. else:
  42. ip = request.environ['HTTP_X_FORWARDED_FOR']
  43. process_data(dic, ip)
  44. except Exception as ex:
  45. print(ex)
  46. return abort(400)
  47. return jsonify(success=True)
  48. @app.route('/stats', methods=['GET'])
  49. def stats_download(): # Will convert database data in csv and response csv file to request
  50. global activity_headers, testrun_headers
  51. test_stats = db.session.query(TestRunStatistics).all()
  52. def generate_data():
  53. for _, test_stat in enumerate(test_stats):
  54. data = []
  55. activities = {}
  56. # Iter through every data in TestRunStatistics the get the data for same UUID from ActivityStatistics
  57. activity_stats = db.session.query(ActivityStatistics
  58. ).filter(ActivityStatistics.TestRunUUID == test_stat.TestRunUUID).all()
  59. test_json = test_stat.to_json()
  60. for key in testrun_headers:
  61. data.append(f'"{str(test_json[key])}"')
  62. for activity_stat in activity_stats:
  63. activity_json = activity_stat.to_json()
  64. for key in activity_json:
  65. if key == "TestRunUUID":
  66. continue
  67. activities[key] = activity_json[key]
  68. for key in activity_headers:
  69. if key not in activities:
  70. data.append(str(0))
  71. else:
  72. data.append(f'"{str(activities[key])}"')
  73. if _ == 0:
  74. headers = []
  75. for key in testrun_headers:
  76. headers.append(key)
  77. for key in activity_headers:
  78. headers.append(key)
  79. yield ','.join(headers)+'\n'
  80. yield ','.join(data)+'\n'
  81. resp = Response(
  82. generate_data(),
  83. mimetype="text/csv",
  84. headers={"Content-disposition":
  85. "attachment; filename=statistics_database.csv"})
  86. return resp
  87. def process_data(data, ip):
  88. global activity_headers
  89. dict = {}
  90. for key in data:
  91. if "Activity_" in key:
  92. AS = ActivityStatistics()
  93. Activity_dict = {}
  94. Activity_dict["TestRunUUID"] = data["TestRunUUID"]
  95. Activity_dict["Key"] = key
  96. Activity_dict["Value"] = data[key]
  97. AS.update(Activity_dict)
  98. db.session.add(AS)
  99. if key not in activity_headers:
  100. activity_headers.append(key)
  101. elif "TestStepMaster" == key:
  102. dict[key] = json.dumps(data[key])
  103. else:
  104. dict[key] = data[key]
  105. dict["ip"] = ip
  106. TRS = TestRunStatistics()
  107. TRS.update(dict)
  108. db.session.add(TRS)
  109. try:
  110. db.session.commit()
  111. except exc.OperationalError as ex:
  112. if "no such table:" in str(ex):
  113. db.session.rollback()
  114. db.create_all()
  115. process_data(data, ip)