VPL Syntax
Comments
# Single line comment
/*
Multi-line
comment
*/Variable Declaration
# Immutable (recommended)
let name = "value"
let count: int = 42
# Mutable
var counter = 0
counter := counter + 1
# Global constant
const MAX_RETRIES = 3
const API_URL = "https://api.example.com"Stream Declaration
Simple Stream
# From an event source
stream Trades = TradeEvent
# With alias
stream T = TradesStream with Filtering
stream HighValueTrades = Trades
.where(price > 10000)
# Multiple conditions
stream FilteredTrades = Trades
.where(price > 1000 and volume > 100)
.where(exchange == "NYSE" or exchange == "NASDAQ")Stream with Projection
stream SimpleTrades = Trades
.select(
symbol,
price,
total: price * volume
)Stream with Temporal Window
# Tumbling window (5 minutes)
stream WindowedTrades = Trades
.window(5m)
.aggregate(
count: count(),
avg_price: avg(price)
)
# Sliding window (5 min, slide 1 min)
stream SlidingMetrics = Trades
.window(5m, sliding: 1m)
.aggregate(sum(volume))Multi-stream Aggregation
stream BuildingMetrics = merge(
stream S1 = SensorEvent .where(sensor_id == "S1"),
stream S2 = SensorEvent .where(sensor_id == "S2"),
stream S3 = SensorEvent .where(sensor_id == "S3")
)
.window(1m, sliding: 10s)
.aggregate(
avg_temp: avg(temperature),
min_temp: min(temperature),
max_temp: max(temperature),
sensor_count: count(distinct(sensor_id))
)Joins
Inner Join
stream EnrichedOrders = join(
stream Orders = OrderEvent,
stream Customers = CustomerEvent
on Orders.customer_id == Customers.id,
stream Inventory = InventoryEvent
on Orders.product_id == Inventory.product_id
)
.window(5m, policy: "watermark")
.emit(
order_id: Orders.id,
customer_name: Customers.name,
stock: Inventory.quantity
)Outer Joins
# LEFT JOIN — all left events, null-fill unmatched right fields
stream WithPayments = left_join(
stream Orders = OrderEvent,
stream Payments = PaymentEvent
on Orders.order_id == Payments.order_id
)
.window(5m)
.emit(order_id: Orders.order_id, paid: Payments.amount)
# RIGHT JOIN — all right events, null-fill unmatched left fields
stream AllPayments = right_join(
stream Orders = OrderEvent,
stream Payments = PaymentEvent
on Orders.order_id == Payments.order_id
)
.window(5m)
.emit(order_id: Payments.order_id, order_amount: Orders.amount)
# FULL JOIN — all events from both sides
stream Reconciliation = full_join(
stream Orders = OrderEvent,
stream Payments = PaymentEvent
on Orders.order_id == Payments.order_id
)
.window(5m)
.emit(order_id: Orders.order_id, payment_ref: Payments.order_id)See the Joins Reference for complete semantics and the Outer Joins Tutorial for step-by-step examples.
Contexts (Multi-Threaded Execution)
Contexts declare isolated execution domains, each running on a dedicated OS thread.
Context Declaration
# Basic context
context ingestion
# With CPU affinity (Linux)
context analytics (cores: [2, 3])
context alerts (cores: [4])Assigning Streams to Contexts
stream FastFilter = RawEvents
.context(ingestion)
.where(value > 0)
.emit(sensor_id: sensor_id, value: value)Cross-Context Emit
# Send events to a different context
stream Processed = RawEvents
.context(ingestion)
.where(priority > 5)
.emit(context: analytics, data: data)When no contexts are declared, the engine runs in single-threaded mode with zero overhead.
See the Contexts Guide for a full tutorial.
Parallelization
Use contexts for parallel execution across CPU cores:
context fast_lane (cores: [0, 1])
context analytics (cores: [2, 3])
stream OrderProcessing = Orders
.context(fast_lane)
.partition_by(customer_id)
.where(quantity > 0 and price > 0)
.emit(order_id: id, customer: customer_id, total: price * quantity)Functions
fn calculate_total(price: float, quantity: int) -> float:
return price * quantity
fn is_valid_order(order: OrderEvent) -> bool:
return order.quantity > 0 and order.price > 0
# Inline function (lambda)
let double = x => x * 2
let add = (a, b) => a + bControl Structures
Conditions
if price > 1000:
category = "high"
elif price > 100:
category = "medium"
else:
category = "low"
# Ternary expression
let status = if active then "enabled" else "disabled"Loops
for item in items:
process(item)
for i in 0..10:
print(i)
while condition:
do_something()
if should_stop:
breakConnectors
MQTT Connector
The MQTT connector allows Varpulis to receive events from and send alerts to an MQTT broker. MQTT support requires the mqtt feature flag.
# Connector declaration - place at the top of your VPL file
connector MqttBroker = mqtt (
host: "localhost",
port: 1883,
client_id: "varpulis-app"
)
# Bind a stream to receive events from the connector
stream Events = SensorReading
.from(MqttBroker, topic: "events/#")The MQTT connector automatically:
- Subscribes to the specified topic to receive events
- Publishes stream
.emit()results via.to(Connector) - Handles reconnection on connection loss
Note: The
config mqtt { }block syntax is deprecated. Useconnectordeclarations with.from()and.to()instead. See Connectors.
Example with streams:
connector MqttBroker = mqtt (
host: "localhost",
port: 1883,
client_id: "fraud-detector"
)
event Transaction:
user_id: str
amount: float
status: str
# Alerts will be published to "alerts/fraud"
stream FraudAlert = Transaction
.where(amount > 10000 and status == "pending")
.emit(
alert_type: "high_value_transaction",
user_id: user_id,
amount: amount
)HTTP Connector
# HTTP webhook output — declare the connector first
connector AlertWebhook = http (
url: "http://webhook.example.com/alerts",
method: "POST"
)
stream Alerts = DetectedPatterns
.emit(severity: "high")
.to(AlertWebhook)Kafka Connector
Kafka is available with the kafka feature flag. Use connector declarations:
connector KafkaBroker = kafka (
brokers: ["broker:9092"],
group_id: "varpulis-consumer"
)
stream Output = Processed
.emit(result: value)
.to(KafkaBroker)Pattern Forecasting
Use .forecast() after a sequence pattern to predict completion probability:
# Zero-config — uses balanced defaults with adaptive warmup
stream SimpleForecast = EventA as a -> EventB as b
.within(5m)
.forecast()
.where(forecast_confidence > 0.8)
.emit(prob: forecast_probability)
# Fully configured
stream FraudForecast = Transaction as t1
-> Transaction as t2 where t2.amount > t1.amount * 5
-> Transaction as t3 where t3.location != t1.location
.within(5m)
.forecast(mode: "accurate", confidence: 0.7)
.where(forecast_confidence > 0.8 and forecast_probability > 0.7)
.emit(
probability: forecast_probability,
stability: forecast_confidence,
expected_time: forecast_time,
state: forecast_state,
confidence_lower: forecast_lower,
confidence_upper: forecast_upper
)Forecast Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
mode | str | "balanced" | Preset: "fast", "accurate", or "balanced" |
confidence | float | 0.5 | Minimum probability threshold to emit forecast |
horizon | duration | within duration | Forecast time window |
warmup | int | 100 | Min events before forecasting starts |
max_depth | int | 3 | PST context depth |
hawkes | bool | true | Enable Hawkes intensity modulation |
conformal | bool | true | Enable conformal prediction intervals |
Mode presets provide convenient defaults (explicit params override):
| Mode | warmup | max_depth | hawkes | conformal | Adaptive warmup |
|---|---|---|---|---|---|
"fast" | 50 | 3 | off | off | off |
"accurate" | 200 | 5 | on | on | on |
"balanced" | 100 | 3 | on | on | on |
Forecast Built-in Variables
Available in .where() and .emit() after .forecast():
| Variable | Type | Description |
|---|---|---|
forecast_probability | float | Pattern completion probability (0.0–1.0) |
forecast_confidence | float | Prediction stability (0.0–1.0) — high = converged |
forecast_time | int | Expected time to completion (nanoseconds) |
forecast_state | str | Current NFA state label |
forecast_context_depth | int | PST context depth used for prediction |
forecast_lower | float | Lower bound of conformal prediction interval (0.0–1.0) |
forecast_upper | float | Upper bound of conformal prediction interval (0.0–1.0) |
External Enrichment
Use .enrich() to enrich streaming events with data from external connectors:
connector WeatherAPI = http(url: "https://api.weather.com/v1")
stream Enriched = Temperature as t
.enrich(WeatherAPI, key: t.city, fields: [forecast, humidity], cache_ttl: 5m)
.where(forecast == "rain")
.emit(city: t.city, forecast: forecast, humidity: humidity)Enrich Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
connector | Yes | — | Connector name (first positional arg) |
key | Yes | — | Expression used as lookup key |
fields | Yes | — | List of fields to extract from response |
cache_ttl | No | none | TTL for caching results (e.g., 5m, 1h) |
timeout | No | 5s | Max time to wait for response |
fallback | No | none | Default value when lookup fails |
Enrich Built-in Variables
Available in .where() and .emit() after .enrich():
| Variable | Type | Description |
|---|---|---|
enrich_status | str | "ok", "error", "cached", or "timeout" |
enrich_latency_ms | int | Lookup latency in ms (0 for cache hits) |
Compatible connectors: http, database, redis. See Enrichment Reference for details.
Alert Notifications
Use .alert() to send webhook notifications as a side-effect. The event continues downstream — .alert() does not consume it.
stream FraudAlerts = Login as l -> LargeTransfer as t
.within(5m)
.where(t.amount > 10000)
.alert(webhook: "https://hooks.slack.com/...", message: "Fraud: {amount}")
.emit(user: l.user_id, amount: t.amount)Parameters:
| Parameter | Description |
|---|---|
webhook | URL to HTTP POST the alert payload (JSON body with event data + message) |
message | Template string with {field} interpolation for event field values |
The alert is fire-and-forget — failed webhooks are logged as warnings but never block the pipeline. Works with any HTTP endpoint including Slack webhooks, PagerDuty, Datadog, or custom services.
Output Routing
Use .to() to route stream output to declared connectors:
connector AlertKafka = kafka (
brokers: ["broker:9092"],
topic: "alerts"
)
stream Alerts = DetectedPatterns
.emit(
alert_id: uuid(),
severity: "high",
timestamp: now()
)
.to(AlertKafka)