Automating AWS Security Group Management with Python and Boto3
For the DR project, each Windows server being replicated into AWS needed its own security group. The recovery instances had to be reachable on specific ports from specific source networks — matching the firewall rules documented on the source machines. 50 servers. Each with a distinct set of rules. Doing this in the AWS console would have taken hours and introduced transcription errors. Python and boto3 did it in about 30 minutes, including a dry run to verify the plan.
The Config File
The input was a JSON file generated by post-processing the Ansible fact collection output. Each entry described a server, its site, and the ingress rules it needed:
servers = [
{
"name": "server-01",
"site": "waterford",
"vpc_id": "vpc-0abc1234",
"allowed_ports": [443, 1500],
"allowed_sources": ["10.0.0.0/8"]
},
{
"name": "server-02",
"site": "eastworks",
"vpc_id": "vpc-0abc1234",
"allowed_ports": [443, 1500, 3389],
"allowed_sources": ["10.20.0.0/16", "10.30.0.0/16"]
}
]
Port 1500 is the AWS Elastic Disaster Recovery replication port. Port 443 was needed for agent-to-AWS control plane traffic. RDP (3389) was included for servers where the runbook required console access post-recovery. The source ranges came from the firewall rule enumeration playbook output.
Security Group Creation
The core function creates a single security group and returns its ID:
import boto3
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
def create_security_group(ec2_client, name, site, vpc_id, dryrun=True):
if dryrun:
logger.info(f"[DRYRUN] Would create SG: {name} (site: {site}, vpc: {vpc_id})")
return None
try:
response = ec2_client.create_security_group(
Description=f"DR replication rules for {name}",
GroupName=name,
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'security-group',
'Tags': [
{'Key': 'Name', 'Value': name},
{'Key': 'Site', 'Value': site},
{'Key': 'ManagedBy', 'Value': 'dr-automation'}
]
}]
)
group_id = response['GroupId']
logger.info(f"Created security group {name}: {group_id}")
return group_id
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidGroup.Duplicate':
logger.warning(f"Security group {name} already exists, skipping creation")
return None
raise
The TagSpecifications block tags the security group at creation time. This matters for tracking: in a VPC with a lot of security groups, the ManagedBy: dr-automation tag lets you find everything this script created without guessing based on names.
Idempotency: Check Before Create
Running the script twice shouldn't produce duplicate security groups or raise unhandled errors. The idempotency check uses describe_security_groups with a name filter:
def get_existing_sg(ec2_client, name, vpc_id):
"""Return the group ID if a security group with this name exists in the VPC, else None."""
try:
response = ec2_client.describe_security_groups(
Filters=[
{'Name': 'group-name', 'Values': [name]},
{'Name': 'vpc-id', 'Values': [vpc_id]}
]
)
groups = response.get('SecurityGroups', [])
if groups:
return groups[0]['GroupId']
return None
except ClientError:
raise
The main loop checks for an existing group first, creates only if missing:
def ensure_security_group(ec2_client, server, dryrun=True):
name = server['name']
site = server['site']
vpc_id = server['vpc_id']
existing_id = get_existing_sg(ec2_client, name, vpc_id)
if existing_id:
logger.info(f"Security group {name} already exists: {existing_id}")
group_id = existing_id
else:
group_id = create_security_group(ec2_client, name, site, vpc_id, dryrun=dryrun)
if group_id and not dryrun:
add_ingress_rules(ec2_client, group_id, server['allowed_ports'], server['allowed_sources'], dryrun=dryrun)
return group_id
Adding Ingress Rules
authorize_security_group_ingress takes an IpPermissions list. Each entry in the list is a protocol/port combination with a list of source ranges:
def add_ingress_rules(ec2_client, group_id, ports, sources, dryrun=True):
ip_permissions = []
for port in ports:
ip_ranges = [{'CidrIp': cidr, 'Description': 'DR automation'} for cidr in sources]
ip_permissions.append({
'IpProtocol': 'tcp',
'FromPort': port,
'ToPort': port,
'IpRanges': ip_ranges
})
if dryrun:
logger.info(f"[DRYRUN] Would add {len(ip_permissions)} rules to {group_id}")
return
try:
ec2_client.authorize_security_group_ingress(
GroupId=group_id,
IpPermissions=ip_permissions
)
logger.info(f"Added ingress rules to {group_id}: ports {ports} from {sources}")
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidPermission.Duplicate':
logger.warning(f"Some rules already exist on {group_id}, skipping")
else:
raise
InvalidPermission.Duplicate is the error code when you try to add a rule that already exists. The API adds all rules in the call or fails — it's not partial. If you're re-running after a partial failure, you may need to revoke first or restructure the call to add only the missing rules. For initial creation this was never an issue; the check catches it as a warning.
The Dry-Run Workflow
The script entry point wires everything together with a dryrun flag:
import json
import sys
def main():
dryrun = '--apply' not in sys.argv
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)
if dryrun:
logger.info("Running in DRY RUN mode. Pass --apply to create resources.")
ec2 = boto3.client('ec2', region_name='us-west-2')
with open('servers.json') as f:
servers = json.load(f)
for server in servers:
ensure_security_group(ec2, server, dryrun=dryrun)
if dryrun:
logger.info("Dry run complete. Review output above, then run with --apply.")
if __name__ == '__main__':
main()
Usage:
python create_sgs.py # dry run, logs what would happen
python create_sgs.py --apply # actually creates the resources
The dry-run output looks like:
2019-12-03 14:22:01 INFO Running in DRY RUN mode. Pass --apply to create resources.
2019-12-03 14:22:01 INFO [DRYRUN] Would create SG: server-01 (site: waterford, vpc: vpc-0abc1234)
2019-12-03 14:22:01 INFO [DRYRUN] Would add 2 rules to None
2019-12-03 14:22:01 INFO [DRYRUN] Would create SG: server-02 (site: eastworks, vpc: vpc-0abc1234)
2019-12-03 14:22:01 INFO [DRYRUN] Would add 3 rules to None
Review that output. Confirm the names, sites, and rule counts look right. Then run with --apply. This saved me from a naming mistake on the first real run — I had the wrong site tag on three servers because of a copy-paste error in the input JSON.
Updating Existing Rules: firewall.py
Security groups aren't write-once. Rules change: a new site comes online and needs access, a deprecated network range gets removed. The companion firewall.py script handles updates to existing security groups by diffing current state against desired state:
def update_security_group_rules(ec2_client, group_id, desired_ports, desired_sources, dryrun=True):
# Get current rules
response = ec2_client.describe_security_groups(GroupIds=[group_id])
current_permissions = response['SecurityGroups'][0].get('IpPermissions', [])
# Build set of current (port, cidr) tuples
current_rules = set()
for perm in current_permissions:
if perm.get('IpProtocol') == 'tcp':
port = perm.get('FromPort')
for ip_range in perm.get('IpRanges', []):
current_rules.add((port, ip_range['CidrIp']))
# Build desired set
desired_rules = {(port, cidr) for port in desired_ports for cidr in desired_sources}
to_add = desired_rules - current_rules
to_revoke = current_rules - desired_rules
if not to_add and not to_revoke:
logger.info(f"{group_id}: no changes needed")
return
if to_add:
logger.info(f"{group_id}: would add rules: {to_add}")
if not dryrun:
add_permissions = []
for port, cidr in to_add:
add_permissions.append({
'IpProtocol': 'tcp',
'FromPort': port,
'ToPort': port,
'IpRanges': [{'CidrIp': cidr}]
})
ec2_client.authorize_security_group_ingress(
GroupId=group_id, IpPermissions=add_permissions
)
if to_revoke:
logger.info(f"{group_id}: would revoke rules: {to_revoke}")
if not dryrun:
revoke_permissions = []
for port, cidr in to_revoke:
revoke_permissions.append({
'IpProtocol': 'tcp',
'FromPort': port,
'ToPort': port,
'IpRanges': [{'CidrIp': cidr}]
})
ec2_client.revoke_security_group_ingress(
GroupId=group_id, IpPermissions=revoke_permissions
)
The diff approach means you can run this script as a reconciliation pass: whatever is in the config file is what the security group should contain, no more and no less. Rules added manually in the console will be revoked on the next run. That's intentional — the config file is the source of truth.
What This Replaced
The alternative was clicking through the AWS console: New security group, name it, add rule, select TCP, enter port, enter source CIDR, repeat for each port and source combination, for each of 50 servers. At even two minutes per server that's nearly two hours, and the error rate for manual data entry across 50 repetitions is not zero.
The script ran against the full server list in under a minute. The dry run caught three configuration errors before anything was created. The --apply run produced all 50 security groups with correct rules. Total time from starting to write the script to having all groups created: about three hours including testing and the inevitable JSON syntax errors in the config file.
Boto3 is the right tool for this class of problem. The API is consistent, the error codes are documented, and the ClientError exception gives you enough information to handle the cases you care about without catching everything.