Service Base Classes
TrustGraph provides specialized service base classes that extend FlowProcessor
to remove boilerplate code for common service patterns. These classes handle the standard request/response flows and error handling, allowing you to focus on implementing the core business logic.
Overview
Each service base class follows a consistent pattern:
- Extends
FlowProcessor
with pre-configured specifications - Handles message routing and error responses automatically
- Requires you to implement only the core business logic method
- Includes built-in metrics and monitoring
- Supports concurrency where appropriate
Available Service Base Classes
AgentService
Purpose: Multi-step agent processing with thought/action loops
Key Features:
- Supports agent workflows with multiple response types
- Handles
next
actions for chained processing - Built-in error handling for agent-specific errors
Implementation Required:
from trustgraph.base import AgentService
class YourAgentService(AgentService):
async def agent_request(self, request, respond, next, flow):
"""
Process agent requests with multiple possible responses
Args:
request: AgentRequest object
respond: Function to send final response
next: Function to send next action in chain
flow: Flow context for accessing other services
"""
# Your agent logic here
# Send intermediate response
await respond(AgentResponse(
thought="I need to analyze this...",
observation=None,
answer=None
))
# Or send next action
await next(AgentRequest(
operation="next-step",
parameters={"data": "processed"}
))
Message Flow:
- Input:
AgentRequest
on “request” topic - Outputs:
AgentResponse
on “response” topic,AgentRequest
on “next” topic
EmbeddingsService
Purpose: Text-to-vector embedding generation
Key Features:
- Configurable concurrency for parallel processing
- Built-in metrics for processing times
- Rate limiting support
Implementation Required:
from trustgraph.base import EmbeddingsService
class YourEmbeddingsService(EmbeddingsService):
async def on_embeddings(self, text):
"""
Generate embeddings for input text
Args:
text: List of strings to embed
Returns:
List of vectors (list of floats)
"""
# Your embedding logic here
vectors = await self.generate_embeddings(text)
return vectors
Message Flow:
- Input:
EmbeddingsRequest
on “request” topic - Output:
EmbeddingsResponse
on “response” topic
LlmService
Purpose: Large Language Model text completion
Key Features:
- Configurable concurrency
- Built-in timing metrics with detailed buckets
- Token usage tracking
- Rate limiting support
Implementation Required:
from trustgraph.base import LlmService, LlmResult
class YourLlmService(LlmService):
async def generate_content(self, system, prompt):
"""
Generate text completion using LLM
Args:
system: System prompt/instructions
prompt: User prompt
Returns:
LlmResult object with text, token counts, and model info
"""
# Your LLM logic here
response_text = await self.call_llm(system, prompt)
return LlmResult(
text=response_text,
in_token=len(prompt.split()), # Approximate
out_token=len(response_text.split()), # Approximate
model="your-model-name"
)
Message Flow:
- Input:
TextCompletionRequest
on “request” topic - Output:
TextCompletionResponse
on “response” topic
ToolService
Purpose: Tool invocation and execution
Key Features:
- JSON parameter parsing
- Support for string and object responses
- Built-in invocation metrics
- Configurable concurrency
Implementation Required:
from trustgraph.base import ToolService
class YourToolService(ToolService):
async def invoke_tool(self, name, parameters):
"""
Invoke a tool with given parameters
Args:
name: Tool name to invoke
parameters: Dict of tool parameters
Returns:
String response or object that can be JSON serialized
"""
# Your tool logic here
if name == "calculator":
return parameters["a"] + parameters["b"]
elif name == "search":
return {"results": ["result1", "result2"]}
else:
raise ValueError(f"Unknown tool: {name}")
Message Flow:
- Input:
ToolRequest
on “request” topic - Output:
ToolResponse
on “response” topic
Query Services
TriplesQueryService
Purpose: RDF triple querying and retrieval
Implementation Required:
from trustgraph.base import TriplesQueryService
class YourTriplesQueryService(TriplesQueryService):
async def query_triples(self, request):
"""
Query triples matching the request pattern
Args:
request: TriplesQueryRequest with subject, predicate, object patterns
Returns:
List of Triple objects matching the query
"""
# Your triple query logic here
matching_triples = await self.search_triples(
request.subject, request.predicate, request.object
)
return matching_triples
GraphEmbeddingsQueryService
Purpose: Graph entity embedding queries
Implementation Required:
from trustgraph.base import GraphEmbeddingsQueryService
class YourGraphEmbeddingsQueryService(GraphEmbeddingsQueryService):
async def query_graph_embeddings(self, request):
"""
Query graph embeddings by vector similarity
Args:
request: GraphEmbeddingsRequest with query vectors
Returns:
List of entities with similarity scores
"""
# Your graph embedding query logic here
similar_entities = await self.find_similar_entities(
request.vectors, request.limit
)
return similar_entities
DocumentEmbeddingsQueryService
Purpose: Document embedding queries
Implementation Required:
from trustgraph.base import DocumentEmbeddingsQueryService
class YourDocumentEmbeddingsQueryService(DocumentEmbeddingsQueryService):
async def query_document_embeddings(self, request):
"""
Query document embeddings by vector similarity
Args:
request: DocumentEmbeddingsRequest with query vectors
Returns:
List of documents with similarity scores
"""
# Your document embedding query logic here
similar_docs = await self.find_similar_documents(
request.vectors, request.limit
)
return similar_docs
Store Services
TriplesStoreService
Purpose: RDF triple storage
Implementation Required:
from trustgraph.base import TriplesStoreService
class YourTriplesStoreService(TriplesStoreService):
async def store_triples(self, triples):
"""
Store triples in the graph database
Args:
triples: Triples object containing list of Triple objects
"""
# Your triple storage logic here
await self.save_to_database(triples.triples)
GraphEmbeddingsStoreService
Purpose: Graph entity embedding storage
Implementation Required:
from trustgraph.base import GraphEmbeddingsStoreService
class YourGraphEmbeddingsStoreService(GraphEmbeddingsStoreService):
async def store_graph_embeddings(self, embeddings):
"""
Store graph embeddings in the vector database
Args:
embeddings: GraphEmbeddings object with entity vectors
"""
# Your graph embedding storage logic here
await self.save_embeddings(embeddings.entities)
DocumentEmbeddingsStoreService
Purpose: Document embedding storage
Implementation Required:
from trustgraph.base import DocumentEmbeddingsStoreService
class YourDocumentEmbeddingsStoreService(DocumentEmbeddingsStoreService):
async def store_document_embeddings(self, embeddings):
"""
Store document embeddings in the vector database
Args:
embeddings: DocumentEmbeddings object with document vectors
"""
# Your document embedding storage logic here
await self.save_document_embeddings(embeddings.documents)
Common Patterns
Error Handling
All service base classes automatically handle errors:
TooManyRequests
: Re-raised to trigger rate limiting- Other exceptions: Caught and converted to appropriate error responses
Metrics
Most services include built-in metrics:
- LlmService: Timing histograms with detailed buckets
- ToolService: Invocation counters by tool name
- EmbeddingsService: Processing time metrics
Concurrency
Services that support concurrency:
EmbeddingsService
: Configurable with--concurrency
flagLlmService
: Configurable with--concurrency
flagToolService
: Configurable with--concurrency
flag
Command-line Arguments
All services inherit FlowProcessor arguments and some add their own:
@staticmethod
def add_args(parser):
parser.add_argument(
'-c', '--concurrency',
type=int,
default=1,
help='Concurrent processing threads'
)
FlowProcessor.add_args(parser)
Complete Example
Here’s a complete example of implementing a simple calculator tool service:
from trustgraph.base import ToolService
class CalculatorService(ToolService):
async def invoke_tool(self, name, parameters):
"""Calculator tool implementation"""
if name == "add":
return parameters["a"] + parameters["b"]
elif name == "multiply":
return parameters["a"] * parameters["b"]
elif name == "divide":
if parameters["b"] == 0:
raise ValueError("Cannot divide by zero")
return parameters["a"] / parameters["b"]
else:
raise ValueError(f"Unknown operation: {name}")
def run():
CalculatorService.launch("calculator", __doc__)
if __name__ == "__main__":
run()
Benefits
Using service base classes provides:
- Reduced boilerplate: No need to write message handling code
- Consistent patterns: All services follow the same structure
- Built-in monitoring: Metrics and error handling included
- Flow integration: Automatic flow management and configuration
- Type safety: Strongly typed request/response schemas
See Also
- FlowProcessor - Understanding the underlying base class
- AsyncProcessor - For services that don’t fit these patterns
- Service Architecture - Overall service design patterns