Advertisement
den4ik2003

Untitled

Jul 4th, 2025
12
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.46 KB | None | 0 0
  1. import asyncio
  2. import sys
  3. import aiopg
  4. import numpy as np
  5. import time
  6. from configparser import ConfigParser
  7. from datetime import timezone, timedelta
  8.  
  9. class DBReader:
  10.  
  11. def __init__(self, path_to_config, max_connections=3):
  12. parser = ConfigParser()
  13. parser.read(path_to_config)
  14.  
  15. self.db_params = {}
  16. if parser.has_section('postgresql'):
  17. params = parser.items('postgresql')
  18. for param in params:
  19. self.db_params[param[0]] = param[1]
  20.  
  21. self.pool = None
  22. self.max_connections = max_connections
  23. self.symbol_id_to_ticket, self.symbols_id_in_group = {}, {}
  24. self.timezones = {'MEI': 9}
  25.  
  26. async def init_pool(self, side=False):
  27. if self.pool is None:
  28. self.pool = await aiopg.create_pool(**self.db_params, minsize=1, maxsize=self.max_connections)
  29. await self.update_symbols_list(side)
  30.  
  31. async def update_symbols_list(self, side=False):
  32. if side:
  33. additional = ""
  34. else:
  35. additional = """
  36. JOIN id_group_relation USING (symbol_id)
  37. JOIN groups USING (group_id)
  38. """
  39. symbols_query = f"""
  40. SELECT symbol_id, base, quote, exchange_name {", username" if side else ''}
  41. FROM symbols JOIN exchanges USING (exchange_id)
  42. {additional}
  43. WHERE exchange_type = 'spot'
  44. """
  45. symbols = await self.execute_select(symbols_query)
  46. names = {}
  47.  
  48. self.symbols_id_in_group, self.symbol_id_to_ticket = {}, {}
  49.  
  50. for symbol in symbols:
  51. name = (symbol[1] + '/' + symbol[2] + ' ' + symbol[3].lower())
  52. group = symbol[1]
  53. if side:
  54. name = (symbol[1] + '/' + symbol[2] + ' ' + symbol[3].lower() + ' ' + symbol[4].lower())
  55. group = (symbol[1] + '_' + symbol[2] + '.' + symbol[4].lower())
  56.  
  57. if group not in self.symbols_id_in_group:
  58. self.symbols_id_in_group[group] = []
  59.  
  60. self.symbols_id_in_group[group].append(symbol[0])
  61. self.symbol_id_to_ticket[symbol[0]] = name
  62.  
  63. print('[UPDATED] symbols_id_in_group:', self.symbols_id_in_group)
  64. print('[UPDATED] symbol_id_to_ticket:', self.symbol_id_to_ticket)
  65.  
  66. async def get_balances_by_group(self, group, strategy=None):
  67. print('A', file=sys.stderr)
  68. balances_answer = ''
  69.  
  70. for symbol in self.symbols_id_in_group[group]:
  71. balance_query = f"""
  72. SELECT time, base, quote
  73. FROM balance_{symbol}
  74. {f"WHERE strategy = '{strategy}'" if strategy else ''}
  75. ORDER BY time DESC
  76. LIMIT 1;
  77. """
  78.  
  79. balance_record = await self.execute_select(balance_query)
  80.  
  81. if not balance_record:
  82. continue
  83.  
  84. balance_record = balance_record[0]
  85. token = self.symbol_id_to_ticket[symbol].split('/')[0]
  86.  
  87. base = str(balance_record[1])
  88. quote = str(balance_record[2])
  89. time = balance_record[0].replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=self.timezones.get(token, 0)))).strftime("%Y-%m-%d %H:%M:%S")
  90.  
  91. balances_answer += self.symbol_id_to_ticket[symbol] + ':\n'
  92. balances_answer += 'time: ' + time + '\n'
  93. balances_answer += f'{token}: ' + base + '\n'
  94. balances_answer += 'USDT: ' + quote + '\n\n'
  95.  
  96. print('B', file=sys.stderr)
  97. return balances_answer
  98.  
  99. async def get_volumes_by_group(self, group):
  100. volumes_answer = ''
  101. for symbol in self.symbols_id_in_group[group]:
  102. public_trades_24h_sum_query = f"""
  103. SELECT SUM(price * quantity)
  104. FROM public_trade_{symbol}
  105. WHERE time > NOW() - interval '1 day'
  106. """
  107. res = await self.execute_select(public_trades_24h_sum_query)
  108. if not res:
  109. continue
  110. res = res[0]
  111. public_trades_24h_sum = 0
  112. if res and res[0]:
  113. public_trades_24h_sum = np.round(res[0], 2)
  114.  
  115. user_trades_24h_sum_query = f"""
  116. SELECT SUM(price * quantity) / 2
  117. FROM user_trade_{symbol}
  118. WHERE strategy = 'V' AND time > NOW() - interval '1 day'
  119. """
  120. res = await self.execute_select(user_trades_24h_sum_query)
  121. if not res:
  122. continue
  123. res = res[0]
  124. user_trades_24h_sum = 0
  125. if res and res[0]:
  126. user_trades_24h_sum = np.round(res[0], 2)
  127.  
  128. volumes_answer += self.symbol_id_to_ticket[symbol] + ':\n'
  129. volumes_answer += f'total liquidity (24h): {public_trades_24h_sum}\n'
  130. volumes_answer += f'volume bot liquidity (24h): {user_trades_24h_sum}\n\n'
  131.  
  132. return volumes_answer
  133.  
  134. async def get_active_orders(self, group):
  135. orders = []
  136. for symbol in self.symbols_id_in_group[group]:
  137. print(f'DEBUG symbol: {symbol} {self.symbol_id_to_ticket[symbol]}')
  138. active_orders_query = f"""
  139. SELECT time, price, quantity, is_bid
  140. FROM active_orders_{symbol}
  141. WHERE strategy = 'M'
  142. """
  143. db_orders = await self.execute_select(active_orders_query)
  144. if not db_orders:
  145. continue
  146. for order in db_orders:
  147. orders.append({'symbol': self.symbol_id_to_ticket[symbol], 'time': str(order[0]), 'price': order[1], 'quantity': order[2], 'side': 'BUY' if order[3] else 'SELL'})
  148.  
  149. return orders
  150.  
  151. async def execute_select(self, query):
  152. try:
  153. async with self.pool.acquire() as conn:
  154. print(f'AWAITED: {int(time.time()*1000)}', file=sys.stderr)
  155. async with conn.cursor() as cursor:
  156. await cursor.execute(query)
  157. print(f'FINISHED: {int(time.time()*1000)}', file=sys.stderr)
  158. return await cursor.fetchall()
  159. except Exception as e:
  160. print('[EXCEPTION] execute_select:', e)
  161. return
  162.  
  163. async def close_connection(self):
  164. if self.pool:
  165. self.pool.close()
  166. await self.pool.wait_closed()
  167. self.pool = None
  168.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement