Technical Architecture
System Overview
The POS Service follows the same architectural patterns as pivot-kpi, adapted for POS data ingestion.
Project Structure
pivot-pos/
├── src/
│ ├── index.ts # Express server entry
│ ├── routes/
│ │ ├── health.ts # Health check endpoints
│ │ ├── sync.ts # Sync trigger endpoints
│ │ └── webhooks.ts # Webhook receivers (future)
│ ├── services/
│ │ ├── sync-engine.ts # Orchestrates sync process
│ │ ├── bigquery.ts # BigQuery client wrapper
│ │ └── firebase.ts # Firebase RTDB client
│ ├── integrations/
│ │ ├── base/
│ │ │ ├── types.ts # Common POS types
│ │ │ ├── fetcher.ts # Base fetcher interface
│ │ │ └── mapper.ts # Base data mapper
│ │ ├── toast/
│ │ │ ├── auth.ts # Toast authentication
│ │ │ ├── fetcher.ts # Toast API client
│ │ │ ├── mapper.ts # Toast to unified model
│ │ │ └── types.ts # Toast-specific types
│ │ ├── square/ # (Future)
│ │ └── clover/ # (Future)
│ ├── models/
│ │ ├── employee.ts # Unified employee model
│ │ ├── time-clock.ts # Unified time clock model
│ │ ├── sale.ts # Unified sale model
│ │ └── tip.ts # Unified tip model
│ └── utils/
│ ├── logger.ts # Structured logging
│ ├── retry.ts # Retry with backoff
│ └── config.ts # Environment config
├── sql/
│ ├── schema/
│ │ ├── employees.sql # BigQuery table DDL
│ │ ├── time_clocks.sql
│ │ ├── sales.sql
│ │ └── tips.sql
│ └── queries/
│ ├── get-last-sync.sql
│ └── aggregate-sales.sql
├── terraform/ # Infrastructure as code
│ ├── main.tf
│ ├── variables.tf
│ └── outputs.tf
├── .github/
│ └── workflows/
│ └── deploy.yml # CI/CD pipeline
├── Dockerfile
├── package.json
├── tsconfig.json
└── CLAUDE.md
Cloud Run Configuration
Based on pivot-kpi patterns, configured via Terraform:
resource "google_cloud_run_service" "pivot_pos" {
name = "pivot-pos-service"
location = "us-central1"
template {
spec {
containers {
image = "gcr.io/${var.project_id}/pivot-pos:latest"
resources {
limits = {
memory = "2Gi"
cpu = "2"
}
}
env {
name = "GOOGLE_CLOUD_PROJECT_ID"
value = "pivot-inc"
}
env {
name = "TOAST_CLIENT_ID"
value_from {
secret_key_ref {
name = "pivot-pos-toast-client-id"
key = "latest"
}
}
}
env {
name = "TOAST_CLIENT_SECRET"
value_from {
secret_key_ref {
name = "pivot-pos-toast-client-secret"
key = "latest"
}
}
}
}
service_account_name = google_service_account.pivot_pos.email
timeout_seconds = 3600 # 1 hour for long syncs
}
metadata {
annotations = {
"autoscaling.knative.dev/minScale" = "0"
"autoscaling.knative.dev/maxScale" = "10"
}
}
}
lifecycle {
ignore_changes = [
template[0].spec[0].containers[0].image
]
}
}
Service Account Permissions
resource "google_service_account" "pivot_pos" {
account_id = "pivot-pos-cloudrun"
display_name = "Pivot POS Service"
}
# BigQuery Data Editor (write data)
resource "google_project_iam_member" "bigquery_editor" {
project = var.project_id
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.pivot_pos.email}"
}
# BigQuery Job User (run queries)
resource "google_project_iam_member" "bigquery_job_user" {
project = var.project_id
role = "roles/bigquery.jobUser"
member = "serviceAccount:${google_service_account.pivot_pos.email}"
}
# Secret Manager Accessor
resource "google_project_iam_member" "secret_accessor" {
project = var.project_id
role = "roles/secretmanager.secretAccessor"
member = "serviceAccount:${google_service_account.pivot_pos.email}"
}
# Firebase RTDB Viewer (read company settings)
resource "google_project_iam_member" "firebase_viewer" {
project = var.project_id
role = "roles/firebase.viewer"
member = "serviceAccount:${google_service_account.pivot_pos.email}"
}
Cloud Scheduler Configuration
resource "google_cloud_scheduler_job" "pos_sync_hourly" {
name = "pivot-pos-sync-hourly"
description = "Trigger hourly POS data sync for all active integrations"
schedule = "0 * * * *" # Every hour
time_zone = "America/New_York"
http_target {
http_method = "POST"
uri = "${google_cloud_run_service.pivot_pos.status[0].url}/sync/all"
oidc_token {
service_account_email = google_service_account.scheduler.email
}
}
retry_config {
retry_count = 3
min_backoff_duration = "30s"
max_backoff_duration = "300s"
}
attempt_deadline = "600s" # 10 minute deadline
}
API Endpoints
Health Check
GET /health
Response: { "status": "healthy", "timestamp": "2025-12-02T..." }
Sync All Integrations
POST /sync/all
Authorization: Bearer <Cloud Scheduler OIDC token>
Response: {
"started": 15,
"skipped": 3,
"message": "Sync initiated for 15 companies"
}
Sync Single Company
POST /sync/toast/:companyId
Authorization: Bearer <service token>
Response: {
"companyId": "abc123",
"status": "completed",
"records": {
"employees": 25,
"timeClocks": 150,
"sales": 1200,
"tips": 1200
},
"duration": 45.2
}
Sync Status
GET /sync/status/:companyId
Response: {
"companyId": "abc123",
"lastSync": "2025-12-02T10:00:00Z",
"status": "healthy",
"nextScheduled": "2025-12-02T11:00:00Z"
}
Sync Engine Flow
Error Handling
Retry Strategy
const RETRY_CONFIG = {
maxAttempts: 5,
initialDelay: 1000, // 1 second
maxDelay: 30000, // 30 seconds
backoffMultiplier: 2,
retryableErrors: [
'ECONNRESET',
'ETIMEDOUT',
'RATE_LIMITED',
'SERVICE_UNAVAILABLE',
],
};
async function withRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let lastError: Error;
let delay = RETRY_CONFIG.initialDelay;
for (let attempt = 1; attempt <= RETRY_CONFIG.maxAttempts; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
if (!isRetryable(error) || attempt === RETRY_CONFIG.maxAttempts) {
logger.error(`${context} failed after ${attempt} attempts`, { error });
throw error;
}
logger.warn(`${context} attempt ${attempt} failed, retrying...`, {
error: error.message,
delay,
});
await sleep(delay);
delay = Math.min(delay * RETRY_CONFIG.backoffMultiplier, RETRY_CONFIG.maxDelay);
}
}
throw lastError;
}
Rate Limiting Compliance
Toast API limits: 20 req/sec, 10,000 req/15min
class RateLimiter {
private requestTimes: number[] = [];
private readonly maxPerSecond = 15; // Stay under 20
private readonly maxPer15Min = 9000; // Stay under 10,000
async throttle(): Promise<void> {
const now = Date.now();
// Clean old entries
this.requestTimes = this.requestTimes.filter(
t => now - t < 15 * 60 * 1000
);
// Check 15-minute limit
if (this.requestTimes.length >= this.maxPer15Min) {
const oldestIn15Min = this.requestTimes[0];
const waitTime = 15 * 60 * 1000 - (now - oldestIn15Min);
await sleep(waitTime);
}
// Check per-second limit
const recentRequests = this.requestTimes.filter(t => now - t < 1000);
if (recentRequests.length >= this.maxPerSecond) {
await sleep(1000 - (now - recentRequests[0]));
}
this.requestTimes.push(Date.now());
}
}
Logging
Structured logging for Cloud Logging integration:
import { createLogger, format, transports } from 'winston';
const logger = createLogger({
level: process.env.LOG_LEVEL || 'info',
format: format.combine(
format.timestamp(),
format.json()
),
defaultMeta: {
service: 'pivot-pos-service',
version: process.env.VERSION || 'unknown',
},
transports: [
new transports.Console(),
],
});
// Usage
logger.info('Starting sync', {
companyId: 'abc123',
posType: 'toast',
lastSync: '2025-12-01T10:00:00Z',
});
logger.error('Sync failed', {
companyId: 'abc123',
error: error.message,
stack: error.stack,
attempt: 3,
});
Monitoring & Alerting
Key Metrics
| Metric | Type | Alert Threshold |
|---|---|---|
| Sync success rate | Counter | <95% over 1 hour |
| Sync duration | Histogram | P95 > 5 minutes |
| API errors | Counter | >10 in 15 minutes |
| BigQuery insert errors | Counter | Any occurrence |
Cloud Monitoring Alerts
resource "google_monitoring_alert_policy" "sync_failures" {
display_name = "POS Sync Failure Rate"
conditions {
display_name = "Sync failure rate > 5%"
condition_threshold {
filter = "metric.type=\"logging.googleapis.com/user/pos_sync_status\" resource.type=\"cloud_run_revision\""
duration = "300s"
comparison = "COMPARISON_GT"
threshold_value = 0.05
aggregations {
alignment_period = "300s"
per_series_aligner = "ALIGN_RATE"
}
}
}
notification_channels = [var.pagerduty_channel]
}
CI/CD Pipeline
# .github/workflows/deploy.yml
name: Deploy POS Service
on:
push:
branches: [main]
paths:
- 'pivot-pos/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: '20'
- run: npm ci
working-directory: pivot-pos
- run: npm test
working-directory: pivot-pos
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_SA_KEY_DEV }}
- name: Build and push Docker image
run: |
gcloud builds submit \
--tag gcr.io/pivot-dev-59310/pivot-pos:${{ github.sha }} \
pivot-pos/
- name: Deploy to Cloud Run
run: |
gcloud run deploy pivot-pos-service \
--image gcr.io/pivot-dev-59310/pivot-pos:${{ github.sha }} \
--platform managed \
--region us-central1 \
--project pivot-dev-59310
Local Development
# Setup
cd pivot-pos
npm install
# Environment
cp .env.example .env
# Edit .env with local credentials
# Run locally
npm run dev # Starts on port 7492
# Test sync manually
curl -X POST http://localhost:7492/sync/toast/abc123
# Run tests
npm test
npm run test:integration
Local BigQuery Access
Uses the same service account pattern as pivot-kpi:
// src/services/bigquery.ts
import { BigQuery } from '@google-cloud/bigquery';
function getBigQueryClient(): BigQuery {
// Check for local service account file first
const saPath = process.env.GOOGLE_APPLICATION_CREDENTIALS;
if (saPath && fs.existsSync(saPath)) {
return new BigQuery({ keyFilename: saPath });
}
// Check for JSON credentials in env (Cloud Run)
const credentialsJson = process.env.GOOGLE_APPLICATION_CREDENTIALS_JSON;
if (credentialsJson) {
const credentials = JSON.parse(credentialsJson);
return new BigQuery({ credentials });
}
// Fall back to ADC
return new BigQuery();
}