Skip to main content

Define a Service

This guide covers how to define an Aster RPC service: its request/response types, methods, streaming patterns, and configuration options. The concepts are shared across all Aster language bindings. Code examples are Python-first, with other languages to follow.

Wire types

Every request and response type needs a stable wire identity for cross-language serialization. Define your types as dataclasses and tag them with @wire_type:

from dataclasses import dataclass
from aster.codec import wire_type

@wire_type("myapp/TaskRequest")
@dataclass
class TaskRequest:
task_id: str = ""
priority: int = 0

@wire_type("myapp/TaskResult")
@dataclass
class TaskResult:
task_id: str = ""
status: str = ""
output: str = ""

The tag string (e.g. "myapp/TaskRequest") is the canonical wire identity. It must be unique within your deployment and stable across releases. The format is namespace/TypeName, where the namespace is typically your package or domain.

Auto-tagging vs explicit @wire_type

If you omit @wire_type, the @service decorator auto-derives a tag from the module and class name at decoration time (e.g. myapp.tasks.TaskRequest). This works for development, but auto-tags are fragile: renaming a module or moving a class changes the wire identity and breaks existing consumers.

Production recommendation: Always use explicit @wire_type tags. The @service decorator emits a warning when it auto-tags a type.

The @service decorator

The @service decorator marks a class as an Aster RPC service. It scans the class for decorated methods, extracts type information from signatures, and validates wire types.

Calling forms

from aster.decorators import service

# Bare — service name defaults to class name
@service
class TaskService:
...

# Explicit name
@service("TaskEngine")
class TaskService:
...

# With options
@service(name="TaskEngine", version=2)
class TaskService:
...

Service options

OptionTypeDefaultDescription
namestrClass nameService name used in the wire protocol and discovery.
versionint1Service version. Distinct versions are separate services.
serializationSerializationMode or list[XLANG]Supported serialization modes.
scopedstr"shared""shared" for singleton, "stream" for per-connection instances.
interceptorslist[type][]Interceptor classes applied to all methods in this service.
max_concurrent_streamsint or NoneNoneLimit on concurrent streams for this service.
requiresCapabilityRequirement or NoneNoneCapability required to call any method on this service.

RPC methods

Unary (@rpc)

A unary method takes one request and returns one response:

from aster.decorators import service, rpc

@service
class TaskService:

@rpc
async def assign_task(self, req: TaskRequest) -> TaskResult:
return TaskResult(task_id=req.task_id, status="assigned")

@rpc supports both bare (@rpc) and parameterized (@rpc(...)) syntax.

Server streaming (@server_stream)

The server sends a stream of responses for a single request. The method must be an async generator:

from typing import AsyncIterator

@service
class TaskService:

@server_stream
async def watch_tasks(self, req: WatchRequest) -> AsyncIterator[TaskUpdate]:
for task in get_pending_tasks():
yield TaskUpdate(task_id=task.id, status=task.status)

Client streaming (@client_stream)

The client sends a stream of requests; the server returns a single response:

@service
class TaskService:

@client_stream
async def upload_artifacts(self, stream: AsyncIterator[ArtifactChunk]) -> UploadResult:
total_bytes = 0
async for chunk in stream:
total_bytes += len(chunk.data)
return UploadResult(total_bytes=total_bytes)

Bidirectional streaming (@bidi_stream)

Both sides stream concurrently. The method receives an async iterator and yields responses:

@service
class TaskService:

@bidi_stream
async def interactive_review(
self, requests: AsyncIterator[ReviewItem]
) -> AsyncIterator[ReviewResult]:
async for item in requests:
yield ReviewResult(item_id=item.id, approved=True)

Method options

All method decorators accept these keyword arguments:

OptionTypeDefaultDescription
timeoutfloat or NoneNoneDefault timeout in seconds for this method.
idempotentboolFalseMarks the method as safe to retry. Enables automatic retry by the client.
serializationSerializationMode or NoneNoneOverride the service-level serialization for this method.
requiresCapabilityRequirement or NoneNoneCapability required to call this specific method.
@rpc(timeout=30.0, idempotent=True)
async def get_task(self, req: TaskRequest) -> TaskResult:
...

Serialization modes

Aster supports three serialization modes, configurable at the service or method level:

ModeDescriptionUse case
XLANGCross-language Fory serialization with @wire_type tags.Default. Required for cross-language interop.
NATIVEPython-native Fory serialization. No @wire_type required.Single-language deployments where performance matters.
ROWRow-oriented format with zero-copy random-access fields.Data-heavy workloads, analytics pipelines.
from aster.types import SerializationMode

@service(serialization=[SerializationMode.XLANG, SerializationMode.NATIVE])
class AnalyticsService:

@rpc(serialization=SerializationMode.ROW)
async def query(self, req: QueryRequest) -> QueryResult:
...

Session-scoped services

By default, services are shared: one instance handles all connections. Session-scoped services create a new instance per QUIC stream, allowing per-connection state.

@service("ChatRoom", scoped="stream")
class ChatRoomService:

def __init__(self, peer: str | None = None):
self.peer = peer or "unknown"
self.nickname = f"anon-{self.peer[:8]}"
self.messages: list[str] = []

@rpc
async def set_nickname(self, req: SetNicknameRequest) -> SetNicknameResponse:
self.nickname = req.nickname
return SetNicknameResponse(greeting=f"Welcome, {req.nickname}!")

@rpc
async def send_message(self, req: SendMessageRequest) -> SendMessageResponse:
self.messages.append(req.text)
return SendMessageResponse(echo=f"[{self.nickname}] {req.text}")

async def on_session_close(self) -> None:
"""Called when the session stream ends."""
print(f"Session closed for {self.nickname}: {len(self.messages)} messages")

Requirements for session-scoped services:

  • __init__ must accept a peer parameter (the remote peer's endpoint ID as a hex string).
  • Instance attributes persist across all RPC calls within the session.
  • on_session_close() is called when the client closes the session or disconnects.

Full working example

from dataclasses import dataclass
from typing import AsyncIterator

from aster.codec import wire_type
from aster.decorators import service, rpc, server_stream, client_stream, bidi_stream
from aster.types import SerializationMode

# --- Wire types ---

@wire_type("example.tasks/TaskRequest")
@dataclass
class TaskRequest:
task_id: str = ""
priority: int = 0

@wire_type("example.tasks/TaskResult")
@dataclass
class TaskResult:
task_id: str = ""
status: str = ""

@wire_type("example.tasks/WatchRequest")
@dataclass
class WatchRequest:
filter: str = ""

@wire_type("example.tasks/TaskUpdate")
@dataclass
class TaskUpdate:
task_id: str = ""
status: str = ""

@wire_type("example.tasks/LogEntry")
@dataclass
class LogEntry:
line: str = ""

@wire_type("example.tasks/IngestResult")
@dataclass
class IngestResult:
count: int = 0

@wire_type("example.tasks/Command")
@dataclass
class Command:
action: str = ""

@wire_type("example.tasks/CommandResult")
@dataclass
class CommandResult:
output: str = ""

# --- Service ---

@service("TaskEngine", version=1)
class TaskService:

@rpc(timeout=10.0, idempotent=True)
async def get_task(self, req: TaskRequest) -> TaskResult:
return TaskResult(task_id=req.task_id, status="running")

@server_stream
async def watch_tasks(self, req: WatchRequest) -> AsyncIterator[TaskUpdate]:
for i in range(5):
yield TaskUpdate(task_id=f"task-{i}", status="active")

@client_stream
async def ingest_logs(self, stream: AsyncIterator[LogEntry]) -> IngestResult:
count = 0
async for entry in stream:
count += 1
return IngestResult(count=count)

@bidi_stream
async def execute(
self, commands: AsyncIterator[Command]
) -> AsyncIterator[CommandResult]:
async for cmd in commands:
yield CommandResult(output=f"executed: {cmd.action}")

This service can be hosted with AsterServer(services=[TaskService()]) and consumed via AsterClient. See Hello Service for a runnable end-to-end example.