|
@@ -0,0 +1,125 @@
|
|
|
+import requests
|
|
|
+import json
|
|
|
+import random
|
|
|
+from time import sleep
|
|
|
+
|
|
|
+class Customers:
|
|
|
+ #
|
|
|
+ # random customers generator
|
|
|
+ #
|
|
|
+ policy_columns = [
|
|
|
+ {
|
|
|
+ "label": "Policy",
|
|
|
+ "isKey": True,
|
|
|
+ "type": "string",
|
|
|
+ "filter": False,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "label": "Status",
|
|
|
+ "type": "string",
|
|
|
+ "filter": False,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "label": "Product",
|
|
|
+ "type": "string",
|
|
|
+ "filter": False,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "label": "Effective Date",
|
|
|
+ "type": "string",
|
|
|
+ "filter": False,
|
|
|
+ },
|
|
|
+ ]
|
|
|
+
|
|
|
+ def __init__(self, quantity=1000):
|
|
|
+ user_url = f'https://randomuser.me/api/?seed=polzy&results={quantity}&nat=ch,de,dk,es,fi,fr,gb,ie,nl,no,tr&exc=login,id,picture'
|
|
|
+ r = requests.get(user_url)
|
|
|
+ if r.status_code != 200:
|
|
|
+ raise Exception(f'Unable fetch random users:\n{r.status_code}\n{r.text}')
|
|
|
+
|
|
|
+ # reshape fetched users to polzy customers
|
|
|
+ try:
|
|
|
+ self.all = [
|
|
|
+ {
|
|
|
+ 'partnerNumber': ''.join([_ for _ in user['registered'].get('date') if _.isdigit()]),
|
|
|
+ 'firstName': user['name'].get('first'),
|
|
|
+ 'lastName': user['name'].get('last'),
|
|
|
+ 'birthDate': user['dob'].get('date')[:10],
|
|
|
+ 'gender': user['gender'],
|
|
|
+ 'email': user['email'],
|
|
|
+ 'phone': user['phone'],
|
|
|
+ 'postCode': str(user['location'].get('postcode')),
|
|
|
+ 'country': user['location'].get('country'),
|
|
|
+ 'city': user['location'].get('city'),
|
|
|
+ 'street': user['location'].get('street')['name'],
|
|
|
+ 'houseNumber': str(user['location'].get('street')['number']),
|
|
|
+ } for user in r.json().get('results')
|
|
|
+ ]
|
|
|
+ except Exception as e:
|
|
|
+ raise Exception(f'Unable generate customers: {e}')
|
|
|
+
|
|
|
+
|
|
|
+ def filter(self, search_string=None):
|
|
|
+ #
|
|
|
+ # filter customers by search string's words
|
|
|
+ #
|
|
|
+
|
|
|
+ if not search_string:
|
|
|
+ return self.all
|
|
|
+
|
|
|
+ # filter customers
|
|
|
+ strings = search_string.lower().split()
|
|
|
+ search_fields = [
|
|
|
+ 'firstName',
|
|
|
+ 'lastName',
|
|
|
+ 'email',
|
|
|
+ 'city',
|
|
|
+ 'street'
|
|
|
+ ]
|
|
|
+
|
|
|
+ filtered_users = filter(
|
|
|
+ lambda user: any(s in ':'.join((user[field].lower() for field in search_fields)) for s in strings),
|
|
|
+ self.all,
|
|
|
+ )
|
|
|
+
|
|
|
+ return list(filtered_users)
|
|
|
+
|
|
|
+ def get_policies(self, partner):
|
|
|
+ #
|
|
|
+ # returns random policies for the specified partner
|
|
|
+ #
|
|
|
+
|
|
|
+ # dalay
|
|
|
+ sleep(2)
|
|
|
+
|
|
|
+ # get customer
|
|
|
+ try:
|
|
|
+ customer = next(filter(
|
|
|
+ lambda user: user['partnerNumber'] == partner,
|
|
|
+ self.all,
|
|
|
+ ))
|
|
|
+ except Exception:
|
|
|
+ raise Exception(f'Partner Number {partner} not found')
|
|
|
+
|
|
|
+ # load policies
|
|
|
+ from pms import POLICY_DATASET
|
|
|
+ with open(POLICY_DATASET, 'r') as f:
|
|
|
+ all_policies = [[
|
|
|
+ p.get('number'),
|
|
|
+ p.get('status'),
|
|
|
+ p['product_line'].get('name'),
|
|
|
+ p.get('effective_date'),
|
|
|
+ ] for p in json.load(f)]
|
|
|
+ policies = random.sample(all_policies, random.randrange(10))
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'partner': partner,
|
|
|
+ 'policies': {
|
|
|
+ 'columns': self.policy_columns,
|
|
|
+ 'rows': policies,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|