Flow Specifications
Flow specifications are declarative interfaces used with FlowProcessor
to define how your service integrates with TrustGraph processing flows. They automatically handle the setup and management of Pulsar message queues, consumers, producers, and request/response clients.
Overview
Flow specifications eliminate boilerplate code by:
- Automatically creating and managing Pulsar consumers and producers
- Handling topic configuration from flow definitions
- Providing typed interfaces for message handling
- Managing metrics and monitoring
- Enabling dynamic flow reconfiguration
Core Specification Types
ConsumerSpec
Declares an input message stream for processing.
from trustgraph.base import FlowProcessor, ConsumerSpec
class MyProcessor(FlowProcessor):
def __init__(self, **params):
super().__init__(**params)
self.register_specification(
ConsumerSpec(
name="input", # Flow interface name
schema=InputMessageSchema, # Message schema class
handler=self.on_input_message, # Handler method
concurrency=4 # Concurrent consumers
)
)
async def on_input_message(self, msg, consumer, flow):
"""Handle incoming messages"""
value = msg.value()
# Process the message
await flow("output").send(processed_value)
Key Features:
- Automatic topic subscription from flow configuration
- Configurable concurrency for parallel processing
- Built-in metrics for message processing
- Automatic message acknowledgment
ProducerSpec
Declares an output message stream for publishing.
self.register_specification(
ProducerSpec(
name="output", # Flow interface name
schema=OutputMessageSchema # Message schema class
)
)
Usage in Message Handlers:
async def on_input_message(self, msg, consumer, flow):
result = await self.process_data(msg.value())
# Send via the producer
await flow("output").send(result)
RequestResponseSpec
Declares a request/response client for calling other services.
self.register_specification(
RequestResponseSpec(
request_name="service-request", # Request topic name
request_schema=ServiceRequest, # Request schema
response_name="service-response", # Response topic name
response_schema=ServiceResponse, # Response schema
impl=ServiceClient # Optional custom client
)
)
Usage in Message Handlers:
async def on_input_message(self, msg, consumer, flow):
# Make request to another service
response = await flow("service-request").request(
ServiceRequest(data="example"),
timeout=30
)
# Process response
result = self.process_response(response)
await flow("output").send(result)
SettingSpec
Declares configuration parameters from flow definitions.
self.register_specification(
SettingSpec(name="max_retries")
)
Usage in Message Handlers:
async def on_input_message(self, msg, consumer, flow):
max_retries = flow.config["max_retries"].value
for attempt in range(max_retries):
try:
result = await self.process_with_retries(msg.value())
break
except Exception as e:
if attempt == max_retries - 1:
raise e
SubscriberSpec
Declares a pub/sub subscriber for broadcast messages.
self.register_specification(
SubscriberSpec(
name="events", # Topic name
schema=EventMessageSchema # Message schema
)
)
Usage:
# Subscribe to events in your flow
subscriber = flow("events")
await subscriber.subscribe("event-id")
event = await subscriber.get_message()
Specialized Client Specifications
TrustGraph provides pre-built client specifications for common services:
PromptClientSpec
For LLM prompt services with built-in convenience methods.
self.register_specification(
PromptClientSpec(
request_name="prompt-request",
response_name="prompt-response"
)
)
Usage:
async def on_input_message(self, msg, consumer, flow):
text = msg.value().text
# Extract relationships using prompt service
relationships = await flow("prompt-request").extract_relationships(
text=text,
timeout=600
)
# Extract definitions
definitions = await flow("prompt-request").extract_definitions(
text=text,
timeout=600
)
# Custom prompt
result = await flow("prompt-request").prompt(
id="custom-prompt",
variables={"input": text},
timeout=600
)
EmbeddingsClientSpec
For text embedding services.
self.register_specification(
EmbeddingsClientSpec(
request_name="embeddings-request",
response_name="embeddings-response"
)
)
Usage:
async def on_input_message(self, msg, consumer, flow):
text_chunks = msg.value().chunks
# Generate embeddings
vectors = await flow("embeddings-request").embed(
text=text_chunks,
timeout=30
)
# Process embeddings
await flow("output").send(EmbeddingsResult(vectors=vectors))
TextCompletionClientSpec
For LLM text completion services.
self.register_specification(
TextCompletionClientSpec(
request_name="llm-request",
response_name="llm-response"
)
)
Usage:
async def on_input_message(self, msg, consumer, flow):
query = msg.value().query
# Generate text completion
response = await flow("llm-request").text_completion(
system="You are a helpful assistant",
prompt=query,
timeout=600
)
await flow("output").send(TextResult(text=response))
ToolClientSpec
For tool invocation services.
self.register_specification(
ToolClientSpec(
request_name="tool-request",
response_name="tool-response"
)
)
Usage:
async def on_input_message(self, msg, consumer, flow):
# Invoke calculator tool
result = await flow("tool-request").invoke(
name="calculator",
parameters={"operation": "add", "a": 5, "b": 3},
timeout=30
)
await flow("output").send(CalculationResult(result=result))
TriplesClientSpec
For RDF triple query services.
self.register_specification(
TriplesClientSpec(
request_name="triples-request",
response_name="triples-response"
)
)
Usage:
async def on_input_message(self, msg, consumer, flow):
entity = msg.value().entity
# Query for triples about the entity
triples = await flow("triples-request").query(
s=Uri(entity),
p=None,
o=None,
limit=100,
timeout=30
)
await flow("output").send(TriplesResult(triples=triples))
Complete Example
Here’s a complete example of a processor that uses multiple specifications:
from trustgraph.base import FlowProcessor, ConsumerSpec, ProducerSpec
from trustgraph.base import PromptClientSpec, EmbeddingsClientSpec, SettingSpec
from trustgraph.schema import TextChunk, ProcessedData
class TextAnalysisProcessor(FlowProcessor):
def __init__(self, **params):
super().__init__(**params)
# Input consumer
self.register_specification(
ConsumerSpec(
name="input",
schema=TextChunk,
handler=self.on_text_chunk,
concurrency=2
)
)
# Output producer
self.register_specification(
ProducerSpec(
name="output",
schema=ProcessedData
)
)
# LLM prompt client
self.register_specification(
PromptClientSpec(
request_name="prompt-request",
response_name="prompt-response"
)
)
# Embeddings client
self.register_specification(
EmbeddingsClientSpec(
request_name="embeddings-request",
response_name="embeddings-response"
)
)
# Configuration setting
self.register_specification(
SettingSpec(name="analysis_mode")
)
async def on_text_chunk(self, msg, consumer, flow):
"""Process text chunks with analysis"""
chunk = msg.value()
text = chunk.text
# Get configuration
mode = flow.config["analysis_mode"].value
try:
# Extract entities using prompt service
entities = await flow("prompt-request").extract_definitions(
text=text,
timeout=300
)
# Generate embeddings
vectors = await flow("embeddings-request").embed(
text=[text],
timeout=30
)
# Create processed result
result = ProcessedData(
text=text,
entities=entities,
embeddings=vectors[0],
metadata=chunk.metadata,
mode=mode
)
# Send to output
await flow("output").send(result)
except Exception as e:
print(f"Processing error: {e}")
# Could send to error queue or log
def run():
TextAnalysisProcessor.launch("text-analysis", __doc__)
if __name__ == "__main__":
run()
Flow Configuration
Flow specifications are automatically configured through the TrustGraph configuration system. Here’s how the configuration maps to specifications:
{
"flows": {
"analysis-flow": {
"interfaces": {
"input": "text-chunks-queue",
"output": "processed-data-queue",
"prompt-request": "prompt-service-request",
"prompt-response": "prompt-service-response",
"embeddings-request": "embeddings-service-request",
"embeddings-response": "embeddings-service-response",
"analysis_mode": "detailed"
}
}
}
}
Benefits of Flow Specifications
- Declarative: Define what you need, not how to set it up
- Automatic Management: Queues, consumers, and producers are created automatically
- Type Safety: Strongly typed message schemas
- Error Handling: Built-in error handling and retry logic
- Metrics: Automatic metrics collection for monitoring
- Dynamic Reconfiguration: Flows can be started/stopped without code changes
- Reusable: Specifications can be shared across processors
Error Handling
All client specifications include automatic error handling:
try:
result = await flow("service-request").request(data, timeout=30)
except RuntimeError as e:
# Service returned an error response
print(f"Service error: {e}")
except asyncio.TimeoutError:
# Request timed out
print("Request timed out")
Best Practices
- Use descriptive names: Choose clear names for your specifications
- Set appropriate timeouts: Configure timeouts based on expected processing times
- Handle errors gracefully: Always handle potential service errors
- Configure concurrency: Set appropriate concurrency levels for your workload
- Use typed schemas: Define proper message schemas for type safety
- Monitor metrics: Use built-in metrics for monitoring and debugging
See Also
- FlowProcessor - Understanding the base class
- Service Base Classes - Pre-built service patterns
- AsyncProcessor - For non-flow services