|
@@ -0,0 +1,605 @@
|
|
|
+
|
|
|
+from flask import render_template, redirect, flash, request, url_for, send_from_directory, jsonify
|
|
|
+from flask_login import login_required, current_user, login_user, logout_user
|
|
|
+from flask.logging import default_handler
|
|
|
+from app import app, db, models, forms, utils
|
|
|
+from datetime import datetime
|
|
|
+import requests
|
|
|
+import json
|
|
|
+import uuid
|
|
|
+import os
|
|
|
+
|
|
|
+# handle favicon requests
|
|
|
+@app.route('/favicon.ico')
|
|
|
+def favicon():
|
|
|
+ return send_from_directory('static/media', 'favicon.ico', mimetype='image/vnd.microsoft.icon')
|
|
|
+
|
|
|
+@app.route('/')
|
|
|
+@login_required
|
|
|
+def index():
|
|
|
+ return render_template('testrun/index.html', items=utils.getItemCategories())
|
|
|
+
|
|
|
+@app.route('/<string:item_type>')
|
|
|
+@login_required
|
|
|
+def item_list(item_type):
|
|
|
+ # placeholder for import form
|
|
|
+ form = None
|
|
|
+ # get item list by type
|
|
|
+ if item_type == 'testrun':
|
|
|
+ items = models.Testrun.query.all()
|
|
|
+ # build form for importing a testrun
|
|
|
+ form = forms.TestrunImportForm()
|
|
|
+
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ items = models.TestCaseSequence.query.all()
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ items = models.TestCase.query.all()
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ items = models.TestStepSequence.query.all()
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ items = models.TestStepExecution.query.all()
|
|
|
+ else:
|
|
|
+ app.logger.warning(f'Item type "{item_type}" does not exist. Requested by "{current_user}".')
|
|
|
+ flash(f'Item type "{item_type}" does not exist.', 'warning')
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ return render_template('testrun/item_list.html', type=item_type, items=items, form=form)
|
|
|
+
|
|
|
+
|
|
|
+#
|
|
|
+# TODO:
|
|
|
+# check for outdating
|
|
|
+#
|
|
|
+@app.route('/<string:item_type>/<int:item_id>', methods=['GET', 'POST'])
|
|
|
+@login_required
|
|
|
+def get_item(item_type, item_id):
|
|
|
+ # get item by type and id
|
|
|
+ if item_type == 'testrun':
|
|
|
+ item = models.Testrun.query.get(item_id)
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ item = models.TestCaseSequence.query.get(item_id)
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ item = models.TestCase.query.get(item_id)
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ item = models.TestStepSequence.query.get(item_id)
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ item = models.TestStepExecution.query.get(item_id)
|
|
|
+ else:
|
|
|
+ app.logger.warning(f'Item type "{item_type}" does not exist. Requested by "{current_user}".')
|
|
|
+ flash(f'Item type "{item_type}" does not exist.', 'warning')
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ return render_template('testrun/item.html', type=item_type, item=item)
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/<string:item_type>/<int:item_id>/delete', methods=['POST', 'DELETE'])
|
|
|
+@login_required
|
|
|
+def delete_item(item_type, item_id):
|
|
|
+ #
|
|
|
+ # delete item
|
|
|
+ #
|
|
|
+
|
|
|
+ # cascade delete
|
|
|
+ if request.method == 'POST':
|
|
|
+ app.logger.info(f'Cascade deletion of {utils.getItemType(item_type)} ID {item_id} requested by "{current_user}".')
|
|
|
+ try:
|
|
|
+ utils.deleteCascade(item_type, item_id)
|
|
|
+ flash(f'Item {utils.getItemType(item_type)} ID #{item_id} and its children have been successfully deleted.', 'success')
|
|
|
+ app.logger.info(
|
|
|
+ f'Item {utils.getItemType(item_type)} ID #{item_id} and its children have been successfully deleted by "{current_user}".'
|
|
|
+ )
|
|
|
+ return jsonify({'success': 'OK'}), 200
|
|
|
+ except Exception as error:
|
|
|
+ app.logger.error(f'Failed to cascade delete {utils.getItemType(item_type)} ID {item_id} requested by "{current_user}": {error}')
|
|
|
+ return jsonify({'error': 'Bad Request'}), 400
|
|
|
+
|
|
|
+ # delete single item
|
|
|
+ elif request.method == 'DELETE':
|
|
|
+ # get item by type and id
|
|
|
+ app.logger.info(f'Deletion of {utils.getItemType(item_type)} ID {item_id} requested by "{current_user}".')
|
|
|
+ if item_type == 'testrun':
|
|
|
+ item = models.Testrun.query.get(item_id)
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ item = models.TestCaseSequence.query.get(item_id)
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ item = models.TestCase.query.get(item_id)
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ item = models.TestStepSequence.query.get(item_id)
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ item = models.TestStepExecution.query.get(item_id)
|
|
|
+ else:
|
|
|
+ app.logger.warning(f'Item type "{item_type}" does not exist. Requested by "{current_user}".')
|
|
|
+ flash(f'Item type "{item_type}" does not exist.', 'warning')
|
|
|
+ #return redirect(url_for('index'))
|
|
|
+ return jsonify({'error': 'Bad Request'}), 400
|
|
|
+
|
|
|
+ db.session.delete(item)
|
|
|
+ db.session.commit()
|
|
|
+ app.logger.info(f'Item {utils.getItemType(item_type)} ID {item_id} successfully deleted by {current_user}.')
|
|
|
+ flash(f'Item "{item.name}" has been successfully deleted.', 'success')
|
|
|
+ #return redirect(url_for('item_list', item_type=item_type))
|
|
|
+ return jsonify({'success': 'OK'}), 200
|
|
|
+
|
|
|
+ return jsonify({'error': 'Method Not Allowed'}), 405
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/<string:item_type>/<int:item_id>/edit', methods=['GET', 'POST'])
|
|
|
+@login_required
|
|
|
+def edit_item(item_type, item_id):
|
|
|
+ #
|
|
|
+ # edit item
|
|
|
+ #
|
|
|
+ if item_type == 'testrun':
|
|
|
+ item = models.Testrun.query.get(item_id)
|
|
|
+ form = forms.TestrunCreateForm.new()
|
|
|
+ if request.method == 'GET':
|
|
|
+ form.testcase_sequences.data = [f'{x.id}' for x in item.testcase_sequences]
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ item = models.TestCaseSequence.query.get(item_id)
|
|
|
+ form = forms.TestCaseSequenceCreateForm.new()
|
|
|
+ if request.method == 'GET':
|
|
|
+ form.classname.data = f'{item.classname.id}'
|
|
|
+ form.datafiles.data = [f'{x.id}' for x in item.datafiles]
|
|
|
+ form.testcases.data = [f'{x.id}' for x in item.testcases]
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ item = models.TestCase.query.get(item_id)
|
|
|
+ form = forms.TestCaseCreateForm.new()
|
|
|
+ if request.method == 'GET':
|
|
|
+ form.classname.data = f'{item.classname.id}'
|
|
|
+ form.browser_type.data = f'{item.browser_type.id}'
|
|
|
+ form.testcase_type.data = f'{item.testcase_type.id}'
|
|
|
+ form.testcase_stepsequences.data = [f'{x.id}' for x in item.teststep_sequences]
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ item = models.TestStepSequence.query.get(item_id)
|
|
|
+ form = forms.TestStepSequenceCreateForm.new()
|
|
|
+ if request.method == 'GET':
|
|
|
+ form.classname.data = f'{item.classname.id}'
|
|
|
+ form.teststeps.data =[f'{x.id}' for x in item.teststeps]
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ item = models.TestStepExecution.query.get(item_id)
|
|
|
+ form = forms.TestStepCreateForm.new()
|
|
|
+ if request.method == 'GET':
|
|
|
+ form.activity_type.data = f'{item.activity_type.id}'
|
|
|
+ form.locator_type.data = f'{item.locator_type.id}'
|
|
|
+ # model extension
|
|
|
+ form.locator.data = item.locator or ''
|
|
|
+ if item.optional:
|
|
|
+ form.optional.data = '2'
|
|
|
+ else:
|
|
|
+ form.optional.data = '1'
|
|
|
+ if item.timeout:
|
|
|
+ form.timeout.data = f'{item.timeout}'
|
|
|
+ else:
|
|
|
+ form.timeout.data = ''
|
|
|
+ form.release.data = item.release or ''
|
|
|
+ form.value.data = item.value or ''
|
|
|
+ form.value2.data = item.value2 or ''
|
|
|
+ form.comparison.data = utils.getComparisonId(item.comparison)
|
|
|
+
|
|
|
+ else:
|
|
|
+ app.logger.warning(f'Item type "{item_type}" does not exist. Requested by "{current_user}".')
|
|
|
+ flash(f'Item type "{item_type}" does not exist.', 'warning')
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ if request.method == 'GET':
|
|
|
+ form.name.data = item.name
|
|
|
+ form.description.data = item.description
|
|
|
+
|
|
|
+ if form.validate_on_submit():
|
|
|
+ # update item data
|
|
|
+ item.name = form.name.data
|
|
|
+ item.description = form.description.data
|
|
|
+ # testrun
|
|
|
+ if item_type == 'testrun':
|
|
|
+ item.editor = current_user
|
|
|
+ item.edited = datetime.utcnow()
|
|
|
+ item.testcase_sequences=[models.TestCaseSequence.query.get(int(x)) for x in form.testcase_sequences.data]
|
|
|
+ # testcase sequence
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ item.editor = current_user
|
|
|
+ item.edited = datetime.utcnow()
|
|
|
+ item.classname = models.ClassName.query.get(int(form.classname.data))
|
|
|
+ item.datafiles = [models.DataFile.query.get(int(x)) for x in form.datafiles.data]
|
|
|
+ item.testcases = [models.TestCase.query.get(int(x)) for x in form.testcases.data]
|
|
|
+ # testcase
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ item.editor = current_user
|
|
|
+ item.edited = datetime.utcnow()
|
|
|
+ item.classname = models.ClassName.query.get(int(form.classname.data))
|
|
|
+ item.browser_type = models.BrowserType.query.get(int(form.browser_type.data))
|
|
|
+ item.testcase_type = models.TestCaseType.query.get(int(form.testcase_type.data))
|
|
|
+ item.teststep_sequences = [models.TestStepSequence.query.get(int(x)) for x in form.testcase_stepsequences.data]
|
|
|
+ # step sequence
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ item.editor = current_user
|
|
|
+ item.edited = datetime.utcnow()
|
|
|
+ item.classname = models.ClassName.query.get(int(form.classname.data))
|
|
|
+ item.teststeps = [models.TestStepExecution.query.get(int(x)) for x in form.teststeps.data]
|
|
|
+ # test step
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ item.editor = current_user
|
|
|
+ item.edited = datetime.utcnow()
|
|
|
+ item.activity_type = models.ActivityType.query.get(int(form.activity_type.data))
|
|
|
+ item.locator_type = models.LocatorType.query.get(int(form.locator_type.data))
|
|
|
+ # model extension
|
|
|
+ item.locator = form.locator.data
|
|
|
+ item.optional = [None, False, True][int(form.optional.data)]
|
|
|
+ try:
|
|
|
+ item.timeout = float(form.timeout.data)
|
|
|
+ except ValueError:
|
|
|
+ item.timeout = None
|
|
|
+ item.release = form.release.data or None
|
|
|
+ item.value = form.value.data or None
|
|
|
+ item.value2 = form.value2.data or None
|
|
|
+ if form.comparison.data == '0':
|
|
|
+ item.comparison = None
|
|
|
+ else:
|
|
|
+ item.comparison = utils.COMPARISONS[int(form.comparison.data)-1]
|
|
|
+
|
|
|
+
|
|
|
+ # update item in db
|
|
|
+ db.session.commit()
|
|
|
+ app.logger.info(f'Edited {item_type} id {item_id} by {current_user}.')
|
|
|
+ flash(f'Item "{item.name}" successfully updated.', 'success')
|
|
|
+ return redirect(url_for('item_list', item_type=item_type))
|
|
|
+
|
|
|
+
|
|
|
+ return render_template('testrun/edit_item.html', type=item_type, item=item, form=form)
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/<string:item_type>/new', methods=['GET', 'POST'])
|
|
|
+@login_required
|
|
|
+def new_item(item_type):
|
|
|
+ #
|
|
|
+ # create new item
|
|
|
+ #
|
|
|
+ if item_type == 'testrun':
|
|
|
+ form = forms.TestrunCreateForm.new()
|
|
|
+ chips = ['testcase_sequences']
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ form = forms.TestCaseSequenceCreateForm.new()
|
|
|
+ chips = ['datafiles', 'testcases']
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ form = forms.TestCaseCreateForm.new()
|
|
|
+ chips = ['testcase_stepsequences']
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ form = forms.TestStepSequenceCreateForm.new()
|
|
|
+ chips = ['teststeps']
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ form = forms.TestStepCreateForm.new()
|
|
|
+ chips = []
|
|
|
+ else:
|
|
|
+ app.logger.warning(f'Item type "{item_type}" does not exist. Requested by "{current_user}".')
|
|
|
+ flash(f'Item type "{item_type}" does not exist.', 'warning')
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ if form.validate_on_submit():
|
|
|
+ # create new item
|
|
|
+ # testrun
|
|
|
+ if item_type == 'testrun':
|
|
|
+ item = models.Testrun(
|
|
|
+ name=form.name.data,
|
|
|
+ description=form.description.data,
|
|
|
+ creator=current_user,
|
|
|
+ testcase_sequences=[models.TestCaseSequence.query.get(int(x)) for x in form.testcase_sequences.data],
|
|
|
+ )
|
|
|
+ # testcase sequence
|
|
|
+ elif item_type == 'testcase_sequence':
|
|
|
+ item = models.TestCaseSequence(
|
|
|
+ name=form.name.data,
|
|
|
+ description=form.description.data,
|
|
|
+ creator=current_user,
|
|
|
+ classname=models.ClassName.query.get(int(form.classname.data)),
|
|
|
+ datafiles=[models.DataFile.query.get(int(x)) for x in form.datafiles.data],
|
|
|
+ testcases=[models.TestCase.query.get(int(x)) for x in form.testcases.data],
|
|
|
+ )
|
|
|
+ # testcase
|
|
|
+ elif item_type == 'testcase':
|
|
|
+ item = models.TestCase(
|
|
|
+ name=form.name.data,
|
|
|
+ description=form.description.data,
|
|
|
+ creator=current_user,
|
|
|
+ classname=models.ClassName.query.get(int(form.classname.data)),
|
|
|
+ browser_type=models.BrowserType.query.get(int(form.browser_type.data)),
|
|
|
+ testcase_type=models.TestCaseType.query.get(int(form.testcase_type.data)),
|
|
|
+ teststep_sequences=[models.TestStepSequence.query.get(int(x)) for x in form.testcase_stepsequences.data],
|
|
|
+ )
|
|
|
+ # step sequence
|
|
|
+ elif item_type == 'teststep_sequence':
|
|
|
+ item = models.TestStepSequence(
|
|
|
+ name=form.name.data,
|
|
|
+ description=form.description.data,
|
|
|
+ creator=current_user,
|
|
|
+ classname=models.ClassName.query.get(int(form.classname.data)),
|
|
|
+ teststeps=[models.TestStepExecution.query.get(int(x)) for x in form.teststeps.data],
|
|
|
+ )
|
|
|
+ # test step
|
|
|
+ elif item_type == 'teststep':
|
|
|
+ item = models.TestStepExecution(
|
|
|
+ name=form.name.data,
|
|
|
+ description=form.description.data,
|
|
|
+ creator=current_user,
|
|
|
+ activity_type=models.ActivityType.query.get(int(form.activity_type.data)),
|
|
|
+ locator_type=models.LocatorType.query.get(int(form.locator_type.data)),
|
|
|
+ # model extension
|
|
|
+ locator=form.locator.data,
|
|
|
+ optional=[None, False, True][int(form.optional.data)],
|
|
|
+ )
|
|
|
+ try:
|
|
|
+ item.timeout = float(form.timeout.data)
|
|
|
+ except ValueError:
|
|
|
+ item.timeout = None
|
|
|
+ item.release = form.release.data or None
|
|
|
+ item.value = form.value.data or None
|
|
|
+ item.value2 = form.value2.data or None
|
|
|
+ if form.comparison.data == '0':
|
|
|
+ item.comparison = None
|
|
|
+ else:
|
|
|
+ item.comparison = utils.COMPARISONS[int(form.comparison.data)-1]
|
|
|
+
|
|
|
+ # save item to db
|
|
|
+ db.session.add(item)
|
|
|
+ db.session.commit()
|
|
|
+ app.logger.info(f'Created {item_type} id {item.id} by {current_user}.')
|
|
|
+ flash(f'Item "{item.name}" successfully created.', 'success')
|
|
|
+ return redirect(url_for('item_list', item_type=item_type))
|
|
|
+
|
|
|
+ return render_template('testrun/create_item.html', type=item_type, chips=chips, form=form)
|
|
|
+ #return render_template('testrun/edit_item.html', type=item_type, item=None, form=form)
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/testrun/<int:item_id>/run')
|
|
|
+@login_required
|
|
|
+def run_testrun(item_id):
|
|
|
+ #
|
|
|
+ # runs Testrun via API Web Service
|
|
|
+ #
|
|
|
+
|
|
|
+ # check for complete set of Testrun's layers
|
|
|
+ try:
|
|
|
+ utils.testrunIntegrity(item_id)
|
|
|
+ except ValueError as error:
|
|
|
+ app.logger.error(f'Failed to execute Testrun ID #{item_id} by {current_user}. {error}.')
|
|
|
+ flash(f'ERROR: Testrun ID #{item_id} cannot be executed. {error}', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+ # export to JSON
|
|
|
+ try:
|
|
|
+ json_testrun = utils.exportJSON(item_id, shouldBeSaved=False)
|
|
|
+ except Exception as error:
|
|
|
+ app.logger.error(f'Failed to export Testrun ID #{item_id} by {current_user}. {error}')
|
|
|
+ flash(f'ERROR: Failed to export Testrun ID #{item_id}. {error}', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+ # call API
|
|
|
+ url = '/'.join(('http:/', app.config.get('BAANGT_API_HOST'), app.config.get('BAANGT_API_EXECUTE')))
|
|
|
+ app.logger.info(f'Call execution of Testrun ID #{item_id} by {current_user}. API URL: {url}')
|
|
|
+ try:
|
|
|
+ r = requests.post(url, json=json_testrun)
|
|
|
+ except Exception as error:
|
|
|
+ app.logger.error(f'Failed to connect to {app.config.get("BAANGT_API_HOST")} by {current_user}. {error}')
|
|
|
+ flash(f'ERROR: Cannot establish connection: {app.config.get("BAANGT_API_HOST")}', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+ # API server error handler
|
|
|
+ if r.status_code == 500:
|
|
|
+ app.logger.error('ERROR: API Server interanal error')
|
|
|
+ flash('ERROR: API Server interanal error', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+ jsonResult = json.loads(r.text)
|
|
|
+ app.logger.info(f'API response: status code {r.status_code} JSON {jsonResult}')
|
|
|
+
|
|
|
+ if r.status_code == 202:
|
|
|
+ # store to db
|
|
|
+ tr_call = models.TestRunCall(
|
|
|
+ id=uuid.UUID(jsonResult['id']).bytes,
|
|
|
+ creator=current_user,
|
|
|
+ testrun_id=item_id,
|
|
|
+ )
|
|
|
+ db.session.add(tr_call)
|
|
|
+ db.session.commit()
|
|
|
+ app.logger.info(f'Created Testrun Call id {jsonResult["id"]} by {current_user}.')
|
|
|
+ return render_template('testrun/testrun_status.html', results=jsonResult, status=r.status_code)
|
|
|
+ else:
|
|
|
+ flash(f'ERROR: {r.text}', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+@app.route('/results/<string:call_id>')
|
|
|
+@login_required
|
|
|
+def get_results(call_id):
|
|
|
+ #
|
|
|
+ # runs Testrun on web-srvice
|
|
|
+ #
|
|
|
+
|
|
|
+ # call API
|
|
|
+ url = '/'.join(('http:/', app.config.get('BAANGT_API_HOST'), app.config.get('BAANGT_API_STATUS'), call_id))
|
|
|
+ app.logger.info(f'Call results by {current_user}. API URL: {url}')
|
|
|
+ try:
|
|
|
+ r = requests.get(url)
|
|
|
+ except Exception as error:
|
|
|
+ app.logger.error(f'Failed to connect to {app.config.get("TESTRUN_SERVICE_HOST")} by {current_user}. {error}')
|
|
|
+ flash(f'ERROR: Cannot establish connection: {app.config.get("TESTRUN_SERVICE_HOST")}', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+ # API server error handler
|
|
|
+ if r.status_code == 500:
|
|
|
+ app.logger.error('ERROR: API Server interanal error')
|
|
|
+ flash('ERROR: API Server interanal error', 'danger')
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+ jsonResult = json.loads(r.text)
|
|
|
+ app.logger.info(f'API response: status code {r.status_code} JSON {jsonResult}')
|
|
|
+
|
|
|
+ if r.status_code == 200:
|
|
|
+ return render_template('testrun/testrun_results.html', results=jsonResult)
|
|
|
+
|
|
|
+ return render_template('testrun/testrun_status.html', results=jsonResult, status=r.status_code)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/results')
|
|
|
+def test_view():
|
|
|
+ #
|
|
|
+ # TEST view
|
|
|
+ #
|
|
|
+
|
|
|
+ with open('data/response.json', 'r') as f:
|
|
|
+ j = json.load(f)
|
|
|
+
|
|
|
+ return render_template('testrun/testrun_results.html', results=j)
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/testrun/<string:export_format>/<int:item_id>', methods=['GET', 'POST'])
|
|
|
+@login_required
|
|
|
+def export_testrun(export_format, item_id):
|
|
|
+ #
|
|
|
+ # export Testrun object to <export_format>
|
|
|
+ # <export_format> is one of the following:
|
|
|
+ # XLSX
|
|
|
+ # JSON
|
|
|
+ #
|
|
|
+
|
|
|
+ if export_format == 'XLSX':
|
|
|
+ # export to XLSX
|
|
|
+ try:
|
|
|
+ result = utils.exportXLSX(item_id)
|
|
|
+ except Exception as e:
|
|
|
+ app.logger.error(f'Failed to export Testrun id {item_id} by {current_user}. {e}')
|
|
|
+ return f'ERROR: {e}'
|
|
|
+
|
|
|
+ elif export_format == 'JSON':
|
|
|
+ # export to JSON
|
|
|
+ try:
|
|
|
+ result = utils.exportJSON(item_id)
|
|
|
+ except Exception as e:
|
|
|
+ app.logger.error(f'Failed to export Testrun id {item_id} by {current_user}. {e}')
|
|
|
+ return f'ERROR: {e}'
|
|
|
+
|
|
|
+ else:
|
|
|
+ # invalid format
|
|
|
+ app.logger.error(f'Failed to export Testrun id {item_id} by {current_user}. Invalid format: {export_format}')
|
|
|
+ return f'ERROR: format {export_format} is not supported.'
|
|
|
+
|
|
|
+ # successful export
|
|
|
+ url = url_for('static', filename=f'files/{result}')
|
|
|
+ app.logger.info(f'Testrun ID #{item_id} exported to {export_format} by {current_user}. Target: static/files/{result}')
|
|
|
+
|
|
|
+ return f'Success: <a href="{url}">{result}</a>'
|
|
|
+
|
|
|
+@app.route('/testrun/import', methods=['POST'])
|
|
|
+@login_required
|
|
|
+def import_testsun():
|
|
|
+ #
|
|
|
+ # imports testrun from file
|
|
|
+ #
|
|
|
+
|
|
|
+ # import only from XLSX is available now
|
|
|
+ form = forms.TestrunImportForm()
|
|
|
+
|
|
|
+ if form.validate_on_submit():
|
|
|
+ #utils.importXLSX(form.file.data)
|
|
|
+ try:
|
|
|
+ utils.importXLSX(form.file.data, datafiles=form.dataFiles)
|
|
|
+ app.logger.info(f'Testrun successfully imported from "{form.file.data.filename}" by {current_user}.')
|
|
|
+ flash(f'Testrun successfully imported from "{form.file.data.filename}"', 'success')
|
|
|
+ except Exception as error:
|
|
|
+ # discard changes
|
|
|
+ db.session.rollback()
|
|
|
+ app.logger.error(f'Failed to import Testrun from "{form.file.data.filename}" by {current_user}. {error}.')
|
|
|
+ flash(f'ERROR: Cannot import Testrun from "{form.file.data.filename}". {error}.', 'danger')
|
|
|
+ else:
|
|
|
+ flash(f'File is required for import', 'warning')
|
|
|
+
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+@app.route('/datafile/upload', methods=['POST'])
|
|
|
+@login_required
|
|
|
+def upload_datafile():
|
|
|
+ # get datafile
|
|
|
+ file = request.files.get('datafile')
|
|
|
+ if file is None:
|
|
|
+ return jsonify({'error': 'Request does not contain dataFile'}), 400
|
|
|
+
|
|
|
+ # upload data file
|
|
|
+ try:
|
|
|
+ datafile_id = utils.importDataFile(file)
|
|
|
+ except Exception as error:
|
|
|
+ app.logger.error(f'Failed to uplod DataFile "{file.filename}" by {current_user}. {error}.')
|
|
|
+ return jsonify({'error': f'Failed upload DataFile "{file.filename}". {error}.'}), 400
|
|
|
+
|
|
|
+ return jsonify({'id': datafile_id, 'name': file.filename}), 200
|
|
|
+
|
|
|
+
|
|
|
+@app.route('/testrun/<int:item_id>/import', methods=['POST'])
|
|
|
+@login_required
|
|
|
+def update_testsun(item_id):
|
|
|
+ #
|
|
|
+ # imports testrun from file
|
|
|
+ #
|
|
|
+
|
|
|
+ # update only from XLSX is available now
|
|
|
+ form = forms.TestrunImportForm()
|
|
|
+
|
|
|
+ if form.validate_on_submit():
|
|
|
+
|
|
|
+ # update items
|
|
|
+ try:
|
|
|
+ utils.importXLSX(form.file.data, item_id=item_id)
|
|
|
+ app.logger.info(f'Testrun ID #{item_id} successfully updated from "{form.file.data.filename}" by {current_user}.')
|
|
|
+ flash(f'Testrun ID #{item_id} has been successfully updated from "{form.file.data.filename}"', 'success')
|
|
|
+ except Exception as error:
|
|
|
+ # discard changes
|
|
|
+ db.session.rollback()
|
|
|
+ app.logger.error(f'Failed to update Testrun ID #{item_id} from "{form.file.data.filename}" by {current_user}. {error}.')
|
|
|
+ flash(f'ERROR: Cannot update Testrun ID #{item_id} from "{form.file.data.filename}". {error}.', 'danger')
|
|
|
+ else:
|
|
|
+ flash(f'File is required for import', 'warning')
|
|
|
+
|
|
|
+ return redirect(url_for('item_list', item_type='testrun'))
|
|
|
+
|
|
|
+
|
|
|
+#
|
|
|
+# user authentication
|
|
|
+#
|
|
|
+
|
|
|
+@app.route('/signup', methods=['GET', 'POST'])
|
|
|
+def signup():
|
|
|
+ if current_user.is_authenticated:
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ form = forms.SingupForm()
|
|
|
+ if form.validate_on_submit():
|
|
|
+ # create user
|
|
|
+ user = models.User(username=form.username.data)
|
|
|
+ user.set_password(form.password.data)
|
|
|
+ db.session.add(user)
|
|
|
+ db.session.commit()
|
|
|
+ # login
|
|
|
+ login_user(user, remember=True)
|
|
|
+ flash(f'User {user.username.capitalize()} successfully created!', 'succsess')
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ return render_template('testrun/signup.html', form=form)
|
|
|
+
|
|
|
+@app.route('/login', methods=['GET', 'POST'])
|
|
|
+def login():
|
|
|
+ if current_user.is_authenticated:
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ form = forms.LoginForm()
|
|
|
+ if form.validate_on_submit():
|
|
|
+ user = models.User.query.filter_by(username=form.username.data).first()
|
|
|
+ if user and user.verify_password(form.password.data):
|
|
|
+ login_user(user, remember=True)
|
|
|
+ flash(f'Welcome {user.username.capitalize()}!', 'success')
|
|
|
+ return redirect(url_for('index'))
|
|
|
+
|
|
|
+ return render_template('testrun/login.html', form=form)
|
|
|
+
|
|
|
+@app.route('/logout')
|
|
|
+def logout():
|
|
|
+ logout_user()
|
|
|
+
|
|
|
+ return redirect(url_for('login'))
|
|
|
+
|