123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- # Mocks
- from requests.models import Response
- import requests
- import time
- import csv
- # Test Env
- import pytest
- from unittest.mock import patch
- # Test functions
- from baangt.base.ProxyRotate import ProxyRotate, proxy_data
- import baangt.base.GlobalConstants as GC
- from dataclasses import dataclass, asdict
- from http import HTTPStatus
- import copy
- MIN_PROXIES_FOR_FIRST_RUN = 3
- def create_init_proxy_list(length):
- proxyList = []
- for index in range(length):
- proxy = asdict(proxy_data(ip = (str)(index), port = (str)(0)))
- for key in proxy:
- proxy[key] = str(proxy[key])
- proxyList.append(proxy)
- return proxyList
- def init_ProxyRotate(mock_csv_DictReader, proxyListLength, firstRun = True):
- proxyRotate = ProxyRotate()
- proxyRotate.proxy_gather_link = "https://www.sslproxies.org/"
- proxyRotate.proxies = {}
- proxyRotate.all_proxies = {}
- proxyRotate._ProxyRotate__temp_proxies = []
- proxyRotate.firstRun = firstRun
- proxyRotate.MIN_PROXIES_FOR_FIRST_RUN = MIN_PROXIES_FOR_FIRST_RUN
- mock_csv_DictReader.return_value = create_init_proxy_list(proxyListLength)
- proxyRotate._ProxyRotate__read_proxies()
- return proxyRotate
- def create_proxy_verification_response(status_code, length, deleteString=None):
- ret = []
- for index in range(length):
- res_youtube = Response()
- res_youtube._content = "<!DOCTYPE html><html><head><meta content='Content' property='og:title'/></head><body></body></html>"
- res_youtube.status_code = status_code
- res_google = copy.copy(res_youtube)
- res_google._content = "<!DOCTYPE html><html></head><body><div class='logo'><img src='/img/favicon.png'></img></div></body></html>"
- if deleteString is not None:
- res_google._content = res_google._content.replace(deleteString, "")
- res_youtube._content = res_youtube._content.replace(deleteString, "")
- ret.append(res_youtube)
- ret.append(res_google)
- return ret
- def create_request_proxy_list(status_code, length, deleteString=None):
- res_proxy = Response()
- res_proxy._content = "<!DOCTYPEhtml><html><head></head><body><tbody>"
- for index in range(length):
- res_proxy._content += f'<tr><td>{index}.000</td><td>40231</td><td>KE</td></tr>'
- res_proxy._content += "</tbody></body></html>"
- if deleteString is not None:
- res_proxy._content = res_proxy._content.replace(deleteString, "")
- res_proxy.status_code = status_code
- return res_proxy
- @patch("requests.get")
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyLength", list(range(0,MIN_PROXIES_FOR_FIRST_RUN)))
- @pytest.mark.parametrize("additionalProxyRequestLength", [1])
- def test_recheckProxies_init_proxy_with_less_min_proxy(mock_csv_DictReader, mock_time_sleep, mock_get, proxyLength, additionalProxyRequestLength):
- """
- Initialise a valid proxy list <= MIN_PROXIES_FOR_FIRST_RUN
- Test Steps:
- - Init a valid proxy list (length 0....MIN_PROXIES_FOR_FIRST_RUN), firstRun = True
- - Verificate proxy list successful
- - Request for more proxy items is provided
- If proxy list size <= MIN_PROXIES_FOR_FIRST_RUN, ProxyRotate will request for more proxy servers
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyLength)
- # Add valid response for proxy verification
- responseList = create_proxy_verification_response(HTTPStatus.OK, proxyLength)
- # Create successful requests for proxy list and proxy verification because [proxy list size > MIN_PROXIES_FOR_FIRST_RUN]
- responseList.append(create_request_proxy_list(HTTPStatus.OK, additionalProxyRequestLength))
- responseList.extend(create_proxy_verification_response(HTTPStatus.OK, additionalProxyRequestLength + proxyLength))
- mock_get.side_effect = responseList
- proxyRotate.recheckProxies(forever= False)
- assert len(responseList) == mock_get.call_count
- assert len(proxyRotate.proxies) == (additionalProxyRequestLength + proxyLength)
- @patch("requests.get")
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyLength", list(range(MIN_PROXIES_FOR_FIRST_RUN,8)))
- def test_recheckProxies_init_proxy_list(mock_csv_DictReader, mock_get, proxyLength):
- """
- Initialise a valid proxy list greater than MIN_PROXIES_FOR_FIRST_RUN
- Test Steps:
- - Init a valid proxy list (length MIN_PROXIES_FOR_FIRST_RUN...8), firstRun = True
- - Verificate proxy list successful
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyLength)
- # Add valid response for proxy verification (if firstRun == True only MIN_PROXIES_FOR_FIRST_RUN are validated)
- responseList = (create_proxy_verification_response(HTTPStatus.OK, MIN_PROXIES_FOR_FIRST_RUN))
- mock_get.side_effect = responseList
- proxyRotate.recheckProxies(forever= False)
- assert len(responseList) == mock_get.call_count
- assert len(proxyRotate.proxies) == proxyRotate.MIN_PROXIES_FOR_FIRST_RUN
- assert len(proxyRotate.all_proxies) == proxyLength
- @patch("requests.get")
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyLengthValid", [10])
- @pytest.mark.parametrize("proxyLengthInvalid", [1])
- @pytest.mark.parametrize("firstRun", [False])
- def test_recheckProxies_proxy_verification_failed_html_status_error(mock_csv_DictReader, mock_time_sleep, mock_get,
- proxyLengthValid, proxyLengthInvalid, firstRun):
- """
- Generate a html status error code in proxy verification
- Test Steps:
- - Init a valid proxy list, firstRun = False
- - One unsuccessful proxy validation response with html response status error is generated
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyLengthValid + proxyLengthInvalid, firstRun)
- # Add valid response for proxy verification
- responseList = create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid)
- # Add invalid response for proxy verification
- responseList.extend(create_proxy_verification_response(HTTPStatus.BAD_REQUEST, proxyLengthInvalid))
- # Create response for requesting more proxies
- responseList.append(create_request_proxy_list(HTTPStatus.OK, 0))
- # Add valid response for proxy verification - proxies are validated again
- responseList.extend(create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid))
- mock_get.side_effect = responseList
-
- proxyRotate.recheckProxies(forever= False)
- assert len(responseList) == mock_get.call_count
- assert len(proxyRotate.proxies) == proxyLengthValid
- assert len(proxyRotate.all_proxies) == (proxyLengthValid + proxyLengthInvalid)
- @patch("requests.get")
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyLengthValid", [10])
- @pytest.mark.parametrize("proxyLengthInvalid", [1])
- @pytest.mark.parametrize("firstRun", [False])
- @pytest.mark.parametrize("htmlBodyErrors", ["property","logo", "meta", "div", "content"])
- def test_recheckProxies_proxy_verification_failed_html_body(mock_csv_DictReader, mock_time_sleep, mock_get,
- proxyLengthValid, proxyLengthInvalid, firstRun, htmlBodyErrors):
- """
- Generate a html body error in proxy verification
- Test Steps:
- - Init a valid proxy list, firstRun = False
- - One unsuccessful proxy validation response with html body error is generated
- """
-
- # Initialize proxy list
- proxyRotate= init_ProxyRotate(mock_csv_DictReader, proxyLengthValid + proxyLengthInvalid, firstRun)
- # Add valid response for proxy verification
- responseList = create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid)
- # Add invalid response for proxy verification
- responseList.extend(create_proxy_verification_response(HTTPStatus.OK, proxyLengthInvalid, htmlBodyErrors))
- # Create response for requesting more proxies
- responseList.append(create_request_proxy_list(HTTPStatus.OK, 0))
- # Add valid response for proxy verification - proxies are validated again
- responseList.extend(create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid))
- mock_get.side_effect = responseList
- proxyRotate.recheckProxies(forever= False)
- assert len(responseList) == mock_get.call_count
- assert len(proxyRotate.proxies) == proxyLengthValid
- assert len(proxyRotate.all_proxies) == (proxyLengthValid + proxyLengthInvalid)
- @patch("requests.get")
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyLengthValid", [10])
- @pytest.mark.parametrize("proxyLengthInvalid", [1])
- @pytest.mark.parametrize("firstRun", [False])
- def test_recheckProxies_proxy_list_failed_html_status_error(mock_csv_DictReader, mock_time_sleep, mock_get,
- proxyLengthValid, proxyLengthInvalid, firstRun):
- """
- Generate a html body error in proxy verification
- Test Steps:
- - Init a valid proxy list, firstRun = False
- - One unsuccessful proxy validation response with html response status error is generated
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyLengthValid, firstRun)
- # Add valid response for proxy verification
- responseList = create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid)
- # Create response for requesting more proxies
- responseList.append(create_request_proxy_list(HTTPStatus.BAD_REQUEST, proxyLengthInvalid))
- # Add valid response for proxy verification - proxies are validated again
- responseList.extend(create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid))
- mock_get.side_effect = responseList
- proxyRotate.recheckProxies(forever= False)
- assert len(responseList) == mock_get.call_count
- assert len(proxyRotate.proxies) == proxyLengthValid
- assert len(proxyRotate.all_proxies) == proxyLengthValid
- @patch("requests.get")
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyLengthValid", [10])
- @pytest.mark.parametrize("proxyLengthInvalid", [1])
- @pytest.mark.parametrize("firstRun", [False])
- @pytest.mark.parametrize("htmlBodyErrors", ["td","tr","tbody"])
- def test_recheckProxies_proxy_list_failed_html_body(mock_csv_DictReader, mock_time_sleep, mock_get,
- proxyLengthValid, proxyLengthInvalid, firstRun, htmlBodyErrors):
- """
- Generate a html body error in proxy list request
- Test Steps:
- - Init a valid proxy list, firstRun = False
- - Failure in proxy list html body response is generated
- """
- # Initialize proxy list
- proxyRotate= init_ProxyRotate(mock_csv_DictReader, proxyLengthValid, firstRun)
- # Add valid response for proxy verification
- responseList = create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid)
- # Create response for requesting more proxies
- responseList.append(create_request_proxy_list(HTTPStatus.OK, proxyLengthInvalid, htmlBodyErrors))
- # Add valid response for proxy verification - proxies are validated again
- responseList.extend(create_proxy_verification_response(HTTPStatus.OK, proxyLengthValid))
- mock_get.side_effect = responseList
- proxyRotate.recheckProxies(forever= False)
- assert len(responseList) == mock_get.call_count
- assert len(proxyRotate.proxies) == proxyLengthValid
- assert len(proxyRotate.all_proxies) == proxyLengthValid
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyInitLength", [10, 700])
- def test_random_proxy(mock_csv_DictReader, mock_time_sleep, proxyInitLength):
- """
- Test random proxy return function
- When called with "value=0" it will not be able to provide a Proxy-Server as no Proxy servers can be found.
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyInitLength)
- proxyRotate.proxies = proxyRotate.all_proxies
- result = proxyRotate.random_proxy()
- assert isinstance(result["ip"], str)
- assert isinstance(result["port"], str)
- assert isinstance(result["type"], str)
- assert proxyRotate.proxies[result["ip"]].called == 1
- @patch('time.sleep', return_value = None)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyInitLength", [0])
- def test_random_proxy_no_proxy_found(mock_csv_DictReader, mock_time_sleep, proxyInitLength):
- """
- Test random proxy return function
- When called with "value=0" it will not be able to provide a Proxy-Server as no Proxy servers can be found.
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyInitLength)
- proxyRotate.proxies = proxyRotate.all_proxies
- with pytest.raises(BaseException):
- result = proxyRotate.random_proxy()
- @patch("csv.DictReader")
- @pytest.mark.parametrize("failedCounter", list(range(0,GC.PROXY_FAILCOUNTER)))
- @pytest.mark.parametrize("proxyInitLength", [10])
- def test_remove_proxy_fail_counter_in_range(mock_csv_DictReader, failedCounter, proxyInitLength):
- """
- Test increase the fail counter until the fail counter limit.
- Proxy should not be removed
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyInitLength)
- proxyRotate.proxies = proxyRotate.all_proxies
- for item in range(0, failedCounter):
- proxyRotate.remove_proxy((str)('0'))
- assert ('0' in proxyRotate.proxies)
- assert len(proxyRotate.proxies) == proxyInitLength
-
- @patch("csv.DictReader")
- @pytest.mark.parametrize("failedCounter", list(range(GC.PROXY_FAILCOUNTER, 5)))
- #@pytest.mark.parametrize("failedCounter", list(range(5,10)))
- @pytest.mark.parametrize("proxyInitLength", [10])
- @pytest.mark.parametrize("testProdyId", ["0"])
- def test_remove_proxy_fail_counter_out_of_range(mock_csv_DictReader, failedCounter, proxyInitLength, testProdyId):
- """
- Test increase the fail counter and exceed the fail counter limit.
- Proxy should be removed
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyInitLength)
- proxyRotate.proxies = proxyRotate.all_proxies
- for item in range(0, failedCounter):
- proxyRotate.remove_proxy((str)(testProdyId))
- assert failedCounter >= GC.PROXY_FAILCOUNTER
- assert testProdyId not in proxyRotate.proxies
- assert len(proxyRotate.proxies) == (proxyInitLength - 1)
- @patch("csv.DictReader")
- @pytest.mark.parametrize("proxyInitLength", [10])
- def test_remove_proxy_invalid_param(mock_csv_DictReader, proxyInitLength):
- """
- Test remove proxy for invalid parameter
- """
- # Initialize proxy list
- proxyRotate = init_ProxyRotate(mock_csv_DictReader, proxyInitLength)
- proxyRotate.proxies = proxyRotate.all_proxies
- proxyRotate.remove_proxy(ip = f'{proxyInitLength}')
- proxyRotate.remove_proxy(ip = proxyInitLength)
- proxyRotate.remove_proxy(ip = 'p')
- proxyRotate.remove_proxy(ip = 'z', port = 0)
- proxyRotate.remove_proxy(ip = 0, port = 'p')
- assert len(proxyRotate.proxies) == proxyInitLength
-
- """
- # TODO This test belongs to the integration test
- def test_gather_links():
- ## Will if links are gathering from website
- proxyRotate = ProxyRotate()
- proxies = proxyRotate.testGatherProxy()
- assert type(proxies)==list and len(proxies)>0
- """
- """
- # TODO This test belongs to the integration test
- def test_verify_proxy():
- ## Verifies single proxy and will give the result
- if os.path.isfile("../proxies.csv"):
- with open("../proxies.csv", 'r') as file:
- data = file.read()
- with open("proxies.csv", 'w') as output_file:
- output_file.write(data)
- elif os.path.isfile(os.path.join(os.path.expanduser("~"),"baangt/proxies.csv")):
- with open(os.path.join(os.path.expanduser("~"),"baangt/proxies.csv"), 'r') as file:
- data = file.read()
- with open("proxies.csv", 'w') as output_file:
- output_file.write(data)
- proxyRotate = ProxyRotate()
- result = proxyRotate.testVerifyProxy()
- print(result)
- assert type(result) == str
- """
- """
- # TODO This test belongs to the integration test
- def test_proxy_full_run():
- proxyRotate = ProxyRotate(reReadProxies=True)
- proxyRotate.recheckProxies(forever=False)
- assert len(proxyRotate.all_proxies) > len(proxyRotate.proxies)
- assert len(proxyRotate.proxies) == proxyRotate.MIN_PROXIES_FOR_FIRST_RUN
- result = proxyRotate.random_proxy()
- assert isinstance(result["ip"], str)
- assert isinstance(result["port"], str)
- assert isinstance(result["type"], str)
- result = proxyRotate.remove_proxy(ip=proxyRotate.random_proxy()["ip"])
- assert 1==1
- """
|