AWS ECR Scan-on-Push Slack Notification


AWS ECR has a very nice integration with the open-source Clair project, which contains known vulnerabilities for container images.

They call this “image scanning”. You may trigger an image scan manually, or you can set “scan on push” on your ECR repository, so then every image uploaded to it will be automatically scanned.

The scan findings can then be retrieved either via the console or via the API, and are sorted according to their severity.

So, this is very neat if you are using ECR for your Docker images and want to stay informed about what vulnerabilities they contain.

Let’s see how we could have this in Terragrunt/Terraform: an ECR repository with “scan on push” enabled and a Lambda function that reports the findings to our Slack channel.

 

I will omit the Terraform module’s code in this post, you can take a look at and use the one Cloud Posse has created, or draw inspiration from it if you want to create your own.

 

An ultra-slim Terragrunt code for our example repository would look like this:

terragrunt.hcl

# -------------------
# ECR repo for myapp
# -------------------

terraform {
  source = "./ecr-module" # replace with path to module
}

include {
  path = find_in_parent_folders()
}

inputs = {
  name                = "myapp-repo" # name of the ECR repository
  scan_images_on_push = true         # important for our example
}

 

And now let’s create a Lambda function. For that we can use one of terraform-aws-modules, which I can’t recommend enough.

The actual Terragrunt file would look like this:

terragrunt.hcl

# -------------------
# Lambda function that sends ECR scan results to Slack
# -------------------
terraform {
  source = "git::git@github.com:terraform-aws-modules/terraform-aws-lambda.git?ref=v1.36.0"
}

include {
  path = find_in_parent_folders()
}

inputs = {
  function_name      = "ecr-scan-report"
  description        = "Send ECR scan results to Slack"
  handler            = "lambda.lambda_handler"
  runtime            = "python3.8"
  timeout            = 30
  source_path        = jsonencode("lambda.py")
  attach_policy_json = true
  policy_json = jsonencode(
    {
      "Version" : "2012-10-17",
      "Statement" : [
        {
          "Effect" : "Allow",
          "Action" : [
            "ecr:DescribeImages",
            "ecr:DescribeImageScanFindings",
            "ecr:DescribeRepositories",
            "ecr:ListImages"
          ],
          "Resource" : "*"
        }
      ]
    }
  )
}

 

And of course, next to it, we need the Python code that sends the scan-on-push results to Slack:

lambda.py

import boto3
import json
from datetime import datetime
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from logging import getLogger, INFO

client = boto3.client('ecr')

logger = getLogger()
logger.setLevel(INFO)

## Replace the following with your specific info
region = "eu-west-1"                 # AWS region of repositories
channel = "security-vulnerabilities" # Slack channel to send message
slack_url = "https://hooks.slack.com/services/XXX/YYY/ZZZ"

## Return the color setting of severity
def get_properties(findings_summary):
    if findings_summary['CRITICAL'] != 0:
        properties = {'color': 'danger', 'icon': '🔴'}
    elif findings_summary['HIGH'] != 0:
        properties = {'color': 'warning', 'icon': '🔶'}
    else:
        properties = {'color': 'good', 'icon': '💚'}
    return properties

## Send slack message
def send_slack_message(scan_result, scan_findings, findings_summary, repo, latest_tag, latest_sha):
    severity_list = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'INFORMAL', 'UNDEFINED']
    for severity in severity_list:
        findings_summary.setdefault(severity, 0)
    text_properties = get_properties(findings_summary)
    description = scan_result['imageScanStatus']['description']
    complete_at = datetime.strftime(
        scan_findings['imageScanCompletedAt'],
        '%Y-%m-%d %H:%M:%S %Z'
    )
    source_update_at = datetime.strftime(
        scan_findings['vulnerabilitySourceUpdatedAt'],
        '%Y-%m-%d %H:%M:%S %Z'
    )

    if findings_summary['CRITICAL'] > 0 or findings_summary['HIGH'] > 0:
        critical_vulns = True
    else:
        critical_vulns = False

    ## Only send message if high or critical vulnerabilities exist
    if critical_vulns:
        slack_message = {
        'username': 'Amazon ECR',
        'channels': channel,
        'icon_emoji': ':ecr:',
        'text': "ECR Image Scan findings",
        'attachments': [
            {
                'fallback': 'AmazonECR Image Scan Findings Description.',
                'color': text_properties['color'],
                'title': f'''{text_properties['icon']} {repo}:{latest_tag}''',
                'title_link': f'''https://console.aws.amazon.com/ecr/repositories/{repo}/image/{latest_sha}/scan-results?region={region}'''
,
                'text': f'''{description}\nImage Scan Completed at {
                    complete_at}\nVulnerability Source Updated at {source_update_at}''',
                'fields': [
                    {'title': 'Critical', 'value': findings_summary['CRITICAL'], 'short': True},
                    {'title': 'High', 'value': findings_summary['HIGH'], 'short': True},
                    {'title': 'Medium', 'value': findings_summary['MEDIUM'], 'short': True},
                    {'title': 'Low', 'value': findings_summary['LOW'], 'short': True},
                    {'title': 'Informational', 'value': findings_summary['INFORMAL'], 'short': True},
                    {'title': 'Undefined', 'value': findings_summary['UNDEFINED'], 'short': True},
                ]
            }
        ]
    }
        req = Request(slack_url, json.dumps(slack_message).encode('utf-8'))
        try:
            with urlopen(req) as res:
                res.read()
                logger.info("Message posted.")
        except HTTPError as err:
            logger.error("Request failed: %d %s", err.code, err.reason)
        except URLError as err:
            logger.error("Server connection failed: %s", err.reason)
        else:
            response = 0
        return response

## Get repository names
def get_repos():
    repos_to_check = []
    txrepositories = client.describe_repositories()

    for repo in txrepositories.get('repositories'):
        if repo['imageScanningConfiguration']['scanOnPush']:
            currentrepo = repo['repositoryName']
            repos_to_check.append(currentrepo)
    repos_to_check.sort()
    return repos_to_check

## Get latest image's tag from each repo
def get_results():
    repos_to_check = get_repos()
    latest_images = []
    for repo in repos_to_check:

        image_tags = []
        image_dates = []
        image_shas = []

        image_details = client.describe_images(repositoryName=repo).get('imageDetails')
        if image_details:
            for detail in image_details:
                ## Fix for images with none or multiple tags
                image_tag = detail.get('imageTags')
                if image_tag == None:
                    image_tag = "untagged"
                elif len(image_tag) > 0:
                    image_tag = detail['imageTags'][0]
                image_tags.append(image_tag)
                
                image_sha = detail.get('imageDigest')
                image_shas.append(image_sha)

                image_date = detail.get('imagePushedAt')
                image_dates.append(image_date)

            ## Sort image tags by upload date
            sorted_tags = [tag for _,tag in sorted(zip(image_dates, image_tags))]
            latest_tag = sorted_tags[-1]
            sorted_shas = [sha for _,sha in sorted(zip(image_dates, image_shas))]
            latest_sha = sorted_shas[-1]

            scan_result = client.describe_image_scan_findings(repositoryName=repo, imageId={'imageTag':latest_tag})
            scan_findings = scan_result.get('imageScanFindings')
            findings_summary = scan_findings['findingSeverityCounts']

            ## Send results to slack function
            send_slack_message(scan_result, scan_findings, findings_summary, repo, latest_tag, latest_sha)

def lambda_handler(event, context):
    get_results()

So, what this short Python code does is it gets the last uploaded image’s scan results in all the ECR repositories that you have and if each one contains vulnerabilities of HIGH or CRITICAL severity then a Slack message will be sent to the channel you defined, containing a count of the findings per severity, and some more details of interest.

Feel free to tweak this as you like.

 

Example:

Orange (myapp1-repo) has vulnerabilities of HIGH and lower severity, Red (myapp2-repo) has vulnerabilities of CRITICAL and lower severity.

ecr slack

 

And there you have it!

Note that the {repository_name}:{image_tag} with the blue font is actually a AWS console link that points directly to the findings on the specific image.

Now, you can create some trigger for the Lambda function (maybe even a CloudWatch event) so that you stay up-to-date with your Docker image vulnerabilities :)

If this helped you, please feel free to give me a tip! https://ko-fi.com/kostavro