|
@@ -0,0 +1,691 @@
|
|
|
+'''
|
|
|
+Created on 24.05.2018
|
|
|
+
|
|
|
+@author: Bernhard
|
|
|
+'''
|
|
|
+
|
|
|
+import requests
|
|
|
+import json
|
|
|
+from datetime import datetime
|
|
|
+from random import randint
|
|
|
+import cProfile, pstats
|
|
|
+from io import StringIO
|
|
|
+import os
|
|
|
+import platform
|
|
|
+
|
|
|
+import logging
|
|
|
+import getopt
|
|
|
+import sys
|
|
|
+import timeit
|
|
|
+import csv
|
|
|
+import codecs
|
|
|
+
|
|
|
+
|
|
|
+def args_read(l_search_parameter):
|
|
|
+ l_args = sys.argv[1:]
|
|
|
+
|
|
|
+ try:
|
|
|
+ opts, args = getopt.getopt(l_args,"u:p:s:l:m:",["users=",
|
|
|
+ "products=",
|
|
|
+ "server=",
|
|
|
+ "ll=",
|
|
|
+ "mode="
|
|
|
+ ])
|
|
|
+ except getopt.GetoptError as err_det:
|
|
|
+ print ("Error in reading parameters:" + str(err_det))
|
|
|
+ print_args()
|
|
|
+ sys.exit("Abgebrochen wegen Fehler in Aufrufparametern")
|
|
|
+ if opts:
|
|
|
+ for opt, arg in opts:
|
|
|
+ if l_search_parameter == opt: #in ("-u", "--usage"):
|
|
|
+ return arg
|
|
|
+ return None
|
|
|
+
|
|
|
+def print_args():
|
|
|
+ print("""
|
|
|
+Call: python api_calls.py --parameters
|
|
|
+ --users=<numeric> e.g. 10, 100, Default = 50
|
|
|
+ --products=<numeric> e.g. 1000, 5000. Default = 2000
|
|
|
+ --server="https://app-test.earthsquad.global" Default = app.test
|
|
|
+ --ll=1..4 (1=Error, 2 = warning, 3 = info, 4 = debug. Default 1)
|
|
|
+ --mode=allnew, recycle
|
|
|
+ allnew = create users, create products, recycle, bulk-recycle, call stats
|
|
|
+ recycle = choose random users, choose random material, recycle, bulk-recycle
|
|
|
+ for this to work there needs to be an export from users-table
|
|
|
+ called auth_user.csv (only having E-Mail-Address).
|
|
|
+ Only users with E-Mail-Address @test.com will be selected
|
|
|
+
|
|
|
+ Suggested for standard use:
|
|
|
+ python api_calls.py --users=50 --products=1000
|
|
|
+ or in the EXE-Version: api_calls.exe --users=50 --products=1000
|
|
|
+ will create 50 users, 1000 products on app-test server
|
|
|
+""")
|
|
|
+
|
|
|
+class perf_trace:
|
|
|
+ def __init__(self):
|
|
|
+ self.__pr = ""
|
|
|
+ def perf_trace_start(self):
|
|
|
+ self.__pr = cProfile.Profile()
|
|
|
+ self.__pr.enable() # start profiling
|
|
|
+
|
|
|
+ def perf_trace_end(self):
|
|
|
+ self.__pr.disable() # end profiling
|
|
|
+ s = StringIO()
|
|
|
+ sortby = 'cumulative'
|
|
|
+ ps = pstats.Stats(self.__pr, stream=s).sort_stats(sortby)
|
|
|
+ ps.print_stats()
|
|
|
+ logger.info("Performance-Trace follows:\n" + s.getvalue())
|
|
|
+ #print(s.getvalue())
|
|
|
+
|
|
|
+def g_post(url, json="", headers=None):
|
|
|
+
|
|
|
+ if url[0] != "#":
|
|
|
+ if not headers:
|
|
|
+ resp = requests.post(url, json=json)
|
|
|
+ else:
|
|
|
+ resp = requests.post(url, headers=headers, json=json)
|
|
|
+ else:
|
|
|
+ resp = i_response()
|
|
|
+ resp.status_code = 201
|
|
|
+ l_json = {'key':12345, 'id':12345}
|
|
|
+ resp.set_json(l_json)
|
|
|
+
|
|
|
+ return resp
|
|
|
+
|
|
|
+def g_get(url, json=None, headers=None):
|
|
|
+ if url[0] != "#":
|
|
|
+ if not json:
|
|
|
+ resp = requests.get(url, headers=headers)
|
|
|
+ else:
|
|
|
+ resp = requests.get(url, headers=headers, json=json)
|
|
|
+ else:
|
|
|
+ resp = i_response()
|
|
|
+ resp.status_code = 201
|
|
|
+ l_json = {'key':12345, 'id':12345}
|
|
|
+ resp.set_json(l_json)
|
|
|
+
|
|
|
+ return resp
|
|
|
+
|
|
|
+class i_response:
|
|
|
+ # Class to buffer a response-object into
|
|
|
+ # internal formati
|
|
|
+ # so that we can mock response-object
|
|
|
+ status_code = None
|
|
|
+ text = None
|
|
|
+ def __init__(self, response=None):
|
|
|
+ if response:
|
|
|
+ self.__response = response
|
|
|
+ self.text = response.text
|
|
|
+ else:
|
|
|
+ self.__response = None
|
|
|
+ self.text = "dummy"
|
|
|
+ self.__json = None
|
|
|
+ def json(self):
|
|
|
+ if self.__response:
|
|
|
+ return self.__response.json()
|
|
|
+ else:
|
|
|
+ return self.__json
|
|
|
+ def set_json(self, l_json):
|
|
|
+ self.__json = l_json
|
|
|
+
|
|
|
+
|
|
|
+def g_check_status(resp):
|
|
|
+ if resp.status_code!=200 and resp.status_code != 201:
|
|
|
+ if resp.status_code == 400:
|
|
|
+ logger.debug(resp.status_code)
|
|
|
+ logger.debug(resp)
|
|
|
+ logger.debug(resp.__dict__)
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ logger.info(resp.status_code)
|
|
|
+ logger.info(resp)
|
|
|
+ logger.info(resp.__dict__)
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ return True
|
|
|
+
|
|
|
+def g_get_header(i):
|
|
|
+
|
|
|
+ l_token_key = g_users.get_user_and_login(i)
|
|
|
+ if not l_token_key:
|
|
|
+ logger.critical("Token for user not recevied: " + str(i))
|
|
|
+ raise SystemExit()
|
|
|
+
|
|
|
+ headers={'x-geolocation-header': "1:1",
|
|
|
+ "authorization": "Token " + l_token_key,
|
|
|
+ "Content-Type": "application/json",
|
|
|
+ "accept": "application/json"}
|
|
|
+
|
|
|
+ return headers
|
|
|
+
|
|
|
+class api_init:
|
|
|
+ def __init__(self, min_number, iteration):
|
|
|
+ self.min_number = min_number
|
|
|
+ self.max_number = min_number + iteration
|
|
|
+ # BaseURL for API-Calls
|
|
|
+ # Muss spaeter noch ersetzt werden - irgendwie aus INI-File ziehen oda so.
|
|
|
+ self.base_url = args_read("--server")
|
|
|
+ if not self.base_url:
|
|
|
+ self.base_url = "https://app-test.earthsquad.global/api"
|
|
|
+ logger.info("Server: " + self.base_url)
|
|
|
+ # Basis for Barcodes, Usernames, Product-Texts, etc.
|
|
|
+ l_datetime = datetime.now()
|
|
|
+ self.base = str(l_datetime.month + l_datetime.day) + str(l_datetime.hour + l_datetime.minute) + str(l_datetime.second)
|
|
|
+ logger.debug("Base-String: " + self.base)
|
|
|
+ def set_base_url(self, base_url):
|
|
|
+ self.base_url = base_url
|
|
|
+ def get_min_number(self):
|
|
|
+ return self.min_number
|
|
|
+ def get_max_number(self):
|
|
|
+ return self.max_number
|
|
|
+ def get_base_url(self):
|
|
|
+ return self.base_url
|
|
|
+ def get_base(self):
|
|
|
+ return self.base
|
|
|
+
|
|
|
+class api_user:
|
|
|
+ # Creates a number of random users
|
|
|
+ def __init__(self, api_init):
|
|
|
+ self.l_api_init = api_init
|
|
|
+ self.l_min_number = self.l_api_init.get_min_number()
|
|
|
+ self.l_max_number = self.l_api_init.get_max_number()
|
|
|
+ self.l_base_url = self.l_api_init.get_base_url()
|
|
|
+ self.l_base = self.l_api_init.get_base()
|
|
|
+ self.l_users = {}
|
|
|
+ self.l_counter = 0
|
|
|
+ self.l_token_key = ""
|
|
|
+ def set_max_number(self, max_number):
|
|
|
+ self.l_max_number = max_number
|
|
|
+ def set_min_number(self, min_number):
|
|
|
+ self.l_min_number = min_number
|
|
|
+ def get_max_number(self):
|
|
|
+ return self.l_max_number
|
|
|
+ def get_min_number(self):
|
|
|
+ return self.l_min_number
|
|
|
+ def create_users(self):
|
|
|
+ for n in range(self.l_min_number, self.l_max_number):
|
|
|
+ url = self.l_base_url + '/rest-auth/registration/'
|
|
|
+ login_data = {"username" : self.l_base + str(n),
|
|
|
+ "email": self.l_base + str(n) + "@test.com",
|
|
|
+ "password1": "bb140272!",
|
|
|
+ "password2": "bb140272!"
|
|
|
+ }
|
|
|
+ g_timeit.start("users")
|
|
|
+ resp = g_post(url, json=login_data, headers=[])
|
|
|
+ if g_check_status(resp):
|
|
|
+ self.l_users[n] = [self.l_base +str(n), self._get_key_from_resp(resp)]
|
|
|
+ logger.info("Key for user " + self.l_base + str(n) + ": " + self.l_users[n][1])
|
|
|
+ #print("User-Result: " + str(resp.__dict__))
|
|
|
+ g_timeit.end()
|
|
|
+ def get_user_and_login(self, i):
|
|
|
+ # The I can be much higher than our number of l_max_number
|
|
|
+ # if this is the case, then restart counting
|
|
|
+
|
|
|
+ if self.l_counter == 0:
|
|
|
+ l_i = i
|
|
|
+ logger.info("i = " + str(i) + " self.l_max_number = " + str(self.l_max_number) +
|
|
|
+ " self.l_min_number = " + str(self.l_min_number) )
|
|
|
+ while l_i > ( self.l_max_number - self.l_min_number - 1):
|
|
|
+ l_i = l_i - ( self.l_max_number - self.l_min_number)
|
|
|
+
|
|
|
+ self.l_counter = self.l_counter + 1
|
|
|
+ self.l_token_key = self._login_(l_i)
|
|
|
+ return self.l_token_key
|
|
|
+ elif self.l_counter > 2:
|
|
|
+ self.l_counter = 0
|
|
|
+ return self.l_token_key
|
|
|
+ else:
|
|
|
+ self.l_counter += 1
|
|
|
+ return self.l_token_key
|
|
|
+
|
|
|
+ def _login_(self,i):
|
|
|
+ g_timeit.start("login")
|
|
|
+ url = self.l_base_url + '/rest-auth/login/'
|
|
|
+ logger.info("login with Array position: " + str(i))
|
|
|
+ login_data = {"username" : self.l_users[i][0], "password" : "bb140272!"}
|
|
|
+ resp = g_post(url, json=login_data)
|
|
|
+ g_timeit.end()
|
|
|
+
|
|
|
+ if g_check_status(resp):
|
|
|
+ l_token_key = self._get_key_from_resp(resp)
|
|
|
+
|
|
|
+ logger.debug("Logged in as: " + self.l_base + str(i))
|
|
|
+ return l_token_key
|
|
|
+ else:
|
|
|
+ logger.warn("Login not successful with: " + str(login_data))
|
|
|
+ def _get_key_from_resp(self, resp):
|
|
|
+
|
|
|
+ if resp.text == "dummy": #When mocking the interface
|
|
|
+ l_token_key = "123456"
|
|
|
+ elif resp.json():
|
|
|
+ l_token_key = resp.json()["key"]
|
|
|
+ else:
|
|
|
+ l_token_key = None
|
|
|
+ return l_token_key
|
|
|
+ def import_users(self, l_csv):
|
|
|
+ self.l_users = []
|
|
|
+ l_i = 0
|
|
|
+ with codecs.open(filename=l_csv, mode='r') as l_csvfile:
|
|
|
+ l_reader = csv.reader(l_csvfile)
|
|
|
+ for l_row in l_reader:
|
|
|
+ if "test.com" in l_row[0]:
|
|
|
+ self.l_users.append([l_row[0].split("@test.com")[0], ""])
|
|
|
+ l_i += 1
|
|
|
+ logger.info("File with Users Imported")
|
|
|
+ logger.info("Imported: " + str(l_i) + " users from CSV-File " + l_csv)
|
|
|
+ if ( self.l_max_number - self.l_min_number ) > l_i:
|
|
|
+ self.l_max_number = self.l_min_number + l_i
|
|
|
+
|
|
|
+class api_product_full:
|
|
|
+ def __init__(self, l_number, api_init):
|
|
|
+ # uses the EAN-Code and the product API
|
|
|
+ #self.base = api_init.get_base()
|
|
|
+ self.number = l_number
|
|
|
+ self.ean_id = 0
|
|
|
+ self.product_id = 0
|
|
|
+ self.api_init = api_init
|
|
|
+ def get_id(self):
|
|
|
+ return self.number
|
|
|
+ def create_product(self):
|
|
|
+ # creates Barcode and Product
|
|
|
+
|
|
|
+ # create Barcode
|
|
|
+ l_ean = api_ean_code(self.number, api_init=self.api_init)
|
|
|
+ self.ean_id = l_ean.create()
|
|
|
+
|
|
|
+ l_product = api_product(self.number, api_init=self.api_init)
|
|
|
+ self.product_id = l_product.create(self.ean_id)
|
|
|
+
|
|
|
+ for l_bom in range(0, g_random(1,4)):
|
|
|
+ api_bom(self.api_init, self.product_id, g_random(3,17), g_random(10,1000))
|
|
|
+
|
|
|
+ def read_server(self, id):
|
|
|
+ # Checks with this ID, whether a product exists @ server
|
|
|
+ g_timeit.start("Read_Product_ID")
|
|
|
+ l_url = self.api_init.get_base_url()
|
|
|
+ resp = g_get(l_url + '/scanapp/en-us/products/' + str(id) + "/",
|
|
|
+ headers=g_get_header(self.number) )
|
|
|
+ g_timeit.end()
|
|
|
+ l_boolean = g_check_status(resp)
|
|
|
+ if l_boolean:
|
|
|
+ logger.debug("Product found from server: " + str(id))
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.debug("Product not found @ server: " + str(id))
|
|
|
+ return False
|
|
|
+
|
|
|
+ def recycle(self):
|
|
|
+ g_timeit.start("recycle")
|
|
|
+ recycle_data = {
|
|
|
+ "product": self.product_id
|
|
|
+ }
|
|
|
+
|
|
|
+ resp = g_post(self.api_init.get_base_url() + '/scanapp/en-us/recycled/',
|
|
|
+ headers=g_get_header(self.number),
|
|
|
+ json = recycle_data)
|
|
|
+ g_check_status(resp)
|
|
|
+ logger.debug("Recycled ID: " + str(self.product_id))
|
|
|
+ g_timeit.end()
|
|
|
+
|
|
|
+
|
|
|
+class api_statistics:
|
|
|
+ def __init__(self, l_api_init):
|
|
|
+ self.l_api_init = l_api_init
|
|
|
+
|
|
|
+ def get_statistics(self, l_who, l_duration):
|
|
|
+
|
|
|
+ if l_who == 1:
|
|
|
+ l_user = "me"
|
|
|
+ elif l_who == 2:
|
|
|
+ l_user = "all"
|
|
|
+
|
|
|
+ if l_duration == 1:
|
|
|
+ l_timeduration = "week"
|
|
|
+ elif l_duration == 2:
|
|
|
+ l_timeduration = "month"
|
|
|
+ elif l_duration == 3:
|
|
|
+ l_timeduration = "year"
|
|
|
+
|
|
|
+ self.querystring = "?weight_unit=1&user=" + l_user + "&duration=" + l_timeduration
|
|
|
+
|
|
|
+ g_timeit.start("statistics")
|
|
|
+ l_url = self.l_api_init.get_base_url() + '/scanapp/en-us/statistics/' + self.querystring
|
|
|
+
|
|
|
+ resp = g_get(l_url, headers=g_get_header(i))
|
|
|
+
|
|
|
+ if g_check_status(resp):
|
|
|
+ l_answer = resp.json()
|
|
|
+ logger.debug("Statistics for " + self.querystring + ":\n" + str(l_answer))
|
|
|
+ self.answer = l_answer
|
|
|
+ g_timeit.end()
|
|
|
+
|
|
|
+class api_bom:
|
|
|
+ def __init__(self, l_api_init, l_product_id, l_packaging_material_type_id=3, l_weight=100, l_weight_unit=2):
|
|
|
+ g_timeit.start("BOM")
|
|
|
+ bom_data = {
|
|
|
+ "product": l_product_id,
|
|
|
+ "product_type": l_packaging_material_type_id,
|
|
|
+ "weight": l_weight,
|
|
|
+ "weight_unit": l_weight_unit
|
|
|
+ }
|
|
|
+ resp = g_post(l_api_init.get_base_url() + '/scanapp/en-us/bom/',
|
|
|
+ headers=g_get_header(i),
|
|
|
+ json=bom_data)
|
|
|
+
|
|
|
+ if g_check_status(resp):
|
|
|
+ l_answer = resp.json()
|
|
|
+ logger.debug("BOM: " + str(i) + " " + str(l_answer))
|
|
|
+ self.answer = l_answer
|
|
|
+ else:
|
|
|
+ self.answer = "Error in API-Call"
|
|
|
+ g_timeit.end()
|
|
|
+
|
|
|
+class api_ean_code:
|
|
|
+ def __init__(self, l_number, api_init):
|
|
|
+ self.number = l_number
|
|
|
+ self.l_api_init = api_init
|
|
|
+ self.l_base_url = self.l_api_init.get_base_url()
|
|
|
+ self.l_base = self.l_api_init.get_base()
|
|
|
+ def create(self):
|
|
|
+ g_timeit.start("ean")
|
|
|
+ url = self.l_base_url + '/scanapp/en-us/ean_codes/'
|
|
|
+ ean_data = {"barcode" : self.l_base + str(self.number),
|
|
|
+ "deleted" : "false"
|
|
|
+ }
|
|
|
+
|
|
|
+ resp = g_post(url, headers=g_get_header(self.number), json=ean_data)
|
|
|
+ g_timeit.end()
|
|
|
+ if g_check_status(resp):
|
|
|
+ l_answer = resp.json()
|
|
|
+ logger.debug(str(l_answer))
|
|
|
+ logger.debug("Barcode: " + str(self.number) + " Barcode-ID = " + str(l_answer["id"]) + " Answer = " + str(l_answer))
|
|
|
+ return l_answer["id"]
|
|
|
+
|
|
|
+class api_product:
|
|
|
+ def __init__(self, l_number, api_init):
|
|
|
+ self.number = l_number
|
|
|
+ self.l_api_init = api_init
|
|
|
+ self.l_base_url = self.l_api_init.get_base_url()
|
|
|
+ self.l_base = self.l_api_init.get_base()
|
|
|
+ def create(self, l_barcode_id):
|
|
|
+ g_timeit.start("products")
|
|
|
+ url = self.l_base_url + "/scanapp/en-us/products/"
|
|
|
+ product_data = {
|
|
|
+ "name": self.l_base + str(self.number),
|
|
|
+ "EAN_code": l_barcode_id,
|
|
|
+ "weight": 0,
|
|
|
+ "length": 0,
|
|
|
+ "width": 0,
|
|
|
+ "height": 0,
|
|
|
+ "is_generic": "false",
|
|
|
+ "deleted": "false",
|
|
|
+ "product_type": 1,
|
|
|
+ "weight_unit": 2
|
|
|
+ }
|
|
|
+ resp = g_post(url, headers=g_get_header(self.number), json=product_data)
|
|
|
+
|
|
|
+ g_timeit.end()
|
|
|
+ if g_check_status(resp):
|
|
|
+ l_answer = resp.json()
|
|
|
+ logger.debug("Product: " + str(self.number) + " " + str(l_answer))
|
|
|
+ #x_result[i].append(l_answer["id"])
|
|
|
+ return l_answer["id"]
|
|
|
+
|
|
|
+def g_random(l_min, l_max):
|
|
|
+ return randint(l_min, l_max)
|
|
|
+
|
|
|
+def get_proper_loglevel(loglevel):
|
|
|
+ if not loglevel:
|
|
|
+ return logging.INFO
|
|
|
+
|
|
|
+ if int(loglevel) == 2:
|
|
|
+ return logging.INFO
|
|
|
+ elif int(loglevel) == 4:
|
|
|
+ return logging.DEBUG
|
|
|
+ elif int(loglevel) == 3:
|
|
|
+ return logging.WARNING
|
|
|
+ elif int(loglevel) == 1:
|
|
|
+ return logging.ERROR
|
|
|
+ else:
|
|
|
+ return logging.INFO
|
|
|
+
|
|
|
+class gc_timeit:
|
|
|
+ def __init__(self, l_api_ini):
|
|
|
+ self.apis=[]
|
|
|
+ self.last_start = None
|
|
|
+ self.apiname = None
|
|
|
+ self.iterator = 0
|
|
|
+ self.api_ini = l_api_ini
|
|
|
+ def start(self,apiname):
|
|
|
+ self.last_start = timeit.default_timer()
|
|
|
+ self.apiname = apiname
|
|
|
+ def end(self):
|
|
|
+ l_time = timeit.default_timer() - self.last_start
|
|
|
+ self.apis.append([self.apiname,l_time])
|
|
|
+
|
|
|
+ self.iterator += 1
|
|
|
+ if self.iterator > 100:
|
|
|
+ self.iterator = 0
|
|
|
+ g_timeit.start("zzmakestastistic")
|
|
|
+ l_dict = self.__make_statistics()
|
|
|
+ g_timeit.end()
|
|
|
+ #print('\033[H\033[J')
|
|
|
+ l = os.system('cls')
|
|
|
+ print("Server: " + self.api_ini.get_base_url())
|
|
|
+ print("Computer: " + platform.uname()[1])
|
|
|
+ print("-------------------- Call duration --------- ")
|
|
|
+ print("API-Call # of calls | # Avg | Total ")
|
|
|
+ print("--------------------------------------------")
|
|
|
+ for l_key, l_value in sorted(l_dict.items()):
|
|
|
+ print
|
|
|
+ print(g_strformat(l_key,20) +
|
|
|
+ g_strformat(str(l_value[0]),6) + " | " +
|
|
|
+ g_strformat(str(round(l_value[1],4)),6) + " | " +
|
|
|
+ g_strformat(str(round(l_value[2],4)),6)
|
|
|
+ )
|
|
|
+
|
|
|
+ def __make_statistics(self):
|
|
|
+ l_dict = {}
|
|
|
+ for l_line in self.apis:
|
|
|
+ if l_line[0] in l_dict.keys():
|
|
|
+ l_dict[l_line[0]][0] = l_dict[l_line[0]][0] + 1
|
|
|
+ l_dict[l_line[0]][1] = l_dict[l_line[0]][1] + l_line[1]
|
|
|
+
|
|
|
+ else:
|
|
|
+ l_dict[l_line[0]] = [1, l_line[1]]
|
|
|
+ l_dict[l_line[0]][0] = 1
|
|
|
+ l_dict[l_line[0]][1] = l_line[1]
|
|
|
+ for l_key, l_value in l_dict.items():
|
|
|
+ l_total = l_value[1]
|
|
|
+ l_dict[l_key] = [l_value[0], l_value[1] / l_value[0], l_total]
|
|
|
+ return l_dict
|
|
|
+ def printreport(self):
|
|
|
+ l_dict = self.__make_statistics()
|
|
|
+ logger.info("\n-------------------- Call duration ----------------- ")
|
|
|
+ logger.info("Server: " + self.api_ini.get_base_url())
|
|
|
+ logger.info("Computer: " + platform.uname()[1])
|
|
|
+ logger.info("API-Call # of calls | # Avg | Total ")
|
|
|
+ logger.info("--------------------------------------------")
|
|
|
+ for l_key, l_value in sorted(l_dict.items()):
|
|
|
+ logger.info(g_strformat(l_key,20) +
|
|
|
+ g_strformat(str(l_value[0]),6) + " | " +
|
|
|
+ g_strformat(str(round(l_value[1],4)),6) + " | " +
|
|
|
+ g_strformat(str(round(l_value[2],4)),6)
|
|
|
+ )
|
|
|
+
|
|
|
+def g_strformat(l_string, l_length):
|
|
|
+ # returns a string in the length l_length
|
|
|
+ if len(l_string) > l_length:
|
|
|
+ return l_string[0:l_length]
|
|
|
+ else:
|
|
|
+ l_plus = " "
|
|
|
+ return l_string + l_plus[0:l_length-len(l_string)]
|
|
|
+
|
|
|
+def g_logging_init():
|
|
|
+ # Bit more advanced logging:
|
|
|
+ logger = logging.getLogger('api_test')
|
|
|
+
|
|
|
+ l_log_filename = ( 'log_api_calls_'
|
|
|
+ + datetime.now().strftime("%Y%m%d_%H%M%S") + '.log' )
|
|
|
+ logger.setLevel(logging.DEBUG)
|
|
|
+ # create file handler which logs even debug messages
|
|
|
+ fh = logging.FileHandler(l_log_filename, encoding = "UTF-8")
|
|
|
+ fh.setLevel(level=get_proper_loglevel(args_read("--ll")))
|
|
|
+ # create console handler with a higher log level
|
|
|
+ ch = logging.StreamHandler()
|
|
|
+ ch.setLevel(logging.INFO)
|
|
|
+ # create formatter and add it to the handlers
|
|
|
+ formatter = logging.Formatter('%(asctime)s _ %(levelname)s _ %(module)s _ %(funcName)s : %(message)s')
|
|
|
+ ch.setFormatter(formatter)
|
|
|
+ fh.setFormatter(formatter)
|
|
|
+ # add the handlers to logger
|
|
|
+ logger.addHandler(ch)
|
|
|
+ logger.addHandler(fh)
|
|
|
+ return logger
|
|
|
+
|
|
|
+ #logging.basicConfig(filename=l_log_filename, level=g_ini.get_value("LogLevel"))
|
|
|
+
|
|
|
+##################################################################
|
|
|
+# Her starts the program #########################################
|
|
|
+##################################################################
|
|
|
+
|
|
|
+print_args() #Prints the instructions how to start the program
|
|
|
+
|
|
|
+logger = g_logging_init()
|
|
|
+#g_perf_trace = perf_trace()
|
|
|
+#g_perf_trace.perf_trace_start()
|
|
|
+
|
|
|
+l_iteration = args_read("--products")
|
|
|
+if not l_iteration:
|
|
|
+ l_iteration = 100
|
|
|
+else:
|
|
|
+ l_iteration = int(l_iteration)
|
|
|
+logger.info("Products: " + str(l_iteration))
|
|
|
+
|
|
|
+l_users = args_read("--users")
|
|
|
+if not l_users:
|
|
|
+ l_users = 10
|
|
|
+else:
|
|
|
+ l_users = int(l_users)
|
|
|
+logger.info("Users: " + str(l_users))
|
|
|
+
|
|
|
+g_api_ini = api_init(min_number=1000, iteration=l_iteration) #Usually 1000 or 5000
|
|
|
+g_timeit = gc_timeit(g_api_ini) #Instanziate timer class
|
|
|
+
|
|
|
+# and some more initializations
|
|
|
+g_all_products = [] # Products
|
|
|
+g_api_statistics = api_statistics(g_api_ini) #Statistics
|
|
|
+
|
|
|
+g_users = api_user(g_api_ini)
|
|
|
+# create only e.g. 2 or 10 or whatever users for this run, independently of what was said in g_api_ini :
|
|
|
+g_users.set_max_number(g_api_ini.get_min_number()+l_users) #usually 100 or 1000
|
|
|
+
|
|
|
+g_mode = args_read("--mode")
|
|
|
+if not g_mode:
|
|
|
+ g_mode = "allnew"
|
|
|
+
|
|
|
+if g_mode == "allnew":
|
|
|
+ g_users.create_users()
|
|
|
+ for i in range(g_api_ini.get_min_number(), g_api_ini.get_max_number()):
|
|
|
+ logger.info("Product # " + str(i))
|
|
|
+
|
|
|
+ # Create Product (under the logged in user)
|
|
|
+ g_api_product_full = api_product_full(i, g_api_ini)
|
|
|
+
|
|
|
+ g_api_product_full.create_product()
|
|
|
+
|
|
|
+ g_api_product_full.recycle()
|
|
|
+
|
|
|
+ g_all_products.append(g_api_product_full)
|
|
|
+
|
|
|
+ # All 20 entries request some statistics.
|
|
|
+ if divmod(i,10)[1] == 0:
|
|
|
+ g_api_statistics.get_statistics(g_random(1,2), g_random(1,3))
|
|
|
+
|
|
|
+ # Jetzt sollen noch jeder User jedes Produkt n mal recyclen:
|
|
|
+ for i in range(g_users.get_min_number(), g_users.get_max_number()):
|
|
|
+ logger.info("Mass-Recycling of user " + str(i))
|
|
|
+ g_users._login_(i)
|
|
|
+
|
|
|
+ l_number_1 = g_random(g_api_ini.get_min_number(), g_api_ini.get_max_number())
|
|
|
+ l_number_2 = g_random(l_number_1, l_number_1+200)
|
|
|
+ if l_number_2 > g_api_ini.get_max_number():
|
|
|
+ l_number_2 = g_api_ini.get_max_number()
|
|
|
+
|
|
|
+ for n in range(l_number_1-g_api_ini.get_min_number(), l_number_2-g_api_ini.get_min_number()):
|
|
|
+ if divmod(n,10)[1] == 0:
|
|
|
+ logger.info("Mass-Recycling - items: " + str(n))
|
|
|
+ g_api_statistics.get_statistics(g_random(1,2), g_random(1,3))
|
|
|
+ l_product = g_all_products[n]
|
|
|
+ l_product.recycle()
|
|
|
+
|
|
|
+elif g_mode == "recycling":
|
|
|
+ logger.info("Mode Recycling!")
|
|
|
+ g_users.import_users("auth_user.csv")
|
|
|
+ l_found_mat = 0
|
|
|
+ l_iterator = 0
|
|
|
+ l_not_found_counter = 0
|
|
|
+ l_endless_loop_counter = 0
|
|
|
+ logger.info("Starting to find " +
|
|
|
+ str(g_api_ini.get_max_number() - g_api_ini.get_min_number()) +
|
|
|
+ " Products")
|
|
|
+ while l_found_mat <= ( g_api_ini.get_max_number() - g_api_ini.get_min_number() ):
|
|
|
+ # Brute-force getting material-IDs from database
|
|
|
+ l_iterator = l_iterator + g_random(1, 100)
|
|
|
+ l_product = api_product_full(l_iterator, g_api_ini)
|
|
|
+ l_ok_product = l_product.read_server(l_iterator)
|
|
|
+ if l_endless_loop_counter >= 5:
|
|
|
+ logger.error("Endless loop when trying to find materials. Exiting")
|
|
|
+ raise SystemExit()
|
|
|
+ if l_not_found_counter >= 5:
|
|
|
+ # If we didn't find a product after 5 tries
|
|
|
+ # we should reset the iterator.
|
|
|
+ l_iterator = 1
|
|
|
+ l_endless_loop_counter += 1
|
|
|
+ if l_ok_product:
|
|
|
+ l_found_mat += 1
|
|
|
+ l_not_found_counter = 0
|
|
|
+ l_endless_loop_counter = 0
|
|
|
+ g_all_products.append(l_product)
|
|
|
+ logger.info("Found existing product id " + str(l_product.get_id()))
|
|
|
+ else:
|
|
|
+ l_not_found_counter += 1
|
|
|
+
|
|
|
+ # Wonderful, we've found all the products we need, we've all the users.
|
|
|
+ # Now users will recycle
|
|
|
+ for i in range(0, (g_users.get_max_number() - g_users.get_min_number())):
|
|
|
+ g_users.get_user_and_login(i)
|
|
|
+ logger.info("Daily Recycling for user " + str(i))
|
|
|
+ for n in range(0, (g_api_ini.get_max_number() - g_api_ini.get_min_number())):
|
|
|
+ l_product = g_all_products[n]
|
|
|
+ l_product.recycle()
|
|
|
+ logger.debug("Recycled " + str(l_product) + " for user " + str(i))
|
|
|
+else:
|
|
|
+ print("Mode unknown: " + g_mode)
|
|
|
+ SystemExit()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+############## Bulk Recycling ################################
|
|
|
+
|
|
|
+url = g_api_ini.get_base_url() + '/scanapp/en-us/bulk_recycling/'
|
|
|
+
|
|
|
+logger.info("Bulk-Recycling starts")
|
|
|
+for i in range(g_api_ini.get_min_number(), g_api_ini.get_max_number()):
|
|
|
+ if divmod(i,20)[1] == 0:
|
|
|
+ logger.info("Bulk recycling - items: " + str(i))
|
|
|
+ g_api_statistics.get_statistics(g_random(1,2), g_random(1,3))
|
|
|
+ g_timeit.start("bulk_recycling")
|
|
|
+ recycle_data = {"product_type" : g_random(3,17), "weight" : g_random(100,1000), "weight_unit" : g_random(1,2)}
|
|
|
+ resp = g_post(url, headers=g_get_header(i), json=recycle_data)
|
|
|
+
|
|
|
+ if g_check_status(resp):
|
|
|
+ l_answer = resp.json()
|
|
|
+ logger.debug(str(i) + " " + str(l_answer))
|
|
|
+
|
|
|
+ g_timeit.end()
|
|
|
+logger.info("Bulk-recycling ends")
|
|
|
+
|
|
|
+g_timeit.printreport()
|
|
|
+#g_perf_trace.perf_trace_end()
|
|
|
+print("\nall done - Thanks")
|