Skip to main content

Temporal

Temporal does two things for MIA:

  1. Schedules cron-fired reports — every MIA "task" the user creates becomes a Temporal Schedule.
  2. Provides observability for in-flight chat pipelines — every /api/chat request signals a tracking workflow whose timeline reproduces the pipeline phases.

It does not execute tools. Tools are plain async functions called by executeTool() — see Agent SDK.

Connection

  • Address: process.env.TEMPORAL_ADDRESS (defaults to localhost:7233).
  • Task queue: pivot-ai-agent (single queue).
  • TLS is off for the dev-2 self-hosted server. Production setup is TBD — see TODOs.

Client setup: functions/modules/ai-memory/services/temporal-scheduler.service.ts (creates Connection + Client from @temporalio/client). Worker setup: functions/pivotAiAgent/temporal/worker.ts.

Workflows

WorkflowTriggerPurpose
scheduledReportWorkflowTemporal Scheduler (cron fire)Run the data pipeline for a saved task and email the result.
aiPipelineWorkflowOptional / batch useAlternate execution wrapper for the chat pipeline. Same logic as /api/chat, but as a workflow.
invitationReminderWorkflowDaily schedule (created at worker startup)Send reminders to employees invited 2+ days ago.
pipelineTrackingWorkflowStarted for every /api/chat requestObservability — receives signals from the SSE pipeline. Does not run tools.

Source: functions/pivotAiAgent/temporal/workflows.ts. Activity registration: worker.ts.

Activity retry / timeout defaults

  • Max 3 attempts.
  • Initial backoff 3s, coefficient 2x.
  • StartToClose timeout: 3 minutes for most activities (also for callModel).

Activities

Source: functions/pivotAiAgent/temporal/activities.ts. Note these are workflow-side activities, distinct from the chat-pipeline tools. They support the workflows above, not tool_use dispatch.

Data fetching (used by scheduledReportWorkflow)

  • fetchUserRulesActivitypivotAiAgentRules/{cid}/userRules
  • fetchCompanyMemoryActivitypivotAiAgentMemory/{cid}/companyProfile
  • fetchCompanyTimezoneActivityCompanies/{cid}.timezone
  • buildNameMapActivity — turn collected tool results' IDs into human-readable names

Models

  • callModel — unified LLM dispatcher (delegates to providers/anthropic.ts or providers/openrouter.ts). See Provider Selection.

Email + session

  • sendReportEmailActivity — markdown → HTML, dispatch via the configured email provider.
  • saveSessionActivityAIZackboardSessions/{cid}/{uid}/{sessionId} write.

Auth

  • getServiceTokenActivity — refresh the Firebase ID token mid-workflow when the user-supplied one has expired (user tokens last 1 hour; cron workflows often run later).

Reminders

  • sendInvitationRemindersActivity — scan invites > 2 days old and send.

Schedules

The ai-memory module creates per-task Temporal Schedules. Schedule ID format:

pivot-ai-report-{sanitizedCompanyId}-{taskId}

Schedule action:

{
type: 'startWorkflow',
workflowType: 'scheduledReportWorkflow',
taskQueue: 'pivot-ai-agent',
workflowTaskTimeout: '30s',
args: [{
taskId, title, reportQuery, companyId,
companyName, userId, userEmail, userName,
personaName: 'Mia',
firebaseToken: '' // workflow refreshes a service token if needed
}]
}

spec.timezone is the company's timezone so cron expressions fire in local time.

Tracking workflow

pipelineTrackingWorkflow is started fresh for every /api/chat request. The SSE handler signals it:

  • pipelineStep — once per pipeline phase (intent, tool iteration N, fan-out branch, etc.).
  • pipelineComplete — terminal success.
  • pipelineError — terminal failure.

The workflow replays each signaled step as a named activity on its own timeline, so an engineer can open Temporal Web UI and see the full per-request trace. Signal failures are logged but never fail the user request — observability is best-effort.

Graceful degradation

If Temporal is unreachable, the ai-memory module's scheduler service caches the last failure timestamp and waits 30 seconds before retrying. RTDB writes still succeed — cron tasks land in the Orphan state instead of failing outright.