Testes unitários para AWS Glue Jobs com uso de StackSpot AI

Capa do artigo sobre Testes unitários para AWS Glue Jobs. Na imagem, temos uma mulher branca usando fones, óculos e uma camisa verde de manga comprida, ela está em frente ao notebook e a um monitor que exibe códigos.

Neste artigo, vamos explorar como escrever testes unitários para AWS Glue Jobs com a ajuda de IA generativa, a StackSpot AI, usando um projeto de exemplo. Antes de começar, vale revisitar alguns conceitos importantes, acompanhe:

AWS Glue

AWS Glue é um serviço de ETL (Extract, Transform, Load) totalmente gerenciado que facilita a preparação e o carregamento de dados para análise. No entanto, como qualquer outro código, os jobs do AWS Glue também precisam ser testados para garantir que funcionem conforme o esperado.

Antes disso, é essencial termos base o suficiente, pois ter um mínimo de conhecimento do que se espera de um código, mesmo gerado por IA, te faz ter conversas e interações muito mais proveitosas que um simples digitar de prompt.

Afinal, qualquer IA vai te gerar um código se você perguntar para ela, mas o diferencial é entender o que a ferramenta gerou. E, a partir daí, sim, gerar interações com um valor agregado (e também scripts) muito mais valioso!

É pra isso que estou aqui: vamos estudar juntos um projeto de um Glue Job, ver como a StackSpot AI gerou tais códigos e entender o racional por trás. Vamos lá?

Nosso primeiro Glue JOB

Vamos partir do ponto zero. Temos um Glue Job simples que faz um ETL, conforme estrutura abaixo:

Python
aws-glue-job-project/ 
├── src/ 
│ ├── main.py 
├── tests/ 
│ ├── test_main.py 

O arquivo "src/main.py" contém o GlueJOB:

<<<< SCRIPT - linguagem PYTHON  >>>>>>>

import boto3
import sys
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job


## -----------------------------------------------------
def get_account_id():
    session = boto3.session.Session()
    sts_client = session.client('sts')
    return sts_client.get_caller_identity()["Account"]

## -----------------------------------------------------
def get_files(bucket_param: str, path: str, nome_trigger: str):
    s3_client = boto3.client("s3")
    arq_controle_grupos = s3_client.get_object(Bucket=bucket_param, Key=path)
    grupos_json = json.loads(arq_controle_grupos['Body'].read().decode('utf-8'))

    grupo_atual = [grupo for grupo in grupos_json if grupo["trigger"] == nome_trigger]  or [{"arquivos": []}]

    return grupo_atual[0]["arquivos"]

# ----------------------------------------------------------------------------------------
# MAIN -----------------------------------------------------------------------------------
def main_glue():

    account_id = get_account_id()

    conf = SparkConf()
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.glue_catalog.warehouse", f"s3://bucket-geral-{account_id}/icebergs/")
    conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    conf.set("spark.sql.defaultCatalog", "glue_catalog")

    # Parâmetros do job
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'bucket_name', 'filename'])

    # Contextos do Spark e Glue
    sc = SparkContext(conf=conf)
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session

    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)

    # ----------------------------------------------------------------------------------------
    # MAIN -----------------------------------------------------------------------------------

    database_destino = 'db_geral'
    tabela_destino   = 'tb_clientes'

    # Ler o arquivo JSON com os parâmetros
    s3_client = boto3.client('s3')
    bucket_name = args["bucket_name"]
    file_key = args["filename"]

    response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    params = json.loads(response['Body'].read().decode('utf-8'))

    # Extrair os parâmetros do JSON
    nome_tabela_origem = params["nome_tabela_origem"]
    database           = params["database_origem"]
    partitions         = params["partitions"]

    partition = partitions[0]

    origem_carga = partition["origem_carga"]
    ano = partition["ano"]
    mes = partition["mes"]

    print(f"Nome Tabela: {nome_tabela_origem}")
    print(f"Database: {database}")
    print(f"Partitions: {partitions}")

    # Montagem da SQL dinamicamente
    sql_query = f"""
        SELECT '{origem_carga}' as origem_carga
             , tb.*
          FROM {database}.{nome_tabela_origem} tb
         WHERE tb.ano = "{ano}"
           AND tb.mes = "{mes}"
    """

    print(f"SQL >> {sql_query}")

    # Executar a SQL com Spark
    result_df = spark.sql(sql_query)

    if result_df.head(1):

        print(f"Recriando particao ... ")

        # Salvar o resultado como overwrite na tabela SOT
        result_df.writeTo(f"glue_catalog.{database_destino}.{tabela_destino}").overwritePartitions()

    else:

        print(f"Sem dados na Origem ... ")

    # Apagar arquivo processado
    print(f"Removendo arquivo de processamento ... ")
    s3_client.delete_object(Bucket=bucket_name, Key=file_key)

    # Finalizar o job
    job.commit()

if __name__ == "__main__": # pragma: no cover
    main_glue()

Entendendo a ideia do nosso projeto exemplo

Basicamente, este simples script realiza as seguintes operações:

  • A função “get_account_id” utiliza o serviço AWS STS (Security Token Service) para obter o ID da conta AWS atual.
  • A função get_files lê um arquivo JSON de um bucket S3 específico e retorna uma lista de arquivos com base em um trigger fornecido.
  • A função “main_glue”, que é o script principal, configura o ambiente do Spark com extensões e catálogos específicos do Iceberg, que é uma tabela de dados otimizada para grandes volumes de dados. Nesse momento, setamos todas as configurações necessárias para o Glue, onde é montada uma query dinamicamente, posteriormente executada, e o resultado é gravado numa outra tabela (em uma outra camada do seu data lake, por exemplo).
  • O job é finalizado e os recursos são liberados.

Consuma inovação,
comece a transformação

Assine nosso boletim informativo para se manter atualizado sobre as práticas recomendadas mais recentes para aproveitar a tecnologia para gerar impacto nos negócios

Gerando testes unitários para AWS Glue Jobs

Com o auxílio da StackSpot AI podemos gerar a base pros nossos testes unitários para AWS Glue Jobs, basta pedir pra gerar via prompt. Porém, nosso ponto aqui é entender o que foi gerado, e como isso serve para continuar e aprimorar novos resultados da IA.

Lembrando que para nossos testes, aplicaremos as quatro etapas essenciais para um bom teste:

  1. ARRANGE: preparação dos cenários macros para testes.
  2. ACT: executar realmente a unidade/código a ser testada.
  3. ASSERT: comparar e garantir que a execução foi realmente conforme esperado.
  4. CLEAN UP / TEAR DOWN: realizar a limpeza e liberação de recursos.

Com o seguinte prompt, de maneira simples e direta, é gerado algo que pode ser usado como base dos testes:

“Enquanto engenheiro de dados, gere os testes unitários para este GlueJOB.”, ele poderá gerar algo próximo a este código (podendo variar de acordo com o contexto que estiver).

Python
<<<<<<<<<< SCRIPT: PYTHON>>>>>>>>>>>>>>>>>>
import pytest
import json, boto3
from moto import mock_aws
from unittest.mock import patch, MagicMock
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import findspark
from src.main import get_account_id, get_files, main_glue

# -------------------------------------------------------------
@pytest.fixture(scope="session")
def mock_spark():
    findspark.init()

    conf = SparkConf()
    conf.set("spark.log.level", "ERROR")

    spark_session = SparkSession.builder \
                    .config(conf=conf) \
                    .appName("TestGlueJOB") \
                    .master("local") \
                    .getOrCreate()
    print(" ***** SPARK >> STARTING TESTS ********* ")
    yield spark_session

    print("\n***** SPARK >> TURNING OFF ********* ")
    spark_session.stop()

# -------------------------------------------------------------
@mock_aws
def test_get_account_id():

    result = get_account_id()
    assert result is not None

# -------------------------------------------------------------
@mock_aws
def test_get_files():

    ## 001 - ARRANGE

    # Preparando ambiente ***********************************
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'meu-bucket'
    s3_client.create_bucket(Bucket=bucket_name)

    json_content = json.dumps([{
                    "trigger": "nome_trigger_01",
                    "arquivos": ["arq0.txt", "arq1.txt", "arq2.txt"]
                },
                                {
                    "trigger": "nome_trigger_02",
                    "arquivos": ["arq3.txt", "arq4.txt", "arq5.txt"]
                }]
    ).encode('utf-8')

    s3_client.put_object(Bucket=bucket_name, Key='pasta/config.json', Body=json_content)

    ## 002 - ACT

    result = get_files(bucket_param=bucket_name, path="pasta/config.json", nome_trigger="nome_trigger_02")

    ## 003 - ASSERT

    assert result == ["arq3.txt", "arq4.txt", "arq5.txt"]

    ## 004 - CLEAN UP

# -------------------------------------------------------------
@mock_aws
@pytest.mark.parametrize("tipo_teste", [
    ("DF_OK"),
    ("DF_EMPTY")
])
@patch('src.main.SparkConf')
@patch('src.main.getResolvedOptions')
@patch('src.main.SparkContext')
@patch('src.main.GlueContext')
@patch('src.main.Job')
def test_main_glue(mock_Job, mock_GlueContext, mock_SparkContext, mock_getResolvedOptions, mock_sparkconf, mock_spark, tipo_teste):

    # 001 - ARRANGE
    # Preparando ambiente ***********************************
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'meu-bucket'
    s3_client.create_bucket(Bucket=bucket_name)

    json_content = json.dumps({
        "nome_tabela_origem": "minha_tabela_01",
        "database_origem": "meu_db",
        "partitions": [{
            "origem_carga": "valor_01",
            "ano": "valor_01",
            "mes": "valor_02",
            "cod_tipo_publ": "valor_03"
        }]
    }).encode('utf-8')
    s3_client.put_object(Bucket=bucket_name, Key='pasta/subpasta/meu_arquivo.json', Body=json_content)

    mock_getResolvedOptions.return_value = {
        'JOB_NAME':' test_job',
        'bucket_name': 'meu-bucket',
        'filename': 'pasta/subpasta/meu_arquivo.json'
    }

    mock_Job.return_value = MagicMock()
    mock_Job.return_value.init.return_value = MagicMock()


    schema = StructType([
        StructField("col1", StringType(), True),
        StructField("col2", IntegerType(), True)
    ])

    if tipo_teste == 'DF_OK':
        result_df = mock_spark.createDataFrame([(0, 2)], schema=schema)
    else:
        result_df = mock_spark.createDataFrame([], schema=schema)

    mock_GlueContext.return_value.spark_session = MagicMock()
    mock_GlueContext.return_value.spark_session.sql.side_effect = [result_df,
                                                                   mock_spark.createDataFrame([], schema=schema)]

    with patch.object(result_df, 'writeTo', return_value=MagicMock()) as mock_writeTo:
        mock_writeTo.return_value.overwritePartitions = MagicMock()
        mock_overwritePartitions = mock_writeTo.return_value.overwritePartitions

        ## 002 - ACT
        main_glue()

    # 003 - ASSERT
    if tipo_teste == 'DF_OK':
        mock_writeTo.assert_called_once_with('glue_catalog.db_geral.tb_clientes')
        mock_overwritePartitions.assert_called_once()

    else:
        mock_writeTo.assert_not_called()

    mock_Job.return_value.init.assert_called_once()
    mock_Job.return_value.commit.assert_called_once()

A partir de agora, você consegue “bater um papo” com a StackSpot AI e pedir detalhamentos ou até mesmo melhorar parte do código? Sim, os códigos precisam de supervisão, pois há casos de alucinações ou uso de uma simples biblioteca desativada.

Já se você acatar o que ela respondeu e der erro, pode ir copiando e colando o erro de volta no prompt? Está tudo bem, quem nunca fez isso? Mas o ponto é: “por que é assim mesmo?” ou “por que não entendi nada do que ela fez? E também não tenho argumentos para questionar?”

Eis o ponto que divide um bom código e um bom uso de IA do simples “CTRL C / CTRL V”: se investirmos um tempo entendendo o que foi gerado para, numa próxima oportunidade de aplicação destes testes, você ganhar tempo e dar mais contexto de projetos para a IA, pedir pra lib X e não Y ou ainda debater melhores alternativas para o código.

Bora lá?

Hora do “test breaking down”!

Antes, vamos entender o que está sendo feito e o porquê para você compreender o processo e gerar o melhor código possível!

Fixtures com a Sessão Spark

Python
<<<<< SCRIPT : PYTHON
@pytest.fixture(scope="session")
def mock_spark():
    findspark.init()

    conf = SparkConf()
    conf.set("spark.log.level", "ERROR")

    spark_session = SparkSession.builder \
                    .config(conf=conf) \
                    .appName("TestGlueJOB") \
                    .master("local") \
                    .getOrCreate()
    print(" ***** SPARK >> STARTING TESTS ********* ")
    yield spark_session

    print("\n***** SPARK >> TURNING OFF ********* ")
    spark_session.stop()
>>>>>>>>>>>>>>>>>

Neste ponto, temos o uso de um decorador Python para Fixtures!

Em tempo, uma fixture é uma função em testes unitários que prepara o ambiente necessário para a execução dos testes, como a configuração de dados ou a inicialização de recursos, além de garantir a limpeza após os testes serem executados. 

No nosso script, a função “mock_spark” será utilizada como fixture a nível de sessão (parâmetro “scope”). O que indica que para todos os testes desta seção este script será executado uma única vez, antes da execução do primeiro teste, e o que ele executar valerá para os demais. Neste exemplo, estamos criando a sessão Spark.

O uso do “findspark.init()” garante que o ambiente local está devidamente configurado (variáveis de ambiente, etc.) para uso do Spark.

O “yield” é usado para retornar ao chamador assim que chegar nesta linha. A função fica “pendente” para continuar executando (até finalizar ou encontrar um novo “yield”). Após o yield, o código continua a ser executado quando os testes que utilizam a fixture terminam (eis o gancho para a etapa 4 de testes: CLEAN UP / TEARDOWN)

Usando Mock para recursos AWS

Python
<<<<< SCRIPT : PYTHON
@mock_aws
def test_get_account_id():

    result = get_account_id()
    assert result is not None
>>>>>>>>>>>>>>>>>

O decorador @mock_aws é utilizado para simular os serviços da AWS durante os testes. Isso é feito usando a biblioteca moto, que permite criar um ambiente de teste isolado para interagir com os serviços da AWS sem realmente fazer chamadas para a AWS. Eis o nosso mock!

Então, como a função get_account_id utiliza chamadas de serviços da AWS, este decorador (em documentações técnicas, você pode achar como “decorator”)  se encarrega de “simular” (ou “mockar”) tais chamadas.

“Mas Will, espera aí, não entendi… o que são Mocks?” 

Calma, vamos entender!

O que são Mocks?

Mocking (que resulta num objeto Mock) é uma técnica usada em testes unitários para simular o comportamento de objetos complexos ou externos ao sistema que está sendo testado.

Isso permite que você isole a unidade de código que está sendo testada e verifique seu comportamento sem depender de componentes externos, como serviços web, bancos de dados, ou, no caso do nosso projeto, serviços da AWS.

“Ahh… vamos simular, então, certo?” 

Certo!

“Ok, mas como ele sabe os objetos que precisam ser retornados?”

Ahh, excelente pergunta!

Precisamos então “preparar nosso ambiente AWS usando os mocks”.

Preparando a simulação do ambiente AWS

Como exposto, a primeira etapa de um teste, é “preparar” o ambiente. E, em conjunto com @mock_aws, conseguiremos simular o que queremos. Veja:

Python
<<<<< SCRIPT : PYTHON
# -------------------------------------------------------------
@mock_aws
def test_get_files():

    ## 001 - ARRANGE

    # Preparando ambiente ***********************************
    s3_client = boto3.client('s3', region_name='us-east-1')
    bucket_name = 'meu-bucket'
    s3_client.create_bucket(Bucket=bucket_name)

    json_content = json.dumps([{
                    "trigger": "nome_trigger_01",
                    "arquivos": ["arq0.txt", "arq1.txt", "arq2.txt"]
                },
                                {
                    "trigger": "nome_trigger_02",
                    "arquivos": ["arq3.txt", "arq4.txt", "arq5.txt"]
                }]
    ).encode('utf-8')

    s3_client.put_object(Bucket=bucket_name, Key='pasta/config.json', Body=json_content)

    ## 002 - ACT

    result = get_files(bucket_param=bucket_name, path="pasta/config.json", nome_trigger="nome_trigger_02")

    ## 003 - ASSERT

    assert result == ["arq3.txt", "arq4.txt", "arq5.txt"]

    ## 004 - CLEAN UP
>>>>>>>>>>>>>>>>>

Perceba que neste cenário, usando o decorador, estamos chamando a lib da AWS boto3 para realizar algumas coisas: criação do client, criação de um bucket e inclusão de objetos neste bucket.

“Mas ele está indo colocar isso lá na AWS? Você disse que ia simular…”

Não e sim! Realmente, ele está chamando os métodos oficiais do boto3, mas como estamos usando o decorador @mock_aws, ele intervém e faz isso apenas em memória, e retorna com “ok – feito!”.

Então, se temos um script que vai ler arquivos de um bucket, precisamos criá-lo em memória para que ele seja realmente lido quando executarmos a etapa do ACT.

Wow! Entendeu? Preparamos todo o cenário antes da chamada e, posteriormente, validamos o seu retorno – de acordo com o que precisa ser executado.

Mocks múltiplos

Agora, vamos mockar várias libs que o script principal usa.

Python
<<<<< SCRIPT : PYTHON
# -------------------------------------------------------------
@mock_aws
@pytest.mark.parametrize("tipo_teste", [
    ("DF_OK"),
    ("DF_EMPTY")
])
@patch('src.main.SparkConf')
@patch('src.main.getResolvedOptions')
@patch('src.main.SparkContext')
@patch('src.main.GlueContext')
@patch('src.main.Job')
def test_main_glue(mock_Job, mock_GlueContext, mock_SparkContext, mock_getResolvedOptions, mock_sparkconf, mock_spark, tipo_teste):
>>>>>>>>>>>>>>>>>

O @patch é um decorador fornecido pela biblioteca unittest.mock do Python, que é usado para substituir (ou “mockar”) objetos ou funções específicas durante a execução de um teste. 

Neste nosso caso, estamos mockando grande parte das libs e gerando um “MagiMock” (classe principal dos mocks), que pode ser “qualquer coisa”. Já veremos isso a seguir. Aqui, citamos exatamente cada objeto/função que queremos mockar. 

Se atente a ordem dos patchs: ela é inversamente ao que é repassado para o uso da função de teste “test_main_glue”. Isso é importante não esquecer!

Então juntamos o @mock_aws com outras libs que queremos mockar.

“Mas mark.parametrize? O que é isso ? Você não comentou….”

Mesmo teste, mas com parâmetros de entrada diferentes

Vamos lá então! Se é breakingdown, precisamos saber tudo!

É exatamente aqui que entra o decorador @pytest.mark.parametrize.

Python
<<<<< SCRIPT : PYTHON
@pytest.mark.parametrize("valor", [
    (20),
    (30)
])
>>>>>>>>>>>>>>>>>

Vamos entender o que o decorador @pytest.mark.parametrize faz no contexto dos testes unitários.

O @pytest.mark.parametrize é um decorador fornecido pelo framework de testes pytest. Ele permite que você execute um mesmo teste várias vezes com diferentes conjuntos de parâmetros. Isso é útil para testar a mesma lógica com diferentes entradas e verificar se o comportamento do código é o esperado para cada conjunto de dados.

Neste exemplo, então, no momento da execução, haverão dois testes: um teste usando o valor = 20, e depois um novo teste, usando o valor = 20. Entendeu a vantagem?

Misturando Spark com os Mocks (ou MagicMocks)

Python
<<<<< SCRIPT : PYTHON

    mock_Job.return_value = MagicMock()
    mock_Job.return_value.init.return_value = MagicMock()
	
    schema = StructType([
        StructField("col1", StringType(), True),
        StructField("col2", IntegerType(), True)
    ])

    if tipo_teste == 'DF_OK':
        result_df = mock_spark.createDataFrame([(0, 2)], schema=schema)
    else:
        result_df = mock_spark.createDataFrame([], schema=schema)

    mock_GlueContext.return_value.spark_session.return_value = MagicMock()
    mock_GlueContext.return_value.spark_session.sql.side_effect = [result_df,
                                                                   mock_spark.createDataFrame([], schema=schema)]
>>>>>>>>>>>>>>>>>

Neste trecho, a brincadeira começa a ficar legal: uma vez usando o “mock_spark” (ele é uma fixture e é passado como parâmetro da função de testes), precisamos mockar o seguinte:

Toda vez que alguém invocar o “spark.sql”, ao invés dele executar lá no Spark do Glue, queremos que ele retorne um Dataframe previamente criado. Por que? Para termos o controle dos testes!

A dica de ouro agora!

Sempre que mockamos um objeto, ele é uma instância da classe “MagickMock”. Esse objeto pode ser “qualquer coisa”, lembra? Então, para começar a ter esse comportamento, precisamos controlar ele.

Como? “Toda vez que alguém invocar alguma função de um objeto que a gente mockou, usamos o atributo “return_value” para atribuir o retorno que queremos.” 

Exemplo: 

Python
<<<<< SCRIPT : PYTHON
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
>>>>>>>>>>	

Ao usarmos um mock pro “Job”, podemos controlar o retorno da função “init”:

Python
<<<<< SCRIPT : PYTHON
    mock_Job.return_value = MagicMock()
    mock_Job.return_value.init.return_value = MagicMock()
>>>>>>>>>>		

Legal, não é?

“E se um determinada função é chamada várias vezes no decorrer do código? Como controlar cada chamada?”

Aí neste caso, substituímos o “return_value” pelo “side_effect”, que entre suas funções, empilha os resultados: 

Python
<<<<< SCRIPT : PYTHON    
	mock_GlueContext.return_value.spark_session.sql.side_effect = [result_df,
                                                                   mock_spark.createDataFrame([], schema=schema)]
>>>>>>>>>>		

Aqui estamos empilhando as chamadas do “spark.sql” para que retorne um item diferente em cada chamada.

Mockando parte de objetos em tempo real

Em alguns cenários, podemos ter que usar o objeto nativo (um “dataframe real” mesmo, por exemplo), pois nos dá a flexibilidade e a “não necessidade” de mockar métodos dele mesmo.

Porém, em determinados cenários, você pode querer mockar “métodos específicos” de um objeto. Uma das alternativas é fazer o seguinte:

Python
<<<<< SCRIPT : PYTHON
    with patch.object(result_df, 'writeTo', return_value=MagicMock()) as mock_writeTo:
        mock_writeTo.return_value.overwritePartitions = MagicMock()
        mock_overwritePartitions = mock_writeTo.return_value.overwritePartitions
>>>>>>>>>>	

Neste exemplo, estamos mockando o método “writeTo” existente em um Dataframe, para que “só” ele retorne um MagickMock, pois não queremos que ele efetivamente persista os registros, só iremos validar se ele “foi chamado” posteriormente na etapa do ASSERT.

Por fim, as checagens

Após a devida execução da função a ser validada, precisamos confirmar que a execução foi feita conforme esperado usando “ASSERTs”:

Python
<<<<< SCRIPT : PYTHON
    # 003 - ASSERT
    if tipo_teste == 'DF_OK':
        mock_writeTo.assert_called_once_with('glue_catalog.db_geral.tb_clientes')
        mock_overwritePartitions.assert_called_once()

    else:
        mock_writeTo.assert_not_called()

    mock_Job.return_value.init.assert_called_once()
    mock_Job.return_value.commit.assert_called_once()
>>>>>>>>>>		

Os asserts mais comuns são comparativos de valores “assert valor_1 == valor_2”, mas também temos a verificação de “qtde de chamadas”  (ou não chamadas de um mock), usando o “mock.funcao.assert_called_once()”, ou ainda “mock.funcao.assert_not_called()”.

Para quantidade de chamadas, usamos “assert mock.funcao.call_count == 2”.

Para confirmar que o mock foi chamado, “e com parâmetros específicos”, usamos “mock.funcao.assert_called_with(‘meu_parametro’)”

Ahh… sem esquecer do CLEAN UP

E, antes de fechar de vez, temos a etapa do CLEAN UP.

Lembra da fixture com “yield”? Então, neste momento, de finalização dos testes (após a execução de cada função de teste), aquela fixture também continua a ser executada e, como vimos, vai parar a sessão Spark – aplicando a etapa de CLEAN UP / TEARDOWN:

Python
<<<<< SCRIPT : PYTHON
    print("\n***** SPARK >> TURNING OFF ********* ")
    spark_session.stop()
>>>>>>>>>>	

Conclusão sobre testes unitários para AWS Glue Jobs

Wow! Que viagem pelos testes, hein! E o mais importante: entendemos o porquê de cada detalhe.

Após este breaking down nos testes de GlueJob (ou qualquer outro script Python), conseguimos aprender vários detalhes que nos ajudarão em aplicações futuras.

Será que agora, você consegue ter mais base pra conversar com a StackSpot AI? Exercite: quando ela te gerar um código, converse, exercite o pensamento crítico e exponha detalhes do seu teste, do seu mock, como deseja os testes a partir de agora, da forma A ou da forma B. Use a StackSpot AI a seu favor e tenha a melhor parceira para o seu código.

A partir de agora você tem a badge de mock para usar nos próximos testes unitários para AWS Glue Jobs. Bons estudos e manda ver!

Consuma inovação,
comece a transformação

Assine nosso boletim informativo para se manter atualizado sobre as práticas recomendadas mais recentes para aproveitar a tecnologia para gerar impacto nos negócios

Related posts