Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import sys
- import aiopg
- import numpy as np
- import time
- from configparser import ConfigParser
- from datetime import timezone, timedelta
- class DBReader:
- def __init__(self, path_to_config, max_connections=3):
- parser = ConfigParser()
- parser.read(path_to_config)
- self.db_params = {}
- if parser.has_section('postgresql'):
- params = parser.items('postgresql')
- for param in params:
- self.db_params[param[0]] = param[1]
- self.pool = None
- self.max_connections = max_connections
- self.symbol_id_to_ticket, self.symbols_id_in_group = {}, {}
- self.timezones = {'MEI': 9}
- async def init_pool(self, side=False):
- if self.pool is None:
- self.pool = await aiopg.create_pool(**self.db_params, minsize=1, maxsize=self.max_connections)
- await self.update_symbols_list(side)
- async def update_symbols_list(self, side=False):
- if side:
- additional = ""
- else:
- additional = """
- JOIN id_group_relation USING (symbol_id)
- JOIN groups USING (group_id)
- """
- symbols_query = f"""
- SELECT symbol_id, base, quote, exchange_name {", username" if side else ''}
- FROM symbols JOIN exchanges USING (exchange_id)
- {additional}
- WHERE exchange_type = 'spot'
- """
- symbols = await self.execute_select(symbols_query)
- names = {}
- self.symbols_id_in_group, self.symbol_id_to_ticket = {}, {}
- for symbol in symbols:
- name = (symbol[1] + '/' + symbol[2] + ' ' + symbol[3].lower())
- group = symbol[1]
- if side:
- name = (symbol[1] + '/' + symbol[2] + ' ' + symbol[3].lower() + ' ' + symbol[4].lower())
- group = (symbol[1] + '_' + symbol[2] + '.' + symbol[4].lower())
- if group not in self.symbols_id_in_group:
- self.symbols_id_in_group[group] = []
- self.symbols_id_in_group[group].append(symbol[0])
- self.symbol_id_to_ticket[symbol[0]] = name
- print('[UPDATED] symbols_id_in_group:', self.symbols_id_in_group)
- print('[UPDATED] symbol_id_to_ticket:', self.symbol_id_to_ticket)
- async def get_balances_by_group(self, group, strategy=None):
- print('A', file=sys.stderr)
- balances_answer = ''
- for symbol in self.symbols_id_in_group[group]:
- balance_query = f"""
- SELECT time, base, quote
- FROM balance_{symbol}
- {f"WHERE strategy = '{strategy}'" if strategy else ''}
- ORDER BY time DESC
- LIMIT 1;
- """
- balance_record = await self.execute_select(balance_query)
- if not balance_record:
- continue
- balance_record = balance_record[0]
- token = self.symbol_id_to_ticket[symbol].split('/')[0]
- base = str(balance_record[1])
- quote = str(balance_record[2])
- time = balance_record[0].replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=self.timezones.get(token, 0)))).strftime("%Y-%m-%d %H:%M:%S")
- balances_answer += self.symbol_id_to_ticket[symbol] + ':\n'
- balances_answer += 'time: ' + time + '\n'
- balances_answer += f'{token}: ' + base + '\n'
- balances_answer += 'USDT: ' + quote + '\n\n'
- print('B', file=sys.stderr)
- return balances_answer
- async def get_volumes_by_group(self, group):
- volumes_answer = ''
- for symbol in self.symbols_id_in_group[group]:
- public_trades_24h_sum_query = f"""
- SELECT SUM(price * quantity)
- FROM public_trade_{symbol}
- WHERE time > NOW() - interval '1 day'
- """
- res = await self.execute_select(public_trades_24h_sum_query)
- if not res:
- continue
- res = res[0]
- public_trades_24h_sum = 0
- if res and res[0]:
- public_trades_24h_sum = np.round(res[0], 2)
- user_trades_24h_sum_query = f"""
- SELECT SUM(price * quantity) / 2
- FROM user_trade_{symbol}
- WHERE strategy = 'V' AND time > NOW() - interval '1 day'
- """
- res = await self.execute_select(user_trades_24h_sum_query)
- if not res:
- continue
- res = res[0]
- user_trades_24h_sum = 0
- if res and res[0]:
- user_trades_24h_sum = np.round(res[0], 2)
- volumes_answer += self.symbol_id_to_ticket[symbol] + ':\n'
- volumes_answer += f'total liquidity (24h): {public_trades_24h_sum}\n'
- volumes_answer += f'volume bot liquidity (24h): {user_trades_24h_sum}\n\n'
- return volumes_answer
- async def get_active_orders(self, group):
- orders = []
- for symbol in self.symbols_id_in_group[group]:
- print(f'DEBUG symbol: {symbol} {self.symbol_id_to_ticket[symbol]}')
- active_orders_query = f"""
- SELECT time, price, quantity, is_bid
- FROM active_orders_{symbol}
- WHERE strategy = 'M'
- """
- db_orders = await self.execute_select(active_orders_query)
- if not db_orders:
- continue
- for order in db_orders:
- orders.append({'symbol': self.symbol_id_to_ticket[symbol], 'time': str(order[0]), 'price': order[1], 'quantity': order[2], 'side': 'BUY' if order[3] else 'SELL'})
- return orders
- async def execute_select(self, query):
- try:
- async with self.pool.acquire() as conn:
- print(f'AWAITED: {int(time.time()*1000)}', file=sys.stderr)
- async with conn.cursor() as cursor:
- await cursor.execute(query)
- print(f'FINISHED: {int(time.time()*1000)}', file=sys.stderr)
- return await cursor.fetchall()
- except Exception as e:
- print('[EXCEPTION] execute_select:', e)
- return
- async def close_connection(self):
- if self.pool:
- self.pool.close()
- await self.pool.wait_closed()
- self.pool = None
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement