gur111

thread_pool_server_tests.py

Jun 14th, 2021 (edited)
3,071
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.45 KB | None | 0 0
  1. import math
  2. import os
  3. import time
  4. import unittest
  5. import subprocess as sp
  6. import asyncio
  7. import requests_async as requests
  8. from h11 import RemoteProtocolError
  9. from threading import Lock
  10.  
  11. SERVER_PATH = os.path.realpath(os.path.join(os.path.curdir, '..', 'server'))
  12. DYNAMIC_REQ_TIME = 0.2
  13. DEFAULT_PORT = 8080
  14. DEFAULT_THREAD_COUNT = 3
  15. DEFAULT_QUEUE_SIZE = 7
  16. DEFAULT_STATIC_PAGE = 'home.html'
  17. DEFAULT_DYNAMIC_PAGE = 'output.cgi'
  18. SERVER_ADDRESS = 'localhost'  # 'localhost'
  19.  
  20.  
  21. class RequestResult:
  22.     def __init__(self, req_ind, res=None, e=None):
  23.         self.res = res
  24.         self.e = e
  25.         self.req_ind = req_ind
  26.  
  27.     def has_exception(self):
  28.         return self.e is not None
  29.  
  30.     def is_exception_of_type(self, exc_type):
  31.         return isinstance(self.e, exc_type)
  32.  
  33.  
  34. class RequestsTest(unittest.TestCase):
  35.     # noinspection HttpUrlsUsage
  36.     def __init__(self, *args, queue_size=DEFAULT_QUEUE_SIZE, thread_count=DEFAULT_THREAD_COUNT, policy='dt', **kwargs):
  37.         super().__init__(*args, **kwargs)
  38.         self.req_ind_mutex = Lock()
  39.         self.dyn_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/{DEFAULT_DYNAMIC_PAGE}'
  40.         self.static_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/{DEFAULT_STATIC_PAGE}'
  41.         self.not_found_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/not_found'
  42.         self.forbidden_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/forbidden.cgi'
  43.         self.queue_size = queue_size
  44.         self.max_reqs = self.queue_size
  45.         self.server_path = SERVER_PATH
  46.         self.thread_count = thread_count
  47.         self.policy = policy
  48.         if policy == 'random':
  49.             self.per_drop_size = math.ceil(0.25 * (self.queue_size-self.thread_count))
  50.         elif policy in ['dt', 'dh']:
  51.             self.per_drop_size = 1
  52.         elif policy == 'block':
  53.             self.per_drop_size = 0
  54.  
  55.         self.last_req_index = 0
  56.  
  57.     def setUp(self):
  58.         print('Setting up test:')
  59.         print(f'\tthread: {self.thread_count}')
  60.         print(f'\tqueue size: {self.queue_size}')
  61.         print(f'\tpolicy: {self.policy}')
  62.         os.chdir(os.path.dirname(self.server_path))
  63.         self.server = sp.Popen([self.server_path, f'{DEFAULT_PORT}', f'{self.thread_count}', f'{self.queue_size}', self.policy])
  64.         print(f'Server status is: {self.server.poll()}')
  65.         # input('Confirm open port and hit RETURN')
  66.         # print('')
  67.  
  68.     def tearDown(self) -> None:
  69.         self.server.terminate()
  70.  
  71.     async def make_req(self, url, method='get'):
  72.         try:
  73.             self.req_ind_mutex.acquire()
  74.             req_ind = self.last_req_index + 1
  75.             self.last_req_index += 1
  76.             self.req_ind_mutex.release()
  77.             req_headers = {'req_index': f'{req_ind}'}
  78.             # arrival_time = time.time() * 1000  # in milliseconds
  79.             req_ind = self.last_req_index
  80.             if method == 'get':
  81.                 req = requests.get(url, headers=req_headers)
  82.             elif method == 'post':
  83.                 req = requests.post(url, headers=req_headers)
  84.             elif method == 'delete':
  85.                 req = requests.delete(url, headers=req_headers)
  86.             else:
  87.                 self.fail('Unknown request method')
  88.  
  89.             # print(f'Awaiting request {req_ind}')
  90.             response = await req
  91.             headers = response.headers
  92.             for header in headers:
  93.                 if header.lower().startswith('stat'):
  94.                     headers[header] = headers[header][1:]
  95.  
  96.             # response_time = time.time() * 1000
  97.             # self.assertAlmostEqual(arrival_time, float(response.headers['stat-req-arrival']), delta=min(500 * DYNAMIC_REQ_TIME, (response_time - arrival_time) * 0.2))
  98.         except Exception as e:
  99.             r = RequestResult(req_ind=req_ind, e=e)
  100.             return r
  101.         else:
  102.             r = RequestResult(req_ind=req_ind, res=response)
  103.             return r
  104.  
  105.     async def make_requests(self, url, total_reqs):
  106.         print(f'Requesting url: {self.dyn_url}')
  107.         self.last_req_index = 0
  108.         tasks = []
  109.         fail_expected_tasks = []
  110.         thread_stats = [{'count': 0, 'dyn': 0, 'static': 0} for _ in range(self.thread_count)]
  111.         expected_error_count = total_reqs - self.max_reqs + ((-(total_reqs - self.max_reqs)) % self.per_drop_size)
  112.         expected_average_dispatch = DYNAMIC_REQ_TIME * float(total_reqs - expected_error_count - min(self.thread_count, self.max_reqs)) / self.thread_count / 2
  113.  
  114.         if self.policy == 'random':
  115.             for _ in range(total_reqs):
  116.                 task = asyncio.ensure_future(self.make_req(url))
  117.                 tasks.append(task)
  118.         elif self.policy == 'dt':
  119.             for _ in range(total_reqs - expected_error_count):
  120.                 task = asyncio.ensure_future(self.make_req(url))
  121.                 tasks.append(task)
  122.  
  123.             for _ in range(expected_error_count):
  124.                 task = asyncio.ensure_future(self.make_req(url))
  125.                 fail_expected_tasks.append(task)
  126.  
  127.         elif self.policy == 'dh':
  128.             for _ in range(expected_error_count):
  129.                 task = asyncio.ensure_future(self.make_req(url))
  130.                 fail_expected_tasks.append(task)
  131.  
  132.             for _ in range(total_reqs - expected_error_count):
  133.                 task = asyncio.ensure_future(self.make_req(url))
  134.                 tasks.append(task)
  135.  
  136.         responses = await asyncio.gather(*tasks, *fail_expected_tasks, return_exceptions=True)
  137.  
  138.         responses = sorted(responses, key=lambda x: x.req_ind)
  139.  
  140.         error_count = 0
  141.         total_dispatch = 0
  142.         for res in responses:
  143.             if res.is_exception_of_type(RemoteProtocolError):
  144.                 error_count += 1
  145.                 continue
  146.             elif res.has_exception():
  147.                 raise res.e
  148.  
  149.             res = res.res
  150.             total_dispatch += float(res.headers['stat-req-dispatch'])
  151.             count, dyn, static = int(res.headers['stat-thread-count']), int(res.headers['stat-thread-dynamic']), int(res.headers['stat-thread-static'])
  152.  
  153.             tid = int(res.headers['stat-thread-id'])
  154.             thread_stats[tid]['count'] = max(count, thread_stats[tid]['count'])
  155.             thread_stats[tid]['dyn'] = max(dyn, thread_stats[tid]['dyn'])
  156.             thread_stats[tid]['static'] = max(static, thread_stats[tid]['static'])
  157.             self.assertEqual(count, dyn, f'Mismatch total request and dynamic request. Total: {count}. Dynamic: {dyn}')
  158.             self.assertEqual(static, 0, f'Unexpected static requests count. Expected {0}. Actual {static}')
  159.             # self.assertAlmostEqual(float(res.headers['stat-req-arrival']), arrival_time)
  160.  
  161.         total_count = total_dyn = 0
  162.  
  163.         for stat in thread_stats:
  164.             total_count += stat['count']
  165.             total_dyn += stat['dyn']
  166.  
  167.         print(f'Requests succeeded: {total_count}')
  168.         print(f'Requests failed: {error_count}')
  169.  
  170.         self.assertEqual(total_count, total_reqs - error_count)
  171.         self.assertEqual(total_count, total_dyn)
  172.  
  173.         self.assertEqual(expected_error_count, error_count, f'Unexpected error count. Expected {expected_error_count}. Actual: {error_count}')
  174.  
  175.         self.assertNotEqual(0, total_reqs - error_count, 'No request succeeded')
  176.         average_dispatch = total_dispatch / float(total_reqs - error_count)
  177.         self.assertAlmostEqual(expected_average_dispatch, average_dispatch, delta=max(1, expected_average_dispatch * 0.3),
  178.                                msg=f'Unexpected average dispatch time. Expected: {expected_average_dispatch}. Actual: {average_dispatch}')
  179.  
  180.         # TODO: Fix check. Should check that the correct requests failed for each policy type
  181.         # if self.policy == 'dh':
  182.         #     for res in responses[self.thread_count:self.thread_count + error_count]:
  183.         #         self.assertIsInstance(res.e, RemoteProtocolError)
  184.         # elif self.policy == 'dt':
  185.         #     for res in responses[-error_count:]:
  186.         #         self.assertIsInstance(res.e, RemoteProtocolError)
  187.  
  188.  
  189. class TestDropTailRequests(RequestsTest):
  190.     def __init__(self, *args, **kwargs):
  191.         super().__init__(*args, policy='dt', **kwargs)
  192.  
  193.     def test_drop_single(self):
  194.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 1))
  195.  
  196.     def test_drop_double_queue_size(self):
  197.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + self.queue_size * 2))
  198.  
  199.  
  200. class TestDropHeadRequests(RequestsTest):
  201.     def __init__(self, *args, **kwargs):
  202.         super().__init__(*args, policy='dh', **kwargs)
  203.  
  204.     def test_drop_single(self):
  205.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 1))
  206.  
  207.     def test_drop_double_queue_size(self):
  208.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + self.queue_size * 2))
  209.  
  210.  
  211. class TestDropRandomRequests(RequestsTest):
  212.     def __init__(self, *args, **kwargs):
  213.         super().__init__(*args, queue_size=16, policy='random', **kwargs)
  214.  
  215.     def test_single_drop_random(self):
  216.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 1))
  217.  
  218.     def test_double_drop_random(self):
  219.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 2 * int(0.25 * self.queue_size)))
  220.  
  221.     def test_no_drop(self):
  222.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs))
  223.  
  224.  
  225. class TestMultiThreaded(RequestsTest):
  226.     def __init__(self, *args, **kwargs):
  227.         super().__init__(*args, queue_size=80, **kwargs)
  228.  
  229.     def test_time_full_queue(self):
  230.         start_time = time.time()
  231.         req_count = self.max_reqs
  232.         asyncio.run(self.make_requests(self.dyn_url, req_count))
  233.         run_time = time.time() - start_time
  234.         expected_runtime = math.ceil(req_count / float(self.thread_count)) * DYNAMIC_REQ_TIME
  235.         # This is optimal so it must be greater
  236.         self.assertGreater(run_time, expected_runtime)
  237.         self.assertLess(run_time, expected_runtime * 2)
  238.  
  239.     def test_better_with_more_threads(self):
  240.         start_time = time.time()
  241.         req_count = self.max_reqs
  242.         asyncio.run(self.make_requests(self.dyn_url, req_count))
  243.         few_threads_run_time = time.time() - start_time
  244.  
  245.         self.server.terminate()
  246.         self.thread_count *= 3
  247.         self.setUp()
  248.  
  249.         start_time = time.time()
  250.         asyncio.run(self.make_requests(self.dyn_url, req_count))
  251.         more_threads_run_time = time.time() - start_time
  252.  
  253.         self.assertTrue(2 * more_threads_run_time < few_threads_run_time < 3 * more_threads_run_time, "Performance doesn't scale as expected with amount of threads")
  254.  
  255.  
  256. class TestStatusCodes(RequestsTest):
  257.     def __init__(self, *args, **kwargs):
  258.         super().__init__(*args, thread_count=1, queue_size=1, **kwargs)
  259.  
  260.     async def _make_req(self, url, expected_status, stat_map, method='get'):
  261.         task = asyncio.ensure_future(self.make_req(url, method=method))
  262.  
  263.         res = await asyncio.ensure_future(task)
  264.  
  265.         headers = res.res.headers
  266.  
  267.         for k in stat_map:
  268.             self.assertIn(k, headers)
  269.             if stat_map[k] is not None:
  270.                 self.assertAlmostEqual(float(headers[k]), stat_map[k], delta=0.03, msg=f'Unexpected value for {k}. Expected: {stat_map[k]}. Actual: {headers[k]}')
  271.  
  272.         self.assertEqual(expected_status, res.res.status_code, f'Unexpected status code. Expected: {expected_status}. Actual: {res.res.status_code}')
  273.  
  274.     def test_404(self):
  275.         stat_map = {
  276.             'stat-req-arrival': None,
  277.             'stat-req-dispatch': 0.00004,
  278.             'stat-thread-id': 0,
  279.             'stat-thread-count': 1,
  280.             'stat-thread-static': 0,
  281.             'stat-thread-dynamic': 0
  282.         }
  283.  
  284.         asyncio.run(self._make_req(self.not_found_url, 404, stat_map))
  285.  
  286.     def test_dynamic(self):
  287.         stat_map = {
  288.             'stat-req-arrival': None,
  289.             'stat-req-dispatch': 0.00004,
  290.             'stat-thread-id': 0,
  291.             'stat-thread-count': 1,
  292.             'stat-thread-static': 0,
  293.             'stat-thread-dynamic': 1
  294.         }
  295.         asyncio.run(self._make_req(self.dyn_url, 200, stat_map))
  296.  
  297.     def test_static(self):
  298.         stat_map = {
  299.             'stat-req-arrival': None,
  300.             'stat-req-dispatch': 0.00004,
  301.             'stat-thread-id': 0,
  302.             'stat-thread-count': 1,
  303.             'stat-thread-static': 1,
  304.             'stat-thread-dynamic': 0
  305.         }
  306.         asyncio.run(self._make_req(self.static_url, 200, stat_map))
  307.  
  308.     def test_forbidden(self):
  309.         stat_map = {
  310.             'stat-req-arrival': None,
  311.             'stat-req-dispatch': 0.00004,
  312.             'stat-thread-id': 0,
  313.             'stat-thread-count': 1,
  314.             'stat-thread-static': 0,
  315.             'stat-thread-dynamic': 0
  316.         }
  317.         asyncio.run(self._make_req(self.forbidden_url, 403, stat_map))
  318.  
  319.     def test_post(self):
  320.         stat_map = {
  321.             'stat-req-arrival': None,
  322.             'stat-req-dispatch': 0.00004,
  323.             'stat-thread-id': 0,
  324.             'stat-thread-count': 1,
  325.             'stat-thread-static': 0,
  326.             'stat-thread-dynamic': 0
  327.         }
  328.         asyncio.run(self._make_req(self.static_url, 501, stat_map, method='post'))
  329.  
  330.  
  331. class TestQueueSmallerThan(RequestsTest):
  332.     def __init__(self, *args, **kwargs):
  333.         super().__init__(*args, thread_count=5, queue_size=2, **kwargs)
  334.  
  335.     def test_small_queue_enough_threads(self):
  336.         asyncio.run(self.make_requests(url=self.dyn_url, total_reqs=self.thread_count))
  337.  
  338.  
  339. if __name__ == '__main__':
  340.     unittest.main()
  341.  
Add Comment
Please, Sign In to add comment