🎯 Multi-Source Tasks#

Stream DaQ’s task-based architecture enables monitoring multiple independent data sources within a single StreamDaQ instance. This powerful feature eliminates the need to manage multiple monitoring processes while providing complete isolation and flexibility for each data source.

Why Multi-Source Monitoring?

In real-world scenarios, you often need to monitor diverse data streams simultaneously: IoT sensors, user events, financial transactions, system metrics. The task-based architecture lets you handle all of these with different configurations, windowing strategies, and quality checks from a unified interface.

Understanding Tasks#

A Task in Stream DaQ represents a complete data quality monitoring configuration for a single data source. Each task encapsulates:

  • Data Source: The stream to monitor

  • Windowing Strategy: How to group data over time

  • Quality Measures: What to check and assess

  • Schema Validation: Optional data validation rules

  • Output Configuration: Where to send results

  • Error Handling: Critical vs non-critical designation

Single-Source (Traditional)

One StreamDaQ instance monitors one data source with one configuration.

Multi-Source (Task-Based)

One StreamDaQ instance orchestrates multiple tasks, each monitoring different sources with independent configurations.

Task Independence#

Tasks operate completely independently:

Independent Configuration

Each task has its own windowing, schema validation, compact data handling, and sink operations.

Independent Execution

Tasks run concurrently without interfering with each other’s processing.

Independent Error Handling

Non-critical task failures don’t affect other tasks. Critical task failures can stop all monitoring.

Independent Output

Each task produces its own quality monitoring results at its own intervals.

API Evolution: Single to Multi-Source#

Stream DaQ’s API has evolved to support both single-source and multi-source scenarios while maintaining full backward compatibility.

### Traditional Single-Source API

# The original way (still works!)
daq = StreamDaQ().configure(
    source=sensor_data,
    window=Windows.tumbling(60),
    time_column="timestamp"
)
daq.add(dqm.count('readings'), assess=">50", name="volume")
daq.watch_out()

### New Multi-Source Task API

# The new way - explicit tasks
daq = StreamDaQ()

# Task 1: IoT sensors
iot_task = daq.new_task("iot_monitoring")
iot_task.configure(
    source=sensor_data,
    window=Windows.sliding(300, 60),  # Different windowing!
    time_column="sensor_timestamp",
    compact_data=CompactData()
)
iot_task.check(dqm.count('temperature'), must_be=">50", name="temp_volume")

# Task 2: User events
user_task = daq.new_task("user_analytics")
user_task.configure(
    source=user_data,
    window=Windows.tumbling(3600),  # Hourly windows
    time_column="event_time",
    instance="user_id"
)
user_task.check(dqm.distinct_count('action'), must_be=">3", name="engagement")

# Start monitoring all tasks
daq.watch_out()

### Mixed API Usage (Transition Approach)

# Combine old and new approaches
daq = StreamDaQ()

# Keep existing code (backward compatible)
daq.configure(source=legacy_data, window=Windows.tumbling(60), time_column="timestamp")
daq.check(dqm.count('events'), must_be=">10", name="legacy_count")

# Add new tasks
new_task = daq.new_task("additional_monitoring")
new_task.configure(source=new_data, window=Windows.sliding(120, 30), time_column="timestamp")
new_task.check(dqm.mean('values'), must_be="(0, 100)", name="avg_check")

daq.watch_out()  # Monitors both legacy and new tasks

Task Naming and Management#

### Automatic Task Naming

If you don’t provide a name, Stream DaQ automatically generates unique identifiers:

daq = StreamDaQ()
task1 = daq.new_task()  # Automatically named "task_1"
task2 = daq.new_task()  # Automatically named "task_2"
task3 = daq.new_task("custom_name")  # Explicitly named

### Task Management Operations

# List all tasks
task_names = daq.list_tasks()
print(f"Configured tasks: {task_names}")

# Get specific task
iot_task = daq.get_task("iot_monitoring")

# Remove task
daq.remove_task("old_task")

# Get task status
status = daq.get_task_status()
print(f"Total tasks: {status['total_tasks']}")

Critical vs Non-Critical Tasks#

Tasks can be designated as critical or non-critical, affecting error handling behavior:

Critical Tasks 🔴

Failure stops ALL monitoring immediately. Use for safety-critical, security, or financial systems.

Non-Critical Tasks 🟡

Failure is logged but other tasks continue. Use for analytics, reporting, or experimental features.

### Critical Task Examples

# Financial transaction monitoring (critical)
finance_task = daq.new_task("transactions", critical=True)
finance_task.configure(
    source=transaction_data,
    window=Windows.tumbling(60),
    time_column="timestamp",
    wait_for_late=0,  # No tolerance for late financial data
    schema_validator=strict_validator
)

# User analytics (non-critical)
analytics_task = daq.new_task("user_behavior", critical=False)
analytics_task.configure(
    source=user_data,
    window=Windows.tumbling(3600),
    time_column="timestamp"
)

### Error Handling Behavior

try:
    daq.watch_out()
except CriticalTaskFailureError as e:
    print(f"Critical task '{e.task_name}' failed: {e.original_error}")
    print("All monitoring stopped for safety")
    # Implement recovery procedures
except Exception as e:
    print(f"Non-critical task failure: {e}")
    print("Other tasks continue running")

Real-World Multi-Source Scenarios#

### Smart City Monitoring

daq = StreamDaQ()

# Environmental sensors (critical for public safety)
env_task = daq.new_task("environmental", critical=True)
env_task.configure(
    source=sensor_data,
    window=Windows.sliding(300, 60),
    compact_data=CompactData(),
    time_column="sensor_timestamp"
)
env_task.check(dqm.missing_count('air_quality'), must_be="<5", name="air_quality_availability")

# Traffic monitoring (non-critical)
traffic_task = daq.new_task("traffic", critical=False)
traffic_task.configure(
    source=traffic_data,
    window=Windows.tumbling(600),
    time_column="timestamp"
)
traffic_task.check(dqm.count('vehicles'), must_be="(10, 1000)", name="traffic_volume")

### E-commerce Platform

daq = StreamDaQ()

# Payment processing (critical)
payment_task = daq.new_task("payments", critical=True)
payment_task.configure(
    source=payment_stream,
    window=Windows.tumbling(60),
    schema_validator=payment_validator,
    wait_for_late=0
)

# User behavior analytics (non-critical)
behavior_task = daq.new_task("user_analytics", critical=False)
behavior_task.configure(
    source=clickstream_data,
    window=Windows.session(1800),  # 30-minute sessions
    instance="user_id"
)

# Inventory monitoring (critical)
inventory_task = daq.new_task("inventory", critical=True)
inventory_task.configure(
    source=inventory_updates,
    window=Windows.tumbling(300)
)

Migration Guide#

### Step 1: Assess Current Usage

# Current single-source code
daq = StreamDaQ().configure(source=data, window=Windows.tumbling(60), time_column="timestamp")
daq.add(dqm.count('events'), assess=">10", name="count")
daq.watch_out()

### Step 2: Gradual Migration

# Phase 1: Keep existing code, add new tasks
daq = StreamDaQ()
daq.configure(source=legacy_data, window=Windows.tumbling(60), time_column="timestamp")
daq.check(dqm.count('events'), must_be=">10", name="count")  # Use new method name

# Add new monitoring tasks
new_task = daq.new_task("additional_source")
new_task.configure(source=new_data, window=Windows.sliding(120, 30), time_column="timestamp")
new_task.check(dqm.mean('values'), must_be="(0, 100)", name="avg")

daq.watch_out()

### Step 3: Full Migration

# Phase 2: Convert to explicit tasks
daq = StreamDaQ()

# Convert legacy code to explicit task
legacy_task = daq.new_task("legacy_monitoring")
legacy_task.configure(source=legacy_data, window=Windows.tumbling(60), time_column="timestamp")
legacy_task.check(dqm.count('events'), must_be=">10", name="count")

# Additional tasks
new_task = daq.new_task("new_monitoring")
new_task.configure(source=new_data, window=Windows.sliding(120, 30), time_column="timestamp")
new_task.check(dqm.mean('values'), must_be="(0, 100)", name="avg")

daq.watch_out()

Best Practices#

### Task Organization

✅ Good Practices

  • Use descriptive task names: “iot_sensors”, “user_events”, “financial_transactions”

  • Group related monitoring: Keep similar data sources in the same task

  • Separate by criticality: Don’t mix critical and non-critical monitoring in the same task

  • Consider windowing alignment: Tasks with similar time requirements can share window strategies

### Error Handling Strategy

⚠️ Critical Task Guidelines

  • Use sparingly: Only mark truly essential systems as critical

  • Have fallbacks: Prepare recovery procedures for critical task failures

  • Monitor health: Track critical task performance separately

  • Test failure scenarios: Verify error handling works as expected

### Performance Considerations

# Efficient: Batch similar monitoring
sensor_task = daq.new_task("all_sensors")
sensor_task.configure(
    source=combined_sensor_data,
    window=Windows.tumbling(60),
    instance="sensor_type"  # Group by sensor type
)

# Less efficient: Too many small tasks
# Avoid creating dozens of tasks for similar data

What’s Next?#

Now that you understand multi-source tasks:

The task-based architecture transforms Stream DaQ from a single-source monitoring tool into a comprehensive multi-source orchestration platform, all while maintaining the simplicity and power that makes Stream DaQ effective.

Made with ❤️ by the Stream DaQ team at Datalab AUTh