Qorrelate Qorrelate | Integration Docs

Azure Event Hub Integration

Stream events from Azure Event Hub to Qorrelate

Overview

Azure Event Hub is a fully managed, real-time data ingestion service. This integration allows you to stream logs, metrics, and events from Event Hub to Qorrelate for real-time analysis and monitoring.

Prerequisites

  • An Azure Event Hub namespace and event hub
  • Event Hub connection string with Listen permission
  • Consumer group (or use $Default)
  • Your Qorrelate API endpoint and organization ID

1. Create an Event Hub Consumer

Deploy an Azure Function that consumes events and forwards them to Qorrelate:

import logging
import json
import azure.functions as func
import requests
import os

def main(events: list[func.EventHubEvent]):
    logs = []
    
    for event in events:
        try:
            body = event.get_body().decode('utf-8')
            try:
                data = json.loads(body)
            except json.JSONDecodeError:
                data = {"message": body}
            
            logs.append({
                "timestamp": event.enqueued_time.isoformat() + "Z",
                "body": json.dumps(data) if isinstance(data, dict) else str(data),
                "severity_text": data.get("level", "INFO"),
                "attributes": {
                    "source": "azure-event-hub",
                    "partition_id": event.partition_key,
                    "sequence_number": str(event.sequence_number),
                    "offset": event.offset
                }
            })
        except Exception as e:
            logging.error(f"Error processing event: {e}")
    
    if logs:
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {os.environ['QORRELATE_API_KEY']}",
            "X-Organization-Id": os.environ["QORRELATE_ORG_ID"]
        }
        
        response = requests.post(
            f"{os.environ['QORRELATE_ENDPOINT']}/v1/logs",
            headers=headers,
            json={"logs": logs}
        )
        
        logging.info(f"Forwarded {len(logs)} events, status: {response.status_code}")

2. Configure Function Bindings

Set up the Event Hub trigger in function.json:

{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "your-event-hub-name",
      "connection": "EventHubConnectionString",
      "consumerGroup": "$Default",
      "cardinality": "many",
      "dataType": "binary"
    }
  ]
}

3. Set Application Settings

# Azure Portal > Function App > Configuration
EventHubConnectionString=Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=...
QORRELATE_API_KEY=your_api_key
QORRELATE_ORG_ID=your_organization_id
QORRELATE_ENDPOINT=https://qorrelate.io

Using Azure Stream Analytics (Alternative)

For complex event processing, use Azure Stream Analytics with an HTTP output:

-- Stream Analytics Query
SELECT
    System.Timestamp() as timestamp,
    *
INTO
    [qorrelate-output]
FROM
    [event-hub-input]
WHERE
    severity IN ('ERROR', 'WARN', 'CRITICAL')

Diagnostic Settings Integration

Route Azure resource logs to Event Hub, then to Qorrelate:

# Enable diagnostic settings to Event Hub
az monitor diagnostic-settings create \
  --name "to-event-hub" \
  --resource "/subscriptions/{sub}/resourceGroups/{rg}/providers/..." \
  --event-hub "your-event-hub" \
  --event-hub-rule "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.EventHub/namespaces/{ns}/authorizationRules/RootManageSharedAccessKey" \
  --logs '[{"category": "AuditLogs", "enabled": true}]'

Verifying the Integration

  1. Deploy the Event Hub consumer function
  2. Send test events to your Event Hub
  3. Check Function logs for successful processing
  4. View logs in Qorrelate filtered by source:azure-event-hub

💡 Pro Tip

Use Event Hub checkpointing to ensure exactly-once processing. The Azure Functions runtime handles this automatically when using the Event Hub trigger binding.