Skip to content

VPL Syntax

Comments

varpulis
# Single line comment

/* 
   Multi-line
   comment
*/

Variable Declaration

varpulis
# Immutable (recommended)
let name = "value"
let count: int = 42

# Mutable
var counter = 0
counter := counter + 1

# Global constant
const MAX_RETRIES = 3
const API_URL = "https://api.example.com"

Stream Declaration

Simple Stream

varpulis
# From an event source
stream Trades = TradeEvent

# With alias
stream T = Trades

Stream with Filtering

varpulis
stream HighValueTrades = Trades
    .where(price > 10000)

# Multiple conditions
stream FilteredTrades = Trades
    .where(price > 1000 and volume > 100)
    .where(exchange == "NYSE" or exchange == "NASDAQ")

Stream with Projection

varpulis
stream SimpleTrades = Trades
    .select(
        symbol,
        price,
        total: price * volume
    )

Stream with Temporal Window

varpulis
# Tumbling window (5 minutes)
stream WindowedTrades = Trades
    .window(5m)
    .aggregate(
        count: count(),
        avg_price: avg(price)
    )

# Sliding window (5 min, slide 1 min)
stream SlidingMetrics = Trades
    .window(5m, sliding: 1m)
    .aggregate(sum(volume))

Multi-stream Aggregation

varpulis
stream BuildingMetrics = merge(
    stream S1 = SensorEvent .where(sensor_id == "S1"),
    stream S2 = SensorEvent .where(sensor_id == "S2"),
    stream S3 = SensorEvent .where(sensor_id == "S3")
)
.window(1m, sliding: 10s)
.aggregate(
    avg_temp: avg(temperature),
    min_temp: min(temperature),
    max_temp: max(temperature),
    sensor_count: count(distinct(sensor_id))
)

Joins

Inner Join

varpulis
stream EnrichedOrders = join(
    stream Orders = OrderEvent,
    stream Customers = CustomerEvent
        on Orders.customer_id == Customers.id,
    stream Inventory = InventoryEvent
        on Orders.product_id == Inventory.product_id
)
.window(5m, policy: "watermark")
.emit(
    order_id: Orders.id,
    customer_name: Customers.name,
    stock: Inventory.quantity
)

Outer Joins

varpulis
# LEFT JOIN — all left events, null-fill unmatched right fields
stream WithPayments = left_join(
    stream Orders = OrderEvent,
    stream Payments = PaymentEvent
        on Orders.order_id == Payments.order_id
)
.window(5m)
.emit(order_id: Orders.order_id, paid: Payments.amount)

# RIGHT JOIN — all right events, null-fill unmatched left fields
stream AllPayments = right_join(
    stream Orders = OrderEvent,
    stream Payments = PaymentEvent
        on Orders.order_id == Payments.order_id
)
.window(5m)
.emit(order_id: Payments.order_id, order_amount: Orders.amount)

# FULL JOIN — all events from both sides
stream Reconciliation = full_join(
    stream Orders = OrderEvent,
    stream Payments = PaymentEvent
        on Orders.order_id == Payments.order_id
)
.window(5m)
.emit(order_id: Orders.order_id, payment_ref: Payments.order_id)

See the Joins Reference for complete semantics and the Outer Joins Tutorial for step-by-step examples.

Contexts (Multi-Threaded Execution)

Contexts declare isolated execution domains, each running on a dedicated OS thread.

Context Declaration

varpulis
# Basic context
context ingestion

# With CPU affinity (Linux)
context analytics (cores: [2, 3])
context alerts (cores: [4])

Assigning Streams to Contexts

varpulis
stream FastFilter = RawEvents
    .context(ingestion)
    .where(value > 0)
    .emit(sensor_id: sensor_id, value: value)

Cross-Context Emit

varpulis
# Send events to a different context
stream Processed = RawEvents
    .context(ingestion)
    .where(priority > 5)
    .emit(context: analytics, data: data)

When no contexts are declared, the engine runs in single-threaded mode with zero overhead.

See the Contexts Guide for a full tutorial.

Parallelization

Use contexts for parallel execution across CPU cores:

varpulis
context fast_lane (cores: [0, 1])
context analytics (cores: [2, 3])

stream OrderProcessing = Orders
    .context(fast_lane)
    .partition_by(customer_id)
    .where(quantity > 0 and price > 0)
    .emit(order_id: id, customer: customer_id, total: price * quantity)

Functions

varpulis
fn calculate_total(price: float, quantity: int) -> float:
    return price * quantity

fn is_valid_order(order: OrderEvent) -> bool:
    return order.quantity > 0 and order.price > 0

# Inline function (lambda)
let double = x => x * 2
let add = (a, b) => a + b

Control Structures

Conditions

varpulis
if price > 1000:
    category = "high"
elif price > 100:
    category = "medium"
else:
    category = "low"

# Ternary expression
let status = if active then "enabled" else "disabled"

Loops

varpulis
for item in items:
    process(item)

for i in 0..10:
    print(i)

while condition:
    do_something()
    if should_stop:
        break

Connectors

MQTT Connector

The MQTT connector allows Varpulis to receive events from and send alerts to an MQTT broker. MQTT support requires the mqtt feature flag.

varpulis
# Connector declaration - place at the top of your VPL file
connector MqttBroker = mqtt (
    host: "localhost",
    port: 1883,
    client_id: "varpulis-app"
)

# Bind a stream to receive events from the connector
stream Events = SensorReading
    .from(MqttBroker, topic: "events/#")

The MQTT connector automatically:

  • Subscribes to the specified topic to receive events
  • Publishes stream .emit() results via .to(Connector)
  • Handles reconnection on connection loss

Note: The config mqtt { } block syntax is deprecated. Use connector declarations with .from() and .to() instead. See Connectors.

Example with streams:

varpulis
connector MqttBroker = mqtt (
    host: "localhost",
    port: 1883,
    client_id: "fraud-detector"
)

event Transaction:
    user_id: str
    amount: float
    status: str

# Alerts will be published to "alerts/fraud"
stream FraudAlert = Transaction
    .where(amount > 10000 and status == "pending")
    .emit(
        alert_type: "high_value_transaction",
        user_id: user_id,
        amount: amount
    )

HTTP Connector

varpulis
# HTTP webhook output — declare the connector first
connector AlertWebhook = http (
    url: "http://webhook.example.com/alerts",
    method: "POST"
)

stream Alerts = DetectedPatterns
    .emit(severity: "high")
    .to(AlertWebhook)

Kafka Connector

Kafka is available with the kafka feature flag. Use connector declarations:

varpulis
connector KafkaBroker = kafka (
    brokers: ["broker:9092"],
    group_id: "varpulis-consumer"
)

stream Output = Processed
    .emit(result: value)
    .to(KafkaBroker)

Pattern Forecasting

Use .forecast() after a sequence pattern to predict completion probability:

varpulis
# Zero-config — uses balanced defaults with adaptive warmup
stream SimpleForecast = EventA as a -> EventB as b
    .within(5m)
    .forecast()
    .where(forecast_confidence > 0.8)
    .emit(prob: forecast_probability)

# Fully configured
stream FraudForecast = Transaction as t1
    -> Transaction as t2 where t2.amount > t1.amount * 5
    -> Transaction as t3 where t3.location != t1.location
    .within(5m)
    .forecast(mode: "accurate", confidence: 0.7)
    .where(forecast_confidence > 0.8 and forecast_probability > 0.7)
    .emit(
        probability: forecast_probability,
        stability: forecast_confidence,
        expected_time: forecast_time,
        state: forecast_state,
        confidence_lower: forecast_lower,
        confidence_upper: forecast_upper
    )

Forecast Parameters

ParameterTypeDefaultDescription
modestr"balanced"Preset: "fast", "accurate", or "balanced"
confidencefloat0.5Minimum probability threshold to emit forecast
horizondurationwithin durationForecast time window
warmupint100Min events before forecasting starts
max_depthint3PST context depth
hawkesbooltrueEnable Hawkes intensity modulation
conformalbooltrueEnable conformal prediction intervals

Mode presets provide convenient defaults (explicit params override):

Modewarmupmax_depthhawkesconformalAdaptive warmup
"fast"503offoffoff
"accurate"2005ononon
"balanced"1003ononon

Forecast Built-in Variables

Available in .where() and .emit() after .forecast():

VariableTypeDescription
forecast_probabilityfloatPattern completion probability (0.0–1.0)
forecast_confidencefloatPrediction stability (0.0–1.0) — high = converged
forecast_timeintExpected time to completion (nanoseconds)
forecast_statestrCurrent NFA state label
forecast_context_depthintPST context depth used for prediction
forecast_lowerfloatLower bound of conformal prediction interval (0.0–1.0)
forecast_upperfloatUpper bound of conformal prediction interval (0.0–1.0)

External Enrichment

Use .enrich() to enrich streaming events with data from external connectors:

varpulis
connector WeatherAPI = http(url: "https://api.weather.com/v1")

stream Enriched = Temperature as t
    .enrich(WeatherAPI, key: t.city, fields: [forecast, humidity], cache_ttl: 5m)
    .where(forecast == "rain")
    .emit(city: t.city, forecast: forecast, humidity: humidity)

Enrich Parameters

ParameterRequiredDefaultDescription
connectorYesConnector name (first positional arg)
keyYesExpression used as lookup key
fieldsYesList of fields to extract from response
cache_ttlNononeTTL for caching results (e.g., 5m, 1h)
timeoutNo5sMax time to wait for response
fallbackNononeDefault value when lookup fails

Enrich Built-in Variables

Available in .where() and .emit() after .enrich():

VariableTypeDescription
enrich_statusstr"ok", "error", "cached", or "timeout"
enrich_latency_msintLookup latency in ms (0 for cache hits)

Compatible connectors: http, database, redis. See Enrichment Reference for details.

Alert Notifications

Use .alert() to send webhook notifications as a side-effect. The event continues downstream — .alert() does not consume it.

varpulis
stream FraudAlerts = Login as l -> LargeTransfer as t
    .within(5m)
    .where(t.amount > 10000)
    .alert(webhook: "https://hooks.slack.com/...", message: "Fraud: {amount}")
    .emit(user: l.user_id, amount: t.amount)

Parameters:

ParameterDescription
webhookURL to HTTP POST the alert payload (JSON body with event data + message)
messageTemplate string with {field} interpolation for event field values

The alert is fire-and-forget — failed webhooks are logged as warnings but never block the pipeline. Works with any HTTP endpoint including Slack webhooks, PagerDuty, Datadog, or custom services.


Output Routing

Use .to() to route stream output to declared connectors:

varpulis
connector AlertKafka = kafka (
    brokers: ["broker:9092"],
    topic: "alerts"
)

stream Alerts = DetectedPatterns
    .emit(
        alert_id: uuid(),
        severity: "high",
        timestamp: now()
    )
    .to(AlertKafka)

Varpulis - Next-generation streaming analytics engine