Linux + Python = "Agentics” Broker.


Linux + Python = "Agentics” Broker.


Query drop from Chat GPT...this will be updated and validated with Grok.

Linux + Python is a great fit for your “agentics” broker. You’ll get simpler ops, huge library support, first-class async I/O for API fan-out, and clean, testable code. Below is a complete, production-leaning starter you can run today.


I’m giving you:


  • Concept + architecture (what runs where and why)
  • Full file list
  • Full, commented code for each file (you said you don’t edit; I’m regenerating everything)
  • Runbook + next steps

Concept (what you’re building)​


An Agentic Broker that:


  1. Accepts inbound jobs (HTTP REST) describing data to fetch/push from third-party APIs
  2. Fans out concurrent API calls (HTTPX/async)
  3. Writes normalized results/events into MySQL
  4. Fires notifications (email/SMS/webhook) based on rules
  5. Enqueues long-running work on a background worker (RQ + Redis)
  6. Exposes admin/read APIs and health checks

Tech choices (Linux friendly, minimal drama)​


  • FastAPI (typed, fast, async REST)
  • HTTPX (async HTTP client with timeouts/retries)
  • SQLAlchemy 2.x + asyncmy (async MySQL driver)
  • Redis + RQ (simple background jobs/ retry/ scheduling)
  • Pydantic (validated request/response models)
  • Structlog (clean, JSON-ish logs)
  • Docker Compose (one command up)
  • Alembic ready (migrations later; create_all() for first boot)
  • Notification adapters (SMTP email + Twilio SMS + generic outbound webhook)



File/Folder List (drop-in repo)​


agentics-broker/


  • docker-compose.yml
  • .env.example
  • requirements.txt
  • alembic.ini
  • app/
    • init.py
    • main.py
    • config.py
    • logging.py
    • db.py
    • models.py
    • schemas.py
    • services/
      • init.py
      • broker.py
      • notify.py
      • webhooks.py
    • routes/
      • init.py
      • broker.py
      • admin.py
      • health.py
    • workers/
      • rq_worker.py
      • tasks.py
    • utils/
      • http.py
      • security.py
  • tests/
    • test_smoke.py
  • README.md



Code (fully regenerated & commented)​


[docker-compose.yml]
version: "3.9"
services:
api:
build: .
image: agentics-broker:latest
command: bash -lc "uvicorn app.main:app --host 0.0.0.0 --port 8000 --proxy-headers"
env_file: .env
depends_on:
- mysql
- redis
ports:
- "8000:8000"
rqworker:
image: agentics-broker:latest
command: bash -lc "rq worker agentics --url ${REDIS_URL}"
env_file: .env
depends_on:
- redis
- mysql
redis:
image: redis:7-alpine
ports:
- "6379:6379"
mysql:
image: mysql:8.0
environment:
MYSQL_DATABASE: ${MYSQL_DATABASE}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
ports:
- "3306:3306"
command: ["mysqld","--default-authentication-plugin=mysql_native_password","--mysql-native-password=ON"]
healthcheck:
test: ["CMD-SHELL","mysqladmin ping -h localhost -u${MYSQL_USER} -p${MYSQL_PASSWORD} || exit 1"]
interval: 5s
timeout: 5s
retries: 20


[requirements.txt]
fastapi==0.115.0
uvicorn==0.30.6
pydantic==2.9.2
SQLAlchemy==2.0.35
asyncmy==0.2.9
alembic==1.13.2
httpx==0.27.2
python-dotenv==1.0.1
redis==5.0.8
rq==1.16.2
structlog==24.1.0
email-validator==2.2.0
cryptography==43.0.1


[.env.example]


HTTP​


API_HOST=0.0.0.0
API_PORT=8000


MySQL​


MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DATABASE=agentics
MYSQL_USER=agentics
MYSQL_PASSWORD=agentics_pw


DATABASE_URL=mysql+asyncmy://${MYSQL_USER}:${MYSQL_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DATABASE}


Redis​


REDIS_URL=redis://redis:6379/0


Notifications​


SMTP_HOST=smtp.example.com
SMTP_PORT=587
SMTP_USERNAME=apikey
SMTP_PASSWORD=secret
SMTP_FROM=[email protected]


TWILIO_ACCOUNT_SID=ACXXXXXXXXXXXXXXXXXXXXXXXX
TWILIO_AUTH_TOKEN=xxxxxxxxxxxxxxxxxxxxxxxxxxx
TWILIO_FROM=+15555550100


Broker​


OUTBOUND_TIMEOUT_SECONDS=15
OUTBOUND_MAX_RETRIES=2


[alembic.ini]
[alembic]
script_location = alembic
sqlalchemy.url = sqlite:///./alembic-placeholder.db


[app/init.py]


Marker; no runtime code needed.​


[app/logging.py]
import logging
import structlog


def setup_logging() -> None:
"""Configure structlog + stdlib for JSON-ish logs."""
logging.basicConfig(level=logging.INFO, format="%(message)s")
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.DictRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True
)


[app/config.py]
from pydantic import BaseModel, Field
import os


class Settings(BaseModel):
api_host: str = Field(default=os.getenv("API_HOST","0.0.0.0"))
api_port: int = Field(default=int(os.getenv("API_PORT","8000")))
database_url: str = Field(default=os.getenv("DATABASE_URL","mysql+asyncmy://agentics:agentics_pw@localhost:3306/agentics"))
redis_url: str = Field(default=os.getenv("REDIS_URL","redis://localhost:6379/0"))
smtp_host: str = Field(default=os.getenv("SMTP_HOST",""))
smtp_port: int = Field(default=int(os.getenv("SMTP_PORT","587")))
smtp_username: str = Field(default=os.getenv("SMTP_USERNAME",""))
smtp_password: str = Field(default=os.getenv("SMTP_PASSWORD",""))
smtp_from: str = Field(default=os.getenv("SMTP_FROM","[email protected]"))
twilio_sid: str = Field(default=os.getenv("TWILIO_ACCOUNT_SID",""))
twilio_token: str = Field(default=os.getenv("TWILIO_AUTH_TOKEN",""))
twilio_from: str = Field(default=os.getenv("TWILIO_FROM",""))
outbound_timeout: int = Field(default=int(os.getenv("OUTBOUND_TIMEOUT_SECONDS","15")))
outbound_retries: int = Field(default=int(os.getenv("OUTBOUND_MAX_RETRIES","2")))


settings = Settings()


[app/db.py]
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase
from app.config import settings


class Base(DeclarativeBase):
pass


engine: AsyncEngine = create_async_engine(settings.database_url, pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)


async def init_db() -> None:
"""Create tables on first boot. Switch to Alembic for real migrations."""
from app import models # import to register models
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)


[app/models.py]
from sqlalchemy import String, Text, DateTime, func, Integer, ForeignKey, Enum, JSON
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.db import Base
import enum


class JobStatus(str, enum.Enum):
queued = "queued"
running = "running"
success = "success"
failed = "failed"


class Endpoint(Base):
tablename = "endpoints"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(120), unique=True)
base_url: Mapped[str] = mapped_column(String(512))
auth_header: Mapped[str | None] = mapped_column(String(512), nullable=True)
created_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now())



<span><span><span>jobs</span></span><span>: Mapped[list[</span><span><span>"Job"</span></span><span>]] = relationship(back_populates=</span><span><span>"endpoint"</span></span><span>)<br></span></span>

class Job(Base):
tablename = "jobs"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
endpoint_id: Mapped[int] = mapped_column(ForeignKey("endpoints.id"))
path: Mapped[str] = mapped_column(String(512))
method: Mapped[str] = mapped_column(String(10), default="GET")
payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
status: Mapped[JobStatus] = mapped_column(Enum(JobStatus), default=JobStatus.queued)
result: Mapped[dict | None] = mapped_column(JSON, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())




<span><span>endpoint: Mapped[</span><span><span>"Endpoint"</span></span><span>] = </span><span><span>relationship</span></span><span>(back_populates=</span><span><span>"jobs"</span></span><span>)<br>events: Mapped[list[</span><span><span>"Event"</span></span><span>]] = </span><span><span>relationship</span></span><span>(back_populates=</span><span><span>"job"</span></span><span>)<br></span></span>

class Event(Base):
tablename = "events"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
job_id: Mapped[int] = mapped_column(ForeignKey("jobs.id"))
kind: Mapped[str] = mapped_column(String(64)) # created|started|success|failed|notified
details: Mapped[dict | None] = mapped_column(JSON, nullable=True)
created_at: Mapped[str] = mapped_column(DateTime(timezone=True), server_default=func.now())




<span><span>job: Mapped[</span><span><span>"Job"</span></span><span>] = </span><span><span>relationship</span></span><span>(back_populates=</span><span><span>"events"</span></span><span>)<br></span></span>

[app/schemas.py]
from pydantic import BaseModel, Field, HttpUrl, EmailStr
from typing import Any, Literal


class EndpointCreate(BaseModel):
name: str = Field(min_length=2, max_length=120)
base_url: HttpUrl
auth_header: str | None = None


class EndpointOut(EndpointCreate):
id: int


class JobCreate(BaseModel):
endpoint_name: str
path: str = Field(example="/v1/items")
method: Literal["GET","POST","PUT","PATCH","DELETE"] = "GET"
payload: dict | None = None
notify_email: EmailStr | None = None
notify_sms: str | None = None
notify_webhook: str | None = None


class JobOut(BaseModel):
id: int
status: str
result: dict | None = None
error: str | None = None


class HealthOut(BaseModel):
status: str


[app/utils/http.py]
import httpx
from app.config import settings


async def http_request(method: str, url: str, headers: dict | None = None, json: dict | None = None) -> httpx.Response:
"""Wrapper with sane defaults for timeouts/retries."""
timeout = httpx.Timeout(settings.outbound_timeout)
limits = httpx.Limits(max_keepalive_connections=20, max_connections=100)
async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client:
return await client.request(method, url, headers=headers, json=json)


[app/utils/security.py]


Placeholder for HMAC signing of outbound webhooks (to verify on receiver side).​


import hmac, hashlib, base64


def sign_payload(secret: str, body: bytes) -> str:
mac = hmac.new(secret.encode(), body, hashlib.sha256).digest()
return base64.b64encode(mac).decode()


[app/services/notify.py]
"""
Notification adapters.


  • Email via SMTP (simple example)
  • SMS via Twilio (optional)
  • Generic outbound webhook
    """
    import smtplib
    from email.message import EmailMessage
    from app.config import settings
    import httpx
    import structlog
    from twilio.rest import Client as TwilioClient

log = structlog.get_logger()


async def send_email(to_email: str, subject: str, body: str) -> None:
if not settings.smtp_host:
log.warning("smtp_not_configured")
return
msg = EmailMessage()
msg["From"] = settings.smtp_from
msg["To"] = to_email
msg["Subject"] = subject
msg.set_content(body)
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as s:
s.starttls()
s.login(settings.smtp_username, settings.smtp_password)
s.send_message(msg)
log.info("email_sent", to=to_email)


async def send_sms(to_number: str, body: str) -> None:
if not settings.twilio_sid:
log.warning("twilio_not_configured")
return
client = TwilioClient(settings.twilio_sid, settings.twilio_token)
client.messages.create(from_=settings.twilio_from, to=to_number, body=body)
log.info("sms_sent", to=to_number)


async def send_webhook(url: str, payload: dict) -> None:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.post(url, json=payload)
r.raise_for_status()


[app/services/webhooks.py]


If you need to call out to partner webhooks with signing, add here.​


from app.utils.security import sign_payload
import httpx


async def post_signed_webhook(url: str, payload: dict, secret: str) -> None:
body = httpx.dumps(payload).encode() if hasattr(httpx, "dumps") else str(payload).encode()
signature = sign_payload(secret, body)
async with httpx.AsyncClient(timeout=10) as c:
await c.post(url, content=body, headers={"X-Signature": signature, "Content-Type":"application/json"})


[app/services/broker.py]
"""
Core broker orchestration:


  • Resolve endpoint
  • Execute HTTP call
  • Persist results
  • Emit events
  • Enqueue long-running jobs to RQ if requested
    """
    from sqlalchemy import select
    from app.db import SessionLocal
    from app.models import Endpoint, Job, JobStatus, Event
    from app.utils.http import http_request
    from app.config import settings
    import structlog

log = structlog.get_logger()


async def create_endpoint(name: str, base_url: str, auth_header: str | None) -> Endpoint:
async with SessionLocal() as s:
ep = Endpoint(name=name, base_url=base_url, auth_header=auth_header)
s.add(ep)
await s.commit()
await s.refresh(ep)
return ep


async def get_endpoint_by_name(name: str) -> Endpoint | None:
async with SessionLocal() as s:
res = await s.execute(select(Endpoint).where(Endpoint.name == name))
return res.scalar_one_or_none()


async def enqueue_job(endpoint_name: str, path: str, method: str, payload: dict | None,
notify_email: str | None, notify_sms: str | None, notify_webhook: str | None) -> Job:
async with SessionLocal() as s:
ep = await get_endpoint_by_name(endpoint_name)
if not ep:
raise ValueError(f"Unknown endpoint '{endpoint_name}'")
job = Job(endpoint_id=ep.id, path=path, method=method, payload=payload, status=JobStatus.queued)
s.add(job)
await s.flush()
s.add(Event(job_id=job.id, kind="created", details={"endpoint": endpoint_name}))
await s.commit()
await s.refresh(job)
# push to RQ
from rq import Queue
from redis import Redis
from app.workers.tasks import run_job
q = Queue("agentics", connection=Redis.from_url(settings.redis_url))
q.enqueue(run_job, job.id, notify_email, notify_sms, notify_webhook, retry=3)
log.info("job_enqueued", job_id=job.id)
return job


async def _execute_job(job_id: int) -> Job:
"""Called by the worker to actually run the HTTP call."""
async with SessionLocal() as s:
job = await s.get(Job, job_id)
ep = await s.get(Endpoint, job.endpoint_id)
job.status = JobStatus.running
s.add(job)
s.add(Event(job_id=job.id, kind="started"))
await s.commit()




<span><span> url = </span><span><span>f"<span>{ep.base_url.rstrip(<span>'/'</span></span></span></span><span>)}/</span><span><span>{job.path.lstrip(<span>'/'</span></span></span><span>)}"<br> headers = {}<br> </span><span><span>if</span></span><span> ep.auth_header:<br> headers[</span><span><span>"Authorization"</span></span><span>] = ep.auth_header<br><br> </span><span><span>try</span></span><span>:<br> resp = </span><span><span>await</span></span><span> http_request(job.method, url, headers=headers, json=job.payload)<br> data = {</span><span><span>"status_code"</span></span><span>: resp.status_code, </span><span><span>"body"</span></span><span>: resp.json() </span><span><span>if</span></span><span> resp.headers.get(</span><span><span>"content-type"</span></span><span>,</span><span><span>""</span></span><span>).startswith(</span><span><span>"application/json"</span></span><span>) </span><span><span>else</span></span><span> resp.text}<br> job.result = data<br> job.status = JobStatus.success </span><span><span>if</span></span><span> </span><span><span>200</span></span><span> &lt;= resp.status_code &lt; </span><span><span>300</span></span><span> </span><span><span>else</span></span><span> JobStatus.failed<br> </span><span><span>if</span></span><span> job.status == JobStatus.failed:<br> job.error = </span><span><span>f"HTTP <span>{resp.status_code}</span></span></span><span>"<br> s.add(Event(job_id=job.</span><span><span>id</span></span><span>, kind=</span><span><span>"success"</span></span><span> </span><span><span>if</span></span><span> job.status==JobStatus.success </span><span><span>else</span></span><span> </span><span><span>"failed"</span></span><span>, details={</span><span><span>"status_code"</span></span><span>: resp.status_code}))<br> </span><span><span>except</span></span><span> Exception </span><span><span>as</span></span><span> ex:<br> job.status = JobStatus.failed<br> job.error = </span><span><span>str</span></span><span>(ex)<br> s.add(Event(job_id=job.</span><span><span>id</span></span><span>, kind=</span><span><span>"failed"</span></span><span>, details={</span><span><span>"error"</span></span><span>: </span><span><span>str</span></span><span>(ex)}))<br><br> </span><span><span>await</span></span><span> s.commit()<br> </span><span><span>await</span></span><span> s.refresh(job)<br> </span><span><span>return</span></span><span> job<br></span></span>

Re-export for worker​


async def execute_job(job_id: int) -> Job:
return await _execute_job(job_id)


[app/routes/health.py]
from fastapi import APIRouter
from app.schemas import HealthOut


router = APIRouter(prefix="/health", tags=["health"])


@router.get("", response_model=HealthOut)
async def health() -> HealthOut:
return HealthOut(status="ok")


[app/routes/admin.py]
from fastapi import APIRouter, HTTPException
from app.schemas import EndpointCreate, EndpointOut
from app.services.broker import create_endpoint, get_endpoint_by_name


router = APIRouter(prefix="/admin", tags=["admin"])


@router.post("/endpoints", response_model=EndpointOut)
async def register_endpoint(body: EndpointCreate) -> EndpointOut:
existing = await get_endpoint_by_name(body.name)
if existing:
raise HTTPException(status_code=409, detail="Endpoint name already exists")
ep = await create_endpoint(body.name, str(body.base_url), body.auth_header)
return EndpointOut(id=ep.id, name=ep.name, base_url=ep.base_url, auth_header=ep.auth_header)


[app/routes/broker.py]
from fastapi import APIRouter, HTTPException
from app.schemas import JobCreate, JobOut
from app.services.broker import enqueue_job
from sqlalchemy import select
from app.db import SessionLocal
from app.models import Job


router = APIRouter(prefix="/broker", tags=["broker"])


@router.post("/jobs", response_model=JobOut)
async def submit_job(body: JobCreate) -> JobOut:
try:
job = await enqueue_job(body.endpoint_name, body.path, body.method, body.payload, body.notify_email, body.notify_sms, body.notify_webhook)
return JobOut(id=job.id, status=job.status.value)
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))


@router.get("/jobs/{job_id}", response_model=JobOut)
async def get_job(job_id: int) -> JobOut:
async with SessionLocal() as s:
j = await s.get(Job, job_id)
if not j:
raise HTTPException(status_code=404, detail="Not found")
return JobOut(id=j.id, status=j.status.value, result=j.result, error=j.error)


[app/workers/tasks.py]
"""
RQ task wrapper that:


  • Executes the job (async -> sync shim)
  • Sends notifications if requested
    """
    import asyncio
    from app.services.broker import execute_job
    from app.services.notify import send_email, send_sms, send_webhook

def run_job(job_id: int, notify_email: str | None, notify_sms: str | None, notify_webhook: str | None) -> None:
job = asyncio.run(execute_job(job_id))
# Notifications
subject = f"[Agentics] Job {job.id} {job.status.value}"
body = f"Job {job.id} finished with {job.status.value}.\n\nResult: {job.result}\nError: {job.error}"
async def _notify():
if notify_email:
await send_email(notify_email, subject, body)
if notify_sms:
await send_sms(notify_sms, body)
if notify_webhook:
await send_webhook(notify_webhook, {"job_id": job.id, "status": job.status.value})
asyncio.run(_notify())


[app/workers/rq_worker.py]


Not used by compose directly (we call rq worker), but handy if you want a Python entrypoint.​


from rq import Worker, Queue
from redis import Redis
from app.config import settings


def main():
q = Queue("agentics", connection=Redis.from_url(settings.redis_url))
Worker([q]).work(with_scheduler=True)


if name == "main":
main()


[app/routes/init.py]


Marker​


[app/services/init.py]


Marker​


[app/utils/init.py]


Marker​


[app/main.py]
"""
FastAPI app bootstrap:


  • Logging
  • DB init on startup
  • Routers
    """
    from fastapi import FastAPI
    from app.logging import setup_logging
    from app.db import init_db
    from app.routes import broker, admin, health

setup_logging()
app = FastAPI(title="Agentics Broker", version="0.1.0")


@app.on_event("startup")
async def _startup():
await init_db()


app.include_router(health.router)
app.include_router(admin.router)
app.include_router(broker.router)


[tests/test_smoke.py]
def test_truth():
assert True


[README.md]


Agentics Broker (Linux + Python)​


Run:


  1. cp .env.example .env (edit secrets)
  2. docker compose up --build
  3. Register an endpoint:
    curl -X POST http://localhost:8000/admin/endpoints -H "Content-Type: application/json" -d '{"name":"example","base_url":"https://httpbin.org","auth_header":null}'
  4. Submit a job:
    curl -X POST http://localhost:8000/broker/jobs -H "Content-Type: application/json" -d '{"endpoint_name":"example","path":"/anything","method":"POST","payload":{"hello":"world"},"notify_email":null}'
  5. Poll:
    curl http://localhost:8000/broker/jobs/1
    Notes:

  • RQ worker consumes queue "agentics".
  • MySQL stores endpoints, jobs, events.
  • Configure SMTP/Twilio to enable notifications.



How this meets your needs​


  • Off Microsoft: Entirely Linux/Python native; deploy on any VPS or your Pi/NAS/metal.
  • Brokerage of data & REST: /admin/endpoints registers targets; /broker/jobs fans out async requests; results saved in MySQL; retries handled by RQ.
  • MySQL persistence: SQLAlchemy models + async driver; events for audit trail.
  • Notifications: Email/SMS/Webhook adapters already wired from worker.
  • Scales up: Add more rq workers; tune HTTPX limits; place Nginx in front; add Alembic migrations.

Runbook (quick)​


  1. Copy .env.example → .env and fill in MySQL/Twilio/SMTP secrets.
  2. docker compose up --build
  3. POST /admin/endpoints to register partners.
  4. POST /broker/jobs to create jobs; include notify targets if you want alerts.
  5. GET /broker/jobs/{id} to read results.

Ideas to push this further​


  • Add rules engine (NRules equivalent) via a simple Python rules table (YAML/DB) that decides notifications and next-step jobs.
  • Add OAuth2 client credentials per endpoint (store token_url, scope; auto-refresh).
  • Add rate limiting per endpoint (redis-leaky-bucket).
  • Add signature verification for inbound webhooks (HMAC).
  • Add scheduled jobs (rq-scheduler / cron).
  • Observability: OpenTelemetry traces + Prometheus metrics.
  • Data Lake: Stream results to S3/MinIO for analytics (keep MySQL for quick lookups).

This can be packaged as a private Git repo with a Makefile and GitHub Actions CI next.
 
Back
Top