Define a Service
This guide covers how to define an Aster RPC service: its request/response types, methods, streaming patterns, and configuration options. The concepts are shared across all Aster language bindings. Code examples are Python-first, with other languages to follow.
Wire types
Every request and response type needs a stable wire identity for cross-language serialization. Define your types as dataclasses and tag them with @wire_type:
- Python
- TypeScript
from dataclasses import dataclass
from aster import wire_type
@wire_type("myapp/TaskRequest")
@dataclass
class TaskRequest:
task_id: str = ""
priority: int = 0
@wire_type("myapp/TaskResult")
@dataclass
class TaskResult:
task_id: str = ""
status: str = ""
output: str = ""
import { WireType } from '@aster-rpc/aster';
@WireType("myapp/TaskRequest")
class TaskRequest {
taskId = "";
priority = 0;
constructor(init?: Partial<TaskRequest>) { if (init) Object.assign(this, init); }
}
@WireType("myapp/TaskResult")
class TaskResult {
taskId = "";
status = "";
output = "";
constructor(init?: Partial<TaskResult>) { if (init) Object.assign(this, init); }
}
The tag string (e.g. "myapp/TaskRequest") is the canonical wire identity. It must be unique within your deployment and stable across releases. The format is namespace/TypeName, where the namespace is typically your package or domain.
Auto-tagging vs explicit @wire_type
If you omit @wire_type, the @service decorator auto-derives a tag from the module and class name at decoration time (e.g. myapp.tasks.TaskRequest). This works for development, but auto-tags are fragile: renaming a module or moving a class changes the wire identity and breaks existing consumers.
Production recommendation: Always use explicit @wire_type tags. The @service decorator emits a warning when it auto-tags a type.
The @service decorator
The @service decorator marks a class as an Aster RPC service. It scans the class for decorated methods, extracts type information from signatures, and validates wire types.
Calling forms
- Python
- TypeScript
from aster import service
# Bare -- service name defaults to class name
@service
class TaskService:
...
# Explicit name
@service("TaskEngine")
class TaskService:
...
# With options
@service(name="TaskEngine", version=2)
class TaskService:
...
import { Service } from '@aster-rpc/aster';
// With options (name is always explicit in TypeScript)
@Service({ name: "TaskEngine", version: 2 })
class TaskService {
// ...methods
}
// Minimal — version defaults to 1
@Service({ name: "TaskEngine" })
class TaskService {
// ...methods
}
Service options
| Option | Python | TypeScript | Default | Description |
|---|---|---|---|---|
name | str | string | Class name | Service name used in the wire protocol and discovery. |
version | int | number | 1 | Service version. Distinct versions are separate services. |
serialization | SerializationMode or list | SerializationMode[] | [XLANG] | Supported serialization modes. |
scoped | str | 'shared' | 'session' | "shared" | "shared" for singleton, "session" for per-connection instances (class __init__ receives a peer argument). |
requires | CapabilityRequirement | CapabilityRequirement | None / undefined | Capability required to call any method on this service. |
RPC methods
Unary (@rpc / @Rpc)
A unary method takes one request and returns one response:
- Python
- TypeScript
from aster import service, rpc
@service
class TaskService:
@rpc
async def assign_task(self, req: TaskRequest) -> TaskResult:
return TaskResult(task_id=req.task_id, status="assigned")
@rpc supports both bare (@rpc) and parameterized (@rpc(...)) syntax.
import { Service, Rpc } from '@aster-rpc/aster';
@Service({ name: "TaskService" })
class TaskService {
@Rpc()
async assignTask(req: TaskRequest): Promise<TaskResult> {
return new TaskResult({ taskId: req.taskId, status: "assigned" });
}
}
@Rpc() accepts optional { timeout, idempotent, requires } options. The scanner reads request/response types from the AST — run npx aster-gen before starting your server. See TypeScript Build Setup.
Server streaming (@server_stream / @ServerStream)
The server sends a stream of responses for a single request. The method must be an async generator:
- Python
- TypeScript
from typing import AsyncIterator
@service
class TaskService:
@server_stream
async def watch_tasks(self, req: WatchRequest) -> AsyncIterator[TaskUpdate]:
for task in get_pending_tasks():
yield TaskUpdate(task_id=task.id, status=task.status)
@Service({ name: "TaskService" })
class TaskService {
@ServerStream()
async *watchTasks(req: WatchRequest): AsyncGenerator<TaskUpdate> {
for (const task of getPendingTasks()) {
yield new TaskUpdate({ taskId: task.id, status: task.status });
}
}
}
Client streaming (@client_stream / @ClientStream)
The client sends a stream of requests; the server returns a single response:
- Python
- TypeScript
@service
class TaskService:
@client_stream
async def upload_artifacts(self, stream: AsyncIterator[ArtifactChunk]) -> UploadResult:
total_bytes = 0
async for chunk in stream:
total_bytes += len(chunk.data)
return UploadResult(total_bytes=total_bytes)
@Service({ name: "TaskService" })
class TaskService {
@ClientStream()
async uploadArtifacts(stream: AsyncIterable<ArtifactChunk>): Promise<UploadResult> {
let totalBytes = 0;
for await (const chunk of stream) {
totalBytes += chunk.data.length;
}
return new UploadResult({ totalBytes });
}
}
Bidirectional streaming (@bidi_stream / @BidiStream)
Both sides stream concurrently. The method receives an async iterator and yields responses:
- Python
- TypeScript
@service
class TaskService:
@bidi_stream
async def interactive_review(
self, requests: AsyncIterator[ReviewItem]
) -> AsyncIterator[ReviewResult]:
async for item in requests:
yield ReviewResult(item_id=item.id, approved=True)
@Service({ name: "TaskService" })
class TaskService {
@BidiStream()
async *interactiveReview(
requests: AsyncIterable<ReviewItem>
): AsyncGenerator<ReviewResult> {
for await (const item of requests) {
yield new ReviewResult({ itemId: item.id, approved: true });
}
}
}
Method options
All method decorators accept these keyword arguments:
| Option | Type | Default | Description |
|---|---|---|---|
timeout | float or None | None | Default timeout in seconds for this method. |
idempotent | bool | False | Marks the method as safe to retry. Enables automatic retry by the client. |
serialization | SerializationMode or None | None | Override the service-level serialization for this method. |
requires | CapabilityRequirement or None | None | Capability required to call this specific method. |
@rpc(timeout=30.0, idempotent=True)
async def get_task(self, req: TaskRequest) -> TaskResult:
...
Serialization modes
Aster supports three serialization modes, configurable at the service or method level:
| Mode | Description | Use case |
|---|---|---|
XLANG | Cross-language Apache Fory serialization with @wire_type tags. | Default. Required for cross-language interop (Python ↔ TypeScript ↔ Java ↔ Go). |
NATIVE | Language-native Fory serialization. No @wire_type required. | Deployments where interoperability is second to performance. |
JSON | Human-readable JSON on the wire. | Debuggability (tcpdump / wireshark show readable payloads), shell exploration via the dynamic proxy client, and any service that wants to be consumable without a typed client. |
from aster import service, rpc, SerializationMode
# Accept both the high-performance native mode AND the debuggable JSON mode
# on the same service. Clients pick one at call time.
@service(serialization=[SerializationMode.NATIVE, SerializationMode.JSON])
class TaskService:
@rpc
async def get_task(self, req: TaskRequest) -> TaskResult:
...
Note: the dynamic proxy client (
client.proxy("ServiceName")) always speaks JSON. If you want the proxy to work against a service, includeSerializationMode.JSONin its supported modes, or use a typed client for NATIVE/XLANG-only services.
Session-scoped services
By default, services are shared: one instance handles all connections. Session-scoped services create a new instance per connection, allowing per-connection state.
@service("ChatRoom", scoped="session")
class ChatRoomService:
def __init__(self, peer: str):
self.peer = peer
self.nickname = f"anon-{peer[:8]}"
self.messages: list[str] = []
@rpc
async def set_nickname(self, req: SetNicknameRequest) -> SetNicknameResponse:
self.nickname = req.nickname
return SetNicknameResponse(greeting=f"Welcome, {req.nickname}!")
@rpc
async def send_message(self, req: SendMessageRequest) -> SendMessageResponse:
self.messages.append(req.text)
return SendMessageResponse(echo=f"[{self.nickname}] {req.text}")
async def on_session_close(self) -> None:
"""Called when the session stream ends."""
print(f"Session closed for {self.nickname}: {len(self.messages)} messages")
Requirements for session-scoped services:
__init__must accept apeerpositional parameter (the remote peer's endpoint id as a hex string). The framework callscls(peer)when a new session opens.- Instance attributes persist across all RPC calls within the session.
on_session_close()is called when the client closes the session or disconnects.- Session-scoped services are multiplexed over a single bidirectional QUIC stream per consumer -- one session, one stream, multiple method calls.
Full working example
from dataclasses import dataclass
from typing import AsyncIterator
from aster import (
service, rpc, server_stream, client_stream, bidi_stream,
wire_type, SerializationMode,
)
# --- Wire types ---
@wire_type("example.tasks/TaskRequest")
@dataclass
class TaskRequest:
task_id: str = ""
priority: int = 0
@wire_type("example.tasks/TaskResult")
@dataclass
class TaskResult:
task_id: str = ""
status: str = ""
@wire_type("example.tasks/WatchRequest")
@dataclass
class WatchRequest:
filter: str = ""
@wire_type("example.tasks/TaskUpdate")
@dataclass
class TaskUpdate:
task_id: str = ""
status: str = ""
@wire_type("example.tasks/LogEntry")
@dataclass
class LogEntry:
line: str = ""
@wire_type("example.tasks/IngestResult")
@dataclass
class IngestResult:
count: int = 0
@wire_type("example.tasks/Command")
@dataclass
class Command:
action: str = ""
@wire_type("example.tasks/CommandResult")
@dataclass
class CommandResult:
output: str = ""
# --- Service ---
@service("TaskEngine", version=1)
class TaskService:
@rpc(timeout=10.0, idempotent=True)
async def get_task(self, req: TaskRequest) -> TaskResult:
return TaskResult(task_id=req.task_id, status="running")
@server_stream
async def watch_tasks(self, req: WatchRequest) -> AsyncIterator[TaskUpdate]:
for i in range(5):
yield TaskUpdate(task_id=f"task-{i}", status="active")
@client_stream
async def ingest_logs(self, stream: AsyncIterator[LogEntry]) -> IngestResult:
count = 0
async for entry in stream:
count += 1
return IngestResult(count=count)
@bidi_stream
async def execute(
self, commands: AsyncIterator[Command]
) -> AsyncIterator[CommandResult]:
async for cmd in commands:
yield CommandResult(output=f"executed: {cmd.action}")
This service can be hosted with AsterServer(services=[TaskService()]) and consumed via AsterClient. See Hello Service for a runnable end-to-end example.