123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- from sqlalchemy import Column, String, Integer, DateTime, Boolean, Table, ForeignKey
- from sqlalchemy.orm import relationship
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- import os, re, uuid
- from datetime import timedelta
- from baangt.base.PathManagement import ManagedPaths
- from baangt.base.GlobalConstants import EXECUTION_STAGE, TIMING_DURATION, TESTCASESTATUS
- managedPaths = ManagedPaths()
- DATABASE_URL = os.getenv('DATABASE_URL') or 'sqlite:///'+str(managedPaths.derivePathForOSAndInstallationOption().joinpath('testrun.db'))
- # handle sqlalchemy dialects
- dialect = re.match(r'\w+', DATABASE_URL)
- if dialect.group() == 'mysql':
- from sqlalchemy.dialects.mysql import BINARY as Binary
- else:
- from sqlalchemy import LargeBinary as Binary
- engine = create_engine(DATABASE_URL)
- base = declarative_base()
- #
- # UUID as bytes
- #
- def uuidAsBytes():
- return uuid.uuid4().bytes
- #
- # Testrun models
- #
- class TestrunLog(base):
- #
- # summary on Testrun results
- #
- __tablename__ = "testruns"
- # columns
- id = Column(Binary(16), primary_key=True, default=uuidAsBytes)
- testrunName = Column(String(32), nullable=False)
- logfileName = Column(String(1024), nullable=False)
- startTime = Column(DateTime, nullable=False)
- endTime = Column(DateTime, nullable=False)
- dataFile = Column(String(1024), nullable=True)
- statusOk = Column(Integer, nullable=False)
- statusFailed = Column(Integer, nullable=False)
- statusPaused = Column(Integer, nullable=False)
- RLPJson = Column(String, nullable=True) # ------------------------> New feature: comment for old DB
- # relationships
- globalVars = relationship('GlobalAttribute')
- testcase_sequences = relationship('TestCaseSequenceLog')
- @property
- def recordCount(self):
- return self.statusOk + self.statusPaused + self.statusFailed
- @property
- def duration(self):
- return (self.endTime - self.startTime).seconds
- @property
- def stage(self):
- for gv in self.globalVars:
- if gv.name == EXECUTION_STAGE:
- return gv.value
- return None
- def __str__(self):
- return str(uuid.UUID(bytes=self.id))
- def to_json(self):
- return {
- 'id': str(self),
- 'Name': self.testrunName,
- 'Summary': {
- 'TestRecords': self.recordCount,
- 'Successful': self.statusOk,
- 'Paused': self.statusPaused,
- 'Error': self.statusFailed,
- 'LogFile': self.logfileName,
- 'StartTime': self.startTime.strftime('%H:%M:%S'),
- 'EndTime': self.endTime.strftime('%H:%M:%S'),
- 'Duration': str(self.endTime - self.startTime),
- },
- 'GlobalSettings': {gv.name: gv.value for gv in self.globalVars},
- 'TestSequences': [tsq.to_json() for tsq in self.testcase_sequences],
- }
- class GlobalAttribute(base):
- #
- # global vars
- #
- __tablename__ = 'globals'
- # columns
- id = Column(Integer, primary_key=True)
- name = Column(String(64), nullable=False)
- value = Column(String(1024), nullable=True)
- testrun_id = Column(Binary(16), ForeignKey('testruns.id'), nullable=False)
- # relationships
- testrun = relationship('TestrunLog', foreign_keys=[testrun_id])
- #
- # Test Case Sequence models
- #
- class TestCaseSequenceLog(base):
- #
- # TestCase Sequence
- #
- __tablename__ = 'testCaseSequences'
- # columns
- id = Column(Binary(16), primary_key=True, default=uuidAsBytes)
- number = Column(Integer, nullable=False)
- testrun_id = Column(Binary(16), ForeignKey('testruns.id'), nullable=False)
- # relationships
- testrun = relationship('TestrunLog', foreign_keys=[testrun_id])
- testcases = relationship('TestCaseLog')
- @property
- def duration(self):
- #
- # duration in seconds
- #
- return sum([tc.duration for tc in self.testcases if tc.duration])
- def __str__(self):
- return str(uuid.UUID(bytes=self.id))
- def to_json(self):
- return {
- 'id': str(self),
- 'TestCases': [tc.to_json() for tc in self.testcases],
- }
- #
- # Test Case models
- #
- class TestCaseLog(base):
- #
- # TestCase results
- #
- __tablename__ = 'testCases'
- # columns
- id = Column(Binary(16), primary_key=True, default=uuidAsBytes)
- number = Column(Integer, nullable=False)
- testcase_sequence_id = Column(Binary(16), ForeignKey('testCaseSequences.id'), nullable=False)
- # relationships
- testcase_sequence = relationship('TestCaseSequenceLog', foreign_keys=[testcase_sequence_id])
- fields = relationship('TestCaseField', lazy='select')
- networkInfo = relationship('TestCaseNetworkInfo')
- @property
- def status(self):
- #
- # testcase status
- #
- for field in self.fields:
- if field.name == TESTCASESTATUS:
- return field.value
- return None
- @property
- def duration(self):
- #
- # duration in seconds
- #
- for field in self.fields:
- if field.name == TIMING_DURATION:
- # parse value from H:M:S.microseconds
- m = re.search(r'(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d[\.\d+]*)', field.value)
- if m:
- factors = {
- 'hours': 3600,
- 'minutes': 60,
- 'seconds': 1,
- }
- #duration = {key: float(value) for key, value in m.groupdict().items()}
- #return timedelta(**duration)
- return sum([factors[key]*float(value) for key, value in m.groupdict().items()])
- return None
- def __str__(self):
- return str(uuid.UUID(bytes=self.id))
- def fields_as_dict(self):
- return {pr.name: pr.value for pr in self.fields}
- def to_json(self):
- return {
- 'id': str(self),
- 'Parameters': {pr.name: pr.value for pr in self.fields},
- 'NetworkInfo': [nw.to_json() for nw in self.networkInfo],
- }
- class TestCaseField(base):
- #
- # field for a TestCase results
- #
- __tablename__ = 'testCaseFields'
- # columns
- id = Column(Integer, primary_key=True)
- name = Column(String(64), nullable=False)
- value = Column(String(1024), nullable=True)
- testcase_id = Column(Binary(16), ForeignKey('testCases.id'), nullable=False)
- # relationships
- testcase = relationship('TestCaseLog', foreign_keys=[testcase_id])
- class TestCaseNetworkInfo(base):
- #
- # network info for a TestCase
- #
- __tablename__ = 'networkInfo'
- # columns
- id = Column(Integer, primary_key=True)
- browserName = Column(String(64), nullable=True)
- status = Column(Integer, nullable=True)
- method = Column(String(16), nullable=True)
- url = Column(String(256), nullable=True)
- contentType = Column(String(64), nullable=True)
- contentSize = Column(Integer, nullable=True)
- headers = Column(String(4096), nullable=True)
- params = Column(String(1024), nullable=True)
- response = Column(String(4096), nullable=True)
- startDateTime = Column(DateTime, nullable=True)
- duration = Column(Integer, nullable=True)
- testcase_id = Column(Binary(16), ForeignKey('testCases.id'), nullable=True)
- # relationships
- testcase = relationship('TestCaseLog', foreign_keys=[testcase_id])
- def to_json(self):
- return {
- 'BrowserName': self.browserName,
- 'Status': self.status,
- 'Method': self.method,
- 'URL': self.url,
- 'ContentType': self.contentType,
- 'ContentSize': self.contentSize,
- 'Headers': self.headers,
- 'Params': self.params,
- 'Response': '',#self.response,
- 'StartDateTime': str(self.startDateTime),
- 'Duration': self.duration,
- }
- # create tables
- #if __name__ == '__main__':
- base.metadata.create_all(engine)
|