Python & Boto3: Automating AWS Like a Security Pro
Here’s the truth: As a security professional, you’ll spend way too much time doing repetitive tasks. Checking 50 S3 buckets for public access? Manually reviewing IAM policies? Scanning EC2 instances for misconfigurations? That’s hours of work that could be automated. Python and Boto3 are your superpowers for automating AWS security. Let me show you how.
Why Python + Boto3?
Python is the language of choice for security automation because:
- Easy to read and write (even if you’re not a full-time developer)
- Huge ecosystem of security libraries
- Great for scripting and automation
- Widely used in the security community
Boto3 is AWS’s official Python SDK. It’s like having a remote control for all AWS services.
Think of it this way:
- AWS Console = Manual control (clicking buttons)
- AWS CLI = Command-line control (typing commands)
- Boto3 = Programmatic control (writing scripts that do the work for you)
Getting Started: Installation and Setup
Step 1: Install Boto3
1
pip install boto3
That’s it! Well, almost. You’ll also need AWS credentials configured.
Step 2: Configure AWS Credentials
You have three options:
Option 1: AWS CLI (Easiest)
1
aws configure
This creates credentials in ~/.aws/credentials that Boto3 automatically uses.
Option 2: Environment Variables
1
2
3
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_DEFAULT_REGION="us-east-1"
Option 3: IAM Role (Best for Production) If your script runs on an EC2 instance, use an IAM role. No credentials needed!
Step 3: Your First Boto3 Script
Let’s start simple - list all your S3 buckets:
1
2
3
4
5
6
7
8
9
10
11
import boto3
# Create an S3 client
s3_client = boto3.client('s3')
# List all buckets
response = s3_client.list_buckets()
print("Your S3 Buckets:")
for bucket in response['Buckets']:
print(f" - {bucket['Name']} (created: {bucket['CreationDate']})")
Run it:
1
python list_buckets.py
Real-world example: You need to audit all your S3 buckets. Instead of clicking through the console 50 times, this script does it in seconds!
Understanding Boto3 Clients vs Resources
Boto3 has two ways to interact with AWS:
Clients (Low-Level)
Clients give you direct access to AWS API operations. More control, more verbose:
1
2
3
4
5
6
7
8
9
10
11
12
13
import boto3
# Create a client
ec2_client = boto3.client('ec2')
# Describe instances (returns raw API response)
response = ec2_client.describe_instances()
# Access the data
for reservation in response['Reservations']:
for instance in reservation['Instances']:
print(f"Instance ID: {instance['InstanceId']}")
print(f"State: {instance['State']['Name']}")
Resources (High-Level)
Resources provide a more Pythonic interface. Easier to use, less control:
1
2
3
4
5
6
7
8
9
10
11
import boto3
# Create a resource
ec2_resource = boto3.resource('ec2')
# Get all instances (more Pythonic)
instances = ec2_resource.instances.all()
for instance in instances:
print(f"Instance ID: {instance.id}")
print(f"State: {instance.state['Name']}")
When to use which:
- Clients - When you need specific API operations or more control
- Resources - When you want simpler, more Pythonic code
For security automation, I usually use clients because they give me more control over what I’m doing.
Real-World Example 1: Compliance Scanner for S3
Let’s build a script that checks all S3 buckets for security issues:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import boto3
from botocore.exceptions import ClientError
def check_s3_bucket_security(bucket_name):
"""Check a single S3 bucket for common security issues."""
s3_client = boto3.client('s3')
issues = []
try:
# Check 1: Public Access Block
try:
public_access = s3_client.get_public_access_block(Bucket=bucket_name)
block_config = public_access['PublicAccessBlockConfiguration']
if not all([
block_config.get('BlockPublicAcls', False),
block_config.get('BlockPublicPolicy', False),
block_config.get('RestrictPublicBuckets', False)
]):
issues.append("Public access not fully blocked")
except ClientError:
issues.append("CRITICAL: Public access block not configured!")
# Check 2: Encryption
try:
encryption = s3_client.get_bucket_encryption(Bucket=bucket_name)
if 'ServerSideEncryptionConfiguration' not in encryption:
issues.append("Encryption not configured")
except ClientError:
issues.append("CRITICAL: Encryption not enabled!")
# Check 3: Versioning
versioning = s3_client.get_bucket_versioning(Bucket=bucket_name)
if versioning.get('Status') != 'Enabled':
issues.append("Versioning not enabled")
# Check 4: Bucket Policy (check if it allows public access)
try:
policy = s3_client.get_bucket_policy(Bucket=bucket_name)
policy_doc = eval(policy['Policy']) # Convert string to dict
for statement in policy_doc.get('Statement', []):
if statement.get('Principal') == '*' or statement.get('Principal') == {'AWS': '*'}:
issues.append("WARNING: Bucket policy allows public access")
except ClientError:
pass # No policy is fine
return issues
except ClientError as e:
return [f"Error checking bucket: {str(e)}"]
def scan_all_s3_buckets():
"""Scan all S3 buckets for security issues."""
s3_client = boto3.client('s3')
# Get all buckets
response = s3_client.list_buckets()
print("=" * 60)
print("S3 Bucket Security Scan")
print("=" * 60)
for bucket in response['Buckets']:
bucket_name = bucket['Name']
print(f"\nScanning: {bucket_name}")
print("-" * 60)
issues = check_s3_bucket_security(bucket_name)
if issues:
print("⚠️ Issues Found:")
for issue in issues:
print(f" - {issue}")
else:
print("✅ No issues found!")
if __name__ == "__main__":
scan_all_s3_buckets()
What this does:
- Lists all your S3 buckets
- Checks each one for:
- Public access configuration
- Encryption settings
- Versioning status
- Public bucket policies
- Reports any issues found
Real-world use: Run this daily as part of your compliance checks. It’s exactly what the automated compliance tool does!
Real-World Example 2: EC2 Security Group Auditor
Let’s check EC2 security groups for risky open ports:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import boto3
# Risky ports that shouldn't be open to the internet
RISKY_PORTS = {
22: "SSH",
3389: "RDP",
445: "SMB",
139: "NetBIOS",
1433: "SQL Server",
3306: "MySQL",
5432: "PostgreSQL"
}
def check_security_group(security_group_id, group_name):
"""Check a security group for risky configurations."""
ec2_client = boto3.client('ec2')
issues = []
# Get security group details
response = ec2_client.describe_security_groups(
GroupIds=[security_group_id]
)
sg = response['SecurityGroups'][0]
# Check each rule
for rule in sg.get('IpPermissions', []):
port = rule.get('FromPort')
protocol = rule.get('IpProtocol')
# Check if port is risky
if port in RISKY_PORTS:
# Check if it's open to the internet (0.0.0.0/0)
for ip_range in rule.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
issues.append(
f"CRITICAL: Port {port} ({RISKY_PORTS[port]}) "
f"is open to the internet (0.0.0.0/0)"
)
return issues
def scan_all_security_groups():
"""Scan all security groups for risky configurations."""
ec2_client = boto3.client('ec2')
# Get all security groups
response = ec2_client.describe_security_groups()
print("=" * 60)
print("EC2 Security Group Audit")
print("=" * 60)
for sg in response['SecurityGroups']:
sg_id = sg['GroupId']
sg_name = sg['GroupName']
print(f"\nChecking: {sg_name} ({sg_id})")
print("-" * 60)
issues = check_security_group(sg_id, sg_name)
if issues:
for issue in issues:
print(f"⚠️ {issue}")
else:
print("✅ No risky ports found")
if __name__ == "__main__":
scan_all_security_groups()
What this does:
- Scans all security groups
- Identifies risky ports (SSH, RDP, etc.) open to the internet
- Reports critical findings
Real-world use: This is exactly what compliance auditors look for. Finding these issues automatically saves hours of manual review!
Real-World Example 3: IAM Policy Analyzer
Let’s check IAM policies for dangerous wildcard permissions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import boto3
import json
def analyze_iam_policy(policy_document):
"""Analyze an IAM policy for security issues."""
issues = []
# Convert policy string to dict if needed
if isinstance(policy_document, str):
policy_doc = json.loads(policy_document)
else:
policy_doc = policy_document
# Check each statement
for statement in policy_doc.get('Statement', []):
effect = statement.get('Effect', 'Allow')
actions = statement.get('Action', [])
resources = statement.get('Resource', [])
# Normalize to lists
if isinstance(actions, str):
actions = [actions]
if isinstance(resources, str):
resources = [resources]
# Check for wildcard actions
if '*' in actions or 's3:*' in actions or 'ec2:*' in actions:
issues.append(
f"WARNING: Wildcard action found: {actions} "
f"(Effect: {effect})"
)
# Check for wildcard resources
if '*' in resources:
issues.append(
f"CRITICAL: Wildcard resource '*' found! "
f"This allows access to ALL resources."
)
# Check for overly permissive actions
dangerous_actions = [
'iam:CreateUser',
'iam:DeleteUser',
'iam:AttachUserPolicy',
's3:DeleteBucket',
'ec2:TerminateInstances'
]
for action in actions:
if any(danger in action for danger in dangerous_actions):
if effect == 'Allow' and '*' in resources:
issues.append(
f"CRITICAL: Dangerous action '{action}' allowed "
f"on all resources!"
)
return issues
def scan_user_policies():
"""Scan all IAM users and their policies."""
iam_client = boto3.client('iam')
print("=" * 60)
print("IAM Policy Security Analysis")
print("=" * 60)
# Get all users
users_response = iam_client.list_users()
for user in users_response['Users']:
username = user['UserName']
print(f"\nAnalyzing user: {username}")
print("-" * 60)
# Get attached policies
attached_policies = iam_client.list_attached_user_policies(
UserName=username
)
# Get inline policies
inline_policies = iam_client.list_user_policies(UserName=username)
all_issues = []
# Check attached policies
for policy in attached_policies['AttachedPolicies']:
policy_arn = policy['PolicyArn']
policy_version = iam_client.get_policy(PolicyArn=policy_arn)
default_version = policy_version['Policy']['DefaultVersionId']
policy_doc = iam_client.get_policy_version(
PolicyArn=policy_arn,
VersionId=default_version
)
issues = analyze_iam_policy(
policy_doc['PolicyVersion']['Document']
)
all_issues.extend(issues)
# Check inline policies
for policy_name in inline_policies['PolicyNames']:
policy_doc = iam_client.get_user_policy(
UserName=username,
PolicyName=policy_name
)
issues = analyze_iam_policy(
policy_doc['PolicyDocument']
)
all_issues.extend(issues)
if all_issues:
for issue in all_issues:
print(f"⚠️ {issue}")
else:
print("✅ No policy issues found")
if __name__ == "__main__":
scan_user_policies()
What this does:
- Scans all IAM users
- Analyzes their policies for:
- Wildcard actions (
*) - Wildcard resources (
*) - Dangerous permissions
- Wildcard actions (
- Reports security issues
Error Handling: The Professional Way
Always handle errors properly. Here’s how:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import boto3
from botocore.exceptions import ClientError, BotoCoreError
def safe_aws_operation():
"""Example of proper error handling."""
s3_client = boto3.client('s3')
try:
response = s3_client.list_buckets()
return response['Buckets']
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'AccessDenied':
print("❌ Access denied. Check your IAM permissions.")
elif error_code == 'InvalidAccessKeyId':
print("❌ Invalid AWS credentials.")
else:
print(f"❌ AWS Error: {error_code} - {e}")
return []
except BotoCoreError as e:
print(f"❌ Boto3 Error: {e}")
return []
except Exception as e:
print(f"❌ Unexpected error: {e}")
return []
Pagination: Handling Large Results
AWS API responses are paginated. Here’s how to handle it:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import boto3
def get_all_ec2_instances():
"""Get all EC2 instances, handling pagination."""
ec2_client = boto3.client('ec2')
all_instances = []
# Use paginator for automatic pagination
paginator = ec2_client.get_paginator('describe_instances')
for page in paginator.paginate():
for reservation in page['Reservations']:
all_instances.extend(reservation['Instances'])
return all_instances
# Or manually handle pagination
def get_all_s3_objects(bucket_name):
"""Get all objects in an S3 bucket."""
s3_client = boto3.client('s3')
all_objects = []
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' in page:
all_objects.extend(page['Contents'])
return all_objects
Best Practices for Security Scripts
1. Use IAM Roles, Not Access Keys
Bad:
1
2
3
4
5
6
# Don't hardcode credentials!
s3_client = boto3.client(
's3',
aws_access_key_id='AKIA...',
aws_secret_access_key='secret...'
)
Good:
1
2
3
# Let Boto3 use default credential chain
# (IAM role, environment variables, or ~/.aws/credentials)
s3_client = boto3.client('s3')
2. Handle Errors Gracefully
Always wrap AWS calls in try/except blocks.
3. Use Paginators
Don’t manually handle pagination. Use paginators!
4. Add Logging
1
2
3
4
5
6
7
8
9
10
11
12
13
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def scan_buckets():
logger.info("Starting S3 bucket scan...")
# ... your code ...
logger.info("Scan complete!")
5. Make Scripts Reusable
Use functions, not just scripts:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def check_bucket_encryption(bucket_name):
"""Check if a bucket has encryption enabled."""
# ... implementation ...
def check_bucket_public_access(bucket_name):
"""Check if a bucket blocks public access."""
# ... implementation ...
# Main function that uses the above
def audit_s3_bucket(bucket_name):
"""Complete audit of an S3 bucket."""
results = {
'encryption': check_bucket_encryption(bucket_name),
'public_access': check_bucket_public_access(bucket_name)
}
return results
Key Takeaways
- Boto3 is your AWS automation toolkit - Learn it well
- Always handle errors - AWS APIs can fail
- Use paginators - For large result sets
- Use IAM roles - Not hardcoded credentials
- Write reusable functions - Not one-off scripts
- Add logging - Know what your scripts are doing
- Test in a dev account first - Don’t break production!
Practice Exercise
Try building this yourself:
- Create a script that lists all EC2 instances
- For each instance, check if it has encryption enabled
- Check if the security groups have risky ports open
- Generate a report in a text file
Resources to Learn More
What’s Next?
Now that you can automate AWS with Python, you’re ready to:
- Build the full compliance scanning tool
- Automate security incident response
- Create custom security monitoring scripts
Remember: Automation is a force multiplier. The time you spend learning Boto3 will save you hundreds of hours in the future!
💡 Pro Tip: Start with the AWS CLI to understand what operations you need, then translate them to Boto3. The AWS CLI commands map directly to Boto3 client methods!
Ready to containerize your scripts? Check out our next post on Docker, where we’ll learn how to package your Python automation tools!