Azure

Azure Queue Storage Integration

Process queue messages and forward to Qorrelate

Overview

Azure Queue Storage provides cloud messaging between application components. This integration allows you to process queue messages containing logs or events and forward them to Qorrelate for centralized monitoring and analysis.

Prerequisites

  • An Azure Storage account with Queue Storage enabled
  • Storage account connection string
  • Queue name where messages are stored
  • Your Qorrelate API endpoint and organization ID

Configuration Steps

1. Create a Queue Processor Function

Deploy an Azure Function that processes queue messages:

import logging
import json
import azure.functions as func
import requests
import os
from datetime import datetime

def main(msg: func.QueueMessage) -> None:
    message_content = msg.get_body().decode('utf-8')
    
    # Parse message (adjust based on your format)
    try:
        data = json.loads(message_content)
    except json.JSONDecodeError:
        data = {"message": message_content}
    
    # Create log entry
    log_entry = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "body": json.dumps(data) if isinstance(data, dict) else message_content,
        "severity_text": data.get("level", "INFO"),
        "attributes": {
            "source": "azure-queue-storage",
            "queue_name": os.environ.get("QUEUE_NAME", "logs"),
            "message_id": msg.id,
            "dequeue_count": str(msg.dequeue_count),
            "insertion_time": msg.insertion_time.isoformat() if msg.insertion_time else None
        }
    }
    
    # Forward to Qorrelate
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ['QORRELATE_API_KEY']}",
        "X-Organization-Id": os.environ["QORRELATE_ORG_ID"]
    }
    
    response = requests.post(
        f"{os.environ['QORRELATE_ENDPOINT']}/v1/logs",
        headers=headers,
        json={"logs": [log_entry]}
    )
    
    logging.info(f"Forwarded message {msg.id}, status: {response.status_code}")

2. Configure Function Bindings

Set up the Queue trigger in function.json:

{
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "logs-queue",
      "connection": "AzureWebJobsStorage"
    }
  ]
}

3. Set Application Settings

# Azure Portal > Function App > Configuration
AzureWebJobsStorage=DefaultEndpointsProtocol=https;AccountName=...
QUEUE_NAME=logs-queue
QORRELATE_API_KEY=your_api_key
QORRELATE_ORG_ID=your_organization_id
QORRELATE_ENDPOINT=https://qorrelate.io

Batch Processing

For high-volume queues, process messages in batches:

# In host.json - configure batch processing
{
  "version": "2.0",
  "extensions": {
    "queues": {
      "batchSize": 16,
      "maxDequeueCount": 5,
      "newBatchThreshold": 8,
      "visibilityTimeout": "00:00:30"
    }
  }
}

Sending Logs to Queue

Example of sending application logs to the queue:

from azure.storage.queue import QueueClient
import json
from datetime import datetime

queue_client = QueueClient.from_connection_string(
    conn_str="your_connection_string",
    queue_name="logs-queue"
)

def send_log(level: str, message: str, **kwargs):
    log_entry = {
        "level": level,
        "message": message,
        "timestamp": datetime.utcnow().isoformat(),
        **kwargs
    }
    queue_client.send_message(json.dumps(log_entry))

# Usage
send_log("ERROR", "Database connection failed", 
         service="api", error_code="DB_001")

Verifying the Integration

  1. Deploy the queue processor function
  2. Send a test message to your queue
  3. Verify the function processes it (check function logs)
  4. View the log in Qorrelate with source:azure-queue-storage