Visão geral
Neste laboratório, você irá:
- gravar testes de unidade para
DoFns
e PTransforms
usando ferramentas de teste no Apache Beam;
- realizar um teste de integração de pipeline;
- usar a classe
TestStream
para testar o comportamento do janelamento de um pipeline de streaming.
Testar o pipeline é uma etapa particularmente importante no desenvolvimento de uma solução eficaz de processamento de dados. A natureza indireta do modelo do Beam pode fazer com que as execuções de depuração com falhas sejam tarefas incomuns.
Neste laboratório, vamos ver como realizar testes de unidade localmente com ferramentas no pacote de testes (link em inglês) do SDK do Beam usando o DirectRunner
.
Configuração e requisitos
Antes de clicar no botão "Começar o laboratório"
Importante: leia estas instruções.
Os laboratórios são cronometrados e não podem ser pausados. O timer é iniciado quando você clica em Começar o laboratório e mostra por quanto tempo os recursos do Google Cloud vão ficar disponíveis.
Este laboratório prático do Qwiklabs permite que você realize as atividades em um ambiente real de nuvem, não em uma simulação ou demonstração. Você receberá novas credenciais temporárias para fazer login e acessar o Google Cloud durante o laboratório.
O que é necessário
Veja os requisitos para concluir o laboratório:
- Acesso a um navegador de Internet padrão (recomendamos o Chrome)
- Tempo disponível para concluir as atividades
Observação: não use seu projeto ou conta pessoal do Google Cloud neste laboratório.
Observação: se você estiver usando um Pixelbook, faça o laboratório em uma janela anônima.
Como começar o laboratório e fazer login no console
-
Clique no botão Começar o laboratório. Se for preciso pagar pelo laboratório, você verá um pop-up para selecionar a forma de pagamento.
Um painel aparece à esquerda contendo as credenciais temporárias que você precisa usar no laboratório.

-
Copie o nome de usuário e clique em Abrir console do Google.
O laboratório ativa os recursos e depois abre a página Escolha uma conta em outra guia.
Observação: abra as guias em janelas separadas, lado a lado.
-
Na página "Escolha uma conta", clique em Usar outra conta. A página de login abre.

-
Cole o nome de usuário que foi copiado do painel "Detalhes da conexão". Em seguida, copie e cole a senha.
Observação: é necessário usar as credenciais do painel "Detalhes da conexão". Não use suas credenciais do Google Cloud Ensina. Não use sua conta pessoal do Google Cloud, caso tenha uma neste laboratório (isso evita cobranças).
- Acesse as próximas páginas:
- Aceite os Termos e Condições.
- Não adicione opções de recuperação nem autenticação de dois fatores (porque essa é uma conta temporária).
- Não se inscreva em testes gratuitos.
Depois de alguns instantes, o console do Cloud abre nesta guia.
Observação: para acessar a lista dos produtos e serviços do Google Cloud, clique no Menu de navegação no canto superior esquerdo.
Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook da instância.
-
No console do Google Cloud, no menu de navegação (
), clique em Vertex AI.
-
Selecione Ativar todas as APIs recomendadas.
-
No menu de navegação, clique em Workbench.
Verifique se você está na visualização Instâncias do topo da página do Workbench.
-
Clique em
Criar.
-
Configure a instância:
-
Nome: lab-workbench
-
Região: configure a região como
-
Zona: configure a zona como
-
Opções avançadas (opcional): se necessário, clique em "Opções avançadas" para personalizar mais (ex.: tipo de máquina, tamanho do disco).

- Clique em Criar.
O processo vai levar alguns minutos, e uma marca de confirmação verde vai aparecer ao lado do nome da instância quando ela for criada.
- Clique em ABRIR O JUPYTERLAB ao lado do nome da instância para iniciar a interface do ambiente. Uma nova guia será aberta no navegador.

- Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.
Faça o download do repositório de código
Agora você precisa dele para usar neste laboratório.
- Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
No painel à esquerda do ambiente do notebook no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.
-
Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/
. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab
, que contém o código que precisa ser concluído, e solution
, que inclui um exemplo prático caso você precise de ajuda.

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.
O código do laboratório é dividido entre duas pastas: 8a_Batch_Testing_Pipeline/lab e 8b_Stream_Testing_Pipeline/lab. Caso haja problemas em algum momento, a solução poderá ser encontrada nas pastas de solução correspondentes.
Clique em Verificar meu progresso para conferir o objetivo.
Crie uma instância de notebook e clone o repositório do curso
Parte 1 do laboratório: como realizar testes de unidade para DoFns e PTransforms
Tarefa 1: preparar o ambiente
Nesta etapa do laboratório, vamos realizar testes de unidade em DoFns e PTransforms para um pipeline em lote calculando estatísticas de sensores climáticos. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:
- Crie um
TestPipeline
.
- Crie alguns dados de entrada de teste e use a transformação
Create
para criar uma PCollection
com esses dados.
- Aplique a transformação à
PCollection
de entrada e salve a PCollection
resultante.
- Use o método
assert_that
do módulo testing.util
, além dos outros métodos dele, para verificar se a PCollection
de saída contém os elementos esperados.
O elemento TestPipeline
é uma classe especial incluída no SDK do Beam especificamente para testar a lógica de pipeline e transformações. Ao fazer os testes, use TestPipeline
em vez de Pipeline
ao criar o objeto de pipeline. A transformação Create
usa uma coleção de objetos na memória (um iterável em Java) e cria uma PCollection
dessa coleção. A meta é ter um pequeno conjunto de dados de entrada de teste das PTransforms
, em que sabemos a PCollection
de saída esperada.
with TestPipeline() as p:
INPUTS = [fake_input_1, fake_input_2]
test_output = p | beam.Create(INPUTS) | # Transformações a serem testadas
Por fim, queremos verificar se a PCollection de saída corresponde à saída esperada. Usamos o método assert_that
para verificar isso. Por exemplo, podemos usar o método equal_to
para verificar se a PCollection de saída tem os elementos corretos:
assert_that(test_output, equal_to(EXPECTED_OUTPUTS))
Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes:
Abrir o laboratório apropriado
- No terminal do seu ambiente de desenvolvimento integrado, execute estes comandos, que farão a mudança para o diretório que você vai usar neste laboratório:
# Mude o diretório para o laboratório
cd 8a_Batch_Testing_Pipeline/lab
export BASE_DIR=$(pwd)
Configurar o ambiente virtual e as dependências
Antes de começar a editar o código do pipeline, você precisa confirmar se instalou as dependências necessárias.
- Execute o código a seguir e crie um ambiente virtual para realizar o trabalho deste laboratório:
sudo apt-get install -y python3-venv
# Crie e ative o ambiente virtual
python3 -m venv df-env
source df-env/bin/activate
- Em seguida, instale os pacotes necessários para executar seu pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Verifique se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com
- Por fim, crie um bucket de armazenamento:
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage buckets create gs://$PROJECT_ID --location=US
Clique em Verificar meu progresso para conferir o objetivo.
Preparar o ambiente
Tarefa 2: analisar o código do pipeline principal
-
No explorador de arquivos, acesse 8a_Batch_Testing_Pipeline/lab. Esse diretório contém dois arquivos: weather_statistics_pipeline.py
, que contém o código principal de pipeline, e weather_statistics_pipeline_test.py
, que contém o código de teste.
-
Primeiro, abra 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py.
Vamos analisar o código de pipeline rapidamente antes de trabalhar nos testes de unidade. Primeiro, vemos a classe WeatherRecord
(a partir da linha 6). Como essa é uma subclasse de typing.NamedTuple
, podemos usar transformações com reconhecimento de esquema para trabalhar com objetos dela. Mais tarde, também vamos definir coleções na memória de objetos dessa classe para os testes.
class WeatherRecord(typing.NamedTuple):
loc_id: str
lat: float
lng: float
date: str
low_temp: float
high_temp: float
precip: float
- Role para baixo até o ponto onde começamos a definir as
DoFns
e PTransforms
(linha 17).
Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:
- As
DoFns
ConvertCsvToWeatherRecord
(a partir da linha 17) e ConvertTempUnits
(a partir da linha 27). Em breve, vamos realizar um teste de unidade nessas DoFns
.
- A
PTransform
ComputeStatistics
(a partir da linha 41). Esse é um exemplo de uma transformação composta que será possível testar da mesma maneira que uma DoFn
.
- A
PTransform
WeatherStatsTransform
(a partir da linha 55). Essa PTransform
contém a lógica de processamento de todo o pipeline, exceto as transformações de origem e de coletor, para podermos realizar um pequeno teste de integração de pipeline em dados sintéticos criados por uma transformação Create
.
Observação: se você encontrar um erro lógico no código de processamento, não o corrija ainda. Mais tarde, vamos aprender a limitar o erro realizando testes.
Tarefa 3: adicionar dependências para testes
- Abra 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py no seu explorador de arquivos.
Precisamos adicionar algumas dependências para realizar testes. Vamos usar os utilitários de teste incluídos no Apache Beam e o pacote unittest
no Python.
- Para concluir essa tarefa, adicione as instruções de importação a seguir quando forem solicitadas na parte de cima de weather_statistics_pipeline_test.py:
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to
Se você não souber o que fazer, consulte as soluções.
Tarefa 4: gravar o primeiro teste de unidade de DoFn no Apache Beam
Note que o arquivo 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py contém o código dos nossos testes de unidade DoFn
e PTransform
. Na maioria das vezes, o código será comentado, mas vamos remover a marca de comentário à medida que continuarmos.
Antes de analisar o código do Beam, definimos um método principal
personalizado para gerenciar a execução dos testes e gravar a saída deles em um arquivo de texto. Dessa forma, temos um registro dos testes para consultar após o término da sessão atual do terminal. Isso também pode ser gerenciado usando o módulo de geração de registros (logging
), por exemplo.
def main(out = sys.stderr, verbosity = 2):
loader = unittest.TestLoader()
suite = loader.loadTestsFromModule(sys.modules[__name__])
unittest.TextTestRunner(out, verbosity = verbosity).run(suite)
# Código de teste omitido
if __name__ == '__main__':
with open('testing.out', 'w') as f:
main(f)
- Vamos começar fazendo um teste de unidade de
DoFn
para a DoFn de ConvertCsvToWeatherRecord
(a partir da linha 43). Primeiro, criamos uma classe para testar o pipeline e criamos um objeto TestPipeline
:
class ConvertToWeatherRecordTest(unittest.TestCase):
def test_convert_to_csv(self):
with TestPipeline() as p:
...
- Agora confira o código (incompleto) do primeiro teste:
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1']
EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)]
input_lines = p | # TASK 4: Create PCollection from LINES
output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord())
# TAREFA 4: escrever a instrução assert_that
Criamos uma única entrada de teste (LINES
) que representa uma linha de um arquivo CSV (o formato de entrada esperado para o pipeline) e a colocamos em uma lista. Também definimos o resultado esperado (EXPECTED_OUTPUT
) como uma lista de WeatherRecords
.
Faltam algumas partes no restante do código para realizar o teste.
-
Para concluir a tarefa, primeiro adicione a transformação Create
para converter LINES
em uma PCollection
.
-
Depois, inclua uma instrução assert_that
usando o método equal_to
para comparar output
com EXPECTED_OUTPUT
.
Se você não souber o que fazer, consulte os testes comentados posteriormente ou as soluções.
Tarefa 5: executar o primeiro teste de unidade de DoFn
- Retorne ao seu terminal e execute o seguinte comando:
python3 weather_statistics_pipeline_test.py
cat testing.out
A saída dos testes será gravada no arquivo testing.out
. É possível visualizar o conteúdo desse arquivo ao executar cat testing.out
no terminal. Se a tarefa anterior tiver sido concluída corretamente, o conteúdo do arquivo testing.out
será o seguinte (o tempo decorrido pode ser diferente):
test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.918s
OK
Observação: o teste acima pode ter sido executado usando o comando "python3 -m unittest test_script.py". Executamos o script Python diretamente para acessar o método principal, mas a abordagem mencionada aqui tende a ser mais comum na prática.
Tarefa 6: executar o segundo teste de unidade de DoFn e o pipeline de depuração
- Volte para 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py e remova a marca de comentário do código do segundo teste de unidade (perto das linhas 33 a 50). É possível fazer isso destacando o código e pressionando Ctrl + / (ou Cmd + / no MacOS). O código é exibido abaixo para referência:
class ConvertTempUnitsTest(unittest.TestCase):
def test_convert_temp_units(self):
with TestPipeline() as p:
RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1),
WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)]
EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1),
WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)]
input_records = p | beam.Create(RECORDS)
output = input_records | beam.ParDo(ConvertTempUnits())
assert_that(output, equal_to(EXPECTED_RECORDS))
Esse teste garante que a DoFn
ConvertTempUnits()
esteja funcionando conforme o esperado. Salve weather_statistics_pipeline_test.py e volte ao terminal.
- Use os comandos a seguir para executar os testes e visualizar a saída:
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
Desta vez, o teste falhou. Se rolarmos pela saída, vamos encontrar as seguintes informações sobre essa falha:
test_compute_statistics (__main__.ComputeStatsTest) ... ok
test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR
...
apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']
Analisando mais de perto o elemento BeamAssertException
, vemos que os valores de low_temp
e high_temp
estão incorretos. Há algo errado na lógica de processamento da DoFn
ConvertTempUnits
.
- Volte para 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py e role para baixo até a definição de
ConvertTempUnits
(perto da linha 32). Para concluir a tarefa, encontre o erro na lógica de processamento da DoFn
e execute novamente estes comandos para confirmar que o teste foi concluído:
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
Como lembrete, a fórmula para converter graus Celsius em Farenheit é fornecida abaixo:
temp_f = temp_c * 1.8 + 32.0
Se você não souber o que fazer, consulte as soluções.
Tarefa 7: executar o teste de unidade de PTransform e o pipeline completo
- Volte para 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py e remova a marca de comentário do código dos dois testes finais (a partir da linha 53, mais ou menos).
O primeiro teste em que removemos a marca de comentário foi o de testar a PTransform
composta ComputeStatistics
. Confira a seguir uma forma truncada do código para referência:
def test_compute_statistics(self):
with TestPipeline() as p:
INPUT_RECORDS = # Test input omitted here
EXPECTED_STATS = # Expected output omitted here
inputs = p | beam.Create(INPUT_RECORDS)
output = inputs | ComputeStatistics()
assert_that(output, equal_to(EXPECTED_STATS))
Perceba que isso é muito semelhante aos testes de unidade de DoFn
anteriores. A única diferença real, além das entradas e saídas de testes que não são as mesmas, é que estamos aplicando PTransform
em vez de beam.ParDo(DoFn())
.
O teste final é para o pipeline completo. No código de pipeline (weather_statistics_pipeline.py), o pipeline completo, exceto a origem e o coletor, foi incluído em uma única PTransform
WeatherStatsTransform
. Para testar o pipeline completo, podemos repetir algo semelhante ao que fizemos acima, mas usando aquela PTransform
.
- Volte ao terminal e execute o comando a seguir para executar os testes mais uma vez:
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
Se você concluiu as tarefas anteriores, a mensagem a seguir vai aparecer no terminal após o término dos testes:
test_compute_statistics (__main__.ComputeStatsTest) ... ok
test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok
test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok
test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok
----------------------------------------------------------------------
4 testes executados em 2,295s
OK
- Copie o arquivo
testing.out
no bucket de armazenamento:
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/
Clique em Verificar meu progresso para conferir o objetivo.
Realize testes de unidade para DoFns e PTransforms.
Parte 2 do laboratório: como testar a lógica do processamento de stream com o TestStream
Nesta etapa do laboratório, vamos realizar testes de unidade para um pipeline de streaming calculando contagens de janelas de corridas de táxi. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:
- Crie um
TestPipeline
.
- Use a classe
TestStream
para gerar dados de streaming, o que inclui gerar uma série de eventos, avançar a marca-d'água e melhorar o tempo de processamento.
- Use o método
assert_that
do módulo testing.util
, além dos outros métodos dele, para verificar se a PCollection
de saída contém os elementos esperados.
Durante a execução de um pipeline que lê dados de um TestStream
, a leitura aguarda a conclusão de todas as consequências de cada evento antes de passar para o próximo, inclusive quando o tempo de processamento avança e os gatilhos apropriados são acionados. O elemento TestStream
permite que o efeito do acionamento e a lentidão permitida sejam observados e testados em um pipeline. Isso inclui a lógica sobre gatilhos atrasados e dados descartados devido a atrasos.
Tarefa 1: analisar o código do pipeline principal
- No explorador de arquivos, acesse 8b_Stream_Testing_Pipeline/lab.
Esse diretório contém dois arquivos: taxi_streaming_pipeline.py
, que contém o código de pipeline principal, e taxi_streaming_pipeline_test.py
, que contém o código de teste.
- Primeiro, abra 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py.
Vamos analisar o código de pipeline rapidamente antes de trabalhar nos testes de unidade. Primeiro, vemos a classe TaxiRide
(a partir da linha 6). Como essa é uma subclasse de typing.NamedTuple
, podemos usar transformações com reconhecimento de esquema para trabalhar com objetos dela. Mais tarde, também vamos definir coleções na memória de objetos dessa classe para os testes.
class TaxiRide(typing.NamedTuple):
ride_id: str
point_idx: int
latitude: float
longitude: float
timestamp: str
meter_reading: float
meter_increment: float
ride_status: str
passenger_count: int
A próxima etapa envolve o código principal do pipeline. Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:
- A
DoFn
JsonToTaxiRide
(a partir da linha 22) é usada para converter as mensagens recebidas do Pub/Sub em objetos da classe TaxiRide
.
- A
PTransform
TaxiCountTransform
(a partir da linha 36). Essa PTransform
contém a lógica principal de contagem e janelamento do pipeline. Os testes serão focados nessa PTransform
.
A saída de TaxiCountTransform
precisa ser uma contagem de todas as corridas de táxi registradas por janela. No entanto, haverá vários eventos por viagem (embarques, desembarques etc.). Vamos filtrar a propriedade ride_status
para garantir a contagem de cada viagem apenas uma vez. Para isso, vamos manter apenas os elementos com ride_status
igual a "pickup":
... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')
Olhando um pouco mais de perto, a lógica de janelamento usada no pipeline está incluída abaixo:
... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60),
trigger=AfterWatermark(late=AfterCount(1)),
allowed_lateness=60,
accumulation_mode=AccumulationMode.ACCUMULATING)
Vamos fazer o janelamento em janelas fixas com 60 segundos de duração. Não temos um gatilho antecipado, mas vamos gerar resultados após a marca d'água transmitir o fim da janela. Incluímos disparos atrasados em cada novo elemento recebido, mas isso só será feito com um atraso permitido de 60 segundos. Por fim, vamos acumular o estado nas janelas até que o atraso permitido tenha sido transmitido.
Tarefa 2: analisar o uso do TestStream e executar o primeiro teste
- Abra 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py.
A primeira meta é entender o uso do TestStream
no código de teste. Vale lembrar que a classe TestStream
permite simular um fluxo de mensagens em tempo real enquanto controla a progressão do tempo de processamento e a marca d'água. O código do primeiro teste (a partir da linha 66) está incluído abaixo:
test_stream = TestStream().advance_watermark_to(0).add_elements([
TimestampedValue(base_json_pickup, 0),
TimestampedValue(base_json_pickup, 0),
TimestampedValue(base_json_enroute, 0),
TimestampedValue(base_json_pickup, 60)
]).advance_watermark_to(60).advance_processing_time(60).add_elements([
TimestampedValue(base_json_pickup, 120)
]).advance_watermark_to_infinity()
- Criamos outro objeto
TestStream
e, depois, transmitimos a mensagem JSON como uma string (chamada base_json_pickup
ou base_json_enroute
, dependendo do status da viagem). O que o TestStream
acima está fazendo?
O TestStream
está realizando estas tarefas:
- definindo a marca-d'água inicial como
0
(todos os carimbos de data/hora são em segundos);
- adicionando três elementos ao stream usando o método
add_elements
com um carimbo de data/hora do evento de 0
. Dois desses eventos serão contados (ride_status = "pickup"
). O outro, não;
- adicionando outro evento "pickup", mas com um carimbo de data/hora do evento de
60
;
- avançando a marca d'água e o tempo de processamento para
60
, o que vai acionar a primeira janela;
- adicionando outro evento "pickup", mas com um carimbo de data/hora do evento de
120
;
- avançando a marca d'água para "infinity". Isso significa que todas as janelas serão fechadas, e os novos dados vão estar além de qualquer atraso permitido.
- O restante do código do primeiro teste é semelhante ao exemplo de lote anterior, mas agora o
TestStream
está sendo usado em vez da transformação Create
:
taxi_counts = (p | test_stream
| TaxiCountTransform()
)
EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3],
IntervalWindow(60,120): [1],
IntervalWindow(120,180): [1]}
assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS),
reify_windows=True)
No código acima, definimos a PCollection
de saída (taxi_counts
) criando o TestStream
e aplicando a PTransform
TaxiCountTransform
. Usamos a classe InvervalWindow
para definir as janelas que queremos verificar e, em seguida, assert_that
com o método equal_to_per_window
para verificar os resultados por janela.
- Salve o arquivo, volte ao terminal e execute os comandos a seguir para fazer o realocamento para o diretório correto:
# Change directory into the lab
cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab
export BASE_DIR=$(pwd)
- Agora, execute o teste acima e visualize a saída executando os comandos a seguir:
python3 taxi_streaming_pipeline_test.py
cat testing.out
A seguinte saída vai aparecer após a conclusão do teste (embora o tempo decorrido possa ser diferente):
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok
----------------------------------------------------------------------
1 teste executado em 1,113 s
OK
Tarefa 3: criar o TestStream para testar o processamento de dados com atraso
Nesta tarefa, você vai criar um código para um TestStream
, o que vai permitir testar a lógica de processamento de dados atrasados.
- Volte para 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py e role para baixo até onde o método
test_late_data_behavior
é comentado (perto da linha 60). Remova a marca de comentário do código do teste em questão, já que vamos concluir o código desta tarefa.
class TaxiLateDataTest(unittest.TestCase):
def test_late_data_behavior(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
"\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
"\"ride_status\":\"pickup\",\"passenger_count\":1}"
test_stream = # TASK 3: Create TestStream Object
EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #Pontualidade e resultado tardio
taxi_counts = (p | test_stream
| TaxiCountTransform()
)
assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))
Perceba que EXPECTED_RESULTS
contém dois resultados para IntervalWindow(0,60)
. Esses valores representam os resultados de gatilhos pontuais e atrasados para esta janela.
O código do teste foi concluído fora do processo de criação do TestStream
.
-
Para concluir a tarefa, crie um objeto TestStream
que execute estes itens:
- avança a marca d'água para
0
(todos os carimbos de data/hora estão em segundos);
- adiciona dois
TimestampedValues
com o valor base_json_pickup
e o carimbo de data/hora 0
;
- avança a marca d'água e o tempo de processamento para
60
;
- adiciona outro
TimestampedValue
com o valor base_json_pickup
e o carimbo de data/hora 0
;
- avança a marca d'água e o tempo de processamento para
300
;
- adiciona outro
TimestampedValue
com o valor base_json_pickup
e o carimbo de data/hora 0
;
- avança a marca d'água até o infinito.
Isso vai criar um TestStream
com quatro elementos que pertencem à primeira janela. Os dois primeiros elementos são pontuais, o segundo está atrasado (mas dentro do atraso permitido) e o elemento final está atrasado e ultrapassa o atraso permitido. Como estamos acumulando painéis disparados, o primeiro gatilho precisa contar dois eventos, e o gatilho final, três. O quarto evento não precisa ser incluído.
Se você não souber o que fazer, consulte as soluções.
Tarefa 4: executar um teste para processar dados atrasados
- Volte ao terminal e use o comando a seguir para executar os testes mais uma vez:
rm testing.out
python3 taxi_streaming_pipeline_test.py
cat testing.out
Se você concluiu as tarefas anteriores, a mensagem a seguir vai aparecer no terminal após o término dos testes:
test_late_data_behavior (__main__.TaxiLateDataTest) ... ok
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok
----------------------------------------------------------------------
2 testes executados em 2,225s
OK
- Copie o arquivo
testing.out
no bucket de armazenamento:
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/
Clique em Verificar meu progresso para conferir o objetivo.
Teste a lógica de processamento de stream com o TestStream
Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
- 1 estrela = muito insatisfeito
- 2 estrelas = insatisfeito
- 3 estrelas = neutro
- 4 estrelas = satisfeito
- 5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2025 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de empresas e produtos podem ser marcas registradas das empresas a que estão associados.