Elasticsearch & SIEM: Security Monitoring for Beginners
Here’s the reality: Security events happen constantly. Failed login attempts, unusual network traffic, configuration changes - thousands of events every minute. Manually reviewing logs is impossible. SIEM (Security Information and Event Management) systems collect, analyze, and alert on these events automatically. Elasticsearch is often the engine behind SIEM solutions. Let’s learn how it works.
What is SIEM, Really?
SIEM stands for Security Information and Event Management. Think of it as:
Real-world analogy:
- Logs = Security camera footage
- SIEM = The system that watches all cameras 24/7
- Alerts = Notifications when something suspicious happens
- Dashboards = The control room where you see everything
What SIEM does:
- Collects logs from all your systems (servers, network devices, applications)
- Stores them in a searchable database (Elasticsearch)
- Analyzes them for patterns and anomalies
- Alerts you when something suspicious happens
- Visualizes data in dashboards
The ELK Stack
ELK stands for:
- Elasticsearch - Search and analytics engine
- Logstash - Data processing pipeline
- Kibana - Visualization and dashboards
How they work together:
1
Logs → Logstash (processes) → Elasticsearch (stores) → Kibana (visualizes)
Understanding Elasticsearch
What is Elasticsearch?
Elasticsearch is a search engine built on Apache Lucene. Think of it like Google, but for your logs.
Key concepts:
- Index - Like a database (e.g., “security-logs”)
- Document - Like a row in a database (a single log entry)
- Field - Like a column (e.g., “timestamp”, “source_ip”)
- Query - Search for specific data
Example Document
1
2
3
4
5
6
7
{
"timestamp": "2025-11-22T10:30:00Z",
"source_ip": "192.168.1.100",
"event_type": "failed_login",
"username": "admin",
"message": "Authentication failed for user admin"
}
Setting Up Elasticsearch (Docker)
Let’s set up a simple Elasticsearch instance for testing:
docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false # Disable for testing
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
volumes:
es_data:
Run it:
1
docker-compose up -d
Access:
- Elasticsearch: http://localhost:9200
- Kibana: http://localhost:5601
Sending Logs to Elasticsearch with Python
Here’s how to send security events from your compliance tool:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from elasticsearch import Elasticsearch
from datetime import datetime
import json
# Connect to Elasticsearch
es = Elasticsearch(
['http://localhost:9200'],
# For production, add authentication:
# http_auth=('elastic', 'password')
)
def index_security_event(event_type, source_ip, message, severity='info'):
"""Index a security event to Elasticsearch."""
document = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'source_ip': source_ip,
'message': message,
'severity': severity
}
try:
# Index the document
response = es.index(
index='security-events',
document=document
)
print(f"✅ Event indexed: {response['_id']}")
return response['_id']
except Exception as e:
print(f"❌ Error indexing event: {e}")
return None
def index_compliance_finding(finding_type, resource, status, details):
"""Index a compliance finding."""
document = {
'timestamp': datetime.utcnow().isoformat(),
'finding_type': finding_type,
'resource': resource,
'status': status, # PASS, FAIL, WARN
'details': details
}
try:
response = es.index(
index='compliance-findings',
document=document
)
return response['_id']
except Exception as e:
print(f"❌ Error indexing finding: {e}")
return None
# Example: Index compliance findings from scan
def index_scan_results(scan_results):
"""Index all findings from a compliance scan."""
for finding in scan_results:
index_compliance_finding(
finding_type=finding['type'],
resource=finding['resource'],
status=finding['status'],
details=finding['details']
)
# Example usage
if __name__ == "__main__":
# Index a security event
index_security_event(
event_type='failed_login',
source_ip='192.168.1.100',
message='Multiple failed login attempts',
severity='high'
)
# Index a compliance finding
index_compliance_finding(
finding_type='s3_public_access',
resource='s3://my-bucket',
status='FAIL',
details='Bucket has public access enabled'
)
Querying Elasticsearch
Search for Events
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def search_security_events(query, size=10):
"""Search for security events."""
try:
response = es.search(
index='security-events',
body={
'query': {
'match': {
'message': query
}
},
'size': size,
'sort': [
{'timestamp': {'order': 'desc'}}
]
}
)
return response['hits']['hits']
except Exception as e:
print(f"Error searching: {e}")
return []
def get_failed_logins_last_hour():
"""Get all failed login attempts in the last hour."""
from datetime import datetime, timedelta
one_hour_ago = (datetime.utcnow() - timedelta(hours=1)).isoformat()
try:
response = es.search(
index='security-events',
body={
'query': {
'bool': {
'must': [
{'match': {'event_type': 'failed_login'}},
{'range': {'timestamp': {'gte': one_hour_ago}}}
]
}
},
'size': 100
}
)
return response['hits']['hits']
except Exception as e:
print(f"Error querying: {e}")
return []
def get_compliance_failures():
"""Get all compliance failures."""
try:
response = es.search(
index='compliance-findings',
body={
'query': {
'match': {
'status': 'FAIL'
}
},
'size': 1000
}
)
return response['hits']['hits']
except Exception as e:
print(f"Error querying: {e}")
return []
Building a Simple SIEM Dashboard
Create Index Template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def create_security_index_template():
"""Create an index template for security events."""
template = {
'index_patterns': ['security-events-*'],
'template': {
'settings': {
'number_of_shards': 1,
'number_of_replicas': 0
},
'mappings': {
'properties': {
'timestamp': {'type': 'date'},
'event_type': {'type': 'keyword'},
'source_ip': {'type': 'ip'},
'message': {'type': 'text'},
'severity': {'type': 'keyword'}
}
}
}
}
try:
es.indices.put_index_template(
name='security-events-template',
body=template
)
print("✅ Index template created")
except Exception as e:
print(f"Error creating template: {e}")
Aggregate Statistics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def get_security_statistics():
"""Get security statistics from Elasticsearch."""
try:
# Count by event type
response = es.search(
index='security-events',
body={
'size': 0,
'aggs': {
'event_types': {
'terms': {
'field': 'event_type',
'size': 10
}
},
'by_severity': {
'terms': {
'field': 'severity',
'size': 5
}
}
}
}
)
return {
'event_types': response['aggregations']['event_types']['buckets'],
'by_severity': response['aggregations']['by_severity']['buckets']
}
except Exception as e:
print(f"Error getting statistics: {e}")
return {}
Integrating with Compliance Tool
Here’s how to integrate Elasticsearch with your compliance scanner:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import boto3
from elasticsearch import Elasticsearch
from datetime import datetime
def scan_and_index_compliance():
"""Scan AWS resources and index findings to Elasticsearch."""
es = Elasticsearch(['http://localhost:9200'])
s3_client = boto3.client('s3')
# Scan S3 buckets
buckets = s3_client.list_buckets()
findings = []
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
# Check for public access
try:
public_access = s3_client.get_public_access_block(Bucket=bucket_name)
# ... check logic ...
finding = {
'timestamp': datetime.utcnow().isoformat(),
'resource_type': 's3_bucket',
'resource_id': bucket_name,
'check_type': 'public_access',
'status': 'PASS', # or 'FAIL'
'details': 'Public access is blocked'
}
# Index to Elasticsearch
es.index(
index='compliance-findings',
document=finding
)
findings.append(finding)
except Exception as e:
print(f"Error checking bucket {bucket_name}: {e}")
return findings
Security Best Practices
1. Enable Authentication
Always enable authentication in production:
1
2
3
environment:
- xpack.security.enabled=true
- ELASTIC_PASSWORD=your-secure-password
2. Use HTTPS
Configure TLS/SSL for encrypted communication.
3. Limit Access
Use firewall rules to restrict access to Elasticsearch (port 9200).
4. Regular Backups
Backup your Elasticsearch indices regularly.
5. Monitor Performance
Monitor Elasticsearch cluster health and performance.
Key Takeaways
- SIEM = Security Monitoring System - Collects and analyzes logs
- Elasticsearch = Search Engine - Stores and searches logs
- Index = Database - Organizes your data
- Query = Search - Find specific events
- Dashboards = Visualization - See your data
- Always authenticate - Don’t leave Elasticsearch open
- Use proper indexing - Organize data efficiently
Practice Exercise
Try this yourself:
- Set up Elasticsearch with Docker
- Create a Python script that sends security events
- Query for specific events
- Create aggregations (count by type, severity, etc.)
- Build a simple dashboard query
Resources to Learn More
What’s Next?
Now that you understand SIEM basics, you’re ready to:
- Build comprehensive security dashboards
- Create automated alerting rules
- Integrate with other security tools
Remember: SIEM is about turning noise (logs) into signal (actionable intelligence)!
💡 Pro Tip: Start small with Elasticsearch. Index a few events, learn to query them, then gradually scale up. Don’t try to index everything at once - you’ll get overwhelmed!
Ready to secure your deployments? Check out our next post on CI/CD Security, where we’ll learn how to build secure deployment pipelines!