TrustGraph Flow API
This API provides workflow management for TrustGraph components. It manages flow classes (workflow templates) and flow instances (active running workflows) that orchestrate complex data processing pipelines.
Request/response
Request
The request contains the following fields:
operation
: The operation to perform (see operations below)class_name
: Flow class name (for class operations and start-flow)class_definition
: Flow class definition JSON (for put-class)description
: Flow description (for start-flow)flow_id
: Flow instance ID (for flow instance operations)
Response
The response contains the following fields:
class_names
: Array of flow class names (returned by list-classes)flow_ids
: Array of active flow IDs (returned by list-flows)class_definition
: Flow class definition JSON (returned by get-class)flow
: Flow instance JSON (returned by get-flow)description
: Flow description (returned by get-flow)error
: Error information if operation fails
Operations
Flow Class Operations
LIST-CLASSES - List All Flow Classes
Request:
{
"operation": "list-classes"
}
Response:
{
"class_names": ["pdf-processor", "text-analyzer", "knowledge-extractor"]
}
GET-CLASS - Get Flow Class Definition
Request:
{
"operation": "get-class",
"class_name": "pdf-processor"
}
Response:
{
"class_definition": "{\"interfaces\": {\"text-completion\": {\"request\": \"persistent://tg/request/text-completion\", \"response\": \"persistent://tg/response/text-completion\"}}, \"description\": \"PDF processing workflow\"}"
}
PUT-CLASS - Create/Update Flow Class
Request:
{
"operation": "put-class",
"class_name": "pdf-processor",
"class_definition": "{\"interfaces\": {\"text-completion\": {\"request\": \"persistent://tg/request/text-completion\", \"response\": \"persistent://tg/response/text-completion\"}}, \"description\": \"PDF processing workflow\"}"
}
Response:
{}
DELETE-CLASS - Remove Flow Class
Request:
{
"operation": "delete-class",
"class_name": "pdf-processor"
}
Response:
{}
Flow Instance Operations
LIST-FLOWS - List Active Flow Instances
Request:
{
"operation": "list-flows"
}
Response:
{
"flow_ids": ["flow-123", "flow-456", "flow-789"]
}
GET-FLOW - Get Flow Instance
Request:
{
"operation": "get-flow",
"flow_id": "flow-123"
}
Response:
{
"flow": "{\"interfaces\": {\"text-completion\": {\"request\": \"persistent://tg/request/text-completion-flow-123\", \"response\": \"persistent://tg/response/text-completion-flow-123\"}}}",
"description": "PDF processing workflow instance"
}
START-FLOW - Start Flow Instance
Request:
{
"operation": "start-flow",
"class_name": "pdf-processor",
"flow_id": "flow-123",
"description": "Processing document batch 1"
}
Response:
{}
STOP-FLOW - Stop Flow Instance
Request:
{
"operation": "stop-flow",
"flow_id": "flow-123"
}
Response:
{}
REST service
The REST service is available at /api/v1/flow
and accepts the above request formats.
Websocket
Requests have a request
object containing the operation fields. Responses have a response
object containing the response fields.
Request:
{
"id": "unique-request-id",
"service": "flow",
"request": {
"operation": "list-classes"
}
}
Response:
{
"id": "unique-request-id",
"response": {
"class_names": ["pdf-processor", "text-analyzer"]
},
"complete": true
}
Pulsar
The Pulsar schema for the Flow API is defined in Python code here:
https://github.com/trustgraph-ai/trustgraph/blob/master/trustgraph-base/trustgraph/schema/flows.py
Default request queue: non-persistent://tg/request/flow
Default response queue: non-persistent://tg/response/flow
Request schema: trustgraph.schema.FlowRequest
Response schema: trustgraph.schema.FlowResponse
Python SDK
The Python SDK provides convenient access to the Flow API:
from trustgraph.api.flow import FlowClient
client = FlowClient()
# List all flow classes
classes = await client.list_classes()
# Get a flow class definition
definition = await client.get_class("pdf-processor")
# Start a flow instance
await client.start_flow("pdf-processor", "flow-123", "Processing batch 1")
# List active flows
flows = await client.list_flows()
# Stop a flow instance
await client.stop_flow("flow-123")
Features
- Flow Classes: Templates that define workflow structure and interfaces
- Flow Instances: Active running workflows based on flow classes
- Dynamic Management: Flows can be started/stopped dynamically
- Template Processing: Uses template replacement for customizing flow instances
- Integration: Works with TrustGraph ecosystem for data processing pipelines
- Persistent Storage: Flow definitions and instances stored for reliability
Use Cases
- Document Processing: Orchestrating PDF processing through chunking, extraction, and storage
- Knowledge Extraction: Managing workflows for relationship and definition extraction
- Data Pipelines: Coordinating complex multi-step data processing workflows
- Resource Management: Dynamically scaling processing flows based on demand