Перейти к содержанию

Trades Aggregator Service

Назначение

Trades Aggregator — сервис агрегации торговых данных. Получает сделки из Exchange HTTP API, агрегирует тикерную информацию (цена, объём, изменения за 24 часа) и предоставляет актуальные данные другим сервисам через gRPC, в том числе через стриминг изменений в реальном времени.

Количество RPC: 2 (включая серверный стриминг).


Архитектура

Сервис состоит из двух исполняемых файлов:

trades-aggregator/
├── cmd/
│   ├── server/    # gRPC-сервер (обслуживает входящие запросы)
│   └── worker/    # Фоновый воркер (стриминг торгов с биржи)

Общая схема

┌────────────────────────────────────────────┐
│              gRPC Server                   │
│  ListenTickerInfoChanges (stream)          │
│  ListTickerInfo                            │
└──────────────────┬─────────────────────────┘
                   │ читает агрегированные данные
┌──────────────────▼─────────────────────────┐
│           Aggregation Layer                │
│  Агрегирует торги → тикерная информация    │
│  (min/max цена, объёмы, изменения 24h)     │
└──────────────────┬─────────────────────────┘
        ┌──────────▼──────────┐
        │     PostgreSQL       │
        │  trades              │
        │  trades_stat         │
        └──────────▲──────────┘
                   │ записывает торги
┌──────────────────┴─────────────────────────┐
│           Background Worker                │
│  Подписка на Exchange WebSocket-стрим      │
│  Агрегация статистики раз в час            │
└──────────────────┬─────────────────────────┘
                   │ WebSocket
        ┌──────────▼──────────┐
        │  Exchange HTTP API   │
        │  (торговые данные)   │
        └─────────────────────┘

База данных

Миграции (2)

Миграция Таблица Описание
1 trades Сделки, загруженные с биржи
2 trades_stat Агрегированная статистика по тикерам

Схема таблиц

erDiagram
    trades {
        BIGSERIAL   id              PK  "Первичный ключ"
        TEXT        ticker              "Торговая пара, напр. BTC_USDT"
        NUMERIC     price               "Цена сделки"
        NUMERIC     base_volume         "Объём в базовой валюте"
        NUMERIC     quote_volume        "Объём в котировочной валюте"
        BOOLEAN     is_buyer_maker      "true = продажа инициирована покупателем"
        TIMESTAMP   exchange_time       "Время сделки на бирже"
        NUMERIC     price_usdt          "Цена в USDT"
    }

    trades_stat {
        BIGSERIAL   id              PK  "Первичный ключ"
        TEXT        ticker              "Торговая пара"
        NUMERIC     min_price           "Минимальная цена за период"
        NUMERIC     max_price           "Максимальная цена за период"
        TIMESTAMP   created_at          "Время создания записи"
    }

    trades ||--o{ trades_stat : "агрегируется в"

DDL

CREATE TABLE trades (
    id              BIGSERIAL   PRIMARY KEY,
    ticker          TEXT        NOT NULL,
    price           NUMERIC     NOT NULL,
    base_volume     NUMERIC     NOT NULL,
    quote_volume    NUMERIC     NOT NULL,
    is_buyer_maker  BOOLEAN     NOT NULL,
    exchange_time   TIMESTAMP   NOT NULL,
    price_usdt      NUMERIC     NOT NULL
);

CREATE TABLE trades_stat (
    id          BIGSERIAL   PRIMARY KEY,
    ticker      TEXT        NOT NULL,
    min_price   NUMERIC     NOT NULL,
    max_price   NUMERIC     NOT NULL,
    created_at  TIMESTAMP   NOT NULL DEFAULT now()
);

CREATE INDEX idx_trades_ticker ON trades(ticker);
CREATE INDEX idx_trades_exchange_time ON trades(exchange_time);
CREATE INDEX idx_trades_stat_ticker ON trades_stat(ticker);
CREATE INDEX idx_trades_stat_created_at ON trades_stat(created_at);

gRPC-контракт

Прото-файл: proto/trades_aggregator.proto

Сервис

service TradesAggregator {
    // Подписка на изменения тикерной информации (стриминг)
    rpc ListenTickerInfoChanges(ListenTickerInfoChangesRequest)
        returns (stream TickerInfoChanges);

    // Получение текущей тикерной информации (однократный запрос)
    rpc ListTickerInfo(ListTickerInfoRequest)
        returns (ListTickerInfoResponse);
}

Типы сообщений

TickerInfo

Текущее состояние тикера:

message TickerInfo {
    string ticker           = 1;  // Торговая пара (напр. "BTC_USDT")
    string base             = 2;  // Базовая валюта (напр. "BTC")
    string quote            = 3;  // Котировочная валюта (напр. "USDT")
    string min_price        = 4;  // Минимальная цена за 24h
    string max_price        = 5;  // Максимальная цена за 24h
    string price            = 6;  // Текущая цена
    string price_usdt       = 7;  // Текущая цена в USDT
    string base_volume      = 8;  // Объём в базовой валюте за 24h
    string quote_volume     = 9;  // Объём в котировочной валюте за 24h
    bool   is_buyer_maker   = 10; // Направление последней сделки
    string changes_24h_percent = 11; // Изменение цены за 24h в процентах
}

TickerInfoChanges

Сообщение стриминга с дельтой изменений:

message TickerInfoChanges {
    string ticker           = 1;
    string base             = 2;
    string quote            = 3;
    string min_price        = 4;
    string max_price        = 5;
    string price            = 6;
    string price_usdt       = 7;
    string base_volume      = 8;
    string quote_volume     = 9;
    bool   is_buyer_maker   = 10;
    string changes_24h_percent = 11;
    string changes          = 12; // Абсолютное изменение цены
    string changes_percent  = 13; // Изменение цены в процентах
}

Запросы и ответы

message ListenTickerInfoChangesRequest {
    repeated string tickers = 1; // Фильтр по тикерам (пусто = все)
}

message ListTickerInfoRequest {
    repeated string tickers = 1; // Фильтр по тикерам (пусто = все)
}

message ListTickerInfoResponse {
    repeated TickerInfo ticker_info = 1;
}

RPC: описание

ListenTickerInfoChanges

Серверный стриминг. Клиент подписывается и получает обновления тикерной информации в реальном времени по мере появления новых торгов. Соединение удерживается до закрытия клиентом или разрыва.

  • Можно указать фильтр по списку тикеров.
  • Сервер отправляет сообщение TickerInfoChanges при каждом обновлении агрегированных данных.

ListTickerInfo

Унарный RPC. Возвращает текущую тикерную информацию по всем (или заданным) тикерам одним запросом. Используется для первоначальной загрузки состояния.


Фоновый воркер

Воркер (cmd/worker/) подключается к Exchange через WebSocket-стрим (ListenExchangeTrades) и обрабатывает сделки в реальном времени:

  1. При запуске загружает список тикеров через Exchange HTTP API (с повторными попытками при ошибке). Каждые 30 минут список обновляется.
  2. Подписывается на WebSocket-стрим сделок по всем тикерам.
  3. На каждое входящее событие сохраняет сделки в таблицу trades и обновляет in-memory агрегированные данные (min/max цена).
  4. Раз в час сохраняет агрегированную статистику в таблицу trades_stat и удаляет записи старше 2 дней.

Exchange HTTP Client

Сервис использует пакет pkg/clients/exchange/ для взаимодействия с Exchange HTTP API. Клиент сгенерирован из OpenAPI-спецификации (pkg/clients/apigen/).

Основная операция: получение списка торгов за временной диапазон по тикеру или всем тикерам.


Конфигурация

Параметр Описание
DB_HOST, DB_PORT, DB_USERNAME, DB_PASSWORD, DB_DB, DB_SSLMODE Подключение к PostgreSQL
PORT Порт gRPC-сервера
METRICS_PORT Порт Prometheus-метрик (по умолчанию 8000)
EXCHANGE_HTTP_URL Базовый URL Exchange HTTP API