Build a Control Plane
You need two services to talk. So you set up a load balancer, provision TLS certs, write protobuf schemas, compile them, configure a service mesh, deploy to Kubernetes, and pray the health checks converge before the demo tomorrow.
Or: you write one file and run it.
The complete, working source for every chapter in this guide lives in the main repo. Clone, run, read.
- Python: examples/python/mission_control
- TypeScript: examples/typescript/missionControl
- Python
- TypeScript
@service(name="MissionControl", version=1)
class MissionControl:
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(agent_id=req.agent_id, status="running")
python control.py # that's the server
aster shell aster1Qm... # that's the client — tab completion, typed responses
@Service({ name: "MissionControl", version: 1 })
class MissionControl {
@Rpc()
async getStatus(req: StatusRequest): Promise<StatusResponse> {
return new StatusResponse({ agent_id: req.agent_id, status: "running" });
}
}
npx aster-gen # generate type metadata (one-time build step)
node control.ts # that's the server
aster shell aster1Qm... # that's the client — tab completion, typed responses
No YAML. No protobuf compilation. No port numbers. No cloud account. Encrypted, authenticated, works across NATs, and your colleague on the other language can call it too.
What you're replacing: Traditional RPC means writing .proto files,
compiling them, setting up TLS certificates, configuring a reverse proxy
or service mesh so clients can find your service, managing certificate
rotation, and repeating all of that for every new service. With Aster you
get mTLS-grade mutual authentication (no CA infrastructure), gRPC-style
streaming RPCs (no .proto compilation), and peer-to-peer connectivity
(no port forwarding or load balancers).
This guide builds Mission Control — a control plane for managing remote agents. An agent could be a CI runner, an IoT sensor, an AI worker, or a service on your colleague's laptop across the world.
In under an hour you'll have:
- Agents that check in, push metrics, and stream logs
- Operators that watch, issue commands, and control access
- A cross-language agent talking to your control plane
Everything runs peer-to-peer. No infrastructure beyond a relay for NAT traversal (self-hostable). Once peers find each other, traffic flows direct.
Aster uses Iroh's public relays for discovery and NAT traversal by default. Point to your own with a single environment variable:
IROH_RELAY_URL=https://relay.yourcompany.com.
Install
- Python
- TypeScript
Two packages -- the framework and the CLI:
uv pip install aster-rpc aster-cli
# or:
pip install aster-rpc aster-cli
The framework gives you from aster import ... for your service code. The CLI gives you aster shell, aster trust keygen, aster enroll node, and aster contract gen-client -- the operator tools you'll use throughout this guide.
Requirements: Python 3.9 -- 3.13, macOS / Linux / Windows.
Two pieces -- the TypeScript runtime and the CLI tools:
# In your TypeScript project:
bun add @aster-rpc/aster
# or: npm install @aster-rpc/aster
# The CLI ships as a Python package and is shared across all language
# bindings -- one shell, one trust manager, one contract generator,
# usable against any Aster server regardless of language:
uv tool install aster-cli
# or: pip install aster-cli
Requirements: Node.js 20+ or Bun 1.0+ for the TypeScript runtime. Python 3.9 -- 3.13 for the CLI (one-time install; day-to-day work stays in TypeScript).
Verify:
aster --version
Chapter 1: Your First Agent Check-In (5 min)
Goal: The full working version of what you just saw — define a service, start it, call it.
- Python
- TypeScript
# control.py
from dataclasses import dataclass
from aster import AsterServer, service, rpc, wire_type
@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""
@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0
@service(name="MissionControl", version=1)
class MissionControl:
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(
agent_id=req.agent_id,
status="running",
uptime_secs=3600,
)
async def main():
async with AsterServer(services=[MissionControl()]) as srv:
print(srv.address) # compact aster1... address
await srv.serve()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
# Start the control plane
python control.py
# → aster1Qmxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# ^^^^^^^ this is your control plane's public-key address — copy it for the next step
In another terminal, connect and inspect. Replace aster1Qm... below with the address your control.py just printed (the address is unique to each run; it's an ephemeral key in dev mode):
aster shell aster1Qm...
> cd services/MissionControl
> ./getStatus agent_id="edge-node-7"
// control.ts
import {
AsterServer, Service, Rpc, WireType,
} from '@aster-rpc/aster';
@WireType("mission/StatusRequest")
class StatusRequest {
agent_id: string = "";
constructor(init?: Partial<StatusRequest>) { if (init) Object.assign(this, init); }
}
@WireType("mission/StatusResponse")
class StatusResponse {
agent_id: string = "";
status: string = "idle";
uptime_secs: number = 0;
constructor(init?: Partial<StatusResponse>) { if (init) Object.assign(this, init); }
}
@Service({ name: "MissionControl", version: 1 })
class MissionControl {
@Rpc()
async getStatus(req: StatusRequest): Promise<StatusResponse> {
return new StatusResponse({
agent_id: req.agent_id,
status: "running",
uptime_secs: 3600,
});
}
}
async function main() {
const server = new AsterServer({ services: [new MissionControl()] });
await server.start();
console.log(server.address); // compact aster1... address
await server.serve();
}
main();
# Generate type metadata (run once, re-run after changing types/methods)
npx aster-gen
# Start the control plane
node control.ts
# → aster1Qmxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# ^^^^^^^ this is your control plane's public-key address — copy it for the next step
In another terminal, connect and inspect. Replace aster1Qm... below with the address your control.ts just printed (the address is unique to each run; it's an ephemeral key in dev mode):
aster shell aster1Qm...
> cd services/MissionControl
> ./getStatus agent_id="edge-node-7"
Or skip the shell entirely — call it straight from the command line. Replace aster1Qm... with your address:
# macOS / Linux
aster call aster1Qm... MissionControl.getStatus '{"agent_id": "edge-node-7"}'
# Windows (PowerShell). The --% operator tells PowerShell to stop parsing
# and pass the rest of the line verbatim — necessary because PowerShell's
# argument parser strips quotes from JSON arguments otherwise.
aster call aster1Qm... MissionControl.getStatus --% {"agent_id": "edge-node-7"}
aster shellvsaster call: Useaster shellfor interactive exploration — browsing services, tab-completing methods, streaming. Useaster callfor scripting and one-shot invocations. Both use JSON serialization under the hood.
What just happened:
- Decorators defined a typed RPC contract
@wire_type/@WireTypemade the types serializable across languages — no.protofiles, no separate schema to maintainAsterServercreated an encrypted QUIC endpoint and started listening — clients discover the service contract on connectaster shellconnected, discovered the service, and invoked it — with tab completion and typed responses
Chapter 2: Live Log Streaming (5 min)
Goal: Agents push logs into the control plane. Operators tail them in real time using server streaming.
This chapter extends control.py (or control.ts) with two new methods: submitLog (a unary RPC) and tailLogs (a server-streaming RPC). To save you from puzzling over which lines go where, the full updated file is below — replace your existing control.py / control.ts with it.
- Python
- TypeScript
Replace your control.py with this complete version:
# control.py
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass
from aster import AsterServer, service, rpc, server_stream, wire_type
# ─────────────── Wire types ───────────────
@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""
@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0
@wire_type("mission/LogEntry")
@dataclass
class LogEntry:
timestamp: float = 0.0
level: str = "info"
message: str = ""
agent_id: str = ""
@wire_type("mission/SubmitLogResult")
@dataclass
class SubmitLogResult:
accepted: bool = True
@wire_type("mission/TailRequest")
@dataclass
class TailRequest:
agent_id: str = ""
level: str = "info" # minimum level filter
# ─────────────── Helpers ───────────────
_LEVEL_ORDER = {"debug": 0, "info": 1, "warn": 2, "error": 3, "fatal": 4}
def _level_rank(level: str) -> int:
return _LEVEL_ORDER.get(level.lower(), 0)
# ─────────────── Service ───────────────
@service(name="MissionControl", version=1)
class MissionControl:
def __init__(self):
self._log_queue: asyncio.Queue[LogEntry] = asyncio.Queue()
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(
agent_id=req.agent_id,
status="running",
uptime_secs=3600,
)
@rpc()
async def submitLog(self, entry: LogEntry) -> SubmitLogResult:
"""Agents call this to push log entries."""
await self._log_queue.put(entry)
return SubmitLogResult(accepted=True)
@server_stream()
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]:
"""Stream log entries as they arrive."""
while True:
entry = await self._log_queue.get()
if req.agent_id and entry.agent_id != req.agent_id:
continue
if _level_rank(entry.level) < _level_rank(req.level):
continue
yield entry
# ─────────────── Main ───────────────
async def main():
async with AsterServer(services=[MissionControl()]) as srv:
print(srv.address) # compact aster1... address
await srv.serve()
if __name__ == "__main__":
asyncio.run(main())
Restart the service. Stop the previous run with Ctrl+C, then start the new version:
python control.py
# → aster1Qmxxxxxxxx... ← this is a NEW address; the old one is gone
The address changes on every restart in dev mode (it's an ephemeral key). Copy the new address — you'll need it for both terminals below.
Replace your control.ts with this complete version:
// control.ts
import {
AsterServer, Service, Rpc, ServerStream, WireType,
} from '@aster-rpc/aster';
// ─────────────── Wire types ───────────────
@WireType("mission/StatusRequest")
class StatusRequest {
agent_id: string = "";
constructor(init?: Partial<StatusRequest>) { if (init) Object.assign(this, init); }
}
@WireType("mission/StatusResponse")
class StatusResponse {
agent_id: string = "";
status: string = "idle";
uptime_secs: number = 0;
constructor(init?: Partial<StatusResponse>) { if (init) Object.assign(this, init); }
}
@WireType("mission/LogEntry")
class LogEntry {
timestamp: number = 0.0;
level: string = "info";
message: string = "";
agent_id: string = "";
constructor(init?: Partial<LogEntry>) { if (init) Object.assign(this, init); }
}
@WireType("mission/SubmitLogResult")
class SubmitLogResult {
accepted: boolean = true;
constructor(init?: Partial<SubmitLogResult>) { if (init) Object.assign(this, init); }
}
@WireType("mission/TailRequest")
class TailRequest {
agent_id: string = "";
level: string = "info"; // minimum level filter
constructor(init?: Partial<TailRequest>) { if (init) Object.assign(this, init); }
}
// ─────────────── Helpers ───────────────
const LEVEL_ORDER: Record<string, number> = {
debug: 0, info: 1, warn: 2, error: 3, fatal: 4,
};
function levelRank(level: string): number {
return LEVEL_ORDER[level.toLowerCase()] ?? 0;
}
// ─────────────── Service ───────────────
@Service({ name: "MissionControl", version: 1 })
class MissionControl {
private _logBuffer: LogEntry[] = [];
private _logResolve: ((entry: LogEntry) => void) | null = null;
@Rpc()
async getStatus(req: StatusRequest): Promise<StatusResponse> {
return new StatusResponse({
agent_id: req.agent_id,
status: "running",
uptime_secs: 3600,
});
}
@Rpc()
async submitLog(entry: LogEntry): Promise<SubmitLogResult> {
if (this._logResolve) {
this._logResolve(entry);
this._logResolve = null;
} else {
this._logBuffer.push(entry);
}
return new SubmitLogResult();
}
@ServerStream()
async *tailLogs(req: TailRequest): AsyncGenerator<LogEntry> {
while (true) {
const entry = this._logBuffer.length > 0
? this._logBuffer.shift()!
: await new Promise<LogEntry>(resolve => { this._logResolve = resolve; });
if (req.agent_id && entry.agent_id !== req.agent_id) continue;
if (levelRank(entry.level) < levelRank(req.level)) continue;
yield entry;
}
}
}
// ─────────────── Main ───────────────
async function main() {
const server = new AsterServer({ services: [new MissionControl()] });
await server.start();
console.log(server.address); // compact aster1... address
await server.serve();
}
main();
Restart the service. Stop the previous run with Ctrl+C, re-run the scanner and start the new version:
npx aster-gen
node control.ts
# → aster1Qmxxxxxxxx... ← this is a NEW address; the old one is gone
The address changes on every restart in dev mode (it's an ephemeral key). Copy the new address — you'll need it for both terminals below.
Submit some logs to stream
tailLogs blocks until log entries arrive. For the demo we need a process that's actively pushing logs, so the stream has something to show. Save this as logs.py (or logs.ts) and run it in a second terminal:
- Python
- TypeScript
# logs.py — submits one log entry per second so tailLogs has something to stream.
# Usage: python logs.py <aster1...address>
import asyncio
import sys
import time
from aster import AsterClient
async def main():
if len(sys.argv) < 2:
print("Usage: python logs.py <aster1...address>")
sys.exit(1)
async with AsterClient(address=sys.argv[1]) as client:
mc = client.proxy("MissionControl")
levels = ["info", "warn", "error"]
messages = ["disk 92% full", "health check failed", "cpu spike detected"]
i = 0
while True:
await mc.submitLog({
"timestamp": time.time(),
"level": levels[i % len(levels)],
"message": messages[i % len(messages)],
"agent_id": "edge-node-7",
})
print(f"submitted log #{i + 1}")
i += 1
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
# Replace aster1Qm... with the address from control.py
python logs.py aster1Qm...
// logs.ts — submits one log entry per second so tailLogs has something to stream.
// Usage: bun run logs.ts <aster1...address>
import { AsterClientWrapper } from '@aster-rpc/aster';
async function main() {
const address = process.argv[2];
if (!address) {
console.error("Usage: bun run logs.ts <aster1...address>");
process.exit(1);
}
const client = new AsterClientWrapper({ address });
await client.connect();
const mc = client.proxy("MissionControl");
const levels = ["info", "warn", "error"];
const messages = ["disk 92% full", "health check failed", "cpu spike detected"];
let i = 0;
while (true) {
await mc.submitLog({
timestamp: Date.now() / 1000,
level: levels[i % levels.length],
message: messages[i % messages.length],
agent_id: "edge-node-7",
});
console.log(`submitted log #${i + 1}`);
i++;
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
main();
# Replace aster1Qm... with the address from control.ts
bun run logs.ts aster1Qm...
Tail the stream
In a third terminal, open the shell and start tailing. Replace aster1Qm... with the same address as before:
aster shell aster1Qm...
> cd services/MissionControl
> ./tailLogs agent_id="edge-node-7" level="warn"
#0 {"timestamp": 1712567890.1, "level": "warn", "message": "disk 92% full", ...}
#1 {"timestamp": 1712567891.3, "level": "error", "message": "health check failed", ...}
# Ctrl+C to stop
You should see entries scroll past in real time as logs.py submits them. Note the level="warn" filter excludes info entries, so you'll see roughly two out of every three submitted logs.
Three terminals? Yes — server (
control.py), submitter (logs.py), and tail consumer (aster shell). That's the natural shape of any streaming demo: someone produces, someone consumes, the server brokers between them.
Or from your own code using the proxy client:
- Python
- TypeScript
# Server-streaming methods are called via `.stream(...)` and iterated
# with `async for`. The plain `await mc.tailLogs({...})` form is for
# unary methods only — it will raise on a streaming RPC.
async for entry in mc.tailLogs.stream({"level": "warn"}):
print(entry)
// Server-streaming methods are called via `.stream(...)` and iterated
// with `for await`. Calling `await mc.tailLogs({...})` directly is for
// unary methods only — it will throw on a streaming RPC.
for await (const entry of mc.tailLogs.stream({ level: "warn" })) {
console.log(entry);
}
The proxy uses a different call form per RPC pattern, mirroring what each one actually does:
| Pattern | Python | TypeScript |
|---|---|---|
| Unary | await mc.getStatus({...}) | await mc.getStatus({...}) |
| Server stream | async for x in mc.tailLogs.stream({...}): | for await (const x of mc.tailLogs.stream({...})) |
| Client stream | await mc.ingestMetrics(async_iter) | await mc.ingestMetrics(asyncIter) |
| Bidi stream | ch = mc.runCommand.bidi(); await ch.open(); ... | const ch = mc.runCommand.bidi(); await ch.open(); ... |
What just happened:
@server_stream/@ServerStreamturns an async generator into a streaming RPC- The client receives items as they're yielded — no polling, no websockets
- Under the hood: a single QUIC stream with Aster framing, flowing until either side closes it
Chapter 3: Metric Ingestion (5 min)
Goal: Agents push thousands of metric datapoints per second using client streaming.
This chapter adds two new wire types (MetricPoint, IngestResult) and one new method (ingestMetrics) to your control plane. Add the wire types alongside the existing types and the method inside MissionControl. The full updated file is at the end of the chapter if you'd rather copy-paste.
- Python
- TypeScript
New wire types (add these alongside LogEntry, TailRequest, etc.):
from dataclasses import field
@wire_type("mission/MetricPoint")
@dataclass
class MetricPoint:
name: str = ""
value: float = 0.0
timestamp: float = 0.0
tags: dict = field(default_factory=dict)
@wire_type("mission/IngestResult")
@dataclass
class IngestResult:
accepted: int = 0
dropped: int = 0
New method on MissionControl (add inside the class, alongside getStatus/submitLog/tailLogs):
@client_stream()
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult:
"""Receive a stream of metric points from an agent."""
accepted = 0
async for point in stream:
# Store, forward, aggregate — whatever you need.
accepted += 1
return IngestResult(accepted=accepted)
Don't forget to import client_stream at the top:
from aster import AsterServer, service, rpc, server_stream, client_stream, wire_type
Restart control.py (Ctrl+C then python control.py) and copy the new address.
Save this as metrics.py in a second terminal to push 10,000 metric points:
# metrics.py — streams 10,000 metric points to MissionControl.
# Usage: python metrics.py <aster1...address>
import asyncio
import random
import sys
import time
from aster import AsterClient
async def main():
if len(sys.argv) < 2:
print("Usage: python metrics.py <aster1...address>")
sys.exit(1)
async with AsterClient(address=sys.argv[1]) as client:
mc = client.proxy("MissionControl")
async def metrics():
for i in range(10_000):
yield {
"name": "cpu.usage",
"value": random.random(),
"timestamp": time.time(),
}
result = await mc.ingestMetrics(metrics())
print(f"Accepted: {result['accepted']}")
if __name__ == "__main__":
asyncio.run(main())
# Replace aster1Qm... with the address from control.py
python metrics.py aster1Qm...
# → Accepted: 10000
Full control.py as of Chapter 3
# control.py
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from aster import (
AsterServer, service, rpc, server_stream, client_stream, wire_type,
)
# ─────────────── Wire types ───────────────
@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""
@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0
@wire_type("mission/LogEntry")
@dataclass
class LogEntry:
timestamp: float = 0.0
level: str = "info"
message: str = ""
agent_id: str = ""
@wire_type("mission/SubmitLogResult")
@dataclass
class SubmitLogResult:
accepted: bool = True
@wire_type("mission/TailRequest")
@dataclass
class TailRequest:
agent_id: str = ""
level: str = "info"
@wire_type("mission/MetricPoint")
@dataclass
class MetricPoint:
name: str = ""
value: float = 0.0
timestamp: float = 0.0
tags: dict = field(default_factory=dict)
@wire_type("mission/IngestResult")
@dataclass
class IngestResult:
accepted: int = 0
dropped: int = 0
# ─────────────── Helpers ───────────────
_LEVEL_ORDER = {"debug": 0, "info": 1, "warn": 2, "error": 3, "fatal": 4}
def _level_rank(level: str) -> int:
return _LEVEL_ORDER.get(level.lower(), 0)
# ─────────────── Service ───────────────
@service(name="MissionControl", version=1)
class MissionControl:
def __init__(self):
self._log_queue: asyncio.Queue[LogEntry] = asyncio.Queue()
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(
agent_id=req.agent_id,
status="running",
uptime_secs=3600,
)
@rpc()
async def submitLog(self, entry: LogEntry) -> SubmitLogResult:
await self._log_queue.put(entry)
return SubmitLogResult(accepted=True)
@server_stream()
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]:
while True:
entry = await self._log_queue.get()
if req.agent_id and entry.agent_id != req.agent_id:
continue
if _level_rank(entry.level) < _level_rank(req.level):
continue
yield entry
@client_stream()
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult:
accepted = 0
async for point in stream:
accepted += 1
return IngestResult(accepted=accepted)
# ─────────────── Main ───────────────
async def main():
async with AsterServer(services=[MissionControl()]) as srv:
print(srv.address)
await srv.serve()
if __name__ == "__main__":
asyncio.run(main())
New wire types (add these alongside LogEntry, TailRequest, etc.):
@WireType("mission/MetricPoint")
class MetricPoint {
name: string = "";
value: number = 0.0;
timestamp: number = 0.0;
tags: Record<string, string> = {};
constructor(init?: Partial<MetricPoint>) { if (init) Object.assign(this, init); }
}
@WireType("mission/IngestResult")
class IngestResult {
accepted: number = 0;
dropped: number = 0;
constructor(init?: Partial<IngestResult>) { if (init) Object.assign(this, init); }
}
New method on MissionControl (add inside the class, alongside getStatus/submitLog/tailLogs):
@ClientStream()
async ingestMetrics(stream: AsyncIterable<MetricPoint>): Promise<IngestResult> {
let accepted = 0;
for await (const point of stream) {
accepted += 1;
}
return new IngestResult({ accepted });
}
Don't forget to add ClientStream to your imports at the top:
import {
AsterServer, Service, Rpc, ServerStream, ClientStream, WireType,
} from '@aster-rpc/aster';
Re-run npx aster-gen (to pick up the new method), then restart control.ts (Ctrl+C then node control.ts) and copy the new address.
Save this as metrics.ts in a second terminal to push 10,000 metric points:
// metrics.ts — streams 10,000 metric points to MissionControl.
// Usage: bun run metrics.ts <aster1...address>
import { AsterClientWrapper } from '@aster-rpc/aster';
async function main() {
const address = process.argv[2];
if (!address) {
console.error("Usage: bun run metrics.ts <aster1...address>");
process.exit(1);
}
const client = new AsterClientWrapper({ address });
await client.connect();
const mc = client.proxy("MissionControl");
async function* metrics() {
for (let i = 0; i < 10_000; i++) {
yield { name: "cpu.usage", value: Math.random(), timestamp: Date.now() / 1000 };
}
}
const result = await mc.ingestMetrics(metrics());
console.log(`Accepted: ${(result as any).accepted}`);
await client.close();
}
main();
# Replace aster1Qm... with the address from control.ts
bun run metrics.ts aster1Qm...
# → Accepted: 10000
Full control.ts as of Chapter 3 — see the examples repo for the complete version
The full file extends Chapter 2's control.ts with the MetricPoint, IngestResult types and the ingestMetrics method. Add ClientStream to your imports and the method body shown above to your MissionControl class. Everything else stays the same.
The proxy client discovers methods from the contract and sends plain objects over the wire. Great for scripting, prototyping, and generic gateways.
What just happened:
- Client streaming sends many messages, gets one response at the end
- The producer processes items as they arrive — no buffering the entire batch
- The proxy client requires no type imports — it reads the contract from the producer and builds method stubs dynamically
- This is how you'd build telemetry ingestion, log shipping, or bulk data upload
Chapter 4: Agent Sessions & Remote Commands (5 min)
Goal: Each agent gets its own session — register, heartbeat, and execute commands. This is where per-agent state and bidi streaming meet.
MissionControl is a shared service — one instance, all clients see the
same state. But each agent needs its own identity, capabilities, and
command channel. That's a session-scoped service.
This chapter adds a brand-new class — AgentSession — alongside MissionControl in the same control.py / control.ts file. Crucially, you'll also need to add AgentSession() to your server's services=[...] list, otherwise it won't be reachable. The full file is at the end of the chapter.
- Python
- TypeScript
Add these wire types alongside the existing ones:
@wire_type("mission/Heartbeat")
@dataclass
class Heartbeat:
agent_id: str = ""
capabilities: list = field(default_factory=list) # ["gpu", "arm64", ...]
load_avg: float = 0.0
@wire_type("mission/Assignment")
@dataclass
class Assignment:
task_id: str = ""
command: str = ""
@wire_type("mission/Command")
@dataclass
class Command:
command: str = ""
@wire_type("mission/CommandResult")
@dataclass
class CommandResult:
stdout: str = ""
stderr: str = ""
exit_code: int = -1
Add this new class anywhere below MissionControl in control.py:
@service(name="AgentSession", version=1, scoped="session")
class AgentSession:
"""Session-scoped: one instance per connected agent."""
def __init__(self, peer: str | None = None):
self._peer = peer
self._agent_id = ""
self._capabilities = []
@rpc()
async def register(self, hb: Heartbeat) -> Assignment:
self._agent_id = hb.agent_id
self._capabilities = hb.capabilities
if "gpu" in hb.capabilities:
return Assignment(task_id="train-42", command="python train.py")
return Assignment(task_id="idle", command="sleep 60")
@bidi_stream()
async def runCommand(self, commands: AsyncIterator[Command]) -> AsyncIterator[CommandResult]:
"""Execute commands on this agent — stream in, results stream back."""
async for cmd in commands:
proc = await asyncio.create_subprocess_shell(
cmd.command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
yield CommandResult(
stdout=stdout.decode(),
stderr=stderr.decode(),
exit_code=proc.returncode,
)
Add bidi_stream to your imports and register the new service in main():
from aster import (
AsterServer, service, rpc, server_stream, client_stream, bidi_stream, wire_type,
)
# ...
async def main():
async with AsterServer(services=[MissionControl(), AgentSession()]) as srv:
# ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
# shared session-scoped
print(srv.address)
await srv.serve()
Restart control.py and copy the new address.
Add these wire types alongside the existing ones:
@WireType("mission/Heartbeat")
class Heartbeat {
agent_id: string = "";
capabilities: string[] = [];
load_avg: number = 0.0;
constructor(init?: Partial<Heartbeat>) { if (init) Object.assign(this, init); }
}
@WireType("mission/Assignment")
class Assignment {
task_id: string = "";
command: string = "";
constructor(init?: Partial<Assignment>) { if (init) Object.assign(this, init); }
}
@WireType("mission/Command")
class Command {
command: string = "";
constructor(init?: Partial<Command>) { if (init) Object.assign(this, init); }
}
@WireType("mission/CommandResult")
class CommandResult {
stdout: string = "";
stderr: string = "";
exit_code: number = -1;
constructor(init?: Partial<CommandResult>) { if (init) Object.assign(this, init); }
}
Add this new class anywhere below MissionControl in control.ts:
@Service({ name: "AgentSession", version: 1, scoped: "session" })
class AgentSession {
private _agent_id: string = "";
private _capabilities: string[] = [];
@Rpc()
async register(hb: Heartbeat): Promise<Assignment> {
this._agent_id = hb.agent_id;
this._capabilities = hb.capabilities;
if (hb.capabilities.includes("gpu")) {
return new Assignment({ task_id: "train-42", command: "python train.py" });
}
return new Assignment({ task_id: "idle", command: "sleep 60" });
}
@BidiStream()
async *runCommand(commands: AsyncIterable<Command>): AsyncGenerator<CommandResult> {
const { execFile } = await import('node:child_process');
const { promisify } = await import('node:util');
const exec = promisify(execFile);
for await (const cmd of commands) {
try {
const { stdout, stderr } = await exec("sh", ["-c", cmd.command]);
yield new CommandResult({ stdout, stderr, exit_code: 0 });
} catch (e: any) {
yield new CommandResult({
stdout: e.stdout ?? "",
stderr: e.stderr ?? e.message,
exit_code: e.code ?? 1,
});
}
}
}
}
Add BidiStream to your imports and register the new service in main():
import {
AsterServer, Service, Rpc, ServerStream, ClientStream, BidiStream, WireType,
} from '@aster-rpc/aster';
// ...
const server = new AsterServer({
services: [new MissionControl(), new AgentSession()],
// ^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^
// shared session-scoped
});
Restart control.ts and copy the new address.
Now connect with the operator CLI. Replace aster1Qm... with the address your control.py / control.ts just printed:
# Operator connects and opens a session subshell:
aster shell aster1Qm...
> cd services
> session AgentSession
# prompt becomes "AgentSession~" — you're now in a dedicated session.
# State persists across calls; the same instance handles every method.
AgentSession~ register agent_id="edge-7" capabilities='["gpu"]'
← {"agent_id": "edge-7", "task": "train-42"}
AgentSession~ runCommand command="df -h"
← {"stdout": "Filesystem Size Used ...", "exit_code": 0}
AgentSession~ runCommand command="uptime"
← {"stdout": " 14:32 up 3 days ...", "exit_code": 0}
AgentSession~ end
Why a session subshell? Session-scoped services hold per-connection state. If you tried
./runCommanddirectly from/services/AgentSession, the shell would open a new stream per call and tear down the state between them. Thesessioncommand opens one persistent session and routes every method through it. TheAgentSession~prompt makes it obvious you're inside a stateful session. Typeendto close it.
Full control.py as of Chapter 4 — for the TypeScript equivalent see the examples repo
# control.py
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from aster import (
AsterServer, service, rpc, server_stream, client_stream, bidi_stream, wire_type,
)
# ─────────────── Wire types ───────────────
@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""
@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0
@wire_type("mission/LogEntry")
@dataclass
class LogEntry:
timestamp: float = 0.0
level: str = "info"
message: str = ""
agent_id: str = ""
@wire_type("mission/SubmitLogResult")
@dataclass
class SubmitLogResult:
accepted: bool = True
@wire_type("mission/TailRequest")
@dataclass
class TailRequest:
agent_id: str = ""
level: str = "info"
@wire_type("mission/MetricPoint")
@dataclass
class MetricPoint:
name: str = ""
value: float = 0.0
timestamp: float = 0.0
tags: dict = field(default_factory=dict)
@wire_type("mission/IngestResult")
@dataclass
class IngestResult:
accepted: int = 0
dropped: int = 0
@wire_type("mission/Heartbeat")
@dataclass
class Heartbeat:
agent_id: str = ""
capabilities: list = field(default_factory=list)
load_avg: float = 0.0
@wire_type("mission/Assignment")
@dataclass
class Assignment:
task_id: str = ""
command: str = ""
@wire_type("mission/Command")
@dataclass
class Command:
command: str = ""
@wire_type("mission/CommandResult")
@dataclass
class CommandResult:
stdout: str = ""
stderr: str = ""
exit_code: int = -1
# ─────────────── Helpers ───────────────
_LEVEL_ORDER = {"debug": 0, "info": 1, "warn": 2, "error": 3, "fatal": 4}
def _level_rank(level: str) -> int:
return _LEVEL_ORDER.get(level.lower(), 0)
# ─────────────── Services ───────────────
@service(name="MissionControl", version=1)
class MissionControl:
def __init__(self):
self._log_queue: asyncio.Queue[LogEntry] = asyncio.Queue()
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(agent_id=req.agent_id, status="running", uptime_secs=3600)
@rpc()
async def submitLog(self, entry: LogEntry) -> SubmitLogResult:
await self._log_queue.put(entry)
return SubmitLogResult(accepted=True)
@server_stream()
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]:
while True:
entry = await self._log_queue.get()
if req.agent_id and entry.agent_id != req.agent_id:
continue
if _level_rank(entry.level) < _level_rank(req.level):
continue
yield entry
@client_stream()
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult:
accepted = 0
async for point in stream:
accepted += 1
return IngestResult(accepted=accepted)
@service(name="AgentSession", version=1, scoped="session")
class AgentSession:
def __init__(self, peer: str | None = None):
self._peer = peer
self._agent_id = ""
self._capabilities = []
@rpc()
async def register(self, hb: Heartbeat) -> Assignment:
self._agent_id = hb.agent_id
self._capabilities = hb.capabilities
if "gpu" in hb.capabilities:
return Assignment(task_id="train-42", command="python train.py")
return Assignment(task_id="idle", command="sleep 60")
@bidi_stream()
async def runCommand(self, commands: AsyncIterator[Command]) -> AsyncIterator[CommandResult]:
async for cmd in commands:
proc = await asyncio.create_subprocess_shell(
cmd.command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
yield CommandResult(
stdout=stdout.decode(),
stderr=stderr.decode(),
exit_code=proc.returncode,
)
# ─────────────── Main ───────────────
async def main():
async with AsterServer(services=[MissionControl(), AgentSession()]) as srv:
print(srv.address)
await srv.serve()
if __name__ == "__main__":
asyncio.run(main())
What just happened:
scoped="session"creates a freshAgentSessionper connection — each agent gets its own identity, capabilities, and command channelrunCommanduses bidi streaming: commands flow in, results flow back, all on a single multiplexed QUIC stream- When the agent disconnects, the session is cleaned up automatically
Two service types, two different lifetimes:
MissionControl(shared) — fleet-wide: status, logs, metricsAgentSession(session) — per-agent: register, heartbeat, commands
Chapter 5: Auth & Capabilities (5 min)
Goal: Not every caller should be able to deploy or run commands on agents. Define roles, compose requirements, and issue scoped credentials.
Up until now the system has run in open-gate mode. That's fine sometimes, but other times you need to put controls on who can access your service. In this step, we will enable authentication so our service will no longer be open to anyone who contacts our node — callers will have to prove they're authorized.
The auth flow has three steps:
- Define — declare which capabilities each method requires (in code)
- Issue — create credentials with specific capabilities (CLI)
- Connect — present the credential on connect; the framework enforces access
Step 1: Generate a root key
The root key is the trust anchor for your entire deployment. It identifies you personally as the owner of your deployment. Keep it offline — you'll use it to sign credentials, not to run services.
# One-time setup — generates an Ed25519 keypair
aster trust keygen --out-key ~/.aster/root.key
# Output:
# Root private key written to: ~/.aster/root.key
# Root public key written to: ~/.aster/root.pub
# Keep root.key secret. Share root.pub with nodes that need to verify credentials.
Step 2: Define roles in code
- Python
- TypeScript
from enum import Enum
from aster import any_of
# Also available: `all_of(A, B)` — caller must have BOTH roles.
class Role(str, Enum):
STATUS = "ops.status"
LOGS = "ops.logs"
ADMIN = "ops.admin"
INGEST = "ops.ingest"
@service(name="MissionControl", version=1)
class MissionControl:
@rpc(requires=Role.STATUS)
async def getStatus(self, req: StatusRequest) -> StatusResponse: ...
@server_stream(requires=any_of(Role.LOGS, Role.ADMIN))
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]: ...
@client_stream(requires=Role.INGEST)
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult: ...
@service(name="AgentSession", version=1, scoped="session")
class AgentSession:
@rpc(requires=Role.INGEST)
async def register(self, hb: Heartbeat) -> Assignment: ...
@bidi_stream(requires=Role.ADMIN)
async def runCommand(self, commands: AsyncIterator[Command]) -> AsyncIterator[CommandResult]: ...
import { anyOf } from '@aster-rpc/aster';
// Also available: `allOf(A, B)` — caller must have BOTH roles.
const Role = {
STATUS: "ops.status",
LOGS: "ops.logs",
ADMIN: "ops.admin",
INGEST: "ops.ingest",
} as const;
@Service({ name: "MissionControl", version: 1 })
class MissionControl {
@Rpc({ requires: Role.STATUS })
async getStatus(req: StatusRequest): Promise<StatusResponse> { ... }
@ServerStream({ requires: anyOf(Role.LOGS, Role.ADMIN) })
async *tailLogs(req: TailRequest): AsyncGenerator<LogEntry> { ... }
@ClientStream({ requires: Role.INGEST })
async ingestMetrics(stream: AsyncIterable<MetricPoint>): Promise<IngestResult> { ... }
}
@Service({ name: "AgentSession", version: 1, scoped: "session" })
class AgentSession {
@Rpc({ requires: Role.INGEST })
async register(hb: Heartbeat): Promise<Assignment> { ... }
@BidiStream({ requires: Role.ADMIN })
async *runCommand(commands: AsyncIterable<Command>): AsyncGenerator<CommandResult> { ... }
}
Step 3: Start the control plane with auth
Update your main() to attach a config that points at the root public key and turns off open admission. You'll also pass an identity file path so the server has a stable endpoint id (without one, every restart gives you a new id and breaks any credentials you've issued).
- Python
- TypeScript
from aster import AsterConfig
async def main():
config = AsterConfig(
root_pubkey_file="~/.aster/root.pub", # owner's public key (yours)
allow_all_consumers=False,
)
async with AsterServer(
services=[MissionControl(), AgentSession()],
identity=".aster-identity", # stable endpoint identity
config=config,
) as srv:
print(srv.address)
await srv.serve()
async function main() {
const server = new AsterServer({
services: [new MissionControl(), new AgentSession()],
identity: ".aster-identity", // stable endpoint identity
config: {
rootPubkeyFile: "~/.aster/root.pub", // owner's public key (yours)
},
allowAllConsumers: false,
});
await server.start();
console.log(server.address);
await server.serve();
}
Restart control.py / control.ts and copy the new address. Because you've now attached a stable .aster-identity file, this address will stay the same across restarts — and any credentials you issue against it will keep working.
Each Aster endpoint will have its own identity (secret key pair) and it will have the public key of its owner (you) so it knows who administers it.
.aster-identity file?Aster generates a fresh ephemeral keypair on startup. That's fine for experiments, but every restart gives you a new endpoint id — and any credentials you issued to the old one will stop working. Once you start enrolling peers, commit to a persistent identity file.
Step 4: Enroll agents
When you want to allow another endpoint connect to yours, you must give it permission. You do that by generating a credential for it and putting in it the roles that endpoint should have.
# Edge agent — status and ingest only
aster enroll node --role consumer --name "edge-node-7" \
--capabilities ops.status,ops.ingest \
--root-key ~/.aster/root.key \
--out edge-node-7.cred
aster enroll node will print a summary like this:
✓ Enrollment credential created
File: /home/you/work/edge-node-7.cred
Format: TOML (.aster-identity) with [node] + [[peers]] sections
Peer: edge-node-7
Role: consumer (policy)
Capabilities: ops.status,ops.ingest
Endpoint ID: 142179f10b7bc606...
Trust root: cd948e4c1456cdbe...
Expires: 2026-05-10T20:20:12+00:00
This file lets a consumer connect to your trusted-mode servers.
It contains a node identity (secret key) AND a signed enrollment
credential. The server validates the credential and grants the
capabilities listed below.
Use it:
aster shell <peer-addr> --rcan edge-node-7.cred
aster call <peer-addr> Service.method '<json>' --rcan edge-node-7.cred
⚠ Keep this file secret -- it is both an identity AND a credential.
Despite the .cred extension, it's a regular .aster-identity TOML
file with two sections:
[node]— the consumer's secret key + endpoint ID. Used by the QUIC layer to prove the consumer's identity.[[peers]]— the signed enrollment credential. Presented to servers to claim capabilities.
Both sections live in the same file because they're paired: the
server checks that the QUIC peer ID matches the credential's
endpoint_id. If they don't match, admission fails.
# Ops team — full access including admin
aster enroll node --role consumer --name "ops-team" \
--capabilities ops.status,ops.logs,ops.admin,ops.ingest \
--root-key ~/.aster/root.key \
--out ops-team.cred
Pass --quiet (or -q) to suppress the educational output. The
command prints exactly one line: <path> <endpoint_id> <expires_iso>
on success and exits non-zero on failure. Easy to parse from CI.
Step 5: Connect with credentials
Replace aster1Qm... in the snippets below with the address your control.py / control.ts printed in Step 3.
- Python
- TypeScript
client = AsterClient(
address="aster1Qm...", # ← from control.py output
enrollment_credential_file="edge-node-7.cred",
)
await client.connect()
mc = client.proxy("MissionControl")
await mc.getStatus({"agent_id": "test"}) # ✓ has ops.status
await mc.ingestMetrics(...) # ✓ has ops.ingest
# await agent.runCommand(...) # ✗ AccessDenied — missing ops.admin
const client = new AsterClientWrapper({
address: "aster1Qm...", // ← from control.ts output
enrollmentCredentialFile: "edge-node-7.cred",
});
await client.connect();
const mc = client.proxy("MissionControl");
await mc.getStatus({ agent_id: "test" }); // ✓ has ops.status
await mc.ingestMetrics(...); // ✓ has ops.ingest
// await agent.runCommand(...); // ✗ AccessDenied — missing ops.admin
# Or from the CLI — the shell respects credentials too.
# Replace aster1Qm... with the address from control.py.
aster shell aster1Qm... --rcan ops-team.cred
> cd services
> session AgentSession # opens session subshell
AgentSession~ runCommand command="df" # ✓ ops-team has ops.admin
What just happened:
aster trust keygencreated the root of trust — one commandaster enroll nodeissued scoped credentials — no CA infrastructurerequires=— Aster checks at the method level, no auth middleware to writeany_of(A, B)— caller needs at least one (log viewers OR admins can tail)- The edge agent can push metrics but can't run commands. The ops team can do both. That's the entire access control model — defined in code, enforced at the wire level
Chapter 6: Cross-Language Interop (5 min)
Goal: Your teammate uses a different language. They don't have your source code — just the server address.
Your control.py / control.ts from Chapter 4 (or 5 with auth) keeps running. The cross-language client below is a brand-new file in a different language — no shared source, no codegen, no .proto file.
- Python
- TypeScript
A TypeScript teammate calls your Python control plane. Save this as ts-agent.ts (anywhere — it doesn't need to live in the same directory as your Python code) and run it in a separate terminal. Replace aster1Qm... with the address your control.py printed.
// ts-agent.ts — TypeScript client calling a Python control plane.
// Usage: bun run ts-agent.ts <aster1...address>
import { AsterClientWrapper } from '@aster-rpc/aster';
async function main() {
const address = process.argv[2];
if (!address) {
console.error("Usage: bun run ts-agent.ts <aster1...address>");
process.exit(1);
}
const client = new AsterClientWrapper({ address });
await client.connect();
const mc = client.proxy("MissionControl");
const status = await mc.getStatus({ agent_id: "ts-worker-1" });
console.log(`Status: ${status.agent_id} is ${status.status}`);
// Stream metrics from TypeScript to the Python control plane
const result = await mc.ingestMetrics(async function*() {
for (let i = 0; i < 1000; i++) {
yield { name: "gpu.temp", value: 72 + Math.random() * 10 };
}
}());
console.log(`Accepted: ${result.accepted}`);
await client.close();
}
main();
bun run ts-agent.ts aster1Qm...
A Python teammate calls your TypeScript control plane. Save this as py-agent.py (anywhere) and run it in a separate terminal. Replace aster1Qm... with the address your control.ts printed.
# py-agent.py — Python client calling a TypeScript control plane.
# Usage: python py-agent.py <aster1...address>
import asyncio
import random
import sys
from aster import AsterClient
async def main():
if len(sys.argv) < 2:
print("Usage: python py-agent.py <aster1...address>")
sys.exit(1)
async with AsterClient(address=sys.argv[1]) as client:
mc = client.proxy("MissionControl")
status = await mc.getStatus({"agent_id": "py-worker-1"})
print(f"Status: {status['agent_id']} is {status['status']}")
# Stream metrics from Python to the TypeScript control plane
async def metrics():
for i in range(1000):
yield {"name": "gpu.temp", "value": 72 + random.random() * 10}
result = await mc.ingestMetrics(metrics())
print(f"Accepted: {result['accepted']}")
if __name__ == "__main__":
asyncio.run(main())
python py-agent.py aster1Qm...
What just happened:
- Your teammate never saw your source code
- The proxy client discovered the contract on connect and built method stubs dynamically — full RPC, no codegen required
- Same wire format, same contract hash — producer and consumer agree on the protocol without sharing a repo
"But there's no .proto file — how does the other language know what you sent?" — The
@wire_type/@WireTypedecorator registers each type's schema in Aster's content-addressed contract. The contract is published with the service and discovered on connect. The contract is the shared schema — you just never had to write it by hand.
What's Next?
You just built a working control plane with four RPC patterns, session-scoped agents, capability-based auth, and cross-language interop. That's a real system — not a demo.
Next guides in the series:
- Hardening for Production — interceptors for retry, circuit-breaking, rate limiting, and deadlines
- Scaling Out — multiple producers with automatic fail-over
- Artifact Distribution — push builds and model weights to agents with content-addressed blobs
- Shared Fleet State — CRDT documents that sync across your fleet
The full source for this example is in
examples/python/mission_control/
and
examples/typescript/missionControl/.