Skip to main content

Data Model

BigQuery Dataset Structure

pivot-inc (project)
└── pos_data (dataset)
├── employees # Employee roster from POS
├── time_clocks # Clock in/out records
├── sales # Sales per employee
├── tips # Tips per employee
├── sync_metadata # Sync tracking
└── sync_log # Audit log

Table Schemas

employees

Unified employee roster across all POS systems.

CREATE TABLE `pivot-inc.pos_data.employees` (
-- Identity
id STRING NOT NULL, -- Pivot-generated UUID
pos_employee_id STRING NOT NULL, -- Original ID from POS
company_id STRING NOT NULL, -- Pivot company ID
pos_type STRING NOT NULL, -- 'toast', 'square', 'clover'

-- Employee Data
first_name STRING,
last_name STRING,
full_name STRING, -- Computed or from POS
email STRING,
phone STRING,

-- Employment
job_codes ARRAY<STRING>, -- Role/position codes
job_titles ARRAY<STRING>, -- Role/position names
hire_date DATE,
termination_date DATE,
is_active BOOL NOT NULL,

-- Metadata
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
synced_at TIMESTAMP NOT NULL,
raw_data JSON, -- Original POS response

-- Clustering for efficient queries
) PARTITION BY DATE(synced_at)
CLUSTER BY company_id, pos_type;

time_clocks

Clock in/out punch records.

CREATE TABLE `pivot-inc.pos_data.time_clocks` (
-- Identity
id STRING NOT NULL,
pos_punch_id STRING NOT NULL,
company_id STRING NOT NULL,
pos_type STRING NOT NULL,

-- Employee Reference
employee_id STRING NOT NULL, -- FK to employees.id
pos_employee_id STRING NOT NULL, -- Original employee ID

-- Punch Data
clock_in TIMESTAMP NOT NULL,
clock_out TIMESTAMP, -- NULL if still clocked in
break_minutes INT64 DEFAULT 0,

-- Computed
shift_date DATE NOT NULL, -- Date of shift start
hours_worked FLOAT64, -- Computed if clock_out exists

-- Location/Department
location_id STRING,
department STRING,
job_code STRING,

-- Metadata
synced_at TIMESTAMP NOT NULL,
raw_data JSON,
) PARTITION BY shift_date
CLUSTER BY company_id, employee_id;

sales

Individual sales transactions attributed to employees.

CREATE TABLE `pivot-inc.pos_data.sales` (
-- Identity
id STRING NOT NULL,
pos_transaction_id STRING NOT NULL, -- Original transaction ID
pos_line_id STRING, -- Line item within transaction
company_id STRING NOT NULL,
pos_type STRING NOT NULL,

-- Employee Attribution
employee_id STRING, -- FK to employees.id
pos_employee_id STRING, -- Original employee ID

-- Transaction Data
transaction_time TIMESTAMP NOT NULL,
transaction_date DATE NOT NULL,

-- Item Details
item_name STRING,
item_category STRING,
plu_number STRING, -- Product lookup number
quantity FLOAT64 NOT NULL DEFAULT 1,

-- Amounts
gross_amount FLOAT64 NOT NULL, -- Before discounts
discount_amount FLOAT64 DEFAULT 0,
net_amount FLOAT64 NOT NULL, -- After discounts
tax_amount FLOAT64 DEFAULT 0,

-- Flags
is_voided BOOL DEFAULT FALSE,
is_refund BOOL DEFAULT FALSE,
is_modifier BOOL DEFAULT FALSE,
parent_line_id STRING, -- For modifiers

-- Metadata
synced_at TIMESTAMP NOT NULL,
raw_data JSON,
) PARTITION BY transaction_date
CLUSTER BY company_id, employee_id;

tips

Tips attributed to employees.

CREATE TABLE `pivot-inc.pos_data.tips` (
-- Identity
id STRING NOT NULL,
pos_transaction_id STRING NOT NULL,
company_id STRING NOT NULL,
pos_type STRING NOT NULL,

-- Employee Attribution
employee_id STRING NOT NULL,
pos_employee_id STRING NOT NULL,

-- Tip Data
transaction_time TIMESTAMP NOT NULL,
transaction_date DATE NOT NULL,

tip_amount FLOAT64 NOT NULL,
tip_type STRING, -- 'credit_card', 'cash', 'service_charge'

-- Related Transaction
related_check_id STRING,
check_amount FLOAT64,

-- Metadata
synced_at TIMESTAMP NOT NULL,
raw_data JSON,
) PARTITION BY transaction_date
CLUSTER BY company_id, employee_id;

sync_metadata

Tracks sync state for each company.

CREATE TABLE `pivot-inc.pos_data.sync_metadata` (
company_id STRING NOT NULL,
pos_type STRING NOT NULL,

-- Sync State
last_sync_time TIMESTAMP,
last_successful_sync TIMESTAMP,
last_sync_status STRING, -- 'success', 'partial', 'failed'
last_error STRING,

-- Counts from last sync
employees_synced INT64 DEFAULT 0,
time_clocks_synced INT64 DEFAULT 0,
sales_synced INT64 DEFAULT 0,
tips_synced INT64 DEFAULT 0,

-- Schema Version (for migrations)
schema_version INT64 NOT NULL DEFAULT 1,

-- Metadata
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,

PRIMARY KEY (company_id, pos_type) NOT ENFORCED
);

sync_log

Audit log of all sync operations.

CREATE TABLE `pivot-inc.pos_data.sync_log` (
id STRING NOT NULL,
company_id STRING NOT NULL,
pos_type STRING NOT NULL,

-- Sync Details
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
status STRING NOT NULL, -- 'started', 'completed', 'failed'
error_message STRING,

-- Stats
duration_seconds FLOAT64,
employees_processed INT64 DEFAULT 0,
time_clocks_processed INT64 DEFAULT 0,
sales_processed INT64 DEFAULT 0,
tips_processed INT64 DEFAULT 0,

-- Request Details
trigger_source STRING, -- 'scheduler', 'manual', 'webhook'
request_id STRING,

-- Metadata
created_at TIMESTAMP NOT NULL,
) PARTITION BY DATE(started_at)
CLUSTER BY company_id;

Data Flow Diagram


Unified Data Models (TypeScript)

Employee

interface UnifiedEmployee {
id: string; // UUID generated by service
posEmployeeId: string; // Original ID from POS
companyId: string;
posType: 'toast' | 'square' | 'clover';

firstName?: string;
lastName?: string;
fullName: string;
email?: string;
phone?: string;

jobCodes: string[];
jobTitles: string[];
hireDate?: Date;
terminationDate?: Date;
isActive: boolean;

createdAt: Date;
updatedAt: Date;
syncedAt: Date;
rawData: object;
}

TimeClock

interface UnifiedTimeClock {
id: string;
posPunchId: string;
companyId: string;
posType: string;

employeeId: string;
posEmployeeId: string;

clockIn: Date;
clockOut?: Date;
breakMinutes: number;

shiftDate: Date;
hoursWorked?: number;

locationId?: string;
department?: string;
jobCode?: string;

syncedAt: Date;
rawData: object;
}

Sale

interface UnifiedSale {
id: string;
posTransactionId: string;
posLineId?: string;
companyId: string;
posType: string;

employeeId?: string;
posEmployeeId?: string;

transactionTime: Date;
transactionDate: Date;

itemName?: string;
itemCategory?: string;
pluNumber?: string;
quantity: number;

grossAmount: number;
discountAmount: number;
netAmount: number;
taxAmount: number;

isVoided: boolean;
isRefund: boolean;
isModifier: boolean;
parentLineId?: string;

syncedAt: Date;
rawData: object;
}

Tip

interface UnifiedTip {
id: string;
posTransactionId: string;
companyId: string;
posType: string;

employeeId: string;
posEmployeeId: string;

transactionTime: Date;
transactionDate: Date;

tipAmount: number;
tipType?: 'credit_card' | 'cash' | 'service_charge';

relatedCheckId?: string;
checkAmount?: number;

syncedAt: Date;
rawData: object;
}

Common Queries

Sales by Employee (Daily)

SELECT
e.full_name,
s.transaction_date,
SUM(s.net_amount) as total_sales,
COUNT(DISTINCT s.pos_transaction_id) as transaction_count
FROM `pivot-inc.pos_data.sales` s
JOIN `pivot-inc.pos_data.employees` e
ON s.employee_id = e.id
WHERE s.company_id = @company_id
AND s.transaction_date BETWEEN @start_date AND @end_date
AND s.is_voided = FALSE
GROUP BY e.full_name, s.transaction_date
ORDER BY s.transaction_date, total_sales DESC;

Tips by Employee (Weekly)

SELECT
e.full_name,
DATE_TRUNC(t.transaction_date, WEEK) as week,
SUM(t.tip_amount) as total_tips,
COUNT(*) as tip_count
FROM `pivot-inc.pos_data.tips` t
JOIN `pivot-inc.pos_data.employees` e
ON t.employee_id = e.id
WHERE t.company_id = @company_id
AND t.transaction_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 4 WEEK)
GROUP BY e.full_name, week
ORDER BY week DESC, total_tips DESC;

Hours Worked by Employee

SELECT
e.full_name,
tc.shift_date,
SUM(tc.hours_worked) as total_hours,
COUNT(*) as shift_count
FROM `pivot-inc.pos_data.time_clocks` tc
JOIN `pivot-inc.pos_data.employees` e
ON tc.employee_id = e.id
WHERE tc.company_id = @company_id
AND tc.shift_date BETWEEN @start_date AND @end_date
AND tc.clock_out IS NOT NULL
GROUP BY e.full_name, tc.shift_date
ORDER BY tc.shift_date, e.full_name;

Labor Cost Analysis

WITH hourly_data AS (
SELECT
e.id as employee_id,
e.full_name,
tc.shift_date,
SUM(tc.hours_worked) as hours_worked
FROM `pivot-inc.pos_data.time_clocks` tc
JOIN `pivot-inc.pos_data.employees` e ON tc.employee_id = e.id
WHERE tc.company_id = @company_id
AND tc.shift_date = @date
GROUP BY e.id, e.full_name, tc.shift_date
),
sales_data AS (
SELECT
employee_id,
transaction_date,
SUM(net_amount) as total_sales
FROM `pivot-inc.pos_data.sales`
WHERE company_id = @company_id
AND transaction_date = @date
AND is_voided = FALSE
GROUP BY employee_id, transaction_date
)
SELECT
h.full_name,
h.hours_worked,
COALESCE(s.total_sales, 0) as total_sales,
SAFE_DIVIDE(COALESCE(s.total_sales, 0), h.hours_worked) as sales_per_hour
FROM hourly_data h
LEFT JOIN sales_data s
ON h.employee_id = s.employee_id
ORDER BY sales_per_hour DESC;

Partitioning & Clustering Strategy

TablePartition ByCluster ByRationale
employeessynced_at (DAY)company_id, pos_typeQuery by company, partition limits scan
time_clocksshift_date (DAY)company_id, employee_idTime-range queries, filter by company
salestransaction_date (DAY)company_id, employee_idDate-range queries, per-employee attribution
tipstransaction_date (DAY)company_id, employee_idSame pattern as sales
sync_logstarted_at (DAY)company_idAudit queries by company and time

Benefits:

  • Partitioning reduces query costs (scan only relevant partitions)
  • Clustering speeds up filtered queries (company_id filter is very common)
  • Date partitioning aligns with typical query patterns

Data Retention

-- Auto-delete sync_log entries older than 90 days
ALTER TABLE `pivot-inc.pos_data.sync_log`
SET OPTIONS (
partition_expiration_days = 90
);

-- Keep transaction data indefinitely (or per customer agreement)
-- Consider archive strategy for data older than 2 years

Schema Migrations

Use version-based migration similar to pivot-kpi:

const SCHEMA_VERSION = 2;

async function checkAndMigrateSchema(companyId: string): Promise<void> {
const metadata = await getSyncMetadata(companyId);

if (!metadata || metadata.schemaVersion < SCHEMA_VERSION) {
logger.info('Schema migration required', {
companyId,
currentVersion: metadata?.schemaVersion || 0,
targetVersion: SCHEMA_VERSION,
});

// Trigger full re-sync with new schema
await triggerFullSync(companyId);

// Update version
await updateSchemaVersion(companyId, SCHEMA_VERSION);
}
}