Define a Service
This guide covers how to define an Aster RPC service: its request/response types, methods, streaming patterns, and configuration options. The concepts are shared across all Aster language bindings. Code examples are Python-first, with other languages to follow.
Wire types
Every request and response type needs a stable wire identity for cross-language serialization. Define your types as dataclasses and tag them with @wire_type:
- Python
- Rust
- JVM
- .NET
- Go
- JavaScript
from dataclasses import dataclass
from aster.codec import wire_type
@wire_type("myapp/TaskRequest")
@dataclass
class TaskRequest:
task_id: str = ""
priority: int = 0
@wire_type("myapp/TaskResult")
@dataclass
class TaskResult:
task_id: str = ""
status: str = ""
output: str = ""
// Rust binding planned — wire_type equivalent will use a derive macro.
// JVM binding planned — wire_type equivalent will use an annotation.
The tag string (e.g. "myapp/TaskRequest") is the canonical wire identity. It must be unique within your deployment and stable across releases. The format is namespace/TypeName, where the namespace is typically your package or domain.
Auto-tagging vs explicit @wire_type
If you omit @wire_type, the @service decorator auto-derives a tag from the module and class name at decoration time (e.g. myapp.tasks.TaskRequest). This works for development, but auto-tags are fragile: renaming a module or moving a class changes the wire identity and breaks existing consumers.
Production recommendation: Always use explicit @wire_type tags. The @service decorator emits a warning when it auto-tags a type.
The @service decorator
The @service decorator marks a class as an Aster RPC service. It scans the class for decorated methods, extracts type information from signatures, and validates wire types.
Calling forms
- Python
- Rust
- JVM
- .NET
- Go
- JavaScript
from aster.decorators import service
# Bare — service name defaults to class name
@service
class TaskService:
...
# Explicit name
@service("TaskEngine")
class TaskService:
...
# With options
@service(name="TaskEngine", version=2)
class TaskService:
...
// Rust: will use a derive macro, e.g. #[aster::service(name = "TaskEngine")]
// JVM: will use annotations, e.g. @AsterService(name = "TaskEngine")
Service options
| Option | Type | Default | Description |
|---|---|---|---|
name | str | Class name | Service name used in the wire protocol and discovery. |
version | int | 1 | Service version. Distinct versions are separate services. |
serialization | SerializationMode or list | [XLANG] | Supported serialization modes. |
scoped | str | "shared" | "shared" for singleton, "stream" for per-connection instances. |
interceptors | list[type] | [] | Interceptor classes applied to all methods in this service. |
max_concurrent_streams | int or None | None | Limit on concurrent streams for this service. |
requires | CapabilityRequirement or None | None | Capability required to call any method on this service. |
RPC methods
Unary (@rpc)
A unary method takes one request and returns one response:
from aster.decorators import service, rpc
@service
class TaskService:
@rpc
async def assign_task(self, req: TaskRequest) -> TaskResult:
return TaskResult(task_id=req.task_id, status="assigned")
@rpc supports both bare (@rpc) and parameterized (@rpc(...)) syntax.
Server streaming (@server_stream)
The server sends a stream of responses for a single request. The method must be an async generator:
from typing import AsyncIterator
@service
class TaskService:
@server_stream
async def watch_tasks(self, req: WatchRequest) -> AsyncIterator[TaskUpdate]:
for task in get_pending_tasks():
yield TaskUpdate(task_id=task.id, status=task.status)
Client streaming (@client_stream)
The client sends a stream of requests; the server returns a single response:
@service
class TaskService:
@client_stream
async def upload_artifacts(self, stream: AsyncIterator[ArtifactChunk]) -> UploadResult:
total_bytes = 0
async for chunk in stream:
total_bytes += len(chunk.data)
return UploadResult(total_bytes=total_bytes)
Bidirectional streaming (@bidi_stream)
Both sides stream concurrently. The method receives an async iterator and yields responses:
@service
class TaskService:
@bidi_stream
async def interactive_review(
self, requests: AsyncIterator[ReviewItem]
) -> AsyncIterator[ReviewResult]:
async for item in requests:
yield ReviewResult(item_id=item.id, approved=True)
Method options
All method decorators accept these keyword arguments:
| Option | Type | Default | Description |
|---|---|---|---|
timeout | float or None | None | Default timeout in seconds for this method. |
idempotent | bool | False | Marks the method as safe to retry. Enables automatic retry by the client. |
serialization | SerializationMode or None | None | Override the service-level serialization for this method. |
requires | CapabilityRequirement or None | None | Capability required to call this specific method. |
@rpc(timeout=30.0, idempotent=True)
async def get_task(self, req: TaskRequest) -> TaskResult:
...
Serialization modes
Aster supports three serialization modes, configurable at the service or method level:
| Mode | Description | Use case |
|---|---|---|
XLANG | Cross-language Fory serialization with @wire_type tags. | Default. Required for cross-language interop. |
NATIVE | Python-native Fory serialization. No @wire_type required. | Single-language deployments where performance matters. |
ROW | Row-oriented format with zero-copy random-access fields. | Data-heavy workloads, analytics pipelines. |
from aster.types import SerializationMode
@service(serialization=[SerializationMode.XLANG, SerializationMode.NATIVE])
class AnalyticsService:
@rpc(serialization=SerializationMode.ROW)
async def query(self, req: QueryRequest) -> QueryResult:
...
Session-scoped services
By default, services are shared: one instance handles all connections. Session-scoped services create a new instance per QUIC stream, allowing per-connection state.
@service("ChatRoom", scoped="stream")
class ChatRoomService:
def __init__(self, peer: str | None = None):
self.peer = peer or "unknown"
self.nickname = f"anon-{self.peer[:8]}"
self.messages: list[str] = []
@rpc
async def set_nickname(self, req: SetNicknameRequest) -> SetNicknameResponse:
self.nickname = req.nickname
return SetNicknameResponse(greeting=f"Welcome, {req.nickname}!")
@rpc
async def send_message(self, req: SendMessageRequest) -> SendMessageResponse:
self.messages.append(req.text)
return SendMessageResponse(echo=f"[{self.nickname}] {req.text}")
async def on_session_close(self) -> None:
"""Called when the session stream ends."""
print(f"Session closed for {self.nickname}: {len(self.messages)} messages")
Requirements for session-scoped services:
__init__must accept apeerparameter (the remote peer's endpoint ID as a hex string).- Instance attributes persist across all RPC calls within the session.
on_session_close()is called when the client closes the session or disconnects.
Full working example
from dataclasses import dataclass
from typing import AsyncIterator
from aster.codec import wire_type
from aster.decorators import service, rpc, server_stream, client_stream, bidi_stream
from aster.types import SerializationMode
# --- Wire types ---
@wire_type("example.tasks/TaskRequest")
@dataclass
class TaskRequest:
task_id: str = ""
priority: int = 0
@wire_type("example.tasks/TaskResult")
@dataclass
class TaskResult:
task_id: str = ""
status: str = ""
@wire_type("example.tasks/WatchRequest")
@dataclass
class WatchRequest:
filter: str = ""
@wire_type("example.tasks/TaskUpdate")
@dataclass
class TaskUpdate:
task_id: str = ""
status: str = ""
@wire_type("example.tasks/LogEntry")
@dataclass
class LogEntry:
line: str = ""
@wire_type("example.tasks/IngestResult")
@dataclass
class IngestResult:
count: int = 0
@wire_type("example.tasks/Command")
@dataclass
class Command:
action: str = ""
@wire_type("example.tasks/CommandResult")
@dataclass
class CommandResult:
output: str = ""
# --- Service ---
@service("TaskEngine", version=1)
class TaskService:
@rpc(timeout=10.0, idempotent=True)
async def get_task(self, req: TaskRequest) -> TaskResult:
return TaskResult(task_id=req.task_id, status="running")
@server_stream
async def watch_tasks(self, req: WatchRequest) -> AsyncIterator[TaskUpdate]:
for i in range(5):
yield TaskUpdate(task_id=f"task-{i}", status="active")
@client_stream
async def ingest_logs(self, stream: AsyncIterator[LogEntry]) -> IngestResult:
count = 0
async for entry in stream:
count += 1
return IngestResult(count=count)
@bidi_stream
async def execute(
self, commands: AsyncIterator[Command]
) -> AsyncIterator[CommandResult]:
async for cmd in commands:
yield CommandResult(output=f"executed: {cmd.action}")
This service can be hosted with AsterServer(services=[TaskService()]) and consumed via AsterClient. See Hello Service for a runnable end-to-end example.