Kapitel 2: Kernbausteine – Workflows, Activities, Worker
Nach der Einführung in Temporal im vorherigen Kapitel tauchen wir nun tief in die drei fundamentalen Bausteine ein, die das Herzstück jeder Temporal-Anwendung bilden: Workflows, Activities und Worker. Das Verständnis dieser Komponenten und ihres Zusammenspiels ist entscheidend für die erfolgreiche Entwicklung mit Temporal.
2.1 Überblick: Die drei Säulen von Temporal
Temporal basiert auf einer klaren Trennung der Verantwortlichkeiten (Separation of Concerns), die in drei Hauptkomponenten unterteilt ist:
graph TB
subgraph "Temporal Application"
W[Workflows<br/>Orchestrierung]
A[Activities<br/>Ausführung]
WK[Workers<br/>Hosting]
end
subgraph "Charakteristika"
W --> W1[Deterministisch]
W --> W2[Koordinieren]
W --> W3[Event Sourcing]
A --> A1[Nicht-deterministisch]
A --> A2[Ausführen]
A --> A3[Side Effects]
WK --> WK1[Stateless]
WK --> WK2[Polling]
WK --> WK3[Skalierbar]
end
style W fill:#e1f5ff
style A fill:#ffe1e1
style WK fill:#e1ffe1
Die Rollen im Detail:
-
Workflows: Die Dirigenten des Orchesters – sie definieren was passieren soll und in welcher Reihenfolge, führen aber selbst keine Business Logic aus.
-
Activities: Die Musiker – sie führen die eigentliche Arbeit aus, von Datenbankzugriffen über API-Aufrufe bis hin zu komplexen Berechnungen.
-
Workers: Die Konzerthalle – sie bieten die Infrastruktur, in der Workflows und Activities ausgeführt werden, und kommunizieren mit dem Temporal Service.
2.2 Workflows: Die Orchestrierungslogik
2.2.1 Was ist ein Workflow?
Ein Workflow in Temporal ist eine Funktion oder Methode, die die Orchestrierungslogik Ihrer Anwendung definiert. Anders als in vielen anderen Workflow-Engines wird ein Temporal-Workflow in einer echten Programmiersprache geschrieben – nicht in YAML, XML oder einer DSL.
Fundamentale Eigenschaften:
- Deterministisch: Bei gleichen Inputs immer gleiche Outputs und Commands
- Langlebig: Kann Tage, Monate oder Jahre laufen
- Ausfallsicher: Übersteht Infrastruktur- und Code-Deployments
- Versionierbar: Unterstützt Code-Änderungen bei laufenden Workflows
Ein einfaches Beispiel aus dem Code:
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class DataProcessingWorkflow:
"""
Ein Workflow orchestriert Activities - er führt sie nicht selbst aus.
"""
@workflow.run
async def run(self, data: str) -> dict:
# Ruft Activity auf - delegiert die eigentliche Arbeit
processed = await workflow.execute_activity(
process_data,
data,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
),
)
# Orchestriert weitere Schritte
await workflow.execute_activity(
send_notification,
f"Processed: {processed}",
start_to_close_timeout=timedelta(seconds=10),
)
return {"input": data, "output": processed, "status": "completed"}
📁 Code-Beispiel:
../examples/part-01/chapter-02/workflow.py
2.2.2 Der Determinismus-Constraint
Warum Determinismus?
Temporal nutzt einen Replay-Mechanismus, um Workflow-State zu rekonstruieren. Stellen Sie sich vor, ein Worker-Prozess stürzt ab, während ein Workflow läuft. Wenn der Workflow später auf einem anderen Worker fortgesetzt wird, muss Temporal den exakten State wiederherstellen. Dies geschieht durch:
- Laden der Event History (alle bisherigen Events)
- Replay des Workflow-Codes gegen diese History
- Vergleich der generierten Commands mit der History
- Bei Übereinstimmung: Workflow kann fortgesetzt werden
sequenceDiagram
participant WC as Workflow Code
participant Worker
participant History as Event History
participant Service as Temporal Service
Note over Worker: Worker startet neu nach Crash
Worker->>Service: Poll für Workflow Task
Service->>Worker: Workflow Task + Event History
Worker->>History: Lade alle Events
History-->>Worker: [Start, Activity1Scheduled, Activity1Complete, ...]
Worker->>WC: Replay Code von Anfang
WC->>WC: Führe Code aus
WC-->>Worker: Commands [ScheduleActivity1, ...]
Worker->>Worker: Validiere Commands gegen History
alt Commands stimmen überein
Worker->>Service: Workflow Task Complete
Note over Worker: State erfolgreich rekonstruiert
else Commands weichen ab
Worker->>Service: Non-Deterministic Error
Note over Worker: Code hat sich geändert!
end
Was ist verboten in Workflows?
# ❌ FALSCH: Nicht-deterministisch
@workflow.defn
class BadWorkflow:
@workflow.run
async def run(self):
# ❌ Zufallszahlen
random_value = random.random()
# ❌ Aktuelle Zeit
now = datetime.now()
# ❌ Direkte I/O-Operationen
with open('file.txt') as f:
data = f.read()
# ❌ Direkte API-Aufrufe
response = requests.get('https://api.example.com')
return random_value
# ✅ RICHTIG: Deterministisch
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self):
# ✅ Temporal's Zeit-API
now = workflow.now()
# ✅ Temporal's Sleep
await workflow.sleep(timedelta(hours=1))
# ✅ I/O in Activities auslagern
data = await workflow.execute_activity(
read_file,
'file.txt',
start_to_close_timeout=timedelta(seconds=10)
)
# ✅ API-Aufrufe in Activities
response = await workflow.execute_activity(
call_external_api,
'https://api.example.com',
start_to_close_timeout=timedelta(seconds=30)
)
return {"data": data, "response": response}
Die goldene Regel: Workflows orchestrieren, Activities führen aus.
2.2.3 Long-Running Workflows und Continue-As-New
Temporal-Workflows können theoretisch unbegrenzt lange laufen. In der Praxis gibt es jedoch technische Grenzen:
Event History Limits:
- Maximum 50.000 Events (technisch 51.200)
- Maximum 50 MB History-Größe
- Performance-Degradation ab ~10.000 Events
Continue-As-New Pattern:
Für langlebige Workflows nutzt man das Continue-As-New Pattern:
@workflow.defn
class LongRunningWorkflow:
@workflow.run
async def run(self, iteration: int = 0) -> str:
# Führe Batch von Arbeit aus
for i in range(100):
await workflow.execute_activity(
process_item,
f"item-{iteration}-{i}",
start_to_close_timeout=timedelta(minutes=1)
)
# Nach 100 Items: Continue-As-New
# Neue Workflow-Instanz mit frischer Event History
workflow.continue_as_new(iteration + 1)
Wie Continue-As-New funktioniert:
timeline
title Workflow Lifecycle mit Continue-As-New
section Run 1
Start Workflow : Event History [0-100 Events]
Process Items : 100 Activities
Decision Point : Continue-As-New?
section Run 2
New Run ID : Neue Event History [0 Events]
Transfer State : iteration = 1
Process Items : 100 Activities
Decision Point : Continue-As-New?
section Run 3
New Run ID : Neue Event History [0 Events]
Transfer State : iteration = 2
Continue... : Unbegrenzt möglich
Wann Continue-As-New nutzen?
- Bei regelmäßigen Checkpoints (täglich, wöchentlich)
- Wenn Event History >10.000 Events erreicht
- Bei häufigen Code-Deployments (vermeidet Versioning-Probleme)
2.2.4 Workflow Versioning
Code ändert sich. Workflows laufen lange. Was passiert, wenn laufende Workflows auf neue Code-Versionen treffen?
Problem:
# Version 1 (deployed, Workflows laufen)
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str):
await workflow.execute_activity(process_payment, ...)
return "done"
# Version 2 (neues Deployment)
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str):
# NEU: Validierung hinzugefügt
await workflow.execute_activity(validate_order, ...)
await workflow.execute_activity(process_payment, ...)
return "done"
Beim Replay eines alten Workflows würde der neue Code eine zusätzliche Activity schedulen – Non-Deterministic Error!
Lösung: Patching API
from temporalio import workflow
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str):
# Patching: Unterstütze alte UND neue Workflows
if workflow.patched("add-validation"):
# Neuer Code
await workflow.execute_activity(validate_order, ...)
# Alter Code (läuft in beiden Versionen)
await workflow.execute_activity(process_payment, ...)
return "done"
Patching-Workflow:
- Patch hinzufügen mit neuem und altem Code
- Warten bis alle alten Workflows abgeschlossen sind
- deprecate_patch() verwenden
- Patch-Code entfernen im nächsten Deployment
2.2.5 Workflow-Timeouts
Temporal bietet verschiedene Timeout-Typen:
graph LR
subgraph "Workflow Execution Timeouts"
WET[Workflow Execution Timeout<br/>Gesamte Execution inkl. Retries]
WRT[Workflow Run Timeout<br/>Ein einzelner Run]
WTT[Workflow Task Timeout<br/>Worker Task Processing]
end
WET --> WRT
WRT --> WTT
style WET fill:#ffcccc
style WRT fill:#ffffcc
style WTT fill:#ccffcc
Empfehlung: Workflow-Timeouts werden generell nicht empfohlen. Workflows sind für langlebige, resiliente Ausführung konzipiert. Timeouts sollten nur in Ausnahmefällen gesetzt werden.
2.3 Activities: Die Business Logic
2.3.1 Was sind Activities?
Activities sind normale Funktionen, die einzelne, klar definierte Aktionen ausführen. Im Gegensatz zu Workflows dürfen Activities:
- ✅ I/O-Operationen durchführen
- ✅ Externe APIs aufrufen
- ✅ Datenbanken lesen/schreiben
- ✅ Zufallszahlen generieren
- ✅ Aktuelle Systemzeit verwenden
- ✅ Side Effects haben
Activities sind der Ort für die eigentliche Business Logic.
Beispiel aus dem Code:
from temporalio import activity
@activity.defn
async def process_data(data: str) -> str:
"""
Activity für Datenverarbeitung.
Darf nicht-deterministische Operationen durchführen.
"""
activity.logger.info(f"Processing data: {data}")
# Simuliert externe API-Aufrufe, DB-Zugriffe, etc.
result = data.upper()
activity.logger.info(f"Data processed: {result}")
return result
@activity.defn
async def send_notification(message: str) -> None:
"""
Activity für Side Effects (E-Mail, Webhook, etc.)
"""
activity.logger.info(f"Sending notification: {message}")
# In der Praxis: Echter API-Aufruf
# await email_service.send(message)
# await webhook.post(message)
print(f"📧 Notification: {message}")
📁 Code-Beispiel:
../examples/part-01/chapter-02/activities.py
2.3.2 Activity-Timeouts
Activities haben vier verschiedene Timeout-Typen:
graph TB
subgraph "Activity Lifecycle"
Scheduled[Activity Scheduled<br/>in Queue]
Started[Activity Started<br/>by Worker]
Running[Activity Executing]
Complete[Activity Complete]
Scheduled -->|Schedule-To-Start| Started
Started -->|Start-To-Close| Complete
Running -->|Heartbeat| Running
Scheduled -->|Schedule-To-Close| Complete
end
style Scheduled fill:#e1f5ff
style Started fill:#fff4e1
style Running fill:#ffe1e1
style Complete fill:#e1ffe1
1. Start-To-Close Timeout (wichtigster):
await workflow.execute_activity(
process_data,
data,
start_to_close_timeout=timedelta(minutes=5), # Max. 5 Min pro Versuch
)
2. Schedule-To-Close Timeout (inkl. Retries):
await workflow.execute_activity(
process_data,
data,
schedule_to_close_timeout=timedelta(minutes=30), # Max. 30 Min total
)
3. Schedule-To-Start Timeout (selten benötigt):
await workflow.execute_activity(
process_data,
data,
schedule_to_start_timeout=timedelta(minutes=10), # Max. 10 Min in Queue
)
4. Heartbeat Timeout (für langlebige Activities):
await workflow.execute_activity(
long_running_task,
params,
heartbeat_timeout=timedelta(seconds=30), # Heartbeat alle 30s
)
Mindestens ein Timeout erforderlich: Start-To-Close ODER Schedule-To-Close.
2.3.3 Retry-Policies und Error Handling
Default Retry Policy (wenn nicht anders konfiguriert):
RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(seconds=100),
maximum_attempts=0, # 0 = unbegrenzt
)
Retry-Berechnung:
retry_wait = min(
initial_interval × (backoff_coefficient ^ retry_count),
maximum_interval
)
Beispiel: Bei initial_interval=1s und backoff_coefficient=2:
- Retry 1: nach 1 Sekunde
- Retry 2: nach 2 Sekunden
- Retry 3: nach 4 Sekunden
- Retry 4: nach 8 Sekunden
- …
Custom Retry Policy:
from temporalio.common import RetryPolicy
@workflow.defn
class RobustWorkflow:
@workflow.run
async def run(self):
result = await workflow.execute_activity(
flaky_external_api,
params,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(minutes=1),
backoff_coefficient=2,
maximum_attempts=5,
# Diese Fehler NICHT wiederholen
non_retryable_error_types=["InvalidInputError", "AuthError"],
),
)
return result
Non-Retryable Errors:
from temporalio.exceptions import ApplicationError
@activity.defn
async def validate_input(data: str) -> str:
if not data:
# Dieser Fehler wird NICHT wiederholt
raise ApplicationError(
"Input cannot be empty",
non_retryable=True
)
return data
2.3.4 Heartbeats für langlebige Activities
Für Activities, die lange laufen (mehrere Minuten oder länger), bieten Heartbeats zwei Vorteile:
- Schnellere Failure Detection: Service erkennt Worker-Crashes sofort
- Progress Tracking: Bei Restart kann Activity von letztem Checkpoint fortsetzen
from temporalio import activity
@activity.defn
async def process_large_file(file_path: str, total_items: int) -> str:
"""
Verarbeitet große Datei mit Progress-Tracking.
"""
start_index = 0
# Recover von vorherigem Progress
if activity.info().is_heartbeat_details_available():
start_index = activity.heartbeat_details()[0]
activity.logger.info(f"Resuming from index {start_index}")
for i in range(start_index, total_items):
# Verarbeite Item
await process_item(i)
# Heartbeat mit Progress
activity.heartbeat(i)
return f"Processed {total_items} items"
Workflow-Seite:
result = await workflow.execute_activity(
process_large_file,
args=["big_file.csv", 10000],
start_to_close_timeout=timedelta(hours=2),
heartbeat_timeout=timedelta(seconds=30), # Erwarte Heartbeat alle 30s
)
Wann Heartbeats nutzen?
- ✅ Große Datei-Downloads oder -Verarbeitung
- ✅ ML-Model-Training
- ✅ Batch-Processing mit vielen Items
- ❌ Schnelle API-Aufrufe (< 1 Minute)
2.3.5 Idempotenz – Die wichtigste Best Practice
Activities sollten IMMER idempotent sein: Mehrfache Ausführung = gleiches Ergebnis.
Warum?
- Temporal garantiert At-Least-Once Execution für Activities
- Bei Netzwerkfehlern kann unklar sein, ob Activity erfolgreich war
- Temporal wiederholt die Activity im Zweifel
Beispiel: Geldüberweisung (nicht idempotent):
# ❌ GEFÄHRLICH: Nicht idempotent
@activity.defn
async def transfer_money(from_account: str, to_account: str, amount: float):
# Was passiert bei Retry?
# → Geld wird mehrfach überwiesen!
await bank_api.transfer(from_account, to_account, amount)
Lösung: Idempotency Keys
# ✅ SICHER: Idempotent
@activity.defn
async def transfer_money(
from_account: str,
to_account: str,
amount: float,
idempotency_key: str
):
# Prüfe ob bereits ausgeführt
if await bank_api.is_processed(idempotency_key):
return await bank_api.get_result(idempotency_key)
# Führe Überweisung aus
result = await bank_api.transfer(
from_account,
to_account,
amount,
idempotency_key=idempotency_key
)
return result
Idempotency Key Generierung im Workflow:
@workflow.defn
class PaymentWorkflow:
@workflow.run
async def run(self, order_id: str, amount: float):
# Generiere deterministischen Idempotency Key
idempotency_key = f"payment-{order_id}-{workflow.info().run_id}"
await workflow.execute_activity(
transfer_money,
args=[
"account-A",
"account-B",
amount,
idempotency_key
],
start_to_close_timeout=timedelta(minutes=5),
)
2.3.6 Local Activities – Der Spezialfall
Local Activities werden im gleichen Prozess wie der Workflow ausgeführt, ohne separate Task Queue:
result = await workflow.execute_local_activity(
quick_calculation,
params,
start_to_close_timeout=timedelta(seconds=5),
)
Wann nutzen?
- ✅ Sehr kurze Activities (< 1 Sekunde)
- ✅ Hoher Throughput erforderlich (1000+ Activities/Sekunde)
- ✅ Einfache Berechnungen ohne externe Dependencies
Limitierungen:
- ❌ Keine Heartbeats
- ❌ Bei Retry wird gesamte Activity wiederholt (kein Checkpoint)
- ❌ Höheres Risiko bei nicht-idempotenten Operationen
Empfehlung: Nutze reguläre Activities als Default. Local Activities nur für sehr spezifische Performance-Optimierungen.
2.4 Workers: Die Laufzeitumgebung
2.4.1 Worker-Architektur
Workers sind eigenständige Prozesse, die außerhalb des Temporal Service laufen und:
- Task Queues pollen (long-polling RPC)
- Workflow- und Activity-Code ausführen
- Ergebnisse zurück an Temporal Service senden
graph TB
subgraph "Worker Process"
WW[Workflow Worker<br/>Führt Workflow-Code aus]
AW[Activity Worker<br/>Führt Activity-Code aus]
Poller1[Workflow Task Poller]
Poller2[Activity Task Poller]
end
subgraph "Temporal Service"
WQ[Workflow Task Queue]
AQ[Activity Task Queue]
end
Poller1 -.->|Long Poll| WQ
WQ -.->|Task| Poller1
Poller1 --> WW
Poller2 -.->|Long Poll| AQ
AQ -.->|Task| Poller2
Poller2 --> AW
style WW fill:#e1f5ff
style AW fill:#ffe1e1
Worker Setup – Beispiel aus dem Code:
from temporalio.worker import Worker
from shared.temporal_helpers import create_temporal_client
async def main():
# 1. Verbinde zu Temporal
client = await create_temporal_client()
# 2. Erstelle Worker
worker = Worker(
client,
task_queue="book-examples",
workflows=[DataProcessingWorkflow], # Registriere Workflows
activities=[process_data, send_notification], # Registriere Activities
)
# 3. Starte Worker (blockiert bis Ctrl+C)
await worker.run()
📁 Code-Beispiel:
../examples/part-01/chapter-02/worker.py
2.4.2 Task Queues und Polling
Task Queue Eigenschaften:
- Lightweight: Dynamisch erstellt, keine explizite Registration
- On-Demand: Wird beim ersten Workflow/Activity-Start erstellt
- Persistent: Tasks bleiben erhalten bei Worker-Ausfällen
- Load Balancing: Automatische Verteilung über alle Worker
Long-Polling Mechanismus:
sequenceDiagram
participant Worker
participant Service as Temporal Service
participant Queue as Task Queue
loop Kontinuierliches Polling
Worker->>Service: Poll für Tasks (RPC)
alt Task verfügbar
Queue->>Service: Task
Service-->>Worker: Task
Worker->>Worker: Execute Task
Worker->>Service: Complete Task
else Keine Tasks
Note over Service: Verbindung bleibt offen
Note over Service: Wartet bis Task oder Timeout
Service-->>Worker: Keine Tasks (nach Timeout)
end
end
Pull-basiert, nicht Push:
- Worker holen Tasks nur, wenn Kapazität vorhanden
- Verhindert Überlastung
- Automatisches Backpressure-Handling
2.4.3 Task Queue Routing und Partitioning
Routing-Strategien:
# 1. Standard: Ein Task Queue für alles
worker = Worker(
client,
task_queue="default",
workflows=[WorkflowA, WorkflowB],
activities=[activity1, activity2, activity3],
)
# 2. Separierung nach Funktion
workflow_worker = Worker(
client,
task_queue="workflows",
workflows=[WorkflowA, WorkflowB],
)
activity_worker = Worker(
client,
task_queue="activities",
activities=[activity1, activity2, activity3],
)
# 3. Isolation kritischer Activities (Bulkheading)
critical_worker = Worker(
client,
task_queue="critical-activities",
activities=[payment_activity],
)
background_worker = Worker(
client,
task_queue="background-activities",
activities=[send_email, generate_report],
)
Warum Isolation?
- Verhindert, dass langsame Activities kritische blockieren
- Bessere Ressourcen-Allokation
- Dedizierte Skalierung möglich
Task Queue Partitioning:
# Default: 4 Partitionen
# → Höherer Throughput, keine FIFO-Garantie
# Single Partition für FIFO-Garantie
# (via Temporal Server Config)
2.4.4 Sticky Execution – Performance-Optimierung
Problem: Bei jedem Workflow Task muss Worker die komplette Event History laden und Workflow replayed.
Lösung: Sticky Execution
sequenceDiagram
participant W1 as Worker 1
participant W2 as Worker 2
participant Service
participant NQ as Normal Queue
participant SQ as Sticky Queue (Worker 1)
W1->>Service: Poll Normal Queue
Service-->>W1: Workflow Task (WF-123)
W1->>W1: Execute + Cache State
W1->>Service: Complete
Service->>SQ: Nächster Task für WF-123 → Sticky Queue
W1->>Service: Poll Sticky Queue
Service-->>W1: Workflow Task (WF-123)
Note over W1: State im Cache!<br/>Kein History Reload
W1->>W1: Execute (sehr schnell)
W1->>Service: Complete
Note over Service: Timeout (5s default)
Service->>NQ: Task zurück zu Normal Queue
W2->>Service: Poll Normal Queue
Service-->>W2: Workflow Task (WF-123)
Note over W2: Kein Cache<br/>History Reload + Replay
Vorteile:
- 10-100x schnellere Task-Verarbeitung
- Reduzierte Last auf History Service
- Geringere Latenz
Automatisch aktiviert – keine Konfiguration erforderlich!
2.4.5 Worker Scaling und Deployment
Horizontal Scaling:
Workers sind stateless – Workflow-State ist im Temporal Service, nicht im Worker.
# Gleicher Code auf allen Workers
# Kann beliebig skaliert werden
# Worker 1 (Server A)
worker1 = Worker(client, task_queue="production", ...)
await worker1.run()
# Worker 2 (Server B)
worker2 = Worker(client, task_queue="production", ...)
await worker2.run()
# Worker 3 (Server C)
worker3 = Worker(client, task_queue="production", ...)
await worker3.run()
Deployment Patterns:
- Dedizierte Worker Processes (empfohlen für Production):
# Separate Prozesse nur für Temporal
python worker.py
- Combined Worker + Application:
# Im gleichen Prozess wie Web Server
# Nur für Development/kleine Apps
async def start_services():
# Starte Web Server
web_server = await start_web_server()
# Starte Worker (im Hintergrund)
worker = Worker(...)
asyncio.create_task(worker.run())
- Worker Fleets (High Availability):
Kubernetes Deployment:
- 10+ Worker Pods
- Auto-Scaling basierend auf Task Queue Länge
- Rolling Updates ohne Downtime
Skalierungs-Strategien:
| Szenario | Lösung |
|---|---|
| Höherer Workflow-Throughput | Mehr Worker Processes |
| Langlebige Activities | Mehr Activity Task Slots pro Worker |
| CPU-intensive Activities | Weniger Slots, mehr CPU pro Worker |
| I/O-bound Activities | Mehr Slots, weniger CPU pro Worker |
| Kritische Activities isolieren | Separate Task Queue + dedizierte Worker |
2.4.6 Worker Tuning und Konfiguration
Task Slots – Concurrency Control:
worker = Worker(
client,
task_queue="production",
workflows=[...],
activities=[...],
max_concurrent_workflow_tasks=100, # Max. parallele Workflow Tasks
max_concurrent_activities=50, # Max. parallele Activities
max_concurrent_local_activities=100, # Max. parallele Local Activities
)
Resource-Based Auto-Tuning (empfohlen):
from temporalio.worker import ResourceBasedTunerConfig, ResourceBasedSlotConfig
worker = Worker(
client,
task_queue="production",
workflows=[...],
activities=[...],
tuner=ResourceBasedTunerConfig(
# Workflow Task Slots
workflow_task_slot_supplier=ResourceBasedSlotConfig(
target_cpu_usage=0.8, # Ziel: 80% CPU
target_memory_usage=0.8, # Ziel: 80% Memory
minimum_slots=5,
maximum_slots=100,
),
# Activity Task Slots
activity_task_slot_supplier=ResourceBasedSlotConfig(
target_cpu_usage=0.7,
target_memory_usage=0.7,
),
),
)
Vorteile:
- Verhindert Out-of-Memory Errors
- Optimiert Durchsatz automatisch
- Passt sich an Workload an
2.5 Das Zusammenspiel: Ein komplettes Beispiel
Betrachten wir ein vollständiges Beispiel: Datenverarbeitung mit Benachrichtigung.
2.5.1 Der komplette Flow
sequenceDiagram
participant Client
participant Service as Temporal Service
participant WQ as Workflow Task Queue
participant AQ as Activity Task Queue
participant Worker
Client->>Service: Start Workflow "DataProcessing"
Service->>Service: Create Event History
Service->>Service: Write WorkflowExecutionStarted
Service->>WQ: Create Workflow Task
Worker->>WQ: Poll
WQ-->>Worker: Workflow Task
Worker->>Worker: Execute Workflow Code
Note over Worker: Code: execute_activity(process_data)
Worker->>Service: Commands [ScheduleActivity(process_data)]
Service->>Service: Write ActivityTaskScheduled
Service->>AQ: Create Activity Task
Worker->>AQ: Poll
AQ-->>Worker: Activity Task (process_data)
Worker->>Worker: Execute Activity Function
Note over Worker: Actual data processing
Worker->>Service: Activity Result
Service->>Service: Write ActivityTaskCompleted
Service->>WQ: Create new Workflow Task
Worker->>WQ: Poll
WQ-->>Worker: Workflow Task
Worker->>Worker: Replay + Continue
Note over Worker: Code: execute_activity(send_notification)
Worker->>Service: Commands [ScheduleActivity(send_notification)]
Service->>AQ: Create Activity Task
Worker->>AQ: Poll
AQ-->>Worker: Activity Task (send_notification)
Worker->>Worker: Execute send_notification
Worker->>Service: Activity Result
Service->>Service: Write ActivityTaskCompleted
Service->>WQ: Create Workflow Task
Worker->>WQ: Poll
WQ-->>Worker: Workflow Task
Worker->>Worker: Replay + Complete
Worker->>Service: Commands [CompleteWorkflow]
Service->>Service: Write WorkflowExecutionCompleted
Client->>Service: Get Result
Service-->>Client: {"status": "completed", ...}
2.5.2 Event History Timeline
Die Event History für diesen Flow:
1. WorkflowExecutionStarted
- WorkflowType: DataProcessingWorkflow
- Input: "Sample Data"
2. WorkflowTaskScheduled
3. WorkflowTaskStarted
4. WorkflowTaskCompleted
- Commands: [ScheduleActivityTask(process_data)]
5. ActivityTaskScheduled
- ActivityType: process_data
6. ActivityTaskStarted
7. ActivityTaskCompleted
- Result: "SAMPLE DATA"
8. WorkflowTaskScheduled
9. WorkflowTaskStarted
10. WorkflowTaskCompleted
- Commands: [ScheduleActivityTask(send_notification)]
11. ActivityTaskScheduled
- ActivityType: send_notification
12. ActivityTaskStarted
13. ActivityTaskCompleted
14. WorkflowTaskScheduled
15. WorkflowTaskStarted
16. WorkflowTaskCompleted
- Commands: [CompleteWorkflowExecution]
17. WorkflowExecutionCompleted
- Result: {"input": "Sample Data", "output": "SAMPLE DATA", "status": "completed"}
2.5.3 Code-Beispiel ausführen
Voraussetzungen:
# 1. Temporal Server starten (Docker)
docker compose up -d
# 2. Dependencies installieren
cd ../examples/part-01/chapter-02
uv sync
Terminal 1 – Worker starten:
uv run python worker.py
Ausgabe:
INFO - Starting Temporal Worker...
INFO - Worker registered workflows and activities:
INFO - - Workflows: ['DataProcessingWorkflow']
INFO - - Activities: ['process_data', 'send_notification']
INFO - Worker is running and polling for tasks...
INFO - Press Ctrl+C to stop
Terminal 2 – Workflow starten:
uv run python workflow.py
Ausgabe:
INFO - Processing data: Sample Data
INFO - Data processed successfully: SAMPLE DATA
INFO - Sending notification: Processed: SAMPLE DATA
📧 Notification: Processed: SAMPLE DATA
INFO - Notification sent successfully
✅ Workflow Result: {'input': 'Sample Data', 'output': 'SAMPLE DATA', 'status': 'completed'}
2.6 Best Practices
2.6.1 Workflow Best Practices
-
Orchestrieren, nicht Implementieren
# ❌ Schlecht: Business Logic im Workflow @workflow.defn class BadWorkflow: @workflow.run async def run(self, data: list): result = [] for item in data: # Komplexe Business Logic processed = item.strip().upper().replace("_", "-") result.append(processed) return result # ✅ Gut: Logic in Activity @workflow.defn class GoodWorkflow: @workflow.run async def run(self, data: list): return await workflow.execute_activity( process_items, data, start_to_close_timeout=timedelta(minutes=5) ) -
Kurze Workflow-Funktionen
- Lange Workflows in kleinere Child Workflows aufteilen
- Verbessert Wartbarkeit und Testbarkeit
-
Continue-As-New bei langen Laufzeiten
- Spätestens bei 10.000 Events
- Oder: Regelmäßige Checkpoints (täglich/wöchentlich)
-
Determinismus-Tests schreiben
from temporalio.testing import WorkflowEnvironment async def test_workflow_determinism(): async with await WorkflowEnvironment.start_time_skipping() as env: # Teste Workflow mit verschiedenen Szenarien ...
2.6.2 Activity Best Practices
-
IMMER idempotent
- Nutze Idempotency Keys
- Prüfe ob Operation bereits durchgeführt wurde
-
Passende Granularität
- Nicht zu fein: Bloated History
- Nicht zu grob: Schwierige Idempotenz, ineffiziente Retries
-
Timeouts immer setzen
- Mindestens Start-To-Close
- Heartbeats für langlebige Activities
-
Error Handling
@activity.defn async def robust_activity(params): try: return await external_api.call(params) except TemporaryError as e: # Retry durch Temporal raise except PermanentError as e: # Nicht wiederholen raise ApplicationError(str(e), non_retryable=True)
2.6.3 Worker Best Practices
-
Dedizierte Worker Processes in Production
- Nicht im gleichen Prozess wie Web Server
-
Task Queue Isolation für kritische Activities
# Zahlungen isoliert payment_worker = Worker( client, task_queue="payments", activities=[payment_activity], ) # Background Jobs separat background_worker = Worker( client, task_queue="background", activities=[email_activity, report_activity], ) -
Resource-Based Tuning nutzen
- Verhindert Out-of-Memory
- Optimiert Throughput automatisch
-
Monitoring und Metriken
# Wichtige Metriken überwachen: # - worker_task_slots_available (sollte >0 sein) # - temporal_sticky_cache_hit_total # - temporal_activity_execution_failed_total
2.7 Zusammenfassung
In diesem Kapitel haben wir die drei Kernbausteine von Temporal kennengelernt:
Workflows orchestrieren den gesamten Prozess:
- Deterministisch und replay-fähig
- Langlebig (Tage bis Jahre)
- Geschrieben in normalen Programmiersprachen
- Dürfen KEINE I/O-Operationen durchführen
Activities führen die eigentliche Arbeit aus:
- Nicht-deterministisch
- Dürfen I/O, externe APIs, Side Effects
- Automatische Retries mit konfigurierbaren Policies
- Sollten IMMER idempotent sein
Workers hostet Workflow- und Activity-Code:
- Pollen Task Queues via long-polling
- Stateless und horizontal skalierbar
- Führen Workflow-Replay und Activity-Execution aus
- Sticky Execution für Performance
Das große Bild:
graph TB
Client[Client Code]
Service[Temporal Service]
Worker[Worker Process]
Client -->|Start Workflow| Service
Service -->|Tasks via Queue| Worker
Worker -->|Workflow Code| WF[Workflows<br/>Orchestrierung]
Worker -->|Activity Code| AC[Activities<br/>Ausführung]
WF -->|Schedule| AC
Worker -->|Results| Service
Service -->|History| DB[(Event<br/>History)]
style WF fill:#e1f5ff
style AC fill:#ffe1e1
style Worker fill:#e1ffe1
Mit diesem Verständnis der Kernbausteine können wir im nächsten Kapitel tiefer in die Architektur des Temporal Service eintauchen und verstehen, wie Frontend, History Service, Matching Service und Persistence Layer zusammenarbeiten.
⬆ Zurück zum Inhaltsverzeichnis
Nächstes Kapitel: Kapitel 3: Architektur des Temporal Service
Code-Beispiele für dieses Kapitel: examples/part-01/chapter-02/