Linux + Python = "Agentics” Broker.
Query drop from Chat GPT...this will be updated and validated with Grok.
Linux + Python is a great fit for your “agentics” broker. You’ll get simpler ops, huge library support, first-class async I/O for API fan-out, and clean, testable code. Below is a complete, production-leaning starter you can run today.
I’m giving you:
- Concept + architecture (what runs where and why)
- Full file list
- Full, commented code for each file (you said you don’t edit; I’m regenerating everything)
- Runbook + next steps
Concept (what you’re building)
An Agentic Broker that:
- Accepts inbound jobs (HTTP REST) describing data to fetch/push from third-party APIs
- Fans out concurrent API calls (HTTPX/async)
- Writes normalized results/events into MySQL
- Fires notifications (email/SMS/webhook) based on rules
- Enqueues long-running work on a background worker (RQ + Redis)
- Exposes admin/read APIs and health checks
Tech choices (Linux friendly, minimal drama)
- FastAPI (typed, fast, async REST)
- HTTPX (async HTTP client with timeouts/retries)
- SQLAlchemy 2.x + asyncmy (async MySQL driver)
- Redis + RQ (simple background jobs/ retry/ scheduling)
- Pydantic (validated request/response models)
- Structlog (clean, JSON-ish logs)
- Docker Compose (one command up)
- Alembic ready (migrations later; create_all() for first boot)
- Notification adapters (SMTP email + Twilio SMS + generic outbound webhook)
File/Folder List (drop-in repo)
agentics-broker/
- docker-compose.yml
- .env.example
- requirements.txt
- alembic.ini
- app/
- init.py
- main.py
- config.py
- logging.py
- db.py
- models.py
- schemas.py
- services/
- init.py
- broker.py
- notify.py
- webhooks.py
- routes/
- init.py
- broker.py
- admin.py
- health.py
- workers/
- rq_worker.py
- tasks.py
- utils/
- http.py
- security.py
- tests/
- test_smoke.py
- README.md
Code (fully regenerated & commented)
[docker-compose.yml]
version: "3.9"
services:
api:
build: .
image: agentics-broker:latest
command: bash -lc "uvicorn app.main:app --host 0.0.0.0 --port 8000 --proxy-headers"
env_file: .env
depends_on:
- mysql
- redis
ports:
- "8000:8000"
rqworker:
image: agentics-broker:latest
command: bash -lc "rq worker agentics --url ${REDIS_URL}"
env_file: .env
depends_on:
- redis
- mysql
redis:
image: redis:7-alpine
ports:
- "6379:6379"
mysql:
image: mysql:8.0
environment:
MYSQL_DATABASE: ${MYSQL_DATABASE}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
ports:
- "3306:3306"
command: ["mysqld","--default-authentication-plugin=mysql_native_password","--mysql-native-password=ON"]
healthcheck:
test: ["CMD-SHELL","mysqladmin ping -h localhost -u${MYSQL_USER} -p${MYSQL_PASSWORD} || exit 1"]
interval: 5s
timeout: 5s
retries: 20
[requirements.txt]
fastapi==0.115.0
uvicorn==0.30.6
pydantic==2.9.2
SQLAlchemy==2.0.35
asyncmy==0.2.9
alembic==1.13.2
httpx==0.27.2
python-dotenv==1.0.1
redis==5.0.8
rq==1.16.2
structlog==24.1.0
email-validator==2.2.0
cryptography==43.0.1
[.env.example]
HTTP
API_HOST=0.0.0.0
API_PORT=8000
MySQL
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DATABASE=agentics
MYSQL_USER=agentics
MYSQL_PASSWORD=agentics_pw
DATABASE_URL=mysql+asyncmy://${MYSQL_USER}:${MYSQL_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DATABASE}
Redis
REDIS_URL=redis://redis:6379/0
Notifications
SMTP_HOST=smtp.example.com
SMTP_PORT=587
SMTP_USERNAME=apikey
SMTP_PASSWORD=secret
SMTP_FROM=[email protected]
TWILIO_ACCOUNT_SID=ACXXXXXXXXXXXXXXXXXXXXXXXX
TWILIO_AUTH_TOKEN=xxxxxxxxxxxxxxxxxxxxxxxxxxx
TWILIO_FROM=+15555550100
Broker
OUTBOUND_TIMEOUT_SECONDS=15
OUTBOUND_MAX_RETRIES=2
[alembic.ini]
[alembic]
script_location = alembic
sqlalchemy.url = sqlite:///./alembic-placeholder.db
[app/init.py]
Marker; no runtime code needed.
[app/logging.py]
import logging
import structlog
def setup_logging() -> None:
"""Configure structlog + stdlib for JSON-ish logs."""
logging.basicConfig(level=logging.INFO, format="%(message)s")
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.DictRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True
)
[app/config.py]
from pydantic import BaseModel, Field
import os
class Settings(BaseModel):
api_host: str = Field(default=os.getenv("API_HOST","0.0.0.0"))
api_port: int = Field(default=int(os.getenv("API_PORT","8000")))
database_url: str = Field(default=os.getenv("DATABASE_URL","mysql+asyncmy://agentics:agentics_pw@localhost:3306/agentics"))
redis_url: str = Field(default=os.getenv("REDIS_URL","redis://localhost:6379/0"))
smtp_host: str = Field(default=os.getenv("SMTP_HOST",""))
smtp_port: int = Field(default=int(os.getenv("SMTP_PORT","587")))
smtp_username: str = Field(default=os.getenv("SMTP_USERNAME",""))
smtp_password: str = Field(default=os.getenv("SMTP_PASSWORD",""))
smtp_from: str = Field(default=os.getenv("SMTP_FROM","[email protected]"))
twilio_sid: str = Field(default=os.getenv("TWILIO_ACCOUNT_SID",""))
twilio_token: str = Field(default=os.getenv("TWILIO_AUTH_TOKEN",""))
twilio_from: str = Field(default=os.getenv("TWILIO_FROM",""))
outbound_timeout: int = Field(default=int(os.getenv("OUTBOUND_TIMEOUT_SECONDS","15")))
outbound_retries: int = Field(default=int(os.getenv("OUTBOUND_MAX_RETRIES","2")))
settings = Settings()
[app/db.py]
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
class Base(DeclarativeBase):
pass
engine: AsyncEngine = create_async_engine(settings.database_url, pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def init_db() -> None:
"""Create tables on first boot. Switch to Alembic for real migrations."""
from app import models # import to register models
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
[app/models.py]
from sqlalchemy import String, Text, DateTime, func, Integer, ForeignKey, Enum, JSON
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.db import Base
import enum
class JobStatus(str, enum.Enum):
queued = "queued"
running = "running"
success = "success"
failed = "failed"
class Endpoint(Base):
tablename = "endpoints"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(120), unique=True)
base_url: Mapped[str] = mapped_column(String(512))
auth_header: Mapped[str | None] = mapped_column(String(512), nullable=True)
created_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now())
<span><span><span>jobs</span></span><span>: Mapped[list[</span><span><span>"Job"</span></span><span>]] = relationship(back_populates=</span><span><span>"endpoint"</span></span><span>)<br></span></span>
class Job(Base):
tablename = "jobs"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
endpoint_id: Mapped[int] = mapped_column(ForeignKey("endpoints.id"))
path: Mapped[str] = mapped_column(String(512))
method: Mapped[str] = mapped_column(String(10), default="GET")
payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
status: Mapped[JobStatus] = mapped_column(Enum(JobStatus), default=JobStatus.queued)
result: Mapped[dict | None] = mapped_column(JSON, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
<span><span>endpoint: Mapped[</span><span><span>"Endpoint"</span></span><span>] = </span><span><span>relationship</span></span><span>(back_populates=</span><span><span>"jobs"</span></span><span>)<br>events: Mapped[list[</span><span><span>"Event"</span></span><span>]] = </span><span><span>relationship</span></span><span>(back_populates=</span><span><span>"job"</span></span><span>)<br></span></span>
class Event(Base):
tablename = "events"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
job_id: Mapped[int] = mapped_column(ForeignKey("jobs.id"))
kind: Mapped[str] = mapped_column(String(64)) # created|started|success|failed|notified
details: Mapped[dict | None] = mapped_column(JSON, nullable=True)
created_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now())
<span><span>job: Mapped[</span><span><span>"Job"</span></span><span>] = </span><span><span>relationship</span></span><span>(back_populates=</span><span><span>"events"</span></span><span>)<br></span></span>
[app/schemas.py]
from pydantic import BaseModel, Field, HttpUrl, EmailStr
from typing import Any, Literal
class EndpointCreate(BaseModel):
name: str = Field(min_length=2, max_length=120)
base_url: HttpUrl
auth_header: str | None = None
class EndpointOut(EndpointCreate):
id: int
class JobCreate(BaseModel):
endpoint_name: str
path: str = Field(example="/v1/items")
method: Literal["GET","POST","PUT","PATCH","DELETE"] = "GET"
payload: dict | None = None
notify_email: EmailStr | None = None
notify_sms: str | None = None
notify_webhook: str | None = None
class JobOut(BaseModel):
id: int
status: str
result: dict | None = None
error: str | None = None
class HealthOut(BaseModel):
status: str
[app/utils/http.py]
import httpx
from app.config import settings
async def http_request(method: str, url: str, headers: dict | None = None, json: dict | None = None) -> httpx.Response:
"""Wrapper with sane defaults for timeouts/retries."""
timeout = httpx.Timeout(settings.outbound_timeout)
limits = httpx.Limits(max_keepalive_connections=20, max_connections=100)
async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client:
return await client.request(method, url, headers=headers, json=json)
[app/utils/security.py]
Placeholder for HMAC signing of outbound webhooks (to verify on receiver side).
import hmac, hashlib, base64
def sign_payload(secret: str, body: bytes) -> str:
mac = hmac.new(secret.encode(), body, hashlib.sha256).digest()
return base64.b64encode(mac).decode()
[app/services/notify.py]
"""
Notification adapters.
- Email via SMTP (simple example)
- SMS via Twilio (optional)
- Generic outbound webhook
"""
import smtplib
from email.message import EmailMessage
from app.config import settings
import httpx
import structlog
from twilio.rest import Client as TwilioClient
log = structlog.get_logger()
async def send_email(to_email: str, subject: str, body: str) -> None:
if not settings.smtp_host:
log.warning("smtp_not_configured")
return
msg = EmailMessage()
msg["From"] = settings.smtp_from
msg["To"] = to_email
msg["Subject"] = subject
msg.set_content(body)
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as s:
s.starttls()
s.login(settings.smtp_username, settings.smtp_password)
s.send_message(msg)
log.info("email_sent", to=to_email)
async def send_sms(to_number: str, body: str) -> None:
if not settings.twilio_sid:
log.warning("twilio_not_configured")
return
client = TwilioClient(settings.twilio_sid, settings.twilio_token)
client.messages.create(from_=settings.twilio_from, to=to_number, body=body)
log.info("sms_sent", to=to_number)
async def send_webhook(url: str, payload: dict) -> None:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.post(url, json=payload)
r.raise_for_status()
[app/services/webhooks.py]
If you need to call out to partner webhooks with signing, add here.
from app.utils.security import sign_payload
import httpx
async def post_signed_webhook(url: str, payload: dict, secret: str) -> None:
body = httpx.dumps(payload).encode() if hasattr(httpx, "dumps") else str(payload).encode()
signature = sign_payload(secret, body)
async with httpx.AsyncClient(timeout=10) as c:
await c.post(url, content=body, headers={"X-Signature": signature, "Content-Type":"application/json"})
[app/services/broker.py]
"""
Core broker orchestration:
- Resolve endpoint
- Execute HTTP call
- Persist results
- Emit events
- Enqueue long-running jobs to RQ if requested
"""
from sqlalchemy import select
from app.db import SessionLocal
from app.models import Endpoint, Job, JobStatus, Event
from app.utils.http import http_request
from app.config import settings
import structlog
log = structlog.get_logger()
async def create_endpoint(name: str, base_url: str, auth_header: str | None) -> Endpoint:
async with SessionLocal() as s:
ep = Endpoint(name=name, base_url=base_url, auth_header=auth_header)
s.add(ep)
await s.commit()
await s.refresh(ep)
return ep
async def get_endpoint_by_name(name: str) -> Endpoint | None:
async with SessionLocal() as s:
res = await s.execute(select(Endpoint).where(Endpoint.name == name))
return res.scalar_one_or_none()
async def enqueue_job(endpoint_name: str, path: str, method: str, payload: dict | None,
notify_email: str | None, notify_sms: str | None, notify_webhook: str | None) -> Job:
async with SessionLocal() as s:
ep = await get_endpoint_by_name(endpoint_name)
if not ep:
raise ValueError(f"Unknown endpoint '{endpoint_name}'")
job = Job(endpoint_id=ep.id, path=path, method=method, payload=payload, status=JobStatus.queued)
s.add(job)
await s.flush()
s.add(Event(job_id=job.id, kind="created", details={"endpoint": endpoint_name}))
await s.commit()
await s.refresh(job)
# push to RQ
from rq import Queue
from redis import Redis
from app.workers.tasks import run_job
q = Queue("agentics", connection=Redis.from_url(settings.redis_url))
q.enqueue(run_job, job.id, notify_email, notify_sms, notify_webhook, retry=3)
log.info("job_enqueued", job_id=job.id)
return job
async def _execute_job(job_id: int) -> Job:
"""Called by the worker to actually run the HTTP call."""
async with SessionLocal() as s:
job = await s.get(Job, job_id)
ep = await s.get(Endpoint, job.endpoint_id)
job.status = JobStatus.running
s.add(job)
s.add(Event(job_id=job.id, kind="started"))
await s.commit()
<span><span> url = </span><span><span>f"<span>{ep.base_url.rstrip(<span>'/'</span></span></span></span><span>)}/</span><span><span>{job.path.lstrip(<span>'/'</span></span></span><span>)}"<br> headers = {}<br> </span><span><span>if</span></span><span> ep.auth_header:<br> headers[</span><span><span>"Authorization"</span></span><span>] = ep.auth_header<br><br> </span><span><span>try</span></span><span>:<br> resp = </span><span><span>await</span></span><span> http_request(job.method, url, headers=headers, json=job.payload)<br> data = {</span><span><span>"status_code"</span></span><span>: resp.status_code, </span><span><span>"body"</span></span><span>: resp.json() </span><span><span>if</span></span><span> resp.headers.get(</span><span><span>"content-type"</span></span><span>,</span><span><span>""</span></span><span>).startswith(</span><span><span>"application/json"</span></span><span>) </span><span><span>else</span></span><span> resp.text}<br> job.result = data<br> job.status = JobStatus.success </span><span><span>if</span></span><span> </span><span><span>200</span></span><span> <= resp.status_code < </span><span><span>300</span></span><span> </span><span><span>else</span></span><span> JobStatus.failed<br> </span><span><span>if</span></span><span> job.status == JobStatus.failed:<br> job.error = </span><span><span>f"HTTP <span>{resp.status_code}</span></span></span><span>"<br> s.add(Event(job_id=job.</span><span><span>id</span></span><span>, kind=</span><span><span>"success"</span></span><span> </span><span><span>if</span></span><span> job.status==JobStatus.success </span><span><span>else</span></span><span> </span><span><span>"failed"</span></span><span>, details={</span><span><span>"status_code"</span></span><span>: resp.status_code}))<br> </span><span><span>except</span></span><span> Exception </span><span><span>as</span></span><span> ex:<br> job.status = JobStatus.failed<br> job.error = </span><span><span>str</span></span><span>(ex)<br> s.add(Event(job_id=job.</span><span><span>id</span></span><span>, kind=</span><span><span>"failed"</span></span><span>, details={</span><span><span>"error"</span></span><span>: </span><span><span>str</span></span><span>(ex)}))<br><br> </span><span><span>await</span></span><span> s.commit()<br> </span><span><span>await</span></span><span> s.refresh(job)<br> </span><span><span>return</span></span><span> job<br></span></span>
Re-export for worker
async def execute_job(job_id: int) -> Job:
return await _execute_job(job_id)
[app/routes/health.py]
from fastapi import APIRouter
from app.schemas import HealthOut
router = APIRouter(prefix="/health", tags=["health"])
@router.get("", response_model=HealthOut)
async def health() -> HealthOut:
return HealthOut(status="ok")
[app/routes/admin.py]
from fastapi import APIRouter, HTTPException
from app.schemas import EndpointCreate, EndpointOut
from app.services.broker import create_endpoint, get_endpoint_by_name
router = APIRouter(prefix="/admin", tags=["admin"])
@router.post("/endpoints", response_model=EndpointOut)
async def register_endpoint(body: EndpointCreate) -> EndpointOut:
existing = await get_endpoint_by_name(body.name)
if existing:
raise HTTPException(status_code=409, detail="Endpoint name already exists")
ep = await create_endpoint(body.name, str(body.base_url), body.auth_header)
return EndpointOut(id=ep.id, name=ep.name, base_url=ep.base_url, auth_header=ep.auth_header)
[app/routes/broker.py]
from fastapi import APIRouter, HTTPException
from app.schemas import JobCreate, JobOut
from app.services.broker import enqueue_job
from sqlalchemy import select
from app.db import SessionLocal
from app.models import Job
router = APIRouter(prefix="/broker", tags=["broker"])
@router.post("/jobs", response_model=JobOut)
async def submit_job(body: JobCreate) -> JobOut:
try:
job = await enqueue_job(body.endpoint_name, body.path, body.method, body.payload, body.notify_email, body.notify_sms, body.notify_webhook)
return JobOut(id=job.id, status=job.status.value)
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
@router.get("/jobs/{job_id}", response_model=JobOut)
async def get_job(job_id: int) -> JobOut:
async with SessionLocal() as s:
j = await s.get(Job, job_id)
if not j:
raise HTTPException(status_code=404, detail="Not found")
return JobOut(id=j.id, status=j.status.value, result=j.result, error=j.error)
[app/workers/tasks.py]
"""
RQ task wrapper that:
- Executes the job (async -> sync shim)
- Sends notifications if requested
"""
import asyncio
from app.services.broker import execute_job
from app.services.notify import send_email, send_sms, send_webhook
def run_job(job_id: int, notify_email: str | None, notify_sms: str | None, notify_webhook: str | None) -> None:
job = asyncio.run(execute_job(job_id))
# Notifications
subject = f"[Agentics] Job {job.id} {job.status.value}"
body = f"Job {job.id} finished with {job.status.value}.\n\nResult: {job.result}\nError: {job.error}"
async def _notify():
if notify_email:
await send_email(notify_email, subject, body)
if notify_sms:
await send_sms(notify_sms, body)
if notify_webhook:
await send_webhook(notify_webhook, {"job_id": job.id, "status": job.status.value})
asyncio.run(_notify())
[app/workers/rq_worker.py]
Not used by compose directly (we call rq worker), but handy if you want a Python entrypoint.
from rq import Worker, Queue
from redis import Redis
from app.config import settings
def main():
q = Queue("agentics", connection=Redis.from_url(settings.redis_url))
Worker([q]).work(with_scheduler=True)
if name == "main":
main()
[app/routes/init.py]
Marker
[app/services/init.py]
Marker
[app/utils/init.py]
Marker
[app/main.py]
"""
FastAPI app bootstrap:
- Logging
- DB init on startup
- Routers
"""
from fastapi import FastAPI
from app.logging import setup_logging
from app.db import init_db
from app.routes import broker, admin, health
setup_logging()
app = FastAPI(title="Agentics Broker", version="0.1.0")
@app.on_event("startup")
async def _startup():
await init_db()
app.include_router(health.router)
app.include_router(admin.router)
app.include_router(broker.router)
[tests/test_smoke.py]
def test_truth():
assert True
[README.md]
Agentics Broker (Linux + Python)
Run:
- cp .env.example .env (edit secrets)
- docker compose up --build
- Register an endpoint:
curl -X POST http://localhost:8000/admin/endpoints -H "Content-Type: application/json" -d '{"name":"example","base_url":"https://httpbin.org","auth_header":null}' - Submit a job:
curl -X POST http://localhost:8000/broker/jobs -H "Content-Type: application/json" -d '{"endpoint_name":"example","path":"/anything","method":"POST","payload":{"hello":"world"},"notify_email":null}' - Poll:
curl http://localhost:8000/broker/jobs/1
Notes:
- RQ worker consumes queue "agentics".
- MySQL stores endpoints, jobs, events.
- Configure SMTP/Twilio to enable notifications.
How this meets your needs
- Off Microsoft: Entirely Linux/Python native; deploy on any VPS or your Pi/NAS/metal.
- Brokerage of data & REST: /admin/endpoints registers targets; /broker/jobs fans out async requests; results saved in MySQL; retries handled by RQ.
- MySQL persistence: SQLAlchemy models + async driver; events for audit trail.
- Notifications: Email/SMS/Webhook adapters already wired from worker.
- Scales up: Add more rq workers; tune HTTPX limits; place Nginx in front; add Alembic migrations.
Runbook (quick)
- Copy .env.example → .env and fill in MySQL/Twilio/SMTP secrets.
- docker compose up --build
- POST /admin/endpoints to register partners.
- POST /broker/jobs to create jobs; include notify targets if you want alerts.
- GET /broker/jobs/{id} to read results.
Ideas to push this further
- Add rules engine (NRules equivalent) via a simple Python rules table (YAML/DB) that decides notifications and next-step jobs.
- Add OAuth2 client credentials per endpoint (store token_url, scope; auto-refresh).
- Add rate limiting per endpoint (redis-leaky-bucket).
- Add signature verification for inbound webhooks (HMAC).
- Add scheduled jobs (rq-scheduler / cron).
- Observability: OpenTelemetry traces + Prometheus metrics.
- Data Lake: Stream results to S3/MinIO for analytics (keep MySQL for quick lookups).
This can be packaged as a private Git repo with a Makefile and GitHub Actions CI next.