πŸ›‘οΈ Schema Validation#

Stream DaQ’s schema validation provides comprehensive data structure and constraint enforcement through Pydantic models. This validation system acts as the first line of defense in data quality monitoring, ensuring data conforms to expected formats before quality measures are computed.

Core Principles#

Type Safety and Constraint Enforcement

Schema validation enforces data integrity at multiple levels:

  • Type Validation: Ensures fields match expected data types (int, str, float, etc.)

  • Constraint Checking: Validates ranges, lengths, patterns, and custom business rules

  • Required Field Verification: Confirms presence of mandatory data elements

  • Format Validation: Checks email addresses, URLs, date formats, and custom patterns

Pydantic Integration

Stream DaQ leverages Pydantic’s powerful validation capabilities:

from pydantic import BaseModel, Field

class SensorReading(BaseModel):
    sensor_id: str = Field(..., min_length=5, pattern=r'^SENSOR_\d+$')
    temperature: float = Field(..., ge=-50, le=100)
    timestamp: int = Field(..., gt=0)
    location: Optional[str] = Field(None, max_length=100)

Flexible Alert Strategies

Schema validation provides sophisticated control over when and how violations are reported:

  • Persistent Alerts: Always report violations for complete audit trails

  • First-K Windows: Alert only during initial stabilization periods

  • Conditional Alerts: Apply business logic to determine alert criticality

Validation Modes#

Persistent Mode

Always alerts on schema violations, providing complete data quality audit trails:

validator = create_schema_validator(
    schema=SensorReading,
    alert_mode=AlertMode.PERSISTENT,
    raise_on_violation=True,  # Fail fast on critical violations
    log_violations=True       # Maintain complete audit log
)

Use Cases: - Financial transaction processing - Regulatory compliance systems - Safety-critical applications - Production data pipelines where data integrity is paramount

First-K Windows Mode

Alerts only during the first K windows with violations, then suppresses further alerts:

validator = create_schema_validator(
    schema=SensorReading,
    alert_mode=AlertMode.ONLY_ON_FIRST_K,
    k_windows=5,              # Alert for first 5 violation windows
    deflect_violating_records=True  # Separate invalid data for analysis
)

Use Cases: - System startup and initialization - Development and testing environments - IoT device deployment where initial calibration issues are expected - Migration scenarios with temporary data quality issues

Conditional Mode

Applies custom business logic to determine when violations should trigger alerts:

def alert_condition(window_data: dict) -> bool:
    """Alert only during high-value transactions or peak hours."""
    transaction_volume = window_data.get('total_transactions', 0)
    hour = datetime.now().hour
    return transaction_volume > 1000 or 9 <= hour <= 17

validator = create_schema_validator(
    schema=TransactionRecord,
    alert_mode=AlertMode.ONLY_IF,
    condition_func=alert_condition
)

Use Cases: - Business-aware data quality monitoring - Dynamic alert prioritization based on operational context - Resource-conscious environments where alert volume must be controlled - Multi-tenant systems with varying quality requirements

Error Handling Strategies#

Record Deflection

Invalid records can be automatically separated from the main data stream:

def write_deflected_records(data):
    # Send invalid records to separate processing pipeline
    pw.io.jsonlines.write(data, "invalid_records.jsonl")

validator = create_schema_validator(
    schema=DataModel,
    deflect_violating_records=True,
    deflection_sink=write_deflected_records,
    include_error_messages=True  # Include validation error details
)

Benefits: - Prevents invalid data from corrupting downstream analysis - Enables separate processing workflow for data cleaning - Maintains detailed error information for debugging - Supports data quality investigation and improvement

Fail-Fast Processing

For critical systems, validation can halt processing on violations:

validator = create_schema_validator(
    schema=CriticalData,
    raise_on_violation=True,    # Throw exception on validation failure
    log_violations=True         # Log details before failing
)

Benefits: - Ensures no invalid data enters critical processing pipelines - Provides immediate feedback on data quality issues - Supports strict compliance requirements - Enables automated recovery procedures

Data Quality Integration#

Pre-Processing Validation

Schema validation occurs before quality measures are computed, ensuring:

  • Clean Input Data: Only valid records contribute to quality statistics

  • Accurate Measures: Quality calculations aren’t skewed by malformed data

  • Reliable Baselines: Historical quality baselines reflect true data patterns

  • Consistent Analysis: Temporal comparisons use consistently structured data

Advanced Validation Patterns#

Multi-Level Validation

Combine multiple validation layers for comprehensive data quality:

class BasicValidation(BaseModel):
    """Essential field validation."""
    id: str = Field(..., min_length=1)
    timestamp: int = Field(..., gt=0)

class BusinessValidation(BaseModel):
    """Business rule validation."""
    amount: float = Field(..., ge=0)
    currency: str = Field(..., regex=r'^[A-Z]{3}$')

class ComplianceValidation(BaseModel):
    """Regulatory compliance validation."""
    customer_id: str = Field(..., min_length=10)
    risk_score: float = Field(..., ge=0, le=1)

Custom Validators

Implement domain-specific validation logic:

from pydantic import validator

class TransactionData(BaseModel):
    amount: float
    account_balance: float

    @validator('amount')
    def amount_must_be_reasonable(cls, v, values):
        balance = values.get('account_balance', 0)
        if v > balance * 1.1:  # Allow 10% overdraft
            raise ValueError('Amount exceeds available balance')
        return v

Cross-Field Validation

Validate relationships between multiple fields:

class OrderData(BaseModel):
    order_date: datetime
    ship_date: Optional[datetime]
    total_amount: float
    discount_amount: float

    @validator('ship_date')
    def ship_date_after_order(cls, v, values):
        if v and values.get('order_date'):
            if v < values['order_date']:
                raise ValueError('Ship date cannot be before order date')
        return v

    @validator('discount_amount')
    def discount_not_exceed_total(cls, v, values):
        total = values.get('total_amount', 0)
        if v > total:
            raise ValueError('Discount cannot exceed total amount')
        return v

Performance Considerations#

Validation Overhead

Schema validation adds computational overhead proportional to:

  • Record Complexity: Number and complexity of validated fields

  • Constraint Complexity: Custom validators and cross-field validations

  • Error Handling: Logging, deflection, and error message generation

For practical implementation examples, see πŸ§™β€β™‚οΈ Advanced Examples and the examples/schema_validation.py file.