Distributed Query
Multi-active distributed query with Apache Ballista, partition-aware execution, and distributed accelerations for multi-node Spice.ai clusters.
Spice.ai Enterprise supports distributed query execution, built on Apache Ballista and Apache DataFusion, for horizontally scaling SQL workloads across multiple nodes. A cluster runs in multi-active mode with shared state in an S3-compatible object store, executes queries with partition-aware routing across executors, and supports distributed accelerations with per-partition data ownership and write-through semantics.
Architecture
A distributed query cluster is composed of three tiers:
Schedulers — Coordinate query planning, partition discovery and assignment, async job submission, and federated
task_historyaggregation.Executors — Hold accelerated data for their assigned partitions, execute query stages, perform shuffles, and stream results.
Object store — S3-compatible storage that holds shared cluster state (
cluster.json, scheduler heartbeats, async job results, and Cayenne acceleration snapshots).
┌──────────────────────────────────────┐
│ Load Balancer │
└──────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Scheduler │ │Scheduler │ │Scheduler │◄──► Object Store
└──────────┘ └──────────┘ └──────────┘
▲ ▲ ▲
│ │ │
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Executor │ │ Executor │ │ Executor │
└──────────┘ └──────────┘ └──────────┘Schedulers register themselves and discover peers through the object store. Executors are shared across all schedulers — they are not bound to or owned by any single scheduler. On startup, an executor connects to its bootstrap scheduler, fetches the full scheduler membership list, and then opens a Ballista task poll_loop plus a persistent bidirectional ControlStream to every scheduler over the internal gRPC port (50052). It refreshes the membership list every 10 s and adds/drops connections as schedulers join or leave. Any scheduler can dispatch tasks to any executor, and an executor's partition assignment is owned by the cluster (persisted in the object store), not by the scheduler that allocated it.
Multi-Active High Availability
Multiple schedulers run simultaneously without an external coordinator:
Shared state — Cluster membership and per-table partition metadata are persisted in
cluster.jsonin the configured object store. All mutations use optimistic concurrency control (ETag /If-Match) with up to 8 retry attempts; the schema is versioned (CLUSTER_STATE_SCHEMA_VERSION = 1) and incompatible versions are rejected.Heartbeats — Each scheduler writes a small
heartbeats/{scheduler_id}.jsonfile independent ofcluster.jsonto avoid contention. Heartbeats refresh at one third of the configured TTL (default ~10 s for a 30 s TTL).Discovery — Every scheduler refreshes its peer map every 5 s by reading
cluster.jsonplus live heartbeat files.Reaper — Schedulers whose last heartbeat exceeds
ttl_ms + 5 sclock-skew tolerance are evicted on the next reaper tick (jittered ±20 % aroundttl_ms).Takeover safety — Each scheduler process generates a fresh
instance_id(UUID) at startup, so a restarted process can safely reclaim its previousscheduler_idwithout colliding with itself.Executor failover — Executors maintain a poll loop (Fibonacci backoff, max 5 s) and a
ControlStreamto every scheduler concurrently. If a scheduler dies, the executor's other connections remain live and surviving schedulers continue to drive partition assignment and refresh commands without re-binding the executor.
The object store backing state_location must support conditional writes. Native AWS S3, S3-compatible stores with PutIfNotExists/PutIfMatch semantics, and the file:// backend (for local development) are supported.
Configuration
Spicepod (runtime.scheduler)
runtime.scheduler)The presence of runtime.scheduler activates scheduler role on a node. Setting --scheduler-address at launch time activates executor role.
state_location
—
Object-store URL for shared cluster state. s3://, file://, or any object store with conditional writes.
params
—
Object-store auth/configuration. auth: iam_role is the default; explicit keys are sourced from secrets.
partition_assignment_interval
30s
How often the partition discovery + assignment loop runs.
max_partition_assignments_per_interval
100
Cap on partitions assigned per cycle to limit churn during scale events.
max_partitions_per_executor
1000
Soft cap per executor used by the assignment heuristic.
partition_discovery_timeout
60s
Maximum time to spend discovering partition values from the federated source per cycle.
CLI flags
The same spiced binary runs as scheduler or executor; the role is selected by flag combination.
--role
inferred
scheduler or executor. Inferred as executor when --scheduler-address is set.
--node-bind-address
0.0.0.0:50052
Internal gRPC bind address for both roles.
--node-advertise-address
required
Hostname/IP this node advertises to peers. Forms scheduler_id as {advertise}:{port}.
--scheduler-address
required (executor)
URL of the scheduler's internal gRPC service. Scheme inferred from TLS configuration.
--node-mtls-ca-certificate-file
—
PEM CA used to verify peer mTLS certificates.
--node-mtls-certificate-file
—
PEM server + client certificate for this node.
--node-mtls-key-file
—
Private key for the mTLS certificate.
--allow-insecure-connections
false
Disable mTLS. Development / test only.
Manual launch (non-Kubernetes)
Kubernetes (recommended)
Use the SpicepodCluster CRD. The Spice Operator provisions mTLS certificates, child SpicepodSet resources for the scheduler and executor pools, services, and PodMonitors automatically. See High Availability for AZ spread, anti-affinity, and PodDisruptionBudget guidance.
Internal gRPC (port 50052)
The internal ClusterService gRPC surface is mTLS-protected and never exposed externally. All cluster coordination flows through it:
GetAppDefinition
Executor at startup
Fetches the full Spicepod definition (datasets, catalogs, views, UDFs) so executors do not need a local manifest.
ExpandSecret
Executor at startup
Resolves a secret key through the scheduler's secret store.
GetSchedulers
Executor at startup
Returns the list of live scheduler advertise addresses; executor opens a poll loop to each.
AllocateInitialPartitions
Executor at startup
Returns the executor's assigned per-table partition filter expressions (serialized DataFusion Expr).
ControlStream (bidirectional)
Executor → Scheduler
Carries heartbeats and metric responses; receives UpdatePartitions, PollNow, RefreshDataset, and CancelTasks commands.
GetTaskHistory
Scheduler → peer schedulers
Federated runtime.task_history fan-out across the cluster.
GetMetrics
Scheduler → peers / executors
On-demand OTLP metrics collection.
Partitioning, Sharding, and Partition-Aware Queries
Distributed Spice clusters shard accelerated tables horizontally across executors using user-declared partition keys. Query planning is partition-aware: each executor only scans the partitions it owns, and the scheduler unions and merges results.
Declaring partitions
Every accelerated dataset and view in cluster mode must declare at least one partition key via acceleration.partition_by. Startup fails otherwise with:
Accelerated {component_type} '{name}' has no partition keys configured. Add 'partition_by' to its acceleration config to participate in cluster partition assignment.
Each entry is a SQL expression over the source schema. Entries can be anonymous, named, or column references:
The bucket(N, col) function is treated as a static partition key: values 0..N-1 are enumerated without querying the source.
Partition discovery and assignment
Partition assignment is coordinated through the object store: cluster.json is the single source of truth for which executor owns which partition. Schedulers never assign partitions purely from in-memory state, and executors never own a partition that is not durably committed to the object store first.
The scheduler runs a PartitionAssignmentTask on partition_assignment_interval (default 30 s):
Discover — For each accelerated table, runs
SELECT DISTINCT {expressions} FROM {federated_source}against the source connector (withinpartition_discovery_timeout). Staticbucket(N, col)keys skip the query.Diff — Compares discovered values against
cluster.json(accelerationssub-map). New partitions are recorded as unassigned; stale partitions are removed.Assign — Picks executors for unassigned partitions using a greedy minimum set-cover algorithm, respecting
max_partitions_per_executorandmax_partition_assignments_per_interval. Tie-breaks are deterministic by executor ID.Commit — Writes the assignment (
assigned_executorsfield of each partition entry) tocluster.jsonvia an OCC conditional write (up to 8 retries). Concurrent schedulers serialize on this write: if two schedulers race on the same partition, one commits first and the other re-reads fresh state and retries, so an executor cannot be assigned the same partition twice.Notify — After the object-store commit succeeds, the scheduler pushes an
UpdatePartitionsmessage over the executor'sControlStreamso it can immediately materialize the new partitions locally without waiting to re-readcluster.json.
How executors learn their assignments
Startup / restart
Executor calls AllocateInitialPartitions (RPC pull) on its bootstrap scheduler, which reads from cluster.json and returns the executor's assigned partition filter expressions per table.
Ongoing changes
Scheduler commits the change to cluster.json (OCC), then pushes UpdatePartitions over the ControlStream. The executor updates its in-memory partition_assignments map and registers/removes local accelerations accordingly.
Authoritative state
cluster.json in the object store. The executor's in-memory map and the scheduler's PartitionStore cache are derived views; both are rebuilt from the object store after a process restart.
Partition-aware query planning
The DataFusion analyzer rule PartitionedTableScanRewrite (scheduler-only) rewrites every TableScan on an accelerated table into a UNION ALL over per-executor FlightSQL scans, pushing down the executor's partition filter and any user predicates:
When the plan contains a Limit → Sort → Union pattern (top-K), the Sort is pushed into each union leg and the executor returns at most Limit rows before the scheduler performs a final merge-sort.
Executor selection uses a greedy minimum set-cover algorithm: pick the executor that covers the most still-required partitions, repeat until coverage is complete, break ties deterministically by executor ID. If any required partition is unassigned, the query fails with:
Cannot execute query: N partition(s) not assigned to any executor
On executors, the AcceleratedPartitionProvider resolves partitions to local TableProviders. Row-level partition predicates are not re-evaluated on executors — each executor only holds its own partitions, so filtering happens by ownership.
Partition-aware writes
Write-through INSERT, UPDATE, DELETE, and MERGE INTO flow through the same partition-aware Arrow Flight DoPut path on the scheduler:
The scheduler decodes the inbound
DoPutschema header.For each
RecordBatch, it evaluates the table's partition expressions against every row to classify rows into partitions.Rows for partitions already assigned to an executor are streamed via that executor's FlightSQL
DoPut.Rows whose partition values are new are assigned on-the-fly to the least-loaded executor (recorded in
cluster.jsonvia OCC), then forwarded.All per-executor sub-streams run concurrently. Idle streams emit a keepalive sentinel to avoid the
DoPutidle timeout (120 s default; override withSPICE_DO_PUT_IDLE_TIMEOUT_SECS).
Write-through is currently constrained to the Cayenne accelerator (see Distributed Accelerations).
Constraints
Every accelerated table and view in cluster mode must declare
partition_by.Partition expressions must be deterministic SQL over the source schema.
bucket(N, col)is the supported static hash-partition function.The scheduler holds no accelerated data; partition discovery queries always go to the federated source.
The DataFusion physical optimizer rule
EnsureSupportedFileScanrejects plans containing file-scan nodes that cannot be distributed.
Distributed Accelerations
Full reference: Distributed Accelerations.
In cluster mode, accelerated data is sharded across executors: each executor materializes only the partitions it owns. The scheduler's view of the table is a logical UNION ALL across executors; it never holds row data.
Engine support
Engine
Cluster partition assignment
write_mode: write_through
Notes
Cayenne
✓
✓
Required for write-through. Vortex storage with SQLite metadata. Supports acceleration snapshots.
DuckDB
✓
—
Read-only partitioned acceleration.
Arrow (in-memory)
✓
—
Read-only. Data is lost on pod restart unless backed by snapshots.
SQLite
✓
—
Read-only.
Postgres
✓
—
Read-only.
Attempting write_mode: write_through with a non-Cayenne engine fails fast at startup with Write-through acceleration currently requires the Cayenne accelerator.
Per-executor sharding
Each partition has a single owning executor (1:1 assignment in cluster.json). Executor-local accelerations only contain rows that match their assigned partition filter expressions. Read paths fan out across executors; write paths route by partition key.
Refresh in cluster mode
The scheduler is the single source of truth for partition ownership; there is no separate refresh leader election.
After every source refresh cycle, the
PartitionAssignmentTaskre-runs discovery; new partition values are assigned, andUpdatePartitionsis pushed to the receiving executor, which materializes the new partition.The scheduler can also explicitly issue
RefreshDatasetCommandover theControlStreamto trigger a refresh on a specific executor, which is how API-driven refresh requests are routed.
Acceleration snapshots (Cayenne)
Acceleration snapshots are a Spice.ai Enterprise feature. See Acceleration Snapshots for the full reference. They are not available in Spice.ai OSS.
Cayenne supports object-store-backed acceleration snapshots that integrate naturally with cluster mode, allowing a newly started executor to bootstrap from the shared object store rather than re-fetching from the federated source:
Setting snapshots: bootstrap_only is recommended on executors when the source is expensive to scan: executors hydrate from the snapshot at startup but do not produce new snapshots themselves. Cayenne's internal partition metadata supports composite keys (Hive-style key1=v1/key2=v2/... paths).
Sizing
Size each executor's local NVMe to hold its assigned partitions plus a shuffle working set (rule of thumb: ~25 % of dataset volume).
The scheduler does not need fast local storage; provision modest CPU / memory and ensure low-latency access to the object store.
See Storage for instance-type recommendations.
Execution Modes
Synchronous
/v1/sql, FlightSQL
Client waits for the query to complete and receives results directly. Available in any deployment.
Asynchronous
/v1/jobs
Client submits a query and polls a job_id for status. Results are stored as chunked Arrow IPC under jobs/ in the shared object store. Cluster (scheduler) mode only.
Async jobs require runtime.scheduler.state_location to be configured.
Observability
All distributed-query metrics are emitted through the OTel cluster meter and exposed on the standard Prometheus port (9090). The Grafana dashboard shipped with Spice.ai Enterprise (see Observability) plots the most important signals.
Shared (scheduler and executor)
node_status
Gauge
node_id, role
node_tasks_total
Counter
node_id, role, status
node_tasks_active
UpDownCounter
node_id, role
node_task_failures
Counter
node_id, role, error_type
node_task_retries
Counter
node_id, role
Scheduler
scheduler_active_executors_count
Gauge
scheduler_count
Gauge
scheduler_task_queue_depth
Gauge
scheduler_task_scheduling_latency_ms
Histogram
scheduler_stages_total
Counter
scheduler_stage_duration_ms
Histogram
scheduler_stage_failures
Counter
scheduler_stage_retries
Counter
scheduler_tasks_per_stage
Histogram
scheduler_planning_duration_ms
Histogram
scheduler_executor_assignments
Counter
scheduler_job_queue_depth
Gauge
scheduler_result_fetch_{bytes,rows,count,duration_ms}
Counter / Histogram
Executor
executor_tasks_active
UpDownCounter
executor_tasks_total
Counter
executor_task_failures
Counter
executor_task_duration_ms
Histogram
executor_memory_available_bytes
Gauge
executor_task_slots
Gauge
executor_shuffle_write_{bytes,rows,duration_ms}
Counter / Histogram
executor_shuffle_read_local_{bytes,rows,count,duration_ms}
Counter / Histogram
executor_shuffle_read_remote_{bytes,rows,count,duration_ms}
Counter / Histogram
Federated runtime.task_history
runtime.task_historyIn cluster mode, runtime.task_history is replaced with a federated view. Queries fan out via GetTaskHistory to all peer schedulers and via FlightSQL to all connected executors; results are merged and sorted by start_time DESC. Every row is annotated with a node_id column identifying the originating advertise address. The local non-federated table remains accessible as runtime.local_task_history for debugging.
If any peer or executor fails, the federated query fails — there are no partial results.
Limits and Caveats
Every accelerated dataset and view in cluster mode must declare partition_by.
Startup error: … has no partition keys configured.
write_mode: write_through requires the Cayenne accelerator.
Startup error: Write-through acceleration currently requires the Cayenne accelerator.
Two scheduler processes cannot share a scheduler_id while one is live.
Startup error: scheduler id {id} is already registered.
All partitions referenced by a query must be assigned to a connected executor at execution time.
Runtime error: Cannot execute query: N partition(s) not assigned to any executor.
Default DoPut idle timeout is 120 s; override with SPICE_DO_PUT_IDLE_TIMEOUT_SECS.
Long-running write streams must emit data or rely on the keepalive sentinel.
File-scan nodes that cannot be distributed are rejected by the EnsureSupportedFileScan physical optimizer rule.
Plan-time error during distributed planning.
cluster.json schema is versioned (schema_version = 1); other versions are rejected.
Cluster state from a different runtime major version is not accepted.
Object stores backing state_location must implement conditional writes (ETag / If-Match).
Required for multi-active correctness.
See also
Last updated
Was this helpful?