Trades Aggregator Service¶
Назначение¶
Trades Aggregator — сервис агрегации торговых данных. Получает сделки из Exchange HTTP API, агрегирует тикерную информацию (цена, объём, изменения за 24 часа) и предоставляет актуальные данные другим сервисам через gRPC, в том числе через стриминг изменений в реальном времени.
Количество RPC: 2 (включая серверный стриминг).
Архитектура¶
Сервис состоит из двух исполняемых файлов:
trades-aggregator/
├── cmd/
│ ├── server/ # gRPC-сервер (обслуживает входящие запросы)
│ └── worker/ # Фоновый воркер (стриминг торгов с биржи)
Общая схема¶
┌────────────────────────────────────────────┐
│ gRPC Server │
│ ListenTickerInfoChanges (stream) │
│ ListTickerInfo │
└──────────────────┬─────────────────────────┘
│ читает агрегированные данные
┌──────────────────▼─────────────────────────┐
│ Aggregation Layer │
│ Агрегирует торги → тикерная информация │
│ (min/max цена, объёмы, изменения 24h) │
└──────────────────┬─────────────────────────┘
│
┌──────────▼──────────┐
│ PostgreSQL │
│ trades │
│ trades_stat │
└──────────▲──────────┘
│ записывает торги
┌──────────────────┴─────────────────────────┐
│ Background Worker │
│ Подписка на Exchange WebSocket-стрим │
│ Агрегация статистики раз в час │
└──────────────────┬─────────────────────────┘
│ WebSocket
┌──────────▼──────────┐
│ Exchange HTTP API │
│ (торговые данные) │
└─────────────────────┘
База данных¶
Миграции (2)¶
| Миграция | Таблица | Описание |
|---|---|---|
| 1 | trades |
Сделки, загруженные с биржи |
| 2 | trades_stat |
Агрегированная статистика по тикерам |
Схема таблиц¶
erDiagram
trades {
BIGSERIAL id PK "Первичный ключ"
TEXT ticker "Торговая пара, напр. BTC_USDT"
NUMERIC price "Цена сделки"
NUMERIC base_volume "Объём в базовой валюте"
NUMERIC quote_volume "Объём в котировочной валюте"
BOOLEAN is_buyer_maker "true = продажа инициирована покупателем"
TIMESTAMP exchange_time "Время сделки на бирже"
NUMERIC price_usdt "Цена в USDT"
}
trades_stat {
BIGSERIAL id PK "Первичный ключ"
TEXT ticker "Торговая пара"
NUMERIC min_price "Минимальная цена за период"
NUMERIC max_price "Максимальная цена за период"
TIMESTAMP created_at "Время создания записи"
}
trades ||--o{ trades_stat : "агрегируется в"
DDL¶
CREATE TABLE trades (
id BIGSERIAL PRIMARY KEY,
ticker TEXT NOT NULL,
price NUMERIC NOT NULL,
base_volume NUMERIC NOT NULL,
quote_volume NUMERIC NOT NULL,
is_buyer_maker BOOLEAN NOT NULL,
exchange_time TIMESTAMP NOT NULL,
price_usdt NUMERIC NOT NULL
);
CREATE TABLE trades_stat (
id BIGSERIAL PRIMARY KEY,
ticker TEXT NOT NULL,
min_price NUMERIC NOT NULL,
max_price NUMERIC NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now()
);
CREATE INDEX idx_trades_ticker ON trades(ticker);
CREATE INDEX idx_trades_exchange_time ON trades(exchange_time);
CREATE INDEX idx_trades_stat_ticker ON trades_stat(ticker);
CREATE INDEX idx_trades_stat_created_at ON trades_stat(created_at);
gRPC-контракт¶
Прото-файл: proto/trades_aggregator.proto
Сервис¶
service TradesAggregator {
// Подписка на изменения тикерной информации (стриминг)
rpc ListenTickerInfoChanges(ListenTickerInfoChangesRequest)
returns (stream TickerInfoChanges);
// Получение текущей тикерной информации (однократный запрос)
rpc ListTickerInfo(ListTickerInfoRequest)
returns (ListTickerInfoResponse);
}
Типы сообщений¶
TickerInfo¶
Текущее состояние тикера:
message TickerInfo {
string ticker = 1; // Торговая пара (напр. "BTC_USDT")
string base = 2; // Базовая валюта (напр. "BTC")
string quote = 3; // Котировочная валюта (напр. "USDT")
string min_price = 4; // Минимальная цена за 24h
string max_price = 5; // Максимальная цена за 24h
string price = 6; // Текущая цена
string price_usdt = 7; // Текущая цена в USDT
string base_volume = 8; // Объём в базовой валюте за 24h
string quote_volume = 9; // Объём в котировочной валюте за 24h
bool is_buyer_maker = 10; // Направление последней сделки
string changes_24h_percent = 11; // Изменение цены за 24h в процентах
}
TickerInfoChanges¶
Сообщение стриминга с дельтой изменений:
message TickerInfoChanges {
string ticker = 1;
string base = 2;
string quote = 3;
string min_price = 4;
string max_price = 5;
string price = 6;
string price_usdt = 7;
string base_volume = 8;
string quote_volume = 9;
bool is_buyer_maker = 10;
string changes_24h_percent = 11;
string changes = 12; // Абсолютное изменение цены
string changes_percent = 13; // Изменение цены в процентах
}
Запросы и ответы¶
message ListenTickerInfoChangesRequest {
repeated string tickers = 1; // Фильтр по тикерам (пусто = все)
}
message ListTickerInfoRequest {
repeated string tickers = 1; // Фильтр по тикерам (пусто = все)
}
message ListTickerInfoResponse {
repeated TickerInfo ticker_info = 1;
}
RPC: описание¶
ListenTickerInfoChanges¶
Серверный стриминг. Клиент подписывается и получает обновления тикерной информации в реальном времени по мере появления новых торгов. Соединение удерживается до закрытия клиентом или разрыва.
- Можно указать фильтр по списку тикеров.
- Сервер отправляет сообщение
TickerInfoChangesпри каждом обновлении агрегированных данных.
ListTickerInfo¶
Унарный RPC. Возвращает текущую тикерную информацию по всем (или заданным) тикерам одним запросом. Используется для первоначальной загрузки состояния.
Фоновый воркер¶
Воркер (cmd/worker/) подключается к Exchange через WebSocket-стрим (ListenExchangeTrades) и обрабатывает сделки в реальном времени:
- При запуске загружает список тикеров через Exchange HTTP API (с повторными попытками при ошибке). Каждые 30 минут список обновляется.
- Подписывается на WebSocket-стрим сделок по всем тикерам.
- На каждое входящее событие сохраняет сделки в таблицу
tradesи обновляет in-memory агрегированные данные (min/max цена). - Раз в час сохраняет агрегированную статистику в таблицу
trades_statи удаляет записи старше 2 дней.
Exchange HTTP Client¶
Сервис использует пакет pkg/clients/exchange/ для взаимодействия с Exchange HTTP API. Клиент сгенерирован из OpenAPI-спецификации (pkg/clients/apigen/).
Основная операция: получение списка торгов за временной диапазон по тикеру или всем тикерам.
Конфигурация¶
| Параметр | Описание |
|---|---|
DB_HOST, DB_PORT, DB_USERNAME, DB_PASSWORD, DB_DB, DB_SSLMODE |
Подключение к PostgreSQL |
PORT |
Порт gRPC-сервера |
METRICS_PORT |
Порт Prometheus-метрик (по умолчанию 8000) |
EXCHANGE_HTTP_URL |
Базовый URL Exchange HTTP API |