Skip to main content

Build a Control Plane

You need two services to talk. So you set up a load balancer, provision TLS certs, write protobuf schemas, compile them, configure a service mesh, deploy to Kubernetes, and pray the health checks converge before the demo tomorrow.

Or: you write one file and run it.

Don't want to follow along? Skip to the code.

The complete, working source for every chapter in this guide lives in the main repo. Clone, run, read.

@service(name="MissionControl", version=1)
class MissionControl:
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(agent_id=req.agent_id, status="running")
python control.py          # that's the server
aster shell aster1Qm... # that's the client — tab completion, typed responses

No YAML. No protobuf compilation. No port numbers. No cloud account. Encrypted, authenticated, works across NATs, and your colleague on the other language can call it too.

What you're replacing: Traditional RPC means writing .proto files, compiling them, setting up TLS certificates, configuring a reverse proxy or service mesh so clients can find your service, managing certificate rotation, and repeating all of that for every new service. With Aster you get mTLS-grade mutual authentication (no CA infrastructure), gRPC-style streaming RPCs (no .proto compilation), and peer-to-peer connectivity (no port forwarding or load balancers).

This guide builds Mission Control — a control plane for managing remote agents. An agent could be a CI runner, an IoT sensor, an AI worker, or a service on your colleague's laptop across the world.

In under an hour you'll have:

  • Agents that check in, push metrics, and stream logs
  • Operators that watch, issue commands, and control access
  • A cross-language agent talking to your control plane

Everything runs peer-to-peer. No infrastructure beyond a relay for NAT traversal (self-hostable). Once peers find each other, traffic flows direct.

Aster uses Iroh's public relays for discovery and NAT traversal by default. Point to your own with a single environment variable: IROH_RELAY_URL=https://relay.yourcompany.com.


Install

Two packages -- the framework and the CLI:

uv pip install aster-rpc aster-cli
# or:
pip install aster-rpc aster-cli

The framework gives you from aster import ... for your service code. The CLI gives you aster shell, aster trust keygen, aster enroll node, and aster contract gen-client -- the operator tools you'll use throughout this guide.

Requirements: Python 3.9 -- 3.13, macOS / Linux / Windows.

Verify:

aster --version

Chapter 1: Your First Agent Check-In (5 min)

Goal: The full working version of what you just saw — define a service, start it, call it.

# control.py
from dataclasses import dataclass
from aster import AsterServer, service, rpc, wire_type

@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""

@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0

@service(name="MissionControl", version=1)
class MissionControl:
@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(
agent_id=req.agent_id,
status="running",
uptime_secs=3600,
)

async def main():
async with AsterServer(services=[MissionControl()]) as srv:
print(srv.address) # compact aster1... address
await srv.serve()

if __name__ == "__main__":
import asyncio
asyncio.run(main())
# Start the control plane
python control.py
# → aster1Qmxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# ^^^^^^^ this is your control plane's public-key address — copy it for the next step

In another terminal, connect and inspect. Replace aster1Qm... below with the address your control.py just printed (the address is unique to each run; it's an ephemeral key in dev mode):

aster shell aster1Qm...
> cd services/MissionControl
> ./getStatus agent_id="edge-node-7"

Or skip the shell entirely — call it straight from the command line. Replace aster1Qm... with your address:

# macOS / Linux
aster call aster1Qm... MissionControl.getStatus '{"agent_id": "edge-node-7"}'
# Windows (PowerShell). The --% operator tells PowerShell to stop parsing
# and pass the rest of the line verbatim — necessary because PowerShell's
# argument parser strips quotes from JSON arguments otherwise.
aster call aster1Qm... MissionControl.getStatus --% {"agent_id": "edge-node-7"}

aster shell vs aster call: Use aster shell for interactive exploration — browsing services, tab-completing methods, streaming. Use aster call for scripting and one-shot invocations. Both use JSON serialization under the hood.

What just happened:

  • Decorators defined a typed RPC contract
  • @wire_type / @WireType made the types serializable across languages — no .proto files, no separate schema to maintain
  • AsterServer created an encrypted QUIC endpoint and started listening — clients discover the service contract on connect
  • aster shell connected, discovered the service, and invoked it — with tab completion and typed responses

Chapter 2: Live Log Streaming (5 min)

Goal: Agents push logs into the control plane. Operators tail them in real time using server streaming.

This chapter extends control.py (or control.ts) with two new methods: submitLog (a unary RPC) and tailLogs (a server-streaming RPC). To save you from puzzling over which lines go where, the full updated file is below — replace your existing control.py / control.ts with it.

Replace your control.py with this complete version:

# control.py
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass
from aster import AsterServer, service, rpc, server_stream, wire_type

# ─────────────── Wire types ───────────────

@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""

@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0

@wire_type("mission/LogEntry")
@dataclass
class LogEntry:
timestamp: float = 0.0
level: str = "info"
message: str = ""
agent_id: str = ""

@wire_type("mission/SubmitLogResult")
@dataclass
class SubmitLogResult:
accepted: bool = True

@wire_type("mission/TailRequest")
@dataclass
class TailRequest:
agent_id: str = ""
level: str = "info" # minimum level filter

# ─────────────── Helpers ───────────────

_LEVEL_ORDER = {"debug": 0, "info": 1, "warn": 2, "error": 3, "fatal": 4}

def _level_rank(level: str) -> int:
return _LEVEL_ORDER.get(level.lower(), 0)

# ─────────────── Service ───────────────

@service(name="MissionControl", version=1)
class MissionControl:
def __init__(self):
self._log_queue: asyncio.Queue[LogEntry] = asyncio.Queue()

@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(
agent_id=req.agent_id,
status="running",
uptime_secs=3600,
)

@rpc()
async def submitLog(self, entry: LogEntry) -> SubmitLogResult:
"""Agents call this to push log entries."""
await self._log_queue.put(entry)
return SubmitLogResult(accepted=True)

@server_stream()
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]:
"""Stream log entries as they arrive."""
while True:
entry = await self._log_queue.get()
if req.agent_id and entry.agent_id != req.agent_id:
continue
if _level_rank(entry.level) < _level_rank(req.level):
continue
yield entry

# ─────────────── Main ───────────────

async def main():
async with AsterServer(services=[MissionControl()]) as srv:
print(srv.address) # compact aster1... address
await srv.serve()

if __name__ == "__main__":
asyncio.run(main())

Restart the service. Stop the previous run with Ctrl+C, then start the new version:

python control.py
# → aster1Qmxxxxxxxx... ← this is a NEW address; the old one is gone

The address changes on every restart in dev mode (it's an ephemeral key). Copy the new address — you'll need it for both terminals below.

Submit some logs to stream

tailLogs blocks until log entries arrive. For the demo we need a process that's actively pushing logs, so the stream has something to show. Save this as logs.py (or logs.ts) and run it in a second terminal:

# logs.py — submits one log entry per second so tailLogs has something to stream.
# Usage: python logs.py <aster1...address>
import asyncio
import sys
import time
from aster import AsterClient

async def main():
if len(sys.argv) < 2:
print("Usage: python logs.py <aster1...address>")
sys.exit(1)

async with AsterClient(address=sys.argv[1]) as client:
mc = client.proxy("MissionControl")

levels = ["info", "warn", "error"]
messages = ["disk 92% full", "health check failed", "cpu spike detected"]

i = 0
while True:
await mc.submitLog({
"timestamp": time.time(),
"level": levels[i % len(levels)],
"message": messages[i % len(messages)],
"agent_id": "edge-node-7",
})
print(f"submitted log #{i + 1}")
i += 1
await asyncio.sleep(1)

if __name__ == "__main__":
asyncio.run(main())
# Replace aster1Qm... with the address from control.py
python logs.py aster1Qm...

Tail the stream

In a third terminal, open the shell and start tailing. Replace aster1Qm... with the same address as before:

aster shell aster1Qm...
> cd services/MissionControl
> ./tailLogs agent_id="edge-node-7" level="warn"
#0 {"timestamp": 1712567890.1, "level": "warn", "message": "disk 92% full", ...}
#1 {"timestamp": 1712567891.3, "level": "error", "message": "health check failed", ...}
# Ctrl+C to stop

You should see entries scroll past in real time as logs.py submits them. Note the level="warn" filter excludes info entries, so you'll see roughly two out of every three submitted logs.

Three terminals? Yes — server (control.py), submitter (logs.py), and tail consumer (aster shell). That's the natural shape of any streaming demo: someone produces, someone consumes, the server brokers between them.

Or from your own code using the proxy client:

# Server-streaming methods are called via `.stream(...)` and iterated
# with `async for`. The plain `await mc.tailLogs({...})` form is for
# unary methods only — it will raise on a streaming RPC.
async for entry in mc.tailLogs.stream({"level": "warn"}):
print(entry)
Proxy method shapes

The proxy uses a different call form per RPC pattern, mirroring what each one actually does:

PatternPythonTypeScript
Unaryawait mc.getStatus({...})await mc.getStatus({...})
Server streamasync for x in mc.tailLogs.stream({...}):for await (const x of mc.tailLogs.stream({...}))
Client streamawait mc.ingestMetrics(async_iter)await mc.ingestMetrics(asyncIter)
Bidi streamch = mc.runCommand.bidi(); await ch.open(); ...const ch = mc.runCommand.bidi(); await ch.open(); ...

What just happened:

  • @server_stream / @ServerStream turns an async generator into a streaming RPC
  • The client receives items as they're yielded — no polling, no websockets
  • Under the hood: a single QUIC stream with Aster framing, flowing until either side closes it

Chapter 3: Metric Ingestion (5 min)

Goal: Agents push thousands of metric datapoints per second using client streaming.

This chapter adds two new wire types (MetricPoint, IngestResult) and one new method (ingestMetrics) to your control plane. Add the wire types alongside the existing types and the method inside MissionControl. The full updated file is at the end of the chapter if you'd rather copy-paste.

New wire types (add these alongside LogEntry, TailRequest, etc.):

from dataclasses import field

@wire_type("mission/MetricPoint")
@dataclass
class MetricPoint:
name: str = ""
value: float = 0.0
timestamp: float = 0.0
tags: dict = field(default_factory=dict)

@wire_type("mission/IngestResult")
@dataclass
class IngestResult:
accepted: int = 0
dropped: int = 0

New method on MissionControl (add inside the class, alongside getStatus/submitLog/tailLogs):

@client_stream()
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult:
"""Receive a stream of metric points from an agent."""
accepted = 0
async for point in stream:
# Store, forward, aggregate — whatever you need.
accepted += 1
return IngestResult(accepted=accepted)

Don't forget to import client_stream at the top:

from aster import AsterServer, service, rpc, server_stream, client_stream, wire_type

Restart control.py (Ctrl+C then python control.py) and copy the new address.

Save this as metrics.py in a second terminal to push 10,000 metric points:

# metrics.py — streams 10,000 metric points to MissionControl.
# Usage: python metrics.py <aster1...address>
import asyncio
import random
import sys
import time
from aster import AsterClient

async def main():
if len(sys.argv) < 2:
print("Usage: python metrics.py <aster1...address>")
sys.exit(1)

async with AsterClient(address=sys.argv[1]) as client:
mc = client.proxy("MissionControl")

async def metrics():
for i in range(10_000):
yield {
"name": "cpu.usage",
"value": random.random(),
"timestamp": time.time(),
}

result = await mc.ingestMetrics(metrics())
print(f"Accepted: {result['accepted']}")

if __name__ == "__main__":
asyncio.run(main())
# Replace aster1Qm... with the address from control.py
python metrics.py aster1Qm...
# → Accepted: 10000
Full control.py as of Chapter 3
# control.py
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from aster import (
AsterServer, service, rpc, server_stream, client_stream, wire_type,
)

# ─────────────── Wire types ───────────────

@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""

@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0

@wire_type("mission/LogEntry")
@dataclass
class LogEntry:
timestamp: float = 0.0
level: str = "info"
message: str = ""
agent_id: str = ""

@wire_type("mission/SubmitLogResult")
@dataclass
class SubmitLogResult:
accepted: bool = True

@wire_type("mission/TailRequest")
@dataclass
class TailRequest:
agent_id: str = ""
level: str = "info"

@wire_type("mission/MetricPoint")
@dataclass
class MetricPoint:
name: str = ""
value: float = 0.0
timestamp: float = 0.0
tags: dict = field(default_factory=dict)

@wire_type("mission/IngestResult")
@dataclass
class IngestResult:
accepted: int = 0
dropped: int = 0

# ─────────────── Helpers ───────────────

_LEVEL_ORDER = {"debug": 0, "info": 1, "warn": 2, "error": 3, "fatal": 4}

def _level_rank(level: str) -> int:
return _LEVEL_ORDER.get(level.lower(), 0)

# ─────────────── Service ───────────────

@service(name="MissionControl", version=1)
class MissionControl:
def __init__(self):
self._log_queue: asyncio.Queue[LogEntry] = asyncio.Queue()

@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(
agent_id=req.agent_id,
status="running",
uptime_secs=3600,
)

@rpc()
async def submitLog(self, entry: LogEntry) -> SubmitLogResult:
await self._log_queue.put(entry)
return SubmitLogResult(accepted=True)

@server_stream()
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]:
while True:
entry = await self._log_queue.get()
if req.agent_id and entry.agent_id != req.agent_id:
continue
if _level_rank(entry.level) < _level_rank(req.level):
continue
yield entry

@client_stream()
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult:
accepted = 0
async for point in stream:
accepted += 1
return IngestResult(accepted=accepted)

# ─────────────── Main ───────────────

async def main():
async with AsterServer(services=[MissionControl()]) as srv:
print(srv.address)
await srv.serve()

if __name__ == "__main__":
asyncio.run(main())

The proxy client discovers methods from the contract and sends plain objects over the wire. Great for scripting, prototyping, and generic gateways.

What just happened:

  • Client streaming sends many messages, gets one response at the end
  • The producer processes items as they arrive — no buffering the entire batch
  • The proxy client requires no type imports — it reads the contract from the producer and builds method stubs dynamically
  • This is how you'd build telemetry ingestion, log shipping, or bulk data upload

Chapter 4: Agent Sessions & Remote Commands (5 min)

Goal: Each agent gets its own session — register, heartbeat, and execute commands. This is where per-agent state and bidi streaming meet.

MissionControl is a shared service — one instance, all clients see the same state. But each agent needs its own identity, capabilities, and command channel. That's a session-scoped service.

This chapter adds a brand-new class — AgentSession — alongside MissionControl in the same control.py / control.ts file. Crucially, you'll also need to add AgentSession() to your server's services=[...] list, otherwise it won't be reachable. The full file is at the end of the chapter.

Add these wire types alongside the existing ones:

@wire_type("mission/Heartbeat")
@dataclass
class Heartbeat:
agent_id: str = ""
capabilities: list = field(default_factory=list) # ["gpu", "arm64", ...]
load_avg: float = 0.0

@wire_type("mission/Assignment")
@dataclass
class Assignment:
task_id: str = ""
command: str = ""

@wire_type("mission/Command")
@dataclass
class Command:
command: str = ""

@wire_type("mission/CommandResult")
@dataclass
class CommandResult:
stdout: str = ""
stderr: str = ""
exit_code: int = -1

Add this new class anywhere below MissionControl in control.py:

@service(name="AgentSession", version=1, scoped="session")
class AgentSession:
"""Session-scoped: one instance per connected agent."""

def __init__(self, peer: str | None = None):
self._peer = peer
self._agent_id = ""
self._capabilities = []

@rpc()
async def register(self, hb: Heartbeat) -> Assignment:
self._agent_id = hb.agent_id
self._capabilities = hb.capabilities
if "gpu" in hb.capabilities:
return Assignment(task_id="train-42", command="python train.py")
return Assignment(task_id="idle", command="sleep 60")

@bidi_stream()
async def runCommand(self, commands: AsyncIterator[Command]) -> AsyncIterator[CommandResult]:
"""Execute commands on this agent — stream in, results stream back."""
async for cmd in commands:
proc = await asyncio.create_subprocess_shell(
cmd.command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
yield CommandResult(
stdout=stdout.decode(),
stderr=stderr.decode(),
exit_code=proc.returncode,
)

Add bidi_stream to your imports and register the new service in main():

from aster import (
AsterServer, service, rpc, server_stream, client_stream, bidi_stream, wire_type,
)

# ...

async def main():
async with AsterServer(services=[MissionControl(), AgentSession()]) as srv:
# ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
# shared session-scoped
print(srv.address)
await srv.serve()

Restart control.py and copy the new address.

Now connect with the operator CLI. Replace aster1Qm... with the address your control.py / control.ts just printed:

# Operator connects and opens a session subshell:
aster shell aster1Qm...
> cd services
> session AgentSession
# prompt becomes "AgentSession~" — you're now in a dedicated session.
# State persists across calls; the same instance handles every method.
AgentSession~ register agent_id="edge-7" capabilities='["gpu"]'
{"agent_id": "edge-7", "task": "train-42"}
AgentSession~ runCommand command="df -h"
{"stdout": "Filesystem Size Used ...", "exit_code": 0}
AgentSession~ runCommand command="uptime"
{"stdout": " 14:32 up 3 days ...", "exit_code": 0}
AgentSession~ end

Why a session subshell? Session-scoped services hold per-connection state. If you tried ./runCommand directly from /services/AgentSession, the shell would open a new stream per call and tear down the state between them. The session command opens one persistent session and routes every method through it. The AgentSession~ prompt makes it obvious you're inside a stateful session. Type end to close it.

Full control.py as of Chapter 4 — for the TypeScript equivalent see the examples repo
# control.py
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from aster import (
AsterServer, service, rpc, server_stream, client_stream, bidi_stream, wire_type,
)

# ─────────────── Wire types ───────────────

@wire_type("mission/StatusRequest")
@dataclass
class StatusRequest:
agent_id: str = ""

@wire_type("mission/StatusResponse")
@dataclass
class StatusResponse:
agent_id: str = ""
status: str = "idle"
uptime_secs: int = 0

@wire_type("mission/LogEntry")
@dataclass
class LogEntry:
timestamp: float = 0.0
level: str = "info"
message: str = ""
agent_id: str = ""

@wire_type("mission/SubmitLogResult")
@dataclass
class SubmitLogResult:
accepted: bool = True

@wire_type("mission/TailRequest")
@dataclass
class TailRequest:
agent_id: str = ""
level: str = "info"

@wire_type("mission/MetricPoint")
@dataclass
class MetricPoint:
name: str = ""
value: float = 0.0
timestamp: float = 0.0
tags: dict = field(default_factory=dict)

@wire_type("mission/IngestResult")
@dataclass
class IngestResult:
accepted: int = 0
dropped: int = 0

@wire_type("mission/Heartbeat")
@dataclass
class Heartbeat:
agent_id: str = ""
capabilities: list = field(default_factory=list)
load_avg: float = 0.0

@wire_type("mission/Assignment")
@dataclass
class Assignment:
task_id: str = ""
command: str = ""

@wire_type("mission/Command")
@dataclass
class Command:
command: str = ""

@wire_type("mission/CommandResult")
@dataclass
class CommandResult:
stdout: str = ""
stderr: str = ""
exit_code: int = -1

# ─────────────── Helpers ───────────────

_LEVEL_ORDER = {"debug": 0, "info": 1, "warn": 2, "error": 3, "fatal": 4}

def _level_rank(level: str) -> int:
return _LEVEL_ORDER.get(level.lower(), 0)

# ─────────────── Services ───────────────

@service(name="MissionControl", version=1)
class MissionControl:
def __init__(self):
self._log_queue: asyncio.Queue[LogEntry] = asyncio.Queue()

@rpc()
async def getStatus(self, req: StatusRequest) -> StatusResponse:
return StatusResponse(agent_id=req.agent_id, status="running", uptime_secs=3600)

@rpc()
async def submitLog(self, entry: LogEntry) -> SubmitLogResult:
await self._log_queue.put(entry)
return SubmitLogResult(accepted=True)

@server_stream()
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]:
while True:
entry = await self._log_queue.get()
if req.agent_id and entry.agent_id != req.agent_id:
continue
if _level_rank(entry.level) < _level_rank(req.level):
continue
yield entry

@client_stream()
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult:
accepted = 0
async for point in stream:
accepted += 1
return IngestResult(accepted=accepted)

@service(name="AgentSession", version=1, scoped="session")
class AgentSession:
def __init__(self, peer: str | None = None):
self._peer = peer
self._agent_id = ""
self._capabilities = []

@rpc()
async def register(self, hb: Heartbeat) -> Assignment:
self._agent_id = hb.agent_id
self._capabilities = hb.capabilities
if "gpu" in hb.capabilities:
return Assignment(task_id="train-42", command="python train.py")
return Assignment(task_id="idle", command="sleep 60")

@bidi_stream()
async def runCommand(self, commands: AsyncIterator[Command]) -> AsyncIterator[CommandResult]:
async for cmd in commands:
proc = await asyncio.create_subprocess_shell(
cmd.command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
yield CommandResult(
stdout=stdout.decode(),
stderr=stderr.decode(),
exit_code=proc.returncode,
)

# ─────────────── Main ───────────────

async def main():
async with AsterServer(services=[MissionControl(), AgentSession()]) as srv:
print(srv.address)
await srv.serve()

if __name__ == "__main__":
asyncio.run(main())

What just happened:

  • scoped="session" creates a fresh AgentSession per connection — each agent gets its own identity, capabilities, and command channel
  • runCommand uses bidi streaming: commands flow in, results flow back, all on a single multiplexed QUIC stream
  • When the agent disconnects, the session is cleaned up automatically

Two service types, two different lifetimes:

  • MissionControl (shared) — fleet-wide: status, logs, metrics
  • AgentSession (session) — per-agent: register, heartbeat, commands

Chapter 5: Auth & Capabilities (5 min)

Goal: Not every caller should be able to deploy or run commands on agents. Define roles, compose requirements, and issue scoped credentials.

Up until now the system has run in open-gate mode. That's fine sometimes, but other times you need to put controls on who can access your service. In this step, we will enable authentication so our service will no longer be open to anyone who contacts our node — callers will have to prove they're authorized.

The auth flow has three steps:

  1. Define — declare which capabilities each method requires (in code)
  2. Issue — create credentials with specific capabilities (CLI)
  3. Connect — present the credential on connect; the framework enforces access

Step 1: Generate a root key

The root key is the trust anchor for your entire deployment. It identifies you personally as the owner of your deployment. Keep it offline — you'll use it to sign credentials, not to run services.

# One-time setup — generates an Ed25519 keypair
aster trust keygen --out-key ~/.aster/root.key

# Output:
# Root private key written to: ~/.aster/root.key
# Root public key written to: ~/.aster/root.pub
# Keep root.key secret. Share root.pub with nodes that need to verify credentials.

Step 2: Define roles in code

from enum import Enum
from aster import any_of
# Also available: `all_of(A, B)` — caller must have BOTH roles.

class Role(str, Enum):
STATUS = "ops.status"
LOGS = "ops.logs"
ADMIN = "ops.admin"
INGEST = "ops.ingest"
@service(name="MissionControl", version=1)
class MissionControl:

@rpc(requires=Role.STATUS)
async def getStatus(self, req: StatusRequest) -> StatusResponse: ...

@server_stream(requires=any_of(Role.LOGS, Role.ADMIN))
async def tailLogs(self, req: TailRequest) -> AsyncIterator[LogEntry]: ...

@client_stream(requires=Role.INGEST)
async def ingestMetrics(self, stream: AsyncIterator[MetricPoint]) -> IngestResult: ...

@service(name="AgentSession", version=1, scoped="session")
class AgentSession:

@rpc(requires=Role.INGEST)
async def register(self, hb: Heartbeat) -> Assignment: ...

@bidi_stream(requires=Role.ADMIN)
async def runCommand(self, commands: AsyncIterator[Command]) -> AsyncIterator[CommandResult]: ...

Step 3: Start the control plane with auth

Update your main() to attach a config that points at the root public key and turns off open admission. You'll also pass an identity file path so the server has a stable endpoint id (without one, every restart gives you a new id and breaks any credentials you've issued).

from aster import AsterConfig

async def main():
config = AsterConfig(
root_pubkey_file="~/.aster/root.pub", # owner's public key (yours)
allow_all_consumers=False,
)
async with AsterServer(
services=[MissionControl(), AgentSession()],
identity=".aster-identity", # stable endpoint identity
config=config,
) as srv:
print(srv.address)
await srv.serve()

Restart control.py / control.ts and copy the new address. Because you've now attached a stable .aster-identity file, this address will stay the same across restarts — and any credentials you issue against it will keep working.

Each Aster endpoint will have its own identity (secret key pair) and it will have the public key of its owner (you) so it knows who administers it.

No .aster-identity file?

Aster generates a fresh ephemeral keypair on startup. That's fine for experiments, but every restart gives you a new endpoint id — and any credentials you issued to the old one will stop working. Once you start enrolling peers, commit to a persistent identity file.

Step 4: Enroll agents

When you want to allow another endpoint connect to yours, you must give it permission. You do that by generating a credential for it and putting in it the roles that endpoint should have.

# Edge agent — status and ingest only
aster enroll node --role consumer --name "edge-node-7" \
--capabilities ops.status,ops.ingest \
--root-key ~/.aster/root.key \
--out edge-node-7.cred

aster enroll node will print a summary like this:

✓ Enrollment credential created

File: /home/you/work/edge-node-7.cred
Format: TOML (.aster-identity) with [node] + [[peers]] sections

Peer: edge-node-7
Role: consumer (policy)
Capabilities: ops.status,ops.ingest
Endpoint ID: 142179f10b7bc606...
Trust root: cd948e4c1456cdbe...
Expires: 2026-05-10T20:20:12+00:00

This file lets a consumer connect to your trusted-mode servers.
It contains a node identity (secret key) AND a signed enrollment
credential. The server validates the credential and grants the
capabilities listed below.

Use it:
aster shell <peer-addr> --rcan edge-node-7.cred
aster call <peer-addr> Service.method '<json>' --rcan edge-node-7.cred

⚠ Keep this file secret -- it is both an identity AND a credential.
What's in the file?

Despite the .cred extension, it's a regular .aster-identity TOML file with two sections:

  • [node] — the consumer's secret key + endpoint ID. Used by the QUIC layer to prove the consumer's identity.
  • [[peers]] — the signed enrollment credential. Presented to servers to claim capabilities.

Both sections live in the same file because they're paired: the server checks that the QUIC peer ID matches the credential's endpoint_id. If they don't match, admission fails.

# Ops team — full access including admin
aster enroll node --role consumer --name "ops-team" \
--capabilities ops.status,ops.logs,ops.admin,ops.ingest \
--root-key ~/.aster/root.key \
--out ops-team.cred
Quiet mode for scripts

Pass --quiet (or -q) to suppress the educational output. The command prints exactly one line: <path> <endpoint_id> <expires_iso> on success and exits non-zero on failure. Easy to parse from CI.

Step 5: Connect with credentials

Replace aster1Qm... in the snippets below with the address your control.py / control.ts printed in Step 3.

client = AsterClient(
address="aster1Qm...", # ← from control.py output
enrollment_credential_file="edge-node-7.cred",
)
await client.connect()
mc = client.proxy("MissionControl")

await mc.getStatus({"agent_id": "test"}) # ✓ has ops.status
await mc.ingestMetrics(...) # ✓ has ops.ingest
# await agent.runCommand(...) # ✗ AccessDenied — missing ops.admin
# Or from the CLI — the shell respects credentials too.
# Replace aster1Qm... with the address from control.py.
aster shell aster1Qm... --rcan ops-team.cred
> cd services
> session AgentSession # opens session subshell
AgentSession~ runCommand command="df" # ✓ ops-team has ops.admin

What just happened:

  • aster trust keygen created the root of trust — one command
  • aster enroll node issued scoped credentials — no CA infrastructure
  • requires= — Aster checks at the method level, no auth middleware to write
  • any_of(A, B) — caller needs at least one (log viewers OR admins can tail)
  • The edge agent can push metrics but can't run commands. The ops team can do both. That's the entire access control model — defined in code, enforced at the wire level

Chapter 6: Cross-Language Interop (5 min)

Goal: Your teammate uses a different language. They don't have your source code — just the server address.

Your control.py / control.ts from Chapter 4 (or 5 with auth) keeps running. The cross-language client below is a brand-new file in a different language — no shared source, no codegen, no .proto file.

A TypeScript teammate calls your Python control plane. Save this as ts-agent.ts (anywhere — it doesn't need to live in the same directory as your Python code) and run it in a separate terminal. Replace aster1Qm... with the address your control.py printed.

// ts-agent.ts — TypeScript client calling a Python control plane.
// Usage: bun run ts-agent.ts <aster1...address>
import { AsterClientWrapper } from '@aster-rpc/aster';

async function main() {
const address = process.argv[2];
if (!address) {
console.error("Usage: bun run ts-agent.ts <aster1...address>");
process.exit(1);
}

const client = new AsterClientWrapper({ address });
await client.connect();

const mc = client.proxy("MissionControl");
const status = await mc.getStatus({ agent_id: "ts-worker-1" });
console.log(`Status: ${status.agent_id} is ${status.status}`);

// Stream metrics from TypeScript to the Python control plane
const result = await mc.ingestMetrics(async function*() {
for (let i = 0; i < 1000; i++) {
yield { name: "gpu.temp", value: 72 + Math.random() * 10 };
}
}());
console.log(`Accepted: ${result.accepted}`);

await client.close();
}

main();
bun run ts-agent.ts aster1Qm...

What just happened:

  • Your teammate never saw your source code
  • The proxy client discovered the contract on connect and built method stubs dynamically — full RPC, no codegen required
  • Same wire format, same contract hash — producer and consumer agree on the protocol without sharing a repo

"But there's no .proto file — how does the other language know what you sent?" — The @wire_type / @WireType decorator registers each type's schema in Aster's content-addressed contract. The contract is published with the service and discovered on connect. The contract is the shared schema — you just never had to write it by hand.


What's Next?

You just built a working control plane with four RPC patterns, session-scoped agents, capability-based auth, and cross-language interop. That's a real system — not a demo.

Next guides in the series:

  • Hardening for Production — interceptors for retry, circuit-breaking, rate limiting, and deadlines
  • Scaling Out — multiple producers with automatic fail-over
  • Artifact Distribution — push builds and model weights to agents with content-addressed blobs
  • Shared Fleet State — CRDT documents that sync across your fleet

The full source for this example is in examples/python/mission_control/ and examples/typescript/missionControl/.