Skip to content

05 — Data layer

ClickHouse over parquet for agent-accessible analytical queries. NFS mount for dev/on-prem, ADLS Gen2 + Dagster for production.

Why ClickHouse over precomputed rollups

The analytical data layer (partner metrics, quote outcomes, commission history, connection-rate rollups) is too large and too query-diverse for precomputed rollup tables alone. ClickHouse over parquet gives agents the ability to ask ad-hoc analytical questions — "what's the 90th-percentile giveaway for this partner's network mix over the last 6 months?" — without predicting every query shape at schema design time.

Architecture

Two environments, same ClickHouse query surface. The agent writes SQL and gets rows — it never knows whether the parquet files are on a local NFS mount or in Azure Data Lake Storage.

Dev / on-prem — NFS-mounted data lake

  NFS mount (/data/lake/)
    ├── partner_metrics/      (parquet, partitioned by date)
    ├── quote_outcomes/       (parquet, partitioned by date)
    └── commission_history/   (parquet, partitioned by partner_id)
    ClickHouse (single node, compose service)
      └── File() engine tables → /var/lib/clickhouse/user_files/lake/*
    analytics client (Go / Python)
      └── exposed as query_analytics tool

The NFS mount is the existing local data lake. ClickHouse reads parquet files directly via the File() table engine. No object-store protocol needed — the parquet directory is bind-mounted into the container.

Production — ADLS data lake (Dagster-orchestrated)

  Azure Data Lake Storage Gen2
    └── abfss://analytics@<account>.dfs.core.windows.net/
          ├── partner_metrics/      (parquet, Dagster-materialised)
          ├── quote_outcomes/       (parquet, Dagster-materialised)
          └── commission_history/   (parquet, Dagster-materialised)
    ClickHouse (single node or cluster)
      └── azureBlobStorage() engine → ADLS via ABFS connector
    analytics client (Go / Python)
      └── exposed as query_analytics tool

Dagster owns the pipeline: materialises parquet assets from MySQL source tables (rev_sci, commercial_engine, V3/VOS) on a schedule or after upstream events. ClickHouse reads ADLS parquet via the azureBlobStorage() table engine, authenticating with a managed identity or SAS token.

Why two paths, not one. The NFS path is zero-cost, zero-latency, and already operational. Dagster + ADLS is the production-grade pipeline with lineage tracking, partition management, freshness sensors, and Azure RBAC. Both produce the same parquet directory structure; ClickHouse table DDL differs only in the engine clause.

Guardrails

LLM-generated SQL is a SQL injection surface. Mitigations:

  • Read-only user. ClickHouse READONLY profile. No DML/DDL.
  • Allowed tables. The analytics client validates that the query references only allowed tables before execution.
  • Row limit. LIMIT 100 appended if absent.
  • Timeout. max_execution_time = 5 seconds.
  • No external access. url() and remote() functions disabled.

Go analytics client

// internal/clients/analytics/client.go
package analytics

import (
    "context"
    "database/sql"
    "fmt"
    "log/slog"
    "regexp"
    "strings"
)

var allowedTables = map[string]bool{
    "partner_metrics":    true,
    "quote_outcomes":     true,
    "commission_history": true,
}

var dmlPattern = regexp.MustCompile(
    `(?i)\b(INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|TRUNCATE)\b`,
)

type AnalyticsClient struct {
    db     *sql.DB
    logger *slog.Logger
}

func (c *AnalyticsClient) Query(ctx context.Context, query string) ([]map[string]any, error) {
    if dmlPattern.MatchString(query) {
        return nil, fmt.Errorf("analytics: DML/DDL not allowed")
    }

    if !strings.Contains(strings.ToUpper(query), "LIMIT") {
        query += " LIMIT 100"
    }

    rows, err := c.db.QueryContext(ctx, query)
    if err != nil {
        return nil, fmt.Errorf("analytics: query: %w", err)
    }
    defer rows.Close()

    cols, _ := rows.Columns()
    var results []map[string]any

    for rows.Next() {
        vals := make([]any, len(cols))
        ptrs := make([]any, len(cols))
        for i := range vals {
            ptrs[i] = &vals[i]
        }
        if err := rows.Scan(ptrs...); err != nil {
            return nil, err
        }
        row := make(map[string]any, len(cols))
        for i, col := range cols {
            row[col] = vals[i]
        }
        results = append(results, row)
    }

    c.logger.Info("analytics_query", "rows", len(results), "cols", len(cols))
    return results, nil
}

ClickHouse compose service (dev)

clickhouse:
  image: clickhouse/clickhouse-server:24.8
  ports:
    - "8123:8123"   # HTTP
    - "9000:9000"   # native
  volumes:
    - ./clickhouse/config.xml:/etc/clickhouse-server/config.d/custom.xml
    - ./clickhouse/users.xml:/etc/clickhouse-server/users.d/readonly.xml
    - /data/lake:/var/lib/clickhouse/user_files/lake:ro   # NFS data lake
  environment:
    CLICKHOUSE_DB: analytics
    CLICKHOUSE_USER: agent_reader
    CLICKHOUSE_PASSWORD: readonly
  networks:
    - backoffice

Table DDL

NFS path (dev):

CREATE TABLE analytics.partner_metrics
ENGINE = File(Parquet)
AS SELECT * FROM file(
    '/var/lib/clickhouse/user_files/lake/partner_metrics/*.parquet'
);

ADLS path (production):

CREATE TABLE analytics.partner_metrics
ENGINE = AzureBlobStorage(
    'DefaultEndpointsProtocol=https;AccountName=<account>;...',
    'analytics',
    'partner_metrics/*.parquet',
    'Parquet'
);

Application code and tool schemas are identical in both environments.

Dagster integration

  MySQL (rev_sci, commercial_engine, V3/VOS)
  Dagster assets (Python, scheduled / sensor-triggered)
      │ extract → transform → partition → write parquet
  ADLS Gen2 (abfss://analytics@...)
  ClickHouse azureBlobStorage() reads on query

Key Dagster features:

  • Partitioned assets. partner_metrics partitioned by date; Dagster tracks materialised partitions and only rebuilds stale ones.
  • Freshness sensors. Watches upstream MySQL tables (CDC or polling) and triggers re-materialisation when source data changes.
  • Lineage. Full dependency graph (MySQL → parquet → ClickHouse query → agent tool result → LLM decision) visible in Dagster UI.
  • Backfills. Schema changes trigger automatic historical rebuild.

Dagster runs as its own stack — not part of the back-office compose. The only interface is the parquet files in ADLS (or NFS for dev).

Open questions

  1. Freshness SLA. Nightly, hourly, or CDC-triggered? What does the review workflow actually need?
  2. Dagster ownership. Data team or back-office team?
  3. What tables? partner_metrics, quote_outcomes, commission_history are proposed. What's missing?
  4. ClickHouse sizing. When does single-node need a cluster?