Post

Elasticsearch & SIEM: Security Monitoring for Beginners

Elasticsearch & SIEM: Security Monitoring for Beginners

Here’s the reality: Security events happen constantly. Failed login attempts, unusual network traffic, configuration changes - thousands of events every minute. Manually reviewing logs is impossible. SIEM (Security Information and Event Management) systems collect, analyze, and alert on these events automatically. Elasticsearch is often the engine behind SIEM solutions. Let’s learn how it works.

What is SIEM, Really?

SIEM stands for Security Information and Event Management. Think of it as:

Real-world analogy:

  • Logs = Security camera footage
  • SIEM = The system that watches all cameras 24/7
  • Alerts = Notifications when something suspicious happens
  • Dashboards = The control room where you see everything

What SIEM does:

  1. Collects logs from all your systems (servers, network devices, applications)
  2. Stores them in a searchable database (Elasticsearch)
  3. Analyzes them for patterns and anomalies
  4. Alerts you when something suspicious happens
  5. Visualizes data in dashboards

The ELK Stack

ELK stands for:

  • Elasticsearch - Search and analytics engine
  • Logstash - Data processing pipeline
  • Kibana - Visualization and dashboards

How they work together:

1
Logs → Logstash (processes) → Elasticsearch (stores) → Kibana (visualizes)

Understanding Elasticsearch

What is Elasticsearch?

Elasticsearch is a search engine built on Apache Lucene. Think of it like Google, but for your logs.

Key concepts:

  1. Index - Like a database (e.g., “security-logs”)
  2. Document - Like a row in a database (a single log entry)
  3. Field - Like a column (e.g., “timestamp”, “source_ip”)
  4. Query - Search for specific data

Example Document

1
2
3
4
5
6
7
{
  "timestamp": "2025-11-22T10:30:00Z",
  "source_ip": "192.168.1.100",
  "event_type": "failed_login",
  "username": "admin",
  "message": "Authentication failed for user admin"
}

Setting Up Elasticsearch (Docker)

Let’s set up a simple Elasticsearch instance for testing:

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false  # Disable for testing
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

volumes:
  es_data:

Run it:

1
docker-compose up -d

Access:

  • Elasticsearch: http://localhost:9200
  • Kibana: http://localhost:5601

Sending Logs to Elasticsearch with Python

Here’s how to send security events from your compliance tool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from elasticsearch import Elasticsearch
from datetime import datetime
import json

# Connect to Elasticsearch
es = Elasticsearch(
    ['http://localhost:9200'],
    # For production, add authentication:
    # http_auth=('elastic', 'password')
)

def index_security_event(event_type, source_ip, message, severity='info'):
    """Index a security event to Elasticsearch."""
    document = {
        'timestamp': datetime.utcnow().isoformat(),
        'event_type': event_type,
        'source_ip': source_ip,
        'message': message,
        'severity': severity
    }
    
    try:
        # Index the document
        response = es.index(
            index='security-events',
            document=document
        )
        print(f"✅ Event indexed: {response['_id']}")
        return response['_id']
    except Exception as e:
        print(f"❌ Error indexing event: {e}")
        return None

def index_compliance_finding(finding_type, resource, status, details):
    """Index a compliance finding."""
    document = {
        'timestamp': datetime.utcnow().isoformat(),
        'finding_type': finding_type,
        'resource': resource,
        'status': status,  # PASS, FAIL, WARN
        'details': details
    }
    
    try:
        response = es.index(
            index='compliance-findings',
            document=document
        )
        return response['_id']
    except Exception as e:
        print(f"❌ Error indexing finding: {e}")
        return None

# Example: Index compliance findings from scan
def index_scan_results(scan_results):
    """Index all findings from a compliance scan."""
    for finding in scan_results:
        index_compliance_finding(
            finding_type=finding['type'],
            resource=finding['resource'],
            status=finding['status'],
            details=finding['details']
        )

# Example usage
if __name__ == "__main__":
    # Index a security event
    index_security_event(
        event_type='failed_login',
        source_ip='192.168.1.100',
        message='Multiple failed login attempts',
        severity='high'
    )
    
    # Index a compliance finding
    index_compliance_finding(
        finding_type='s3_public_access',
        resource='s3://my-bucket',
        status='FAIL',
        details='Bucket has public access enabled'
    )

Querying Elasticsearch

Search for Events

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def search_security_events(query, size=10):
    """Search for security events."""
    try:
        response = es.search(
            index='security-events',
            body={
                'query': {
                    'match': {
                        'message': query
                    }
                },
                'size': size,
                'sort': [
                    {'timestamp': {'order': 'desc'}}
                ]
            }
        )
        
        return response['hits']['hits']
    except Exception as e:
        print(f"Error searching: {e}")
        return []

def get_failed_logins_last_hour():
    """Get all failed login attempts in the last hour."""
    from datetime import datetime, timedelta
    
    one_hour_ago = (datetime.utcnow() - timedelta(hours=1)).isoformat()
    
    try:
        response = es.search(
            index='security-events',
            body={
                'query': {
                    'bool': {
                        'must': [
                            {'match': {'event_type': 'failed_login'}},
                            {'range': {'timestamp': {'gte': one_hour_ago}}}
                        ]
                    }
                },
                'size': 100
            }
        )
        
        return response['hits']['hits']
    except Exception as e:
        print(f"Error querying: {e}")
        return []

def get_compliance_failures():
    """Get all compliance failures."""
    try:
        response = es.search(
            index='compliance-findings',
            body={
                'query': {
                    'match': {
                        'status': 'FAIL'
                    }
                },
                'size': 1000
            }
        )
        
        return response['hits']['hits']
    except Exception as e:
        print(f"Error querying: {e}")
        return []

Building a Simple SIEM Dashboard

Create Index Template

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def create_security_index_template():
    """Create an index template for security events."""
    template = {
        'index_patterns': ['security-events-*'],
        'template': {
            'settings': {
                'number_of_shards': 1,
                'number_of_replicas': 0
            },
            'mappings': {
                'properties': {
                    'timestamp': {'type': 'date'},
                    'event_type': {'type': 'keyword'},
                    'source_ip': {'type': 'ip'},
                    'message': {'type': 'text'},
                    'severity': {'type': 'keyword'}
                }
            }
        }
    }
    
    try:
        es.indices.put_index_template(
            name='security-events-template',
            body=template
        )
        print("✅ Index template created")
    except Exception as e:
        print(f"Error creating template: {e}")

Aggregate Statistics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def get_security_statistics():
    """Get security statistics from Elasticsearch."""
    try:
        # Count by event type
        response = es.search(
            index='security-events',
            body={
                'size': 0,
                'aggs': {
                    'event_types': {
                        'terms': {
                            'field': 'event_type',
                            'size': 10
                        }
                    },
                    'by_severity': {
                        'terms': {
                            'field': 'severity',
                            'size': 5
                        }
                    }
                }
            }
        )
        
        return {
            'event_types': response['aggregations']['event_types']['buckets'],
            'by_severity': response['aggregations']['by_severity']['buckets']
        }
    except Exception as e:
        print(f"Error getting statistics: {e}")
        return {}

Integrating with Compliance Tool

Here’s how to integrate Elasticsearch with your compliance scanner:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import boto3
from elasticsearch import Elasticsearch
from datetime import datetime

def scan_and_index_compliance():
    """Scan AWS resources and index findings to Elasticsearch."""
    es = Elasticsearch(['http://localhost:9200'])
    s3_client = boto3.client('s3')
    
    # Scan S3 buckets
    buckets = s3_client.list_buckets()
    
    findings = []
    
    for bucket in buckets['Buckets']:
        bucket_name = bucket['Name']
        
        # Check for public access
        try:
            public_access = s3_client.get_public_access_block(Bucket=bucket_name)
            # ... check logic ...
            
            finding = {
                'timestamp': datetime.utcnow().isoformat(),
                'resource_type': 's3_bucket',
                'resource_id': bucket_name,
                'check_type': 'public_access',
                'status': 'PASS',  # or 'FAIL'
                'details': 'Public access is blocked'
            }
            
            # Index to Elasticsearch
            es.index(
                index='compliance-findings',
                document=finding
            )
            
            findings.append(finding)
            
        except Exception as e:
            print(f"Error checking bucket {bucket_name}: {e}")
    
    return findings

Security Best Practices

1. Enable Authentication

Always enable authentication in production:

1
2
3
environment:
  - xpack.security.enabled=true
  - ELASTIC_PASSWORD=your-secure-password

2. Use HTTPS

Configure TLS/SSL for encrypted communication.

3. Limit Access

Use firewall rules to restrict access to Elasticsearch (port 9200).

4. Regular Backups

Backup your Elasticsearch indices regularly.

5. Monitor Performance

Monitor Elasticsearch cluster health and performance.

Key Takeaways

  1. SIEM = Security Monitoring System - Collects and analyzes logs
  2. Elasticsearch = Search Engine - Stores and searches logs
  3. Index = Database - Organizes your data
  4. Query = Search - Find specific events
  5. Dashboards = Visualization - See your data
  6. Always authenticate - Don’t leave Elasticsearch open
  7. Use proper indexing - Organize data efficiently

Practice Exercise

Try this yourself:

  1. Set up Elasticsearch with Docker
  2. Create a Python script that sends security events
  3. Query for specific events
  4. Create aggregations (count by type, severity, etc.)
  5. Build a simple dashboard query

Resources to Learn More

What’s Next?

Now that you understand SIEM basics, you’re ready to:

  • Build comprehensive security dashboards
  • Create automated alerting rules
  • Integrate with other security tools

Remember: SIEM is about turning noise (logs) into signal (actionable intelligence)!

💡 Pro Tip: Start small with Elasticsearch. Index a few events, learn to query them, then gradually scale up. Don’t try to index everything at once - you’ll get overwhelmed!


Ready to secure your deployments? Check out our next post on CI/CD Security, where we’ll learn how to build secure deployment pipelines!

This post is licensed under CC BY 4.0 by the author.