Unit tests for AWS Glue Jobs using StackSpot AI

Cover of the article on unit tests for AWS Glue Job. The image shows a white person wearing a headset and a long-sleeved shirt in front of a laptop and monitor in an office environment.

In this article, we’ll explore how to write unit tests for AWS Glue Jobs using an example project, with the help of generative AI and StackSpot AI. Before we get begin, it’s helpful to revisit some essential concepts:

AWS Glue

AWS Glue is a fully managed ETL (Extract, Transform, Load) service that makes it easy to prepare and load data for analysis. However, like any other code, AWS Glue jobs must be tested to ensure they work as expected. 

Before that, we must have sufficient background, as even basic knowledge of what to expect from a piece of code —AI-generated or not— leads to more meaningful conversations and interactions than simply typing a prompt.

After all, any AI will generate code if prompted, but the key lies in understanding what the tool has produced. From there, you can create interactions with more added value (and scripts)!

That’s why I’m here: let’s study a Glue Job project together, examine how StackSpot AI generated such code, and understand its logic. Let’s go.

Our first Glue Job

Let’s start from scratch with a simple Glue Job that performs ETL according to the structure below:

Python
aws-glue-job-project/ 
├── src/ 
│ ├── main.py 
├── tests/ 
│ ├── test_main.py 

The file  "src/main.py" contains the script below:

<<<< SCRIPT: PYTHON  >>>>>>>

import boto3
import sys
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job


## -----------------------------------------------------
def get_account_id():
    session = boto3.session.Session()
    sts_client = session.client('sts')
    return sts_client.get_caller_identity()["Account"]

## -----------------------------------------------------
def get_files(bucket_param: str, path: str, nome_trigger: str):
    s3_client = boto3.client("s3")
    arq_controle_grupos = s3_client.get_object(Bucket=bucket_param, Key=path)
    grupos_json = json.loads(arq_controle_grupos['Body'].read().decode('utf-8'))

    grupo_atual = [grupo for grupo in grupos_json if grupo["trigger"] == nome_trigger]  or [{"arquivos": []}]

    return grupo_atual[0]["arquivos"]

# ----------------------------------------------------------------------------------------
# MAIN -----------------------------------------------------------------------------------
def main_glue():

    account_id = get_account_id()

    conf = SparkConf()
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.glue_catalog.warehouse", f"s3://bucket-geral-{account_id}/icebergs/")
    conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    conf.set("spark.sql.defaultCatalog", "glue_catalog")

    # JOB parameters
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'bucket_name', 'filename'])

    # Spark and Glue contexts
    sc = SparkContext(conf=conf)
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session

    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)

    # ----------------------------------------------------------------------------------------
    # MAIN -----------------------------------------------------------------------------------

    database_destino = 'db_geral'
    tabela_destino   = 'tb_clientes'

    # Read the JSON file that contains the parameters 
    s3_client = boto3.client('s3')
    bucket_name = args["bucket_name"]
    file_key = args["filename"]

    response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    params = json.loads(response['Body'].read().decode('utf-8'))

    # Extract the parameters from JSON
    nome_tabela_origem = params["nome_tabela_origem"]
    database           = params["database_origem"]
    partitions         = params["partitions"]

    partition = partitions[0]

    origem_carga = partition["origem_carga"]
    ano = partition["ano"]
    mes = partition["mes"]

    print(f"Nome Tabela: {nome_tabela_origem}")
    print(f"Database: {database}")
    print(f"Partitions: {partitions}")

    # Creating the Dynamic SQL
    sql_query = f"""
        SELECT '{origem_carga}' as origem_carga
             , tb.*
          FROM {database}.{nome_tabela_origem} tb
         WHERE tb.ano = "{ano}"
           AND tb.mes = "{mes}"
    """

    print(f"SQL >> {sql_query}")

    # Execute SQL with Spark
    result_df = spark.sql(sql_query)

    if result_df.head(1):

        print(f"Recreating partition ... ")

        # Save the results
        result_df.writeTo(f"glue_catalog.{database_destino}.{tabela_destino}").overwritePartitions()

    else:

        print(f"No data found ... ")

    # Remove processed files
    print(f"Removing file ... ")
    s3_client.delete_object(Bucket=bucket_name, Key=file_key)

    # Finalizar o job
    job.commit()

if __name__ == "__main__": # pragma: no cover
    main_glue()

Understanding the concept behind our example project

This straightforward script performs the following operations:

  • The “get_account_id” function uses the AWS STS (Security Token Service) service to retrieve the ID of the current AWS account.
  • The get_files function reads a JSON file from a specific S3 bucket and returns a list of files based on the provided trigger.
  • The “main_glue” function, which is the main script, configures the Spark environment with extensions and catalogs tailored to Iceberg, a data table optimized for large data volumes. At this stage, we set up all the necessary configurations for Glue, where a query is set up dynamically and then executed, and the result is saved in another table (such as another layer of your data lake).
  • The job is finished, and the resources are released.

Generating tests

With StackSpot AI, we can generate the foundation for our tests and request further development via a prompt. However, our focus here is to understand what has been generated and how it can be used to refine future AI results.

Keep in mind that for our tests, we will follow the four essential steps for effective testing:

  • ARRANGE: preparing the macro scenarios for testing.
  • ACT: run the unit/code to be tested.
  • ASSERT: compare and verify that the execution aligns with expectations.
  • CLEAN UP / TEAR DOWN: clean up and release resources.

Using the following prompt, something is directly generated that can serve as the foundation for the tests:

“As a data engineer, generate the unit tests for this GlueJOB.” This prompt will generate something close to this code (though it may vary depending on the context).

Python
<<<<<<<<<< SCRIPT: PYTHON>>>>>>>>>>>>>>>>>>
import pytest
import json, boto3
from moto import mock_aws
from unittest.mock import patch, MagicMock
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import findspark
from src.main import get_account_id, get_files, main_glue

# -------------------------------------------------------------
@pytest.fixture(scope="session")
def mock_spark():
    findspark.init()

    conf = SparkConf()
    conf.set("spark.log.level", "ERROR")

    spark_session = SparkSession.builder \
                    .config(conf=conf) \
                    .appName("TestGlueJOB") \
                    .master("local") \
                    .getOrCreate()
    print(" ***** SPARK >> STARTING TESTS ********* ")
    yield spark_session

    print("\n***** SPARK >> TURNING OFF ********* ")
    spark_session.stop()

# -------------------------------------------------------------
@mock_aws
def test_get_account_id():

    result = get_account_id()
    assert result is not None

# -------------------------------------------------------------
@mock_aws
def test_get_files():

    ## 001 - ARRANGE

    # Preparing the environment ***********************************
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'meu-bucket'
    s3_client.create_bucket(Bucket=bucket_name)

    json_content = json.dumps([{
                    "trigger": "nome_trigger_01",
                    "arquivos": ["arq0.txt", "arq1.txt", "arq2.txt"]
                },
                                {
                    "trigger": "nome_trigger_02",
                    "arquivos": ["arq3.txt", "arq4.txt", "arq5.txt"]
                }]
    ).encode('utf-8')

    s3_client.put_object(Bucket=bucket_name, Key='pasta/config.json', Body=json_content)

    ## 002 - ACT

    result = get_files(bucket_param=bucket_name, path="pasta/config.json", nome_trigger="nome_trigger_02")

    ## 003 - ASSERT

    assert result == ["arq3.txt", "arq4.txt", "arq5.txt"]

    ## 004 - CLEAN UP

# -------------------------------------------------------------
@mock_aws
@pytest.mark.parametrize("tipo_teste", [
    ("DF_OK"),
    ("DF_EMPTY")
])
@patch('src.main.SparkConf')
@patch('src.main.getResolvedOptions')
@patch('src.main.SparkContext')
@patch('src.main.GlueContext')
@patch('src.main.Job')
def test_main_glue(mock_Job, mock_GlueContext, mock_SparkContext, mock_getResolvedOptions, mock_sparkconf, mock_spark, tipo_teste):

    # 001 - ARRANGE
    # Preparing the environment ***********************************
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'meu-bucket'
    s3_client.create_bucket(Bucket=bucket_name)

    json_content = json.dumps({
        "nome_tabela_origem": "minha_tabela_01",
        "database_origem": "meu_db",
        "partitions": [{
            "origem_carga": "valor_01",
            "ano": "valor_01",
            "mes": "valor_02",
            "cod_tipo_publ": "valor_03"
        }]
    }).encode('utf-8')
    s3_client.put_object(Bucket=bucket_name, Key='pasta/subpasta/meu_arquivo.json', Body=json_content)

    mock_getResolvedOptions.return_value = {
        'JOB_NAME':' test_job',
        'bucket_name': 'meu-bucket',
        'filename': 'pasta/subpasta/meu_arquivo.json'
    }

    mock_Job.return_value = MagicMock()
    mock_Job.return_value.init.return_value = MagicMock()


    schema = StructType([
        StructField("col1", StringType(), True),
        StructField("col2", IntegerType(), True)
    ])

    if tipo_teste == 'DF_OK':
        result_df = mock_spark.createDataFrame([(0, 2)], schema=schema)
    else:
        result_df = mock_spark.createDataFrame([], schema=schema)

    mock_GlueContext.return_value.spark_session = MagicMock()
    mock_GlueContext.return_value.spark_session.sql.side_effect = [result_df,
                                                                   mock_spark.createDataFrame([], schema=schema)]

    with patch.object(result_df, 'writeTo', return_value=MagicMock()) as mock_writeTo:
        mock_writeTo.return_value.overwritePartitions = MagicMock()
        mock_overwritePartitions = mock_writeTo.return_value.overwritePartitions

        ## 002 - ACT
        main_glue()

    # 003 - ASSERT
    if tipo_teste == 'DF_OK':
        mock_writeTo.assert_called_once_with('glue_catalog.db_geral.tb_clientes')
        mock_overwritePartitions.assert_called_once()

    else:
        mock_writeTo.assert_not_called()

    mock_Job.return_value.init.assert_called_once()
    mock_Job.return_value.commit.assert_called_once()

From now on, can you “chat” with StackSpot AI and ask for details or even improve part of the code? Yes, the code requires supervision, as there are instances of hallucinations or reliance on outdated libraries.

If you follow suggestions and get an error, can you copy and paste the error back into the prompt? That’s fine —who hasn’t done that? But the real question is: “Why did that happen?” or “Why didn’t I understand what it generated, and why can’t I argue against it?”.

This is what separates good code and effective AI usage from simple “CTRL C / CTRL V.” By investing time to understand what was generated, we can save time during future test applications, provide the AI with more project context, request lib X instead of Y, or even discuss better alternatives for the code.

Shall we dive in?

Test breakdown time!

First, let’s examine what’s done and why, so you can grasp the process and generate the best possible code!

Fixtures with the Spark Session

Python
<<<<< SCRIPT : PYTHON
@pytest.fixture(scope="session")
def mock_spark():
    findspark.init()

    conf = SparkConf()
    conf.set("spark.log.level", "ERROR")

    spark_session = SparkSession.builder \
                    .config(conf=conf) \
                    .appName("TestGlueJOB") \
                    .master("local") \
                    .getOrCreate()
    print(" ***** SPARK >> STARTING TESTS ********* ")
    yield spark_session

    print("\n***** SPARK >> TURNING OFF ********* ")
    spark_session.stop()
>>>>>>>>>>>>>>>>>

At this stage, we use a Python decorator for Fixtures!

A fixture, in unit tests, will become a function that sets up the environment needed to run the tests, such as configuring data, initializing resources and ensuring cleanup after tests are completed. 

In our script, the “mock_spark” function will be used as a fixture at the session level (“scope” parameter). This means that this script will be executed only once before the first test is run

for all the tests in this section, and what it executes will apply to the others. In this example, we are creating the Spark session.

The use of “findspark.init()” ensures that Spark configures the local environment (environment variables, etc.) properly.

The “yield” returns to the caller when it reaches this line. The function remains “pending” to continue execution (until it either finishes or finds a another “yield”). After the “yield,” the code resumes execution once the tests using the fixture are complete, serving as the hook for testing step 4: CLEAN UP / TEARDOWN.

Using Mocks for AWS Resources

Python
<<<<< SCRIPT : PYTHON
@mock_aws
def test_get_account_id():

    result = get_account_id()
    assert result is not None
>>>>>>>>>>>>>>>>>

The @mock_aws decorator simulates AWS services during testing. This is done using the moto library, which creates an isolated test environment to interact with AWS services without making real calls to AWS. Here’s our mock!

Since the get_account_id function makes calls to AWS services, this decorator (referred to as “decorator” in technical documentation) handles “simulating” —or “mocking”— those calls.

“Hold on a second—I don’t understand… what are Mocks?” 

Don’t worry, let’s break it down!

What are Mocks?

Mocking is a unit testing technique that simulates the behavior of complex objects or objects external to the system being tested.

This technique isolates the unit of code being tested, allowing verification of its behavior without relying on external components such as web services, databases, or, in our case, AWS services.

“Let’s simulate it then, shall we?” 

Right!

“Okay, but how does it know which objects must be returned?”

Excellent question!

That’s where we “prepare our AWS environment using mocks.”

Preparing the AWS environment simulation

As explained, the first stage of a test is to “prepare” the environment. Using @mock_aws, we’ll be able to simulate what we want. Take a look:

Python
<<<<< SCRIPT : PYTHON
# -------------------------------------------------------------
@mock_aws
def test_get_files():

    ## 001 - ARRANGE

    # Preparing the environment ***********************************
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'meu-bucket'
    s3_client.create_bucket(Bucket=bucket_name)

    json_content = json.dumps([{
                    "trigger": "nome_trigger_01",
                    "arquivos": ["arq0.txt", "arq1.txt", "arq2.txt"]
                },
                                {
                    "trigger": "nome_trigger_02",
                    "arquivos": ["arq3.txt", "arq4.txt", "arq5.txt"]
                }]
    ).encode('utf-8')

    s3_client.put_object(Bucket=bucket_name, Key='pasta/config.json', Body=json_content)

    ## 002 - ACT

    result = get_files(bucket_param=bucket_name, path="pasta/config.json", nome_trigger="nome_trigger_02")

    ## 003 - ASSERT

    assert result == ["arq3.txt", "arq4.txt", "arq5.txt"]

    ## 004 - CLEAN UP
>>>>>>>>>>>>>>>>>

Note that in this scenario, using the decorator, we call the AWS boto3 lib to create the client, set up a bucket, and add objects to it.

“But is it actually going to put this in AWS? You said it was a simulation…”

No, and yes! It actually calls the official boto3 methods, but because we’re using the @mock_aws decorator, it intervenes and does it only in memory, returning with “okay—done!”

So, if our script reads files from a bucket, we need to create it in memory so that it’s actually read when we run the ACT step.

Wow! Got it? We prepare the entire scenario before the call and validate its return based on the expected execution.

Multiple mocks

Now, let’s mock several libs used by the main script.

Python
<<<<< SCRIPT : PYTHON
# -------------------------------------------------------------
@mock_aws
@pytest.mark.parametrize("tipo_teste", [
    ("DF_OK"),
    ("DF_EMPTY")
])
@patch('src.main.SparkConf')
@patch('src.main.getResolvedOptions')
@patch('src.main.SparkContext')
@patch('src.main.GlueContext')
@patch('src.main.Job')
def test_main_glue(mock_Job, mock_GlueContext, mock_SparkContext, mock_getResolvedOptions, mock_sparkconf, mock_spark, tipo_teste):
>>>>>>>>>>>>>>>>>

The @patch decorator, provided by Python’s unittest.mock library, is used to replace (or “mock”) specific objects or functions during test execution. 

In our case, we’re mocking most of the libs and generating a “MagiMock” (the main mock class), which can act as “anything.” We’ll explore this later, but here, we mention exactly each object/function we want to mock. 

Pay attention to the order of the patches—it’s the reverse of what is passed on to use the test function “test_main_glue.” Be sure to remember this!

So we put @mock_aws together with the other libs we want to mock.

“But what about mark.parametrize? What’s that? You didn’t comment….”

Same test but with different input parameters

If we’re breaking it down, we need to cover everything!

This is exactly where the @pytest.mark.parametrize decorator comes in.

Python
<<<<< SCRIPT : PYTHON
@pytest.mark.parametrize("valor", [
    (20),
    (30)
])
>>>>>>>>>>>>>>>>>

Let’s understand what the @pytest.mark.parametrize decorator does in the context of unit tests.

The @pytest.mark.parametrize is a decorator provided by the pytest testing framework. It allows you to run the same test several times with different parameters. This is particularly useful for testing the same logic against various inputs and ensuring the code behaves as expected for each dataset.

In this example, at runtime, there will be two tests: one using the value = 20 and then a new test using the value = 20. Can you see the benefit?

Mixing Spark with Mocks (or MagicMocks)

Python
<<<<< SCRIPT : PYTHON

    mock_Job.return_value = MagicMock()
    mock_Job.return_value.init.return_value = MagicMock()
	
    schema = StructType([
        StructField("col1", StringType(), True),
        StructField("col2", IntegerType(), True)
    ])

    if tipo_teste == 'DF_OK':
        result_df = mock_spark.createDataFrame([(0, 2)], schema=schema)
    else:
        result_df = mock_spark.createDataFrame([], schema=schema)

    mock_GlueContext.return_value.spark_session.return_value = MagicMock()
    mock_GlueContext.return_value.spark_session.sql.side_effect = [result_df,
                                                                   mock_spark.createDataFrame([], schema=schema)]
>>>>>>>>>>>>>>>>>

This is where things get exciting: after using “mock_spark” (a fixture passed as a parameter to the test function), we need to mock the following:

Whenever “spark.sql” is invoked, instead of executing it in Glue’s Spark, we want it to return a predefined Dataframe so we can control the tests!

Here’s the golden tip!

Whenever we mock an object, it becomes an instance of the “MagickMock” class. This object can be “anything,” remember? So, to enforce this behavior, we need to control it.

How? “Every time someone invokes a function of an object we’ve mocked, we use the ‘return_value’ attribute to assign the desired return.” 

Example: 

Python
<<<<< SCRIPT : PYTHON
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
>>>>>>>>>>	

By using a “Job” mock, we can control the return from the “init” function:

Python
<<<<< SCRIPT : PYTHON
    mock_Job.return_value = MagicMock()
    mock_Job.return_value.init.return_value = MagicMock()
>>>>>>>>>>		

Cool, isn’t it?

“What if a function is called several times during the code? How can I control each call?”

In this case, we replace “return_value” with “side_effect” to stack the results for each function: 

Python

<<<<< SCRIPT : PYTHON    
	mock_GlueContext.return_value.spark_session.sql.side_effect = [result_df,
                                                                   mock_spark.createDataFrame([], schema=schema)]
>>>>>>>>>>	

Here, we are stacking the “spark.sql” calls so that they return a different item for each call.

Mocking part of real-time objects

In some scenarios, we may have to use the native object (an actual data frame, for example), as it gives us flexibility and “no need” to mock its methods.

However, in certain situations, you might need to mock the “specific methods” of an object. Here’s one way to do it:

Python
<<<<< SCRIPT : PYTHON
    with patch.object(result_df, 'writeTo', return_value=MagicMock()) as mock_writeTo:
        mock_writeTo.return_value.overwritePartitions = MagicMock()
        mock_overwritePartitions = mock_writeTo.return_value.overwritePartitions
>>>>>>>>>>	

In this example, we mock the existing “writeTo” method in a Dataframe so that it returns “only” a MagickMock. Since we don’t want it to persist in the records, we will simply validate if it was “called” later during the ASSERT stage.

Finally, the checks

After properly executing the function to be validated, we must confirm that it performed as expected using “ASSERTs”:

Python
<<<<< SCRIPT : PYTHON
    # 003 - ASSERT
    if tipo_teste == 'DF_OK':
        mock_writeTo.assert_called_once_with('glue_catalog.db_geral.tb_clientes')
        mock_overwritePartitions.assert_called_once()

    else:
        mock_writeTo.assert_not_called()

    mock_Job.return_value.init.assert_called_once()
    mock_Job.return_value.commit.assert_called_once()
>>>>>>>>>>		

The most common asserts are value comparisons “assert value_1 == value_2”, but we also have the “call count” check (or no calls from a mock), using “mock.funcao.assert_called_once()”, or “mock.funcao.assert_not_called()”.

To verify the number of calls, we use “assert mock.funcao.call_count == 2”.

To confirm that the mock was called “and with specific parameters,” we use “mock. function.assert_called_with(‘my_parameter’)”

And let’s not forget CLEAN UP

Before wrapping up, there’s the CLEAN UP step.

Remember the fixture with “yield”? Once the tests are finished (after the execution of each test function), that fixture continues to run and, as we’ve seen, will stop the Spark session,  applying the CLEAN UP / TEARDOWN step:

Python
<<<<< SCRIPT : PYTHON
    print("\n***** SPARK >> TURNING OFF ********* ")
    spark_session.stop()
>>>>>>>>>>

Conclusion 

Wow, what a journey through the tests! And most importantly: we’ve understood the reasoning behind every detail.

By breaking down the GlueJob tests (or any Python script), we learned valuable details that will help us in future applications.

Do you now feel more prepared to interact with StackSpot AI? When it generates code for you, engage with it—exercise critical thinking, and clarify the details of your test, your mock, and how you want the tests to operate from now on, whether in form A or form B. Leverage StackSpot AI to your advantage and make it your best coding partner.

You’ve earned a mock badge to use in your upcoming tests. Study hard and go for it!

Consume innovation,
begin transformation

Subscribe to our newsletter to stay updated
on the latest best practices for leveraging
technology to drive business impact

Related posts