Azure Queue Storage Integration
Process queue messages and forward to Qorrelate
Overview
Azure Queue Storage provides cloud messaging between application components. This integration allows you to process queue messages containing logs or events and forward them to Qorrelate for centralized monitoring and analysis.
Prerequisites
- An Azure Storage account with Queue Storage enabled
- Storage account connection string
- Queue name where messages are stored
- Your Qorrelate API endpoint and organization ID
Configuration Steps
1. Create a Queue Processor Function
Deploy an Azure Function that processes queue messages:
import logging
import json
import azure.functions as func
import requests
import os
from datetime import datetime
def main(msg: func.QueueMessage) -> None:
message_content = msg.get_body().decode('utf-8')
# Parse message (adjust based on your format)
try:
data = json.loads(message_content)
except json.JSONDecodeError:
data = {"message": message_content}
# Create log entry
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"body": json.dumps(data) if isinstance(data, dict) else message_content,
"severity_text": data.get("level", "INFO"),
"attributes": {
"source": "azure-queue-storage",
"queue_name": os.environ.get("QUEUE_NAME", "logs"),
"message_id": msg.id,
"dequeue_count": str(msg.dequeue_count),
"insertion_time": msg.insertion_time.isoformat() if msg.insertion_time else None
}
}
# Forward to Qorrelate
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ['QORRELATE_API_KEY']}",
"X-Organization-Id": os.environ["QORRELATE_ORG_ID"]
}
response = requests.post(
f"{os.environ['QORRELATE_ENDPOINT']}/v1/logs",
headers=headers,
json={"logs": [log_entry]}
)
logging.info(f"Forwarded message {msg.id}, status: {response.status_code}")
2. Configure Function Bindings
Set up the Queue trigger in function.json:
{
"bindings": [
{
"name": "msg",
"type": "queueTrigger",
"direction": "in",
"queueName": "logs-queue",
"connection": "AzureWebJobsStorage"
}
]
}
3. Set Application Settings
# Azure Portal > Function App > Configuration
AzureWebJobsStorage=DefaultEndpointsProtocol=https;AccountName=...
QUEUE_NAME=logs-queue
QORRELATE_API_KEY=your_api_key
QORRELATE_ORG_ID=your_organization_id
QORRELATE_ENDPOINT=https://qorrelate.io
Batch Processing
For high-volume queues, process messages in batches:
# In host.json - configure batch processing
{
"version": "2.0",
"extensions": {
"queues": {
"batchSize": 16,
"maxDequeueCount": 5,
"newBatchThreshold": 8,
"visibilityTimeout": "00:00:30"
}
}
}
Sending Logs to Queue
Example of sending application logs to the queue:
from azure.storage.queue import QueueClient
import json
from datetime import datetime
queue_client = QueueClient.from_connection_string(
conn_str="your_connection_string",
queue_name="logs-queue"
)
def send_log(level: str, message: str, **kwargs):
log_entry = {
"level": level,
"message": message,
"timestamp": datetime.utcnow().isoformat(),
**kwargs
}
queue_client.send_message(json.dumps(log_entry))
# Usage
send_log("ERROR", "Database connection failed",
service="api", error_code="DB_001")
Verifying the Integration
- Deploy the queue processor function
- Send a test message to your queue
- Verify the function processes it (check function logs)
- View the log in Qorrelate with
source:azure-queue-storage