Data Model
BigQuery Dataset Structure
pivot-inc (project)
└── pos_data (dataset)
├── employees # Employee roster from POS
├── time_clocks # Clock in/out records
├── sales # Sales per employee
├── tips # Tips per employee
├── sync_metadata # Sync tracking
└── sync_log # Audit log
Table Schemas
employees
Unified employee roster across all POS systems.
CREATE TABLE `pivot-inc.pos_data.employees` (
-- Identity
id STRING NOT NULL, -- Pivot-generated UUID
pos_employee_id STRING NOT NULL, -- Original ID from POS
company_id STRING NOT NULL, -- Pivot company ID
pos_type STRING NOT NULL, -- 'toast', 'square', 'clover'
-- Employee Data
first_name STRING,
last_name STRING,
full_name STRING, -- Computed or from POS
email STRING,
phone STRING,
-- Employment
job_codes ARRAY<STRING>, -- Role/position codes
job_titles ARRAY<STRING>, -- Role/position names
hire_date DATE,
termination_date DATE,
is_active BOOL NOT NULL,
-- Metadata
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
synced_at TIMESTAMP NOT NULL,
raw_data JSON, -- Original POS response
-- Clustering for efficient queries
) PARTITION BY DATE(synced_at)
CLUSTER BY company_id, pos_type;
time_clocks
Clock in/out punch records.
CREATE TABLE `pivot-inc.pos_data.time_clocks` (
-- Identity
id STRING NOT NULL,
pos_punch_id STRING NOT NULL,
company_id STRING NOT NULL,
pos_type STRING NOT NULL,
-- Employee Reference
employee_id STRING NOT NULL, -- FK to employees.id
pos_employee_id STRING NOT NULL, -- Original employee ID
-- Punch Data
clock_in TIMESTAMP NOT NULL,
clock_out TIMESTAMP, -- NULL if still clocked in
break_minutes INT64 DEFAULT 0,
-- Computed
shift_date DATE NOT NULL, -- Date of shift start
hours_worked FLOAT64, -- Computed if clock_out exists
-- Location/Department
location_id STRING,
department STRING,
job_code STRING,
-- Metadata
synced_at TIMESTAMP NOT NULL,
raw_data JSON,
) PARTITION BY shift_date
CLUSTER BY company_id, employee_id;
sales
Individual sales transactions attributed to employees.
CREATE TABLE `pivot-inc.pos_data.sales` (
-- Identity
id STRING NOT NULL,
pos_transaction_id STRING NOT NULL, -- Original transaction ID
pos_line_id STRING, -- Line item within transaction
company_id STRING NOT NULL,
pos_type STRING NOT NULL,
-- Employee Attribution
employee_id STRING, -- FK to employees.id
pos_employee_id STRING, -- Original employee ID
-- Transaction Data
transaction_time TIMESTAMP NOT NULL,
transaction_date DATE NOT NULL,
-- Item Details
item_name STRING,
item_category STRING,
plu_number STRING, -- Product lookup number
quantity FLOAT64 NOT NULL DEFAULT 1,
-- Amounts
gross_amount FLOAT64 NOT NULL, -- Before discounts
discount_amount FLOAT64 DEFAULT 0,
net_amount FLOAT64 NOT NULL, -- After discounts
tax_amount FLOAT64 DEFAULT 0,
-- Flags
is_voided BOOL DEFAULT FALSE,
is_refund BOOL DEFAULT FALSE,
is_modifier BOOL DEFAULT FALSE,
parent_line_id STRING, -- For modifiers
-- Metadata
synced_at TIMESTAMP NOT NULL,
raw_data JSON,
) PARTITION BY transaction_date
CLUSTER BY company_id, employee_id;
tips
Tips attributed to employees.
CREATE TABLE `pivot-inc.pos_data.tips` (
-- Identity
id STRING NOT NULL,
pos_transaction_id STRING NOT NULL,
company_id STRING NOT NULL,
pos_type STRING NOT NULL,
-- Employee Attribution
employee_id STRING NOT NULL,
pos_employee_id STRING NOT NULL,
-- Tip Data
transaction_time TIMESTAMP NOT NULL,
transaction_date DATE NOT NULL,
tip_amount FLOAT64 NOT NULL,
tip_type STRING, -- 'credit_card', 'cash', 'service_charge'
-- Related Transaction
related_check_id STRING,
check_amount FLOAT64,
-- Metadata
synced_at TIMESTAMP NOT NULL,
raw_data JSON,
) PARTITION BY transaction_date
CLUSTER BY company_id, employee_id;
sync_metadata
Tracks sync state for each company.
CREATE TABLE `pivot-inc.pos_data.sync_metadata` (
company_id STRING NOT NULL,
pos_type STRING NOT NULL,
-- Sync State
last_sync_time TIMESTAMP,
last_successful_sync TIMESTAMP,
last_sync_status STRING, -- 'success', 'partial', 'failed'
last_error STRING,
-- Counts from last sync
employees_synced INT64 DEFAULT 0,
time_clocks_synced INT64 DEFAULT 0,
sales_synced INT64 DEFAULT 0,
tips_synced INT64 DEFAULT 0,
-- Schema Version (for migrations)
schema_version INT64 NOT NULL DEFAULT 1,
-- Metadata
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
PRIMARY KEY (company_id, pos_type) NOT ENFORCED
);
sync_log
Audit log of all sync operations.
CREATE TABLE `pivot-inc.pos_data.sync_log` (
id STRING NOT NULL,
company_id STRING NOT NULL,
pos_type STRING NOT NULL,
-- Sync Details
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
status STRING NOT NULL, -- 'started', 'completed', 'failed'
error_message STRING,
-- Stats
duration_seconds FLOAT64,
employees_processed INT64 DEFAULT 0,
time_clocks_processed INT64 DEFAULT 0,
sales_processed INT64 DEFAULT 0,
tips_processed INT64 DEFAULT 0,
-- Request Details
trigger_source STRING, -- 'scheduler', 'manual', 'webhook'
request_id STRING,
-- Metadata
created_at TIMESTAMP NOT NULL,
) PARTITION BY DATE(started_at)
CLUSTER BY company_id;
Data Flow Diagram
Unified Data Models (TypeScript)
Employee
interface UnifiedEmployee {
id: string; // UUID generated by service
posEmployeeId: string; // Original ID from POS
companyId: string;
posType: 'toast' | 'square' | 'clover';
firstName?: string;
lastName?: string;
fullName: string;
email?: string;
phone?: string;
jobCodes: string[];
jobTitles: string[];
hireDate?: Date;
terminationDate?: Date;
isActive: boolean;
createdAt: Date;
updatedAt: Date;
syncedAt: Date;
rawData: object;
}
TimeClock
interface UnifiedTimeClock {
id: string;
posPunchId: string;
companyId: string;
posType: string;
employeeId: string;
posEmployeeId: string;
clockIn: Date;
clockOut?: Date;
breakMinutes: number;
shiftDate: Date;
hoursWorked?: number;
locationId?: string;
department?: string;
jobCode?: string;
syncedAt: Date;
rawData: object;
}
Sale
interface UnifiedSale {
id: string;
posTransactionId: string;
posLineId?: string;
companyId: string;
posType: string;
employeeId?: string;
posEmployeeId?: string;
transactionTime: Date;
transactionDate: Date;
itemName?: string;
itemCategory?: string;
pluNumber?: string;
quantity: number;
grossAmount: number;
discountAmount: number;
netAmount: number;
taxAmount: number;
isVoided: boolean;
isRefund: boolean;
isModifier: boolean;
parentLineId?: string;
syncedAt: Date;
rawData: object;
}
Tip
interface UnifiedTip {
id: string;
posTransactionId: string;
companyId: string;
posType: string;
employeeId: string;
posEmployeeId: string;
transactionTime: Date;
transactionDate: Date;
tipAmount: number;
tipType?: 'credit_card' | 'cash' | 'service_charge';
relatedCheckId?: string;
checkAmount?: number;
syncedAt: Date;
rawData: object;
}
Common Queries
Sales by Employee (Daily)
SELECT
e.full_name,
s.transaction_date,
SUM(s.net_amount) as total_sales,
COUNT(DISTINCT s.pos_transaction_id) as transaction_count
FROM `pivot-inc.pos_data.sales` s
JOIN `pivot-inc.pos_data.employees` e
ON s.employee_id = e.id
WHERE s.company_id = @company_id
AND s.transaction_date BETWEEN @start_date AND @end_date
AND s.is_voided = FALSE
GROUP BY e.full_name, s.transaction_date
ORDER BY s.transaction_date, total_sales DESC;
Tips by Employee (Weekly)
SELECT
e.full_name,
DATE_TRUNC(t.transaction_date, WEEK) as week,
SUM(t.tip_amount) as total_tips,
COUNT(*) as tip_count
FROM `pivot-inc.pos_data.tips` t
JOIN `pivot-inc.pos_data.employees` e
ON t.employee_id = e.id
WHERE t.company_id = @company_id
AND t.transaction_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 4 WEEK)
GROUP BY e.full_name, week
ORDER BY week DESC, total_tips DESC;
Hours Worked by Employee
SELECT
e.full_name,
tc.shift_date,
SUM(tc.hours_worked) as total_hours,
COUNT(*) as shift_count
FROM `pivot-inc.pos_data.time_clocks` tc
JOIN `pivot-inc.pos_data.employees` e
ON tc.employee_id = e.id
WHERE tc.company_id = @company_id
AND tc.shift_date BETWEEN @start_date AND @end_date
AND tc.clock_out IS NOT NULL
GROUP BY e.full_name, tc.shift_date
ORDER BY tc.shift_date, e.full_name;
Labor Cost Analysis
WITH hourly_data AS (
SELECT
e.id as employee_id,
e.full_name,
tc.shift_date,
SUM(tc.hours_worked) as hours_worked
FROM `pivot-inc.pos_data.time_clocks` tc
JOIN `pivot-inc.pos_data.employees` e ON tc.employee_id = e.id
WHERE tc.company_id = @company_id
AND tc.shift_date = @date
GROUP BY e.id, e.full_name, tc.shift_date
),
sales_data AS (
SELECT
employee_id,
transaction_date,
SUM(net_amount) as total_sales
FROM `pivot-inc.pos_data.sales`
WHERE company_id = @company_id
AND transaction_date = @date
AND is_voided = FALSE
GROUP BY employee_id, transaction_date
)
SELECT
h.full_name,
h.hours_worked,
COALESCE(s.total_sales, 0) as total_sales,
SAFE_DIVIDE(COALESCE(s.total_sales, 0), h.hours_worked) as sales_per_hour
FROM hourly_data h
LEFT JOIN sales_data s
ON h.employee_id = s.employee_id
ORDER BY sales_per_hour DESC;
Partitioning & Clustering Strategy
| Table | Partition By | Cluster By | Rationale |
|---|---|---|---|
| employees | synced_at (DAY) | company_id, pos_type | Query by company, partition limits scan |
| time_clocks | shift_date (DAY) | company_id, employee_id | Time-range queries, filter by company |
| sales | transaction_date (DAY) | company_id, employee_id | Date-range queries, per-employee attribution |
| tips | transaction_date (DAY) | company_id, employee_id | Same pattern as sales |
| sync_log | started_at (DAY) | company_id | Audit queries by company and time |
Benefits:
- Partitioning reduces query costs (scan only relevant partitions)
- Clustering speeds up filtered queries (company_id filter is very common)
- Date partitioning aligns with typical query patterns
Data Retention
-- Auto-delete sync_log entries older than 90 days
ALTER TABLE `pivot-inc.pos_data.sync_log`
SET OPTIONS (
partition_expiration_days = 90
);
-- Keep transaction data indefinitely (or per customer agreement)
-- Consider archive strategy for data older than 2 years
Schema Migrations
Use version-based migration similar to pivot-kpi:
const SCHEMA_VERSION = 2;
async function checkAndMigrateSchema(companyId: string): Promise<void> {
const metadata = await getSyncMetadata(companyId);
if (!metadata || metadata.schemaVersion < SCHEMA_VERSION) {
logger.info('Schema migration required', {
companyId,
currentVersion: metadata?.schemaVersion || 0,
targetVersion: SCHEMA_VERSION,
});
// Trigger full re-sync with new schema
await triggerFullSync(companyId);
// Update version
await updateSchemaVersion(companyId, SCHEMA_VERSION);
}
}