123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- #
- # Sample Antrag class definition
- #
- from flask import url_for, current_app
- from polzybackend.mediators import Antrag
- from polzybackend.models import File
- from polzybackend.utils import date_format
- from pms.fast_offer import get_fast_offer, get_value_lists
- from functools import reduce
- from time import sleep
- import json
- import os
- import zipfile
- from datetime import datetime
- class SampleAntrag(Antrag):
- def initialize(self):
- #
- # initializes antrag instance within Policy Management System
- #
- result = get_fast_offer(self.product_name)
- if result:
- # expand result
- result.update({
- 'id': self.id,
- 'status': 'Neu',
- 'product_line': {
- 'name': self.product_name,
- 'attributes': {
- 'Produkt': "Auto Insurance Ultra",
- }
- }
- })
- self.instance = result
- self.set_documents()
- return
- raise Exception(f'No offer available for product {self.product_name}')
- def get(self):
- #
- # returns antrag instance as json object to front-end
- #
- return self.instance
- def get_field_by_name(self, name):
- #
- # find antrag field by name
- #
- get_field = lambda field_list: next(filter(lambda field: field['name'] == name, field_list), None)
- # build list of field groups
- field_sets = reduce(
- lambda result, group: [*result, group['name']],
- self.instance['field_groups'],
- ['fields', 'field_groups'],
- )
- # search for field
- for fs in field_sets:
- target_field = get_field(self.instance[fs])
- if target_field:
- return target_field
- def getValueList(self, valueListName):
- #
- # returns value-list with given name
- #
- # emulate delay
- sleep(2)
- return get_value_lists(valueListName)
- def update_auto_brands(self):
- #
- # update auto brands based on vehicle type
- #
- mapVechicleToList = {
- 'Car': 'carBrand',
- 'Light Truck': 'lightTruckBrand',
- 'Heavy Truck': 'heavyTruckBrand',
- 'Motorcycle': 'motorcycleBrand',
- 'Tractor': 'tractorBrand',
- 'Bus': 'busBrand',
- }
- if self.instance['product_line'].get('name') == 'Automobile':
- vehicle_type = self.get_field_by_name('VehicleType').get('value')
- brand_field = self.get_field_by_name('Brand')
- brand_field['inputRange'] = ['async', mapVechicleToList[vehicle_type]]
- #brand_field['value'] = None
- def update_attachments(self):
- #
- # updates files in attachment activity
- #
- # get documents activity
- document_activity = next(filter(
- lambda activity: activity.get('name') == "Documents",
- self.instance['possible_activities'],
- ), None)
- print('Document Activity:')
- print(document_activity)
- if document_activity is None:
- raise Exception('Document Activity not found')
- # get attachment field
- attachments = next(filter(
- lambda field: field.get('name') == "Attachments",
- document_activity['fields']
- ), None)
- print('Document Activity:')
- print(document_activity)
- if attachments is None:
- raise Exception('Attachment Field not found')
-
- # get files from db
- antrag_files = File.query.filter_by(parent_id=self.id).all()
- attachments['value'] = [
- {
- "id": file.id,
- "name": file.filename,
- "created": file.created.strftime(date_format),
- "type": file.type,
- "actions": [
- "edit",
- "delete",
- ]
- } for index, file in enumerate(antrag_files)
- ]
- def set_documents(self):
- #
- # updates files in attachment activity
- #
- # get documents activity
- document_activity = next(filter(
- lambda activity: activity.get('name') == "Documents",
- self.instance['possible_activities'],
- ), None)
- if document_activity is None:
- raise Exception('Document Activity not found')
- # get documents field
- documents = next(filter(
- lambda field: field.get('name') == "Documents",
- document_activity['fields']
- ), None)
- # create field if not exist
- if documents is None:
- documents = {
- "fieldVisibilityType": 2,
- "name": "Documents",
- "brief": "Documents",
- "tooltip": "Here you can manage the documents",
- "fieldDataType": "Documents",
- }
- document_activity['fields'].append(documents)
-
- # get files from db
- antrag_docs = File.query.filter_by(type="document").group_by(File.filename).all()
- documents['value'] = [
- {
- "id": file.id,
- "name": file.filename,
- "created": file.created.strftime(date_format),
- "signed": "Yes" if file.processed else "No",
- } for file in antrag_docs
- ]
- def updateFields(self, data):
- #
- # updates antrag fields based on data
- #
- print('\n*** UPDATE FIELDS')
- print(json.dumps(data, indent=2))
- booleans = {
- True: "True",
- False: "False",
- }
- for key, value in data['values'].items():
- field = self.get_field_by_name(key)
- if field and (field['fieldVisibilityType'] == 1):
- field['value'] = booleans.get(value) or value
- if field.get("fieldDataType") == "FlagWithOptions" and field.get("relatedFields"):
- for sub_field in field['relatedFields']:
- sub_field['value'] = data['values'].get(sub_field['name'])
- self.update_auto_brands()
- self.update_attachments()
- self.set_documents()
- def executeActivity(self, data):
- #
- # executes antrag activity defined in data
- #
- # emulate delay
- sleep(1)
- # update fields
- self.updateFields(data)
- # execute activity
- if data['activity'] == 'Berechnen':
- premium = self.get_field_by_name('premium')
- premium['value'] = 1347
- self.instance['status'] = 'Calculated'
- return self.get()
-
- if data['activity'] == 'Print':
- from AntragDrucken import AntragDrucken
- lPrint = AntragDrucken(antrag=self)
- lPrint.executeActivity()
- return {
- "link": url_for(
- 'end-points.downloads',
- filename=lPrint.getFolderAndPath(),
- _external=True,
- ),
- }
- if data['activity'] == 'Fields':
- print('\n*** Fields Updated:')
- print(json.dumps(data, indent=2))
- return self.get()
- raise Exception(f'Logic for activity {data["activity"]} is not defined')
- def getRemoteDocuments(self, documents_id: list):
- #
- # returns path to fetched remote document
- #
- files = []
- for file_id in documents_id:
- file = File.query.get(file_id)
- ext = file.filename.split('.')[-1]
- files.append({
- 'name': file.filename,
- 'path': os.path.join(current_app.config['UPLOADS'], f'{file_id}.{ext}'),
- })
- if len(files) == 1:
- return files[0]['path']
- # zip files
- path_to_zip = os.path.join(
- current_app.config['UPLOADS'],
- f'polzy_documents_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip',
- )
- ziped_documents = zipfile.ZipFile(path_to_zip, 'w')
- for file in files:
- ziped_documents.write(file['path'], arcname=file['name'])
- ziped_documents.close()
- return path_to_zip
|