arrow_back

Processamento de dados sem servidor com o Dataflow: pipelines de análise em lote com o Dataflow (Python)

Acesse mais de 700 laboratórios e cursos

Processamento de dados sem servidor com o Dataflow: pipelines de análise em lote com o Dataflow (Python)

Laboratório 2 horas universal_currency_alt 5 créditos show_chart Avançado
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Acesse mais de 700 laboratórios e cursos

Visão geral

Neste laboratório, você vai:

  • escrever um pipeline para agregar o tráfego do site por usuário;
  • escrever um pipeline para agregar o tráfego do site por minuto;
  • implementar o janelamento com base em dados de série temporal.

Pré-requisitos

OBSERVAÇÃO: o nível do laboratório é avançado e é exigido conhecimento de Python.

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.

  1. Faça login no Qwiklabs em uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Parte A. Configuração do ambiente de desenvolvimento das instâncias do Workbench

Neste laboratório, você vai executar todos os comandos em um terminal usando a instância do Workbench no notebook.

  1. No console do Google Cloud, no menu de navegação (Menu de navegação), selecione Vertex AI, ou você também pode acessar o painel da Vertex AI.

  2. Selecione Ativar todas as APIs recomendadas. Agora, vamos verificar se a API Notebook está ativada.

  3. No menu de navegação, clique em Workbench.

    Verifique se você está na visualização Instâncias do topo da página do Workbench.

  4. Clique em Caixa "adicionar"Criar.

  5. Configure a instância:

Nome Região Zona Opções avançadas (opcional)
lab-workbench Se necessário, clique em "Opções avançadas" para personalizar ainda mais (por exemplo, tipo de máquina, tamanho do disco).
  1. Clique em Criar.

O processo vai levar alguns minutos, e uma marca de seleção verde vai aparecer ao lado do nome da instância quando ela for criada.

  1. Clique em Abrir o Jupyterlab ao lado do nome da instância para iniciar a interface do ambiente. Uma nova guia será aberta no navegador.

  2. Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.

Faça o download do repositório de código

Agora você precisa dele para usar neste laboratório.

  1. Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. No painel à esquerda do ambiente do notebook, no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.

  2. Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab, que contém o código que precisa ser concluído, e solution, que inclui um exemplo prático caso você precise de ajuda.

Opção "Explorador" destacada no menu "Visualização" expandido

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.

Clique em Verificar meu progresso para conferir o objetivo. Crie uma instância de notebook e clone o repositório do curso

Tarefa 1. Como agregar o tráfego do site por usuário

Nesta parte do laboratório, você vai criar um pipeline que:

  1. lê o tráfego do dia de um arquivo no Cloud Storage;
  2. converte cada evento em um objeto CommonLog;
  3. soma o número de hits de cada usuário único agrupando cada objeto por ID do usuário e combinando os valores para ter o número total de hits desse usuário específico;
  4. realiza agregações extras em cada usuário;
  5. grava os dados resultantes no BigQuery.

Tarefa 2. Gerar dados sintéticos

Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes:

Abra o laboratório apropriado

  • No terminal do ambiente de desenvolvimento integrado, execute os seguintes comandos:
# Change directory into the lab cd 3_Batch_Analytics/lab export BASE_DIR=$(pwd)

Como configurar o ambiente virtual e as dependências

Antes de começar a editar o código do pipeline real, você precisa verificar se instalou as dependências necessárias.

  1. Execute o código abaixo e crie um ambiente virtual para realizar o trabalho deste laboratório:
sudo apt-get update && sudo apt-get install -y python3-venv # Criar e ativar ambiente virtual python3 -m venv df-env source df-env/bin/activate
  1. Em seguida, instale os pacotes necessários para executar o pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Confira se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com

Configure o ambiente de dados

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_batch_sinks.sh # Generate event dataflow source generate_batch_events.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

O script cria um arquivo chamado events.json com linhas semelhantes às seguintes:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Em seguida, ele copia automaticamente esse arquivo para o bucket do Google Cloud Storage em .

  • Acesse o Google Cloud Storage e confirme se o bucket de armazenamento contém um arquivo chamado events.json.

Clique em Verificar meu progresso para conferir o objetivo. Gerar dados sintéticos

Tarefa 3. Somar as visualizações de página por usuário

Esta tarefa tem dois minidesafios. Você pode consultar a solução

No explorador de arquivos, navegue até o caminho abaixo e abra o arquivo batch_user_traffic_pipeline.py.

training-data-analyst/quests/dataflow_python/3_Batch_Analytics/lab

Minidesafio: #TODO 1:

O pipeline contém o código necessário para aceitar as opções de linha de comando do caminho de entrada e o nome da tabela de saída, além do código para ler eventos no Google Cloud Storage, analisar esses eventos e gravar resultados no BigQuery. No entanto, algumas partes importantes estão ausentes e estão marcadas como #TODOs.

Esta é uma tarefa de modelagem de dados. Nesta etapa, depois de agrupar todos os eventos de registro por usuário, você deve considerar quais informações quer calcular para cada um.

Você precisa definir a estrutura (o esquema) que vai reter os resultados da agregação. Você precisa analisar a classe CommonLog para ver quais campos estão disponíveis para agregação.

Como resolver:

  1. Identifique a chave: o nome da classe é PerUserAggregation. Portanto, a principal informação que você precisa manter é o user_id.

  2. Escolha as métricas a serem calculadas: o que é possível calcular com base na coleta de entradas CommonLog de um usuário?

    • Uma contagem: quantas vezes o usuário acessou o servidor?
    • Uma soma: qual é o número total de bytes (num_bytes) que eles baixaram?
    • Um mín/máx: qual foi o primeiro e o último carimbo de data/hora de atividade?
  3. Por exemplo:

user_id: str page_views: int ...

Minidesafio: #TODO 2:

Este é um requisito técnico do framework Apache Beam. O desafio é testar seu conhecimento sobre como o Beam trata tipos de dados personalizados.

Quando o Apache Beam executa um pipeline, muitas vezes ele precisa enviar dados entre diferentes computadores (chamados de workers). Para isso, ele precisa serializar o objeto Python (como uma instância de PerUserAggregation) em um fluxo de bytes, enviá-lo pela rede e depois desserializá-lo de volta ao objeto do outro lado. Um Coder é o objeto que diz ao Beam como realizar a serialização e a desserialização.

Se você não disser ao Beam como codificar/decodificar sua classe personalizada PerUserAggregation, o pipeline vai apresentar um erro.

Como resolver:

A solução está na linha logo acima dos #TODOs. O Beam fornece um RowCoder que funciona perfeitamente com classes NamedTuple. Basta registrar um RowCoder para a nova classe PerUserAggregation, como foi feito para CommonLog.

  1. Por exemplo:
beam.coders.registry.register_coder(PerUserAggregation, ...)

Tarefa 4. Executar o pipeline

Volte ao terminal e use o comando a seguir para executar o pipeline com o serviço do Cloud Dataflow. Se você tiver problemas, use o DirectRunner ou consulte a solução

  • No snippet de código abaixo, substitua os campos ENTER_REGION_ID e ENTER_ZONE_ID segundo a tabela a seguir
Região Zona
  • Substitua os valores regionais e zonais de acordo com as especificações do laboratório.
# 1. Configurar todas as variáveis de ambiente export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export ZONE=ENTER_ZONE_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic # 2. Checar se o arquivo de entrada existe # Este comando NÃO deve retornar um erro. echo "Verifying input file exists at ${INPUT_PATH}..." gcloud storage ls ${INPUT_PATH} # 3. Executar o script de pipeline echo "Running the Dataflow pipeline..." python3 batch_user_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --worker_zone=${ZONE} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME}
  • Você pode verificar o job enviado no painel do Dataflow

  • Quando o status do job mostrar que ele foi bem-sucedido, vamos verificar os resultados no BigQuery. Para terminar essa tarefa, aguarde alguns minutos até que o pipeline seja concluído. Em seguida, navegue até o BigQuery e consulte a tabela user_traffic.

Clique em Verificar meu progresso para ver o objetivo. Como agregar o tráfego do site por usuário e executar o pipeline

Parte B. Como agregar o tráfego do site por minuto

Nesta parte do laboratório, você vai criar um novo pipeline chamado batch_minute_traffic Ele se expande de acordo com os princípios básicos de análise em lote usados no batch_user_traffic e, em vez de agregar por usuários em todo o lote, agrega quando os eventos ocorrem. Há vários #TODOs que você precisa corrigir ou, como alternativa, é possível obter ajuda da solução

No ambiente de desenvolvimento integrado, acesse o caminho abaixo e abra o arquivo batch_minute_traffic_pipeline.py.

3_Batch_Analytics/lab Observação: antes de começar a trabalhar com o script principal (batch_minute_traffic_pipeline.py), não se esqueça de analisar os dois arquivos de ajuda pipeline_utils.py e setup.py. Para executar jobs do Dataflow, vamos usar esses arquivos de ajuda, já que pipeline_utils contém a lógica personalizada do Python (classes e funções) em um módulo separado que pode ser empacotado e enviado para os workers remotos do Dataflow. Já o setup.py funciona como o manual de instruções para o Dataflow, dizendo exatamente como empacotar o arquivo pipeline_utils.py de modo adequado para ser instalado em cada worker.

Tarefa 5. Adicionar carimbos de data/hora a cada elemento

Estes itens #TODOs orientam você na criação de um pipeline clássico de agregação de séries temporais no Apache Beam. Cada um representa um conceito básico no processamento em lote e de stream. O objetivo deste pipeline é processar um arquivo JSON de eventos da Web (events.json), contar quantos eventos ocorreram a cada minuto e gravar essas contagens minuto a minuto em uma tabela do BigQuery.

O fluxo do pipeline é semelhante a: Read Text -> Parse to CommonLog -> TODOs -> Write to BigQuery

Os #TODOs constituem o conceito básico do pipeline, em que a lógica de agregação real acontece.

Minidesafio: #TODO 1:

Você tem uma coleção de objetos CommonLog. A próxima etapa no pipeline é agrupar esses eventos por tempo (WindowByMinute). O Apache Beam não consegue fazer isso, a menos que conheça o horário do evento de cada dado. Seu desafio é dizer ao Beam como encontrar esse carimbo de data/hora no objeto CommonLog.

Como resolver:

  1. A função add_timestamp (definida em pipeline_utils.py) analisa a string do carimbo de data/hora de cada registro e a anexa ao elemento como um carimbo de data/hora adequado do Beam, que é necessário para o janelamento.

  2. Por exemplo:

| 'AddEventTimestamp' >> beam.Map(...)

Tarefa 6. Agrupar em janelas de um minuto

Vamos passar para a segunda tarefa do script, em que transformamos elementos de grupos de acordo com o desafio.

Minidesafio: #TODO 2:

  1. Essa transformação agrupa elementos em janelas de tamanho fixo não sobrepostas de 60 segundos (um minuto) com base nos carimbos de data/hora dos eventos.

  2. Por exemplo:

| "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(...))
  1. Para saber mais sobre outros tipos de janelamento, leia a documentação do Apache Beam Seção 8.2. Funções de janelas fornecidas.

Tarefa 7. Contar eventos por janela

Vamos para a terceira tarefa do script, em que contamos eventos por janela.

Minidesafio: #TODO 3:

  1. Esse combinador conta o número de elementos em cada janela de um minuto. Use .without_defaults() para garantir que nenhuma saída seja gerada para janelas vazias.

  2. Por exemplo:

| "CountPerMinute" >> beam.CombineGlobally(CountCombineFn())...()
  1. Para concluir essa tarefa, adicione uma transformação para contagem de todos os elementos de cada janela. Se tiver dificuldades, veja a solução.

Tarefa 8. Converter de volta para uma linha e adicionar o carimbo de data/hora

Vamos para a tarefa final do script, em que convertemos de volta para uma linha e adicionamos o carimbo de data/hora

Minidesafio: #TODO 4:

  1. A GetTimestampFn (definida em pipeline_utils.py) recebe a contagem de números inteiros para cada janela e a formata em um dicionário, adicionando o horário de início da janela como uma string para corresponder ao esquema do BigQuery.

  2. Para concluir essa tarefa, grave uma função do ParDo que aceite elementos do tipo int e transmita o parâmetro adicional para acessar informações da janela. O campo de carimbo de data/hora no esquema da tabela do BigQuery é uma STRING. Portanto, você vai precisar converter o carimbo de data/hora em uma string.

  3. Por exemplo:

| "AddWindowTimestamp" >> beam.ParDo(...()) | 'WriteToBQ' >> beam.io.WriteToBigQuery( table_name, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

Tarefa 9. Executar o pipeline

Depois de concluir a programação, execute o pipeline usando o comando abaixo. Importante: ao testar o código, a alteração da variável de ambiente "RUNNER" para o DirectRunner, que executará o pipeline no dispositivo, será muito mais rápida. Por enquanto, vamos executar o pipeline usando o Dataflow.

  • Substitua os valores regionais e zonais de acordo com as especificações do laboratório.
Região
export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME} \ --setup_file=./setup.py

Tarefa 10. Verificar os resultados:

  • Para concluir essa tarefa, aguarde alguns minutos para que o pipeline seja executado, navegue até o BigQuery e consulte a tabela minute_traffic.

Clique em Verificar meu progresso para ver o objetivo. Agregar o tráfego do site por minuto e executar o pipeline

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.

Antes de começar

  1. Os laboratórios criam um projeto e recursos do Google Cloud por um período fixo
  2. Os laboratórios têm um limite de tempo e não têm o recurso de pausa. Se você encerrar o laboratório, vai precisar recomeçar do início.
  3. No canto superior esquerdo da tela, clique em Começar o laboratório

Usar a navegação anônima

  1. Copie o nome de usuário e a senha fornecidos para o laboratório
  2. Clique em Abrir console no modo anônimo

Fazer login no console

  1. Faça login usando suas credenciais do laboratório. Usar outras credenciais pode causar erros ou gerar cobranças.
  2. Aceite os termos e pule a página de recursos de recuperação
  3. Não clique em Terminar o laboratório a menos que você tenha concluído ou queira recomeçar, porque isso vai apagar seu trabalho e remover o projeto

Este conteúdo não está disponível no momento

Você vai receber uma notificação por e-mail quando ele estiver disponível

Ótimo!

Vamos entrar em contato por e-mail se ele ficar disponível

Um laboratório por vez

Confirme para encerrar todos os laboratórios atuais e iniciar este

Use a navegação anônima para executar o laboratório

Para executar este laboratório, use o modo de navegação anônima ou uma janela anônima do navegador. Isso evita conflitos entre sua conta pessoal e a conta de estudante, o que poderia causar cobranças extras na sua conta pessoal.