telemetry storage

Хранилище телеметрии Hive — техническая документация

🎯 Обзор

Система хранения и запроса временных рядов данных датчиков Интернета вещей из ульев. Обрабатывает прием данных с нескольких типов устройств (датчики температуры/влажности, весы, входные камеры) и предоставляет API-интерфейсы GraphQL/REST для получения данных с помощью гибких запросов временного диапазона и функций агрегирования.

🏗️ Архитектура

Компоненты

  • TelemetryChart: компонент React для рендеринга графиков временных рядов.
  • MetricSelector: пользовательский интерфейс для выбора показателей для отображения.
  • TimeRangeFilter: выбор диапазона дат для исторических запросов.
  • HiveTelemetryPanel: виджет информационной панели, показывающий данные датчиков в реальном времени.
  • GrafanaEmbed: интеграция iFrame для расширенной аналитики.

Услуги

  • telemetry-api: основная служба для хранения и извлечения показателей.
  • beehive-sensors: аппаратное устройство отправляет данные о температуре, влажности и весе.
  • entrance-observer: служба видеоаналитики, отправляющая показатели трафика пчел.
  • user-cycle: служба аутентификации для проверки токена API.
  • graphql-router: запросы телеметрии маршрутизации шлюза федерации.
  • grafana: расширенная платформа визуализации и аналитики.

📋 Технические характеристики

Схема базы данных

erDiagram
    users ||--o{ telemetry_metrics : "owns"
    users ||--o{ entrance_traffic : "owns"
    hives ||--o{ telemetry_metrics : "has"
    hives ||--o{ entrance_traffic : "has"
    boxes ||--o{ entrance_traffic : "monitors"
    devices ||--o{ telemetry_metrics : "sends"
    
    telemetry_metrics {
        bigint id PK
        int hive_id FK
        int user_id FK
        enum metric_type "temperature, humidity, weight"
        float value
        datetime timestamp "millisecond precision"
        varchar device_id FK "optional"
    }
    
    entrance_traffic {
        bigint id PK
        int hive_id FK
        int box_id FK
        int user_id FK
        float bees_in
        float bees_out
        float net_flow
        float avg_speed
        float p95_speed
        int stationary_bees
        int detected_bees
        int bee_interactions
        datetime timestamp "millisecond precision"
    }
    
    users {
        int id PK
        varchar email
    }
    
    hives {
        int id PK
        int user_id FK
        varchar name
    }
    
    boxes {
        int id PK
        int hive_id FK
        enum type "DEEP, SUPER, GATE, BOTTOM"
    }
    
    devices {
        varchar device_id PK
        int hive_id FK
        enum type "sensor, camera"
    }

Обоснование конструкции таблицы:

  • Отдельные таблицы для разных категорий метрик (экологический и входной трафик)
  • Метки времени с миллисекундной точностью для высокочастотных данных
    — Составные индексы (hive_id, временная метка DESC) для запросов временного диапазона.
    — Индексы (user_id, hive_id) для изоляции данных.
    – Индексы (metric_type, timestamp DESC) для отфильтрованных запросов.
  • device_id отслеживает источник данных для отладки

GraphQL API

scalar DateTime

enum AggregationType {
  DAILY_AVG
  DAILY_MAX
  DAILY_MIN
}

type TelemetryError {
  message: String
  code: String
}

type MetricFloat {
  t: DateTime!
  v: Float
}

type MetricFloatList {
  metrics: [MetricFloat]
}

union MetricListResult = MetricFloatList | TelemetryError

type BeeMovementInOutResult {
  beesIn: Float
  beesOut: Float
  netFlow: Float
  avgSpeed: Float
  p95Speed: Float
  stationaryBees: Int
  detectedBees: Int
  beeInteractions: Int
  time: DateTime
}

type EntranceMovementRecord {
  id: ID!
  hiveId: ID!
  boxId: ID!
  beesOut: Float
  beesIn: Float
  time: DateTime!
  netFlow: Float
  avgSpeed: Float
  p95Speed: Float
  stationaryBees: Int
  detectedBees: Int
  beeInteractions: Int
}

type EntranceMovementList {
  metrics: [EntranceMovementRecord]
}

union EntranceMovementResult = EntranceMovementList | TelemetryError

input MetricSetInput {
  temperatureCelsius: Float
  humidityPercent: Float
  weightKg: Float
}

type AddMetricMessage {
  message: String
}

union AddMetricResult = AddMetricMessage | TelemetryError

type Query {
  temperatureCelsius(
    hiveId: ID!
    timeRangeMin: Int
  ): MetricListResult
  
  humidityPercent(
    hiveId: ID!
    timeRangeMin: Int
  ): MetricListResult
  
  weightKg(
    hiveId: ID!
    timeRangeMin: Int
  ): MetricListResult
  
  weightKgAggregated(
    hiveId: ID!
    days: Int!
    aggregation: AggregationType
  ): MetricListResult
  
  entranceMovementToday(
    hiveId: ID!
    boxId: ID!
  ): BeeMovementInOutResult
  
  entranceMovement(
    hiveId: ID!
    boxId: ID
    timeFrom: DateTime!
    timeTo: DateTime!
  ): EntranceMovementResult
}

type Mutation {
  addMetric(
    hiveId: ID!
    fields: MetricSetInput!
  ): AddMetricResult
}

Примечания к проектированию:
— Короткие имена полей (t, v) в MetricFloat для уменьшения размера полезной нагрузки.

  • Типы объединения для обработки ошибок.
  • Гибкий диапазон времени (минуты назад) и абсолютные временные метки.
  • Агрегация во время запроса для гибкости.

REST API Конечные точки

POST /v1/metrics/:hiveId
Body: {
  "temperatureCelsius": 34.5,
  "humidityPercent": 65.2,
  "weightKg": 45.3
}
Headers: {
  "Authorization": "Bearer <user_api_token>"
}

POST /v1/entrance/:hiveId/:boxId
Body: {
  "beesIn": 12.5,
  "beesOut": 10.2,
  "netFlow": 2.3,
  "avgSpeed": 1.2,
  "p95Speed": 2.5,
  "stationaryBees": 5,
  "detectedBees": 18,
  "beeInteractions": 3
}
Headers: {
  "Authorization": "Bearer <device_token>"
}

GET /v1/metrics/:hiveId/temperature?from=2024-12-01&to=2024-12-06
GET /v1/metrics/:hiveId/humidity?minutes=60
GET /v1/metrics/:hiveId/weight?days=7&aggregation=DAILY_AVG

REST Дизайн:

  • Префикс версии (/v1) для обратной совместимости.
  • Аутентификация на основе токена через user-cycle.
  • Поддержка как абсолютных временных меток, так и относительных временных диапазонов.
  • Возможность массовой вставки пакетных данных датчиков

🔧 Детали реализации

Интерфейс (web-app)

  • Framework: React 18 с TypeScript.
  • Библиотека графиков: упрощенные графики от TradingView
  • Управление состоянием: клиент Apollo для кэширования GraphQL.
  • Обновления в режиме реального времени: опрос каждые 30 секунд для получения актуальных данных.
  • Форматирование данных: специальные приемы для преобразования показателей.

Ключевые компоненты:

interface TelemetryChartProps {
  hiveId: string;
  metricType: 'temperature' | 'humidity' | 'weight';
  timeRangeMinutes?: number;
  aggregation?: AggregationType;
}

const TelemetryChart: React.FC<TelemetryChartProps> = ({
  hiveId,
  metricType,
  timeRangeMinutes = 60,
  aggregation
}) => {
  const { data, loading, error } = useQuery(GET_METRIC_QUERY, {
    variables: { hiveId, timeRangeMin: timeRangeMinutes },
    pollInterval: 30000
  });
  
  return <LineChart data={data?.metrics} />;
};

Серверная часть (telemetry-api)

  • Язык: Node.js с TypeScript.
  • Среда: сервер Apollo для GraphQL.
  • База данных: MySQL 8.0 с оптимизацией временных рядов.
  • Аутентификация: проверка токена JWT через user-cycle.
  • Ограничение скорости: 100 запросов/мин на устройство, 60 запросов/мин на пользователя.

Поток приема данных:

async function addMetric(
  hiveId: number,
  userId: number,
  fields: MetricSetInput,
  deviceId?: string
): Promise<void> {
  const timestamp = new Date();
  const insertPromises = [];
  
  if (fields.temperatureCelsius !== undefined) {
    insertPromises.push(
      db.telemetry_metrics.insert({
        hive_id: hiveId,
        user_id: userId,
        metric_type: 'temperature',
        value: fields.temperatureCelsius,
        timestamp,
        device_id: deviceId
      })
    );
  }
  
  await Promise.all(insertPromises);
}

Оптимизация запросов:

async function getTemperature(
  hiveId: number,
  timeRangeMin: number = 60
): Promise<MetricFloat[]> {
  const fromTime = new Date(Date.now() - timeRangeMin * 60 * 1000);
  
  return db.query(`
    SELECT 
      timestamp as t,
      value as v
    FROM telemetry_metrics
    WHERE hive_id = ? 
      AND metric_type = 'temperature'
      AND timestamp >= ?
    ORDER BY timestamp ASC
    LIMIT 10000
  `, [hiveId, fromTime]);
}

Логика агрегирования:

async function getWeightAggregated(
  hiveId: number,
  days: number,
  aggregation: AggregationType
): Promise<MetricFloat[]> {
  const aggFunc = {
    DAILY_AVG: 'AVG',
    DAILY_MAX: 'MAX',
    DAILY_MIN: 'MIN'
  }[aggregation];
  
  return db.query(`
    SELECT 
      DATE(timestamp) as t,
      ${aggFunc}(value) as v
    FROM telemetry_metrics
    WHERE hive_id = ?
      AND metric_type = 'weight'
      AND timestamp >= DATE_SUB(NOW(), INTERVAL ? DAY)
    GROUP BY DATE(timestamp)
    ORDER BY t ASC
  `, [hiveId, days]);
}

Поток данных

sequenceDiagram
    participant S as beehive-sensors
    participant T as telemetry-api
    participant U as user-cycle
    participant M as MySQL
    participant W as web-app
    participant G as grafana
    
    S->>T: POST /v1/metrics (with token)
    T->>U: Validate API token
    U-->>T: User ID & permissions
    T->>M: INSERT telemetry_metrics
    M-->>T: Success
    T-->>S: 200 OK
    
    W->>T: GraphQL: temperatureCelsius
    T->>M: SELECT FROM telemetry_metrics
    M-->>T: Time-series data
    T-->>W: MetricFloatList
    
    G->>M: Direct SQL query
    M-->>G: Aggregated results

⚙️ Конфигурация

Переменные среды (telemetry-api)

MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=telemetry
MYSQL_PASSWORD=secret
MYSQL_DATABASE=gratheon

USER_CYCLE_URL=http://user-cycle:8080
GRAPHQL_ROUTER_URL=http://graphql-router:8080

RATE_LIMIT_DEVICE=100
RATE_LIMIT_USER=60

DATA_RETENTION_DAYS_HOBBYIST=365
DATA_RETENTION_DAYS_STARTER=730
DATA_RETENTION_DAYS_PRO=1095

Конфигурация устройства (beehive-sensors)

[telemetry]
api_endpoint=https://telemetry.gratheon.com/v1/metrics
api_token=your_api_token_here
hive_id=123
interval_seconds=5
batch_size=12

🧪 Тестирование

Модульные тесты

Местоположение: /test/unit/

Охват:

  • Проверка приема метрик (обязательные поля, типы данных)
  • Логика запроса временного диапазона (относительная и абсолютная)
  • Агрегационные расчеты (среднее, мин, макс)
  • Обработка ошибок (неверные идентификаторы ульев, несанкционированный доступ)
describe('TelemetryAPI', () => {
  describe('addMetric', () => {
    it('should insert temperature metric', async () => {
      const result = await addMetric(hiveId, userId, {
        temperatureCelsius: 34.5
      });
      expect(result).toBeDefined();
    });
    
    it('should reject invalid temperature values', async () => {
      await expect(
        addMetric(hiveId, userId, { temperatureCelsius: 150 })
      ).rejects.toThrow('Invalid temperature range');
    });
  });
});

Интеграционные тесты

Местоположение: /test/integration/

Сценарии:

  • Сквозной поток данных от устройства к базе данных
  • Проверка формата ответа на запрос GraphQL.
  • Аутентификация токена с помощью user-cycle.
  • Принуждение к ограничению ставок
  • Выполнение политики хранения данных

E2E-тесты

Местоположение: web-app/e2e/telemetry.spec.ts

Потоки пользователя:

  • Просматривайте данные датчиков в реальном времени на панели управления улья.
  • Запрос исторических данных с помощью выбора временного диапазона
  • Экспорт данных телеметрии в формате CSV.
  • Настройка пороговых оповещений на основе показателей.

📊 Вопросы производительности

Оптимизации

  • Индексы базы данных: составные индексы по (hive_id, timestamp) для запросов быстрого диапазона.
  • Кэширование запросов: кэширование сервера Apollo для часто используемых диапазонов времени (5 минут TTL).
  • Пул соединений: пул соединений MySQL (мин.: 10, макс.: 100).
  • Пакетная вставка: группируйте несколько показателей с одного устройства в одну транзакцию.
  • Ограничения запросов: максимум 10 000 точек данных на запрос во избежание проблем с памятью.
  • Агрегация: предварительный расчет ежедневных агрегатов для длительных временных диапазонов (более 30 дней).

Узкие места

  • Высокочастотная запись: датчики отправляют данные каждые 5 секунд, примерно 17 280 вставок на улей в день.
    Запросы с большим временным диапазоном: запросы старше 1 года требуют агрегирования во избежание проблем с памятью.
  • Одновременная запись на устройство: MySQL блокирует запись во время массовой вставки с нескольких устройств.
  • Grafana нагрузка: сложные запросы информационной панели могут генерировать более 20 запросов к базе данных одновременно.

Метрики

  • Время ответа на запрос: менее 500 мс для 24-часового диапазона, менее 2 секунд для 30-дневного диапазона.
  • Пропускная способность записи: более 1000 метрик в секунду на всех устройствах.
  • Размер базы данных: примерно 500 МБ на улей в год (исходные показатели).
  • Успех: более 99,5% (исключая сбои сети)
  • API доступность: время безотказной работы более 99,9 % (отслеживается посредством проверок работоспособности).

Мониторинг производительности:

interface PerformanceMetrics {
  queryDuration: number;
  dataPoints: number;
  cacheHit: boolean;
  dbConnectionTime: number;
}

async function logQueryPerformance(
  operation: string,
  metrics: PerformanceMetrics
) {
  if (metrics.queryDuration > 2000) {
    logger.warn('Slow query detected', { operation, metrics });
  }
}

🚫 Технические ограничения

Текущие ограничения:

  • Максимальный диапазон запросов: 2 года без агрегации.
  • Ограничение количества точек данных на запрос: 10 000 записей.
  • Частота записи: минимум 1 секундный интервал на устройство
  • Поддерживаемые метрики: температура, влажность, вес, только входной трафик.
  • Нет потоковой передачи через веб-сокет в реальном времени (только опрос)
  • Grafana требует отдельной аутентификации (не интегрированной SSO)

Известные проблемы:

  • Переход на летнее время может привести к появлению пробелов в временных метках на диаграммах.
  • Дрейф тактовой частоты устройства может привести к выходу из строя вставок.
  • Разделение даты MySQL еще не реализовано (планируется для большого количества пользователей)
  • Нет автоматического обнаружения выбросов или сглаживания данных.
  • Для показателей входящего трафика требуется box_id (не агрегация на уровне куста).

Будущие улучшения:

  • Добавить поддержку датчиков CO2, давления, звука, вибрации.
  • Внедрение подписок на веб-сокеты для обновлений в реальном времени.
  • Добавьте прогнозную аналитику и обнаружение аномалий.
  • Создание автоматического формирования отчетов.
  • Внедрить сжатие данных для более старых показателей (более 1 года).
  • Добавьте запросы агрегации нескольких ульев для аналитики на уровне пасеки.

🔗 Сопутствующая документация

📚 Ресурсы для разработки

  • Репозиторий GitHub: telemetry-api
  • Схема архитектуры: доступна в репозитории README.
  • API Схема: /schema.graphql в репозитории.
  • Миграция базы данных: каталог /migrations.
  • Grafana Панели: /grafana/dashboards (экспорт JSON)

💬 Технические примечания

Решения по реализации:
– Выберите MySQL вместо InfluxDB/ClickHouse, чтобы снизить сложность эксплуатации (единый механизм базы данных).

  • REST API поддерживается для совместимости с устаревшими устройствами (новые устройства должны использовать GraphQL)
  • Опрос вместо веб-сокетов для упрощения масштабирования и снижения использования ресурсов сервера.
  • Отдельная таблица для входного трафика из-за другой схемы (несколько метрик на одну временную метку)
  • Короткие имена полей (t, v) в ответах GraphQL для оптимизации использования мобильных данных.

Аспекты интеграции:

  • Устройства должны корректно обрабатывать сбои в сети (буферизировать и повторять попытки).
  • Токены API следует менять каждые 90 дней в целях безопасности.
  • Запросы с большим временным диапазоном должны использовать агрегацию, чтобы предотвратить тайм-аут.
  • Для внедрения Grafana требуется настройка CORS на telemetry-api.
  • Политики хранения данных выполняются ежедневно с помощью задания cron (удаляет старые записи).

Журнал изменений

Последнее обновление: 6 декабря 2025 г.