arrow_back

Processamento de dados sem servidor com o Dataflow: como fazer testes usando o Apache Beam (Python)

Acesse mais de 700 laboratórios e cursos

Processamento de dados sem servidor com o Dataflow: como fazer testes usando o Apache Beam (Python)

Laboratório 2 horas universal_currency_alt 5 créditos show_chart Avançado
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Acesse mais de 700 laboratórios e cursos

Visão geral

Neste laboratório, você irá:

  • gravar testes de unidade para DoFns e PTransforms usando ferramentas de teste no Apache Beam;
  • realizar um teste de integração de pipeline;
  • usar a classe TestStream para testar o comportamento do janelamento de um pipeline de streaming.

Testar o pipeline é uma etapa particularmente importante no desenvolvimento de uma solução eficaz de processamento de dados. A natureza indireta do modelo do Beam pode fazer com que as execuções de depuração com falhas sejam tarefas incomuns.

Neste laboratório, vamos ver como realizar testes de unidade localmente com ferramentas no pacote de testes (link em inglês) do SDK do Beam usando o DirectRunner.

Configuração e requisitos

Antes de clicar no botão "Começar o laboratório"

Importante: leia estas instruções.

Os laboratórios são cronometrados e não podem ser pausados. O timer é iniciado quando você clica em Começar o laboratório e mostra por quanto tempo os recursos do Google Cloud vão ficar disponíveis.

Este laboratório prático do Qwiklabs permite que você realize as atividades em um ambiente real de nuvem, não em uma simulação ou demonstração. Você receberá novas credenciais temporárias para fazer login e acessar o Google Cloud durante o laboratório.

O que é necessário

Veja os requisitos para concluir o laboratório:

  • Acesso a um navegador de Internet padrão (recomendamos o Chrome)
  • Tempo disponível para concluir as atividades
Observação: não use seu projeto ou conta pessoal do Google Cloud neste laboratório. Observação: se você estiver usando um Pixelbook, faça o laboratório em uma janela anônima.

Como começar o laboratório e fazer login no console

  1. Clique no botão Começar o laboratório. Se for preciso pagar pelo laboratório, você verá um pop-up para selecionar a forma de pagamento. Um painel aparece à esquerda contendo as credenciais temporárias que você precisa usar no laboratório.

    Painel de credenciais

  2. Copie o nome de usuário e clique em Abrir console do Google. O laboratório ativa os recursos e depois abre a página Escolha uma conta em outra guia.

    Observação: abra as guias em janelas separadas, lado a lado.
  3. Na página "Escolha uma conta", clique em Usar outra conta. A página de login abre.

    Caixa de diálogo "Escolha uma conta" com a opção "Usar outra conta" destacada

  4. Cole o nome de usuário que foi copiado do painel "Detalhes da conexão". Em seguida, copie e cole a senha.

Observação: é necessário usar as credenciais do painel "Detalhes da conexão". Não use suas credenciais do Google Cloud Ensina. Não use sua conta pessoal do Google Cloud, caso tenha uma neste laboratório (isso evita cobranças).
  1. Acesse as próximas páginas:
  • Aceite os Termos e Condições.
  • Não adicione opções de recuperação nem autenticação de dois fatores (porque essa é uma conta temporária).
  • Não se inscreva em testes gratuitos.

Depois de alguns instantes, o console do Cloud abre nesta guia.

Observação: para acessar a lista dos produtos e serviços do Google Cloud, clique no Menu de navegação no canto superior esquerdo. Menu do console do Cloud

Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook da instância.

  1. No console do Google Cloud, no menu de navegação (Menu de navegação), clique em Vertex AI.

  2. Selecione Ativar todas as APIs recomendadas.

  3. No menu de navegação, clique em Workbench.

    Verifique se você está na visualização Instâncias do topo da página do Workbench.

  4. Clique em Caixa "adicionar"Criar.

  5. Configure a instância:

    • Nome: lab-workbench
    • Região: configure a região como
    • Zona: configure a zona como
    • Opções avançadas (opcional): se necessário, clique em "Opções avançadas" para personalizar mais (ex.: tipo de máquina, tamanho do disco).

Crie uma instância do Vertex AI Workbench

  1. Clique em Criar.

O processo vai levar alguns minutos, e uma marca de confirmação verde vai aparecer ao lado do nome da instância quando ela for criada.

  1. Clique em ABRIR O JUPYTERLAB ao lado do nome da instância para iniciar a interface do ambiente. Uma nova guia será aberta no navegador.

Instância do Workbench implantada

  1. Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.

Faça o download do repositório de código

Agora você precisa dele para usar neste laboratório.

  1. Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. No painel à esquerda do ambiente do notebook no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.

  2. Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab, que contém o código que precisa ser concluído, e solution, que inclui um exemplo prático caso você precise de ajuda.

Opção "Explorador" destacada no menu "Visualização" expandido

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.

O código do laboratório é dividido entre duas pastas: 8a_Batch_Testing_Pipeline/lab e 8b_Stream_Testing_Pipeline/lab. Caso haja problemas em algum momento, a solução poderá ser encontrada nas pastas de solução correspondentes.

Clique em Verificar meu progresso para conferir o objetivo. Crie uma instância de notebook e clone o repositório do curso

Parte 1 do laboratório: como realizar testes de unidade para DoFns e PTransforms

Tarefa 1: preparar o ambiente

Nesta etapa do laboratório, vamos realizar testes de unidade em DoFns e PTransforms para um pipeline em lote calculando estatísticas de sensores climáticos. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:

  • Crie um TestPipeline.
  • Crie alguns dados de entrada de teste e use a transformação Create para criar uma PCollection com esses dados.
  • Aplique a transformação à PCollection de entrada e salve a PCollection resultante.
  • Use o método assert_that do módulo testing.util, além dos outros métodos dele, para verificar se a PCollection de saída contém os elementos esperados.

O elemento TestPipeline é uma classe especial incluída no SDK do Beam especificamente para testar a lógica de pipeline e transformações. Ao fazer os testes, use TestPipeline em vez de Pipeline ao criar o objeto de pipeline. A transformação Create usa uma coleção de objetos na memória (um iterável em Java) e cria uma PCollection dessa coleção. A meta é ter um pequeno conjunto de dados de entrada de teste das PTransforms, em que sabemos a PCollection de saída esperada.

with TestPipeline() as p: INPUTS = [fake_input_1, fake_input_2] test_output = p | beam.Create(INPUTS) | # Transformações a serem testadas

Por fim, queremos verificar se a PCollection de saída corresponde à saída esperada. Usamos o método assert_that para verificar isso. Por exemplo, podemos usar o método equal_to para verificar se a PCollection de saída tem os elementos corretos:

assert_that(test_output, equal_to(EXPECTED_OUTPUTS))

Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes:

Abrir o laboratório apropriado

  • No terminal do seu ambiente de desenvolvimento integrado, execute estes comandos, que farão a mudança para o diretório que você vai usar neste laboratório:
# Mude o diretório para o laboratório cd 8a_Batch_Testing_Pipeline/lab export BASE_DIR=$(pwd)

Configurar o ambiente virtual e as dependências

Antes de começar a editar o código do pipeline, você precisa confirmar se instalou as dependências necessárias.

  1. Execute o código a seguir e crie um ambiente virtual para realizar o trabalho deste laboratório:
sudo apt-get install -y python3-venv # Crie e ative o ambiente virtual python3 -m venv df-env source df-env/bin/activate
  1. Em seguida, instale os pacotes necessários para executar seu pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Verifique se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com
  1. Por fim, crie um bucket de armazenamento:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage buckets create gs://$PROJECT_ID --location=US

Clique em Verificar meu progresso para conferir o objetivo. Preparar o ambiente

Tarefa 2: analisar o código do pipeline principal

  1. No explorador de arquivos, acesse 8a_Batch_Testing_Pipeline/lab. Esse diretório contém dois arquivos: weather_statistics_pipeline.py, que contém o código principal de pipeline, e weather_statistics_pipeline_test.py, que contém o código de teste.

  2. Primeiro, abra 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py.

Vamos analisar o código de pipeline rapidamente antes de trabalhar nos testes de unidade. Primeiro, vemos a classe WeatherRecord (a partir da linha 6). Como essa é uma subclasse de typing.NamedTuple, podemos usar transformações com reconhecimento de esquema para trabalhar com objetos dela. Mais tarde, também vamos definir coleções na memória de objetos dessa classe para os testes.

class WeatherRecord(typing.NamedTuple): loc_id: str lat: float lng: float date: str low_temp: float high_temp: float precip: float
  1. Role para baixo até o ponto onde começamos a definir as DoFns e PTransforms (linha 17).

Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:

  • As DoFns ConvertCsvToWeatherRecord (a partir da linha 17) e ConvertTempUnits (a partir da linha 27). Em breve, vamos realizar um teste de unidade nessas DoFns.
  • A PTransform ComputeStatistics (a partir da linha 41). Esse é um exemplo de uma transformação composta que será possível testar da mesma maneira que uma DoFn.
  • A PTransform WeatherStatsTransform (a partir da linha 55). Essa PTransform contém a lógica de processamento de todo o pipeline, exceto as transformações de origem e de coletor, para podermos realizar um pequeno teste de integração de pipeline em dados sintéticos criados por uma transformação Create.
Observação: se você encontrar um erro lógico no código de processamento, não o corrija ainda. Mais tarde, vamos aprender a limitar o erro realizando testes.

Tarefa 3: adicionar dependências para testes

  1. Abra 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py no seu explorador de arquivos.

Precisamos adicionar algumas dependências para realizar testes. Vamos usar os utilitários de teste incluídos no Apache Beam e o pacote unittest no Python.

  1. Para concluir essa tarefa, adicione as instruções de importação a seguir quando forem solicitadas na parte de cima de weather_statistics_pipeline_test.py:
from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import BeamAssertException from apache_beam.testing.util import assert_that, equal_to

Se você não souber o que fazer, consulte as soluções.

Tarefa 4: gravar o primeiro teste de unidade de DoFn no Apache Beam

Note que o arquivo 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py contém o código dos nossos testes de unidade DoFn e PTransform. Na maioria das vezes, o código será comentado, mas vamos remover a marca de comentário à medida que continuarmos.

Antes de analisar o código do Beam, definimos um método principal personalizado para gerenciar a execução dos testes e gravar a saída deles em um arquivo de texto. Dessa forma, temos um registro dos testes para consultar após o término da sessão atual do terminal. Isso também pode ser gerenciado usando o módulo de geração de registros (logging), por exemplo.

def main(out = sys.stderr, verbosity = 2): loader = unittest.TestLoader() suite = loader.loadTestsFromModule(sys.modules[__name__]) unittest.TextTestRunner(out, verbosity = verbosity).run(suite) # Código de teste omitido if __name__ == '__main__': with open('testing.out', 'w') as f: main(f)
  1. Vamos começar fazendo um teste de unidade de DoFn para a DoFn de ConvertCsvToWeatherRecord (a partir da linha 43). Primeiro, criamos uma classe para testar o pipeline e criamos um objeto TestPipeline:
class ConvertToWeatherRecordTest(unittest.TestCase): def test_convert_to_csv(self): with TestPipeline() as p: ...
  1. Agora confira o código (incompleto) do primeiro teste:
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1'] EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)] input_lines = p | # TASK 4: Create PCollection from LINES output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord()) # TAREFA 4: escrever a instrução assert_that

Criamos uma única entrada de teste (LINES) que representa uma linha de um arquivo CSV (o formato de entrada esperado para o pipeline) e a colocamos em uma lista. Também definimos o resultado esperado (EXPECTED_OUTPUT) como uma lista de WeatherRecords.

Faltam algumas partes no restante do código para realizar o teste.

  1. Para concluir a tarefa, primeiro adicione a transformação Create para converter LINES em uma PCollection.

  2. Depois, inclua uma instrução assert_that usando o método equal_to para comparar output com EXPECTED_OUTPUT.

Se você não souber o que fazer, consulte os testes comentados posteriormente ou as soluções.

Tarefa 5: executar o primeiro teste de unidade de DoFn

  • Retorne ao seu terminal e execute o seguinte comando:
python3 weather_statistics_pipeline_test.py cat testing.out

A saída dos testes será gravada no arquivo testing.out. É possível visualizar o conteúdo desse arquivo ao executar cat testing.out no terminal. Se a tarefa anterior tiver sido concluída corretamente, o conteúdo do arquivo testing.out será o seguinte (o tempo decorrido pode ser diferente):

test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 0.918s OK Observação: o teste acima pode ter sido executado usando o comando "python3 -m unittest test_script.py". Executamos o script Python diretamente para acessar o método principal, mas a abordagem mencionada aqui tende a ser mais comum na prática.

Tarefa 6: executar o segundo teste de unidade de DoFn e o pipeline de depuração

  1. Volte para 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py e remova a marca de comentário do código do segundo teste de unidade (perto das linhas 33 a 50). É possível fazer isso destacando o código e pressionando Ctrl + / (ou Cmd + / no MacOS). O código é exibido abaixo para referência:
class ConvertTempUnitsTest(unittest.TestCase): def test_convert_temp_units(self): with TestPipeline() as p: RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)] EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)] input_records = p | beam.Create(RECORDS) output = input_records | beam.ParDo(ConvertTempUnits()) assert_that(output, equal_to(EXPECTED_RECORDS))

Esse teste garante que a DoFn ConvertTempUnits() esteja funcionando conforme o esperado. Salve weather_statistics_pipeline_test.py e volte ao terminal.

  1. Use os comandos a seguir para executar os testes e visualizar a saída:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

Desta vez, o teste falhou. Se rolarmos pela saída, vamos encontrar as seguintes informações sobre essa falha:

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR ... apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']

Analisando mais de perto o elemento BeamAssertException, vemos que os valores de low_temp e high_temp estão incorretos. Há algo errado na lógica de processamento da DoFn ConvertTempUnits.

  1. Volte para 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py e role para baixo até a definição de ConvertTempUnits (perto da linha 32). Para concluir a tarefa, encontre o erro na lógica de processamento da DoFn e execute novamente estes comandos para confirmar que o teste foi concluído:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

Como lembrete, a fórmula para converter graus Celsius em Farenheit é fornecida abaixo:

temp_f = temp_c * 1.8 + 32.0

Se você não souber o que fazer, consulte as soluções.

Tarefa 7: executar o teste de unidade de PTransform e o pipeline completo

  1. Volte para 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py e remova a marca de comentário do código dos dois testes finais (a partir da linha 53, mais ou menos).

O primeiro teste em que removemos a marca de comentário foi o de testar a PTransform composta ComputeStatistics. Confira a seguir uma forma truncada do código para referência:

def test_compute_statistics(self): with TestPipeline() as p: INPUT_RECORDS = # Test input omitted here EXPECTED_STATS = # Expected output omitted here inputs = p | beam.Create(INPUT_RECORDS) output = inputs | ComputeStatistics() assert_that(output, equal_to(EXPECTED_STATS))

Perceba que isso é muito semelhante aos testes de unidade de DoFn anteriores. A única diferença real, além das entradas e saídas de testes que não são as mesmas, é que estamos aplicando PTransform em vez de beam.ParDo(DoFn()).

O teste final é para o pipeline completo. No código de pipeline (weather_statistics_pipeline.py), o pipeline completo, exceto a origem e o coletor, foi incluído em uma única PTransform WeatherStatsTransform. Para testar o pipeline completo, podemos repetir algo semelhante ao que fizemos acima, mas usando aquela PTransform.

  1. Volte ao terminal e execute o comando a seguir para executar os testes mais uma vez:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

Se você concluiu as tarefas anteriores, a mensagem a seguir vai aparecer no terminal após o término dos testes:

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok ---------------------------------------------------------------------- 4 testes executados em 2,295s OK
  1. Copie o arquivo testing.out no bucket de armazenamento:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/

Clique em Verificar meu progresso para conferir o objetivo. Realize testes de unidade para DoFns e PTransforms.

Parte 2 do laboratório: como testar a lógica do processamento de stream com o TestStream

Nesta etapa do laboratório, vamos realizar testes de unidade para um pipeline de streaming calculando contagens de janelas de corridas de táxi. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:

  • Crie um TestPipeline.
  • Use a classe TestStream para gerar dados de streaming, o que inclui gerar uma série de eventos, avançar a marca-d'água e melhorar o tempo de processamento.
  • Use o método assert_that do módulo testing.util, além dos outros métodos dele, para verificar se a PCollection de saída contém os elementos esperados.

Durante a execução de um pipeline que lê dados de um TestStream, a leitura aguarda a conclusão de todas as consequências de cada evento antes de passar para o próximo, inclusive quando o tempo de processamento avança e os gatilhos apropriados são acionados. O elemento TestStream permite que o efeito do acionamento e a lentidão permitida sejam observados e testados em um pipeline. Isso inclui a lógica sobre gatilhos atrasados e dados descartados devido a atrasos.

Tarefa 1: analisar o código do pipeline principal

  1. No explorador de arquivos, acesse 8b_Stream_Testing_Pipeline/lab.

Esse diretório contém dois arquivos: taxi_streaming_pipeline.py, que contém o código de pipeline principal, e taxi_streaming_pipeline_test.py, que contém o código de teste.

  1. Primeiro, abra 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py.

Vamos analisar o código de pipeline rapidamente antes de trabalhar nos testes de unidade. Primeiro, vemos a classe TaxiRide (a partir da linha 6). Como essa é uma subclasse de typing.NamedTuple, podemos usar transformações com reconhecimento de esquema para trabalhar com objetos dela. Mais tarde, também vamos definir coleções na memória de objetos dessa classe para os testes.

class TaxiRide(typing.NamedTuple): ride_id: str point_idx: int latitude: float longitude: float timestamp: str meter_reading: float meter_increment: float ride_status: str passenger_count: int

A próxima etapa envolve o código principal do pipeline. Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:

  • A DoFn JsonToTaxiRide (a partir da linha 22) é usada para converter as mensagens recebidas do Pub/Sub em objetos da classe TaxiRide.
  • A PTransform TaxiCountTransform (a partir da linha 36). Essa PTransform contém a lógica principal de contagem e janelamento do pipeline. Os testes serão focados nessa PTransform.

A saída de TaxiCountTransform precisa ser uma contagem de todas as corridas de táxi registradas por janela. No entanto, haverá vários eventos por viagem (embarques, desembarques etc.). Vamos filtrar a propriedade ride_status para garantir a contagem de cada viagem apenas uma vez. Para isso, vamos manter apenas os elementos com ride_status igual a "pickup":

... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')

Olhando um pouco mais de perto, a lógica de janelamento usada no pipeline está incluída abaixo:

... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60), trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=60, accumulation_mode=AccumulationMode.ACCUMULATING)

Vamos fazer o janelamento em janelas fixas com 60 segundos de duração. Não temos um gatilho antecipado, mas vamos gerar resultados após a marca d'água transmitir o fim da janela. Incluímos disparos atrasados em cada novo elemento recebido, mas isso só será feito com um atraso permitido de 60 segundos. Por fim, vamos acumular o estado nas janelas até que o atraso permitido tenha sido transmitido.

Tarefa 2: analisar o uso do TestStream e executar o primeiro teste

  1. Abra 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py.

A primeira meta é entender o uso do TestStream no código de teste. Vale lembrar que a classe TestStream permite simular um fluxo de mensagens em tempo real enquanto controla a progressão do tempo de processamento e a marca d'água. O código do primeiro teste (a partir da linha 66) está incluído abaixo:

test_stream = TestStream().advance_watermark_to(0).add_elements([ TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_enroute, 0), TimestampedValue(base_json_pickup, 60) ]).advance_watermark_to(60).advance_processing_time(60).add_elements([ TimestampedValue(base_json_pickup, 120) ]).advance_watermark_to_infinity()
  1. Criamos outro objeto TestStream e, depois, transmitimos a mensagem JSON como uma string (chamada base_json_pickup ou base_json_enroute, dependendo do status da viagem). O que o TestStream acima está fazendo?

O TestStream está realizando estas tarefas:

  • definindo a marca-d'água inicial como 0 (todos os carimbos de data/hora são em segundos);
  • adicionando três elementos ao stream usando o método add_elements com um carimbo de data/hora do evento de 0. Dois desses eventos serão contados (ride_status = "pickup"). O outro, não;
  • adicionando outro evento "pickup", mas com um carimbo de data/hora do evento de 60;
  • avançando a marca d'água e o tempo de processamento para 60, o que vai acionar a primeira janela;
  • adicionando outro evento "pickup", mas com um carimbo de data/hora do evento de 120;
  • avançando a marca d'água para "infinity". Isso significa que todas as janelas serão fechadas, e os novos dados vão estar além de qualquer atraso permitido.
  1. O restante do código do primeiro teste é semelhante ao exemplo de lote anterior, mas agora o TestStream está sendo usado em vez da transformação Create:
taxi_counts = (p | test_stream | TaxiCountTransform() ) EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3], IntervalWindow(60,120): [1], IntervalWindow(120,180): [1]} assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS), reify_windows=True)

No código acima, definimos a PCollection de saída (taxi_counts) criando o TestStream e aplicando a PTransform TaxiCountTransform. Usamos a classe InvervalWindow para definir as janelas que queremos verificar e, em seguida, assert_that com o método equal_to_per_window para verificar os resultados por janela.

  1. Salve o arquivo, volte ao terminal e execute os comandos a seguir para fazer o realocamento para o diretório correto:
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab export BASE_DIR=$(pwd)
  1. Agora, execute o teste acima e visualize a saída executando os comandos a seguir:
python3 taxi_streaming_pipeline_test.py cat testing.out

A seguinte saída vai aparecer após a conclusão do teste (embora o tempo decorrido possa ser diferente):

test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- 1 teste executado em 1,113 s OK

Tarefa 3: criar o TestStream para testar o processamento de dados com atraso

Nesta tarefa, você vai criar um código para um TestStream, o que vai permitir testar a lógica de processamento de dados atrasados.

  1. Volte para 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py e role para baixo até onde o método test_late_data_behavior é comentado (perto da linha 60). Remova a marca de comentário do código do teste em questão, já que vamos concluir o código desta tarefa.
class TaxiLateDataTest(unittest.TestCase): def test_late_data_behavior(self): options = PipelineOptions() options.view_as(StandardOptions).streaming = True with TestPipeline(options=options) as p: base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \ "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \ "\"ride_status\":\"pickup\",\"passenger_count\":1}" test_stream = # TASK 3: Create TestStream Object EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #Pontualidade e resultado tardio taxi_counts = (p | test_stream | TaxiCountTransform() ) assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))

Perceba que EXPECTED_RESULTS contém dois resultados para IntervalWindow(0,60). Esses valores representam os resultados de gatilhos pontuais e atrasados para esta janela.

O código do teste foi concluído fora do processo de criação do TestStream.

  1. Para concluir a tarefa, crie um objeto TestStream que execute estes itens:

    1. avança a marca d'água para 0 (todos os carimbos de data/hora estão em segundos);
    2. adiciona dois TimestampedValues com o valor base_json_pickup e o carimbo de data/hora 0;
    3. avança a marca d'água e o tempo de processamento para 60;
    4. adiciona outro TimestampedValue com o valor base_json_pickup e o carimbo de data/hora 0;
    5. avança a marca d'água e o tempo de processamento para 300;
    6. adiciona outro TimestampedValue com o valor base_json_pickup e o carimbo de data/hora 0;
    7. avança a marca d'água até o infinito.

Isso vai criar um TestStream com quatro elementos que pertencem à primeira janela. Os dois primeiros elementos são pontuais, o segundo está atrasado (mas dentro do atraso permitido) e o elemento final está atrasado e ultrapassa o atraso permitido. Como estamos acumulando painéis disparados, o primeiro gatilho precisa contar dois eventos, e o gatilho final, três. O quarto evento não precisa ser incluído.

Se você não souber o que fazer, consulte as soluções.

Tarefa 4: executar um teste para processar dados atrasados

  1. Volte ao terminal e use o comando a seguir para executar os testes mais uma vez:
rm testing.out python3 taxi_streaming_pipeline_test.py cat testing.out

Se você concluiu as tarefas anteriores, a mensagem a seguir vai aparecer no terminal após o término dos testes:

test_late_data_behavior (__main__.TaxiLateDataTest) ... ok test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- 2 testes executados em 2,225s OK
  1. Copie o arquivo testing.out no bucket de armazenamento:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/

Clique em Verificar meu progresso para conferir o objetivo. Teste a lógica de processamento de stream com o TestStream

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2025 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de empresas e produtos podem ser marcas registradas das empresas a que estão associados.

Antes de começar

  1. Os laboratórios criam um projeto e recursos do Google Cloud por um período fixo
  2. Os laboratórios têm um limite de tempo e não têm o recurso de pausa. Se você encerrar o laboratório, vai precisar recomeçar do início.
  3. No canto superior esquerdo da tela, clique em Começar o laboratório

Usar a navegação anônima

  1. Copie o nome de usuário e a senha fornecidos para o laboratório
  2. Clique em Abrir console no modo anônimo

Fazer login no console

  1. Faça login usando suas credenciais do laboratório. Usar outras credenciais pode causar erros ou gerar cobranças.
  2. Aceite os termos e pule a página de recursos de recuperação
  3. Não clique em Terminar o laboratório a menos que você tenha concluído ou queira recomeçar, porque isso vai apagar seu trabalho e remover o projeto

Este conteúdo não está disponível no momento

Você vai receber uma notificação por e-mail quando ele estiver disponível

Ótimo!

Vamos entrar em contato por e-mail se ele ficar disponível

Um laboratório por vez

Confirme para encerrar todos os laboratórios atuais e iniciar este

Use a navegação anônima para executar o laboratório

Para executar este laboratório, use o modo de navegação anônima ou uma janela anônima do navegador. Isso evita conflitos entre sua conta pessoal e a conta de estudante, o que poderia causar cobranças extras na sua conta pessoal.