arrow_back

Procesamiento de datos sin servidores con Dataflow: cómo realizar pruebas con Apache Beam (Python)

Acceder Unirse
Obtén acceso a más de 700 labs y cursos

Procesamiento de datos sin servidores con Dataflow: cómo realizar pruebas con Apache Beam (Python)

Lab 2 horas universal_currency_alt 5 créditos show_chart Avanzado
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Obtén acceso a más de 700 labs y cursos

Descripción general

En este lab, aprenderás a hacer lo siguiente:

  • Escribir pruebas de unidades para DoFn y PTransform mediante las herramientas de prueba de Apache Beam
  • Realizar una prueba de integración de la canalización
  • Utilizar la clase TestStream para probar el comportamiento del sistema de ventanas en una canalización de transmisión

La prueba de tu canalización es un paso importante en el desarrollo de una solución de procesamiento de datos efectiva. Debido a la naturaleza indirecta del modelo de Beam, la depuración de ejecuciones con errores puede convertirse en una tarea importante.

En este lab, aprenderemos a realizar pruebas de unidades de forma local con herramientas del paquete de pruebas del SDK de Beam usando DirectRunner.

Configuración y requisitos

Antes de hacer clic en el botón Comenzar lab

Nota: Lee estas instrucciones.

Los labs son cronometrados y no se pueden pausar. El cronómetro, que comienza a funcionar cuando haces clic en Comenzar lab, indica por cuánto tiempo tendrás a tu disposición los recursos de Google Cloud.

En este lab práctico de Qwiklabs, se te proporcionarán credenciales temporales nuevas para acceder a Google Cloud y realizar las actividades en un entorno de nube real, no en uno de simulación o demostración.

Requisitos

Para completar este lab, necesitarás lo siguiente:

  • Acceso a un navegador de Internet estándar (se recomienda el navegador Chrome)
  • Tiempo para completar el lab
Nota: Si ya tienes un proyecto o una cuenta personal de Google Cloud, no los uses para el lab. Nota: Si usas una Pixelbook, abre una ventana de incógnito para ejecutar el lab.

Cómo iniciar tu lab y acceder a la consola

  1. Haz clic en el botón Comenzar lab. Si debes pagar por el lab, se abrirá una ventana emergente para que selecciones tu forma de pago. A la izquierda, verás un panel con las credenciales temporales que debes usar para este lab.

    Panel de credenciales

  2. Copia el nombre de usuario y, luego, haz clic en Abrir la consola de Google. El lab inicia los recursos y abre otra pestaña que muestra la página Elige una cuenta.

    Sugerencia: Abre las pestañas en ventanas separadas, una junto a la otra.
  3. En la página Elige una cuenta, haz clic en Usar otra cuenta. Se abrirá la página de acceso.

    Cuadro de diálogo Elige una cuenta el que se destaca la opción Usar otra cuenta

  4. Pega el nombre de usuario que copiaste del panel Detalles de la conexión. Luego, copia y pega la contraseña.

Nota: Debes usar las credenciales del panel Detalles de la conexión. No uses tus credenciales de Google Cloud Skills Boost. Si tienes una cuenta propia de Google Cloud, no la utilices para este lab para no incurrir en cargos.
  1. Haz clic para avanzar por las páginas siguientes:
  • Acepta los Términos y Condiciones.
  • No agregues opciones de recuperación o autenticación de dos factores (esta es una cuenta temporal).
  • No te registres para obtener pruebas gratuitas.

Después de un momento, se abrirá la consola de Cloud en esta pestaña.

Nota: Para ver el menú con una lista de los productos y servicios de Google Cloud, haz clic en el menú de navegación que se encuentra en la parte superior izquierda de la pantalla. Menú de la consola de Cloud

En este lab, ejecutarás todos los comandos en una terminal de tu notebook de instancia.

  1. En el menú de navegación (Menú de navegación) de la consola de Google Cloud, selecciona Vertex AI.

  2. Haz clic en Habilitar todas las APIs recomendadas.

  3. En el menú de navegación, haz clic en Workbench.

    En la parte superior de la página de Workbench, asegúrate de estar en la vista Instancias.

  4. Haz clic en agregar cuadroCrear nueva.

  5. Configura la instancia:

    • Nombre: lab-workbench
    • Región: Configura la región como
    • Zona: Establece la zona en
    • Opciones avanzadas (opcional): Si es necesario, haz clic en "Opciones avanzadas" para realizar personalizaciones adicionales (p. ej., tipo de máquina, tamaño del disco).

Crea una instancia de Vertex AI Workbench

  1. Haz clic en Crear.

La instancia tardará algunos minutos en crearse. Se mostrará una marca de verificación verde junto a su nombre cuando esté lista.

  1. Haz clic en ABRIR JUPYTERLAB junto al nombre de la instancia para iniciar la interfaz de JupyterLab. Se abrirá una pestaña nueva en el navegador.

Instancia de Workbench implementada

  1. Haz clic en Terminal. Esto abrirá una terminal en la que podrá ejecutar todos los comandos del lab.

Descarga el repositorio de código

A continuación, descargarás un repositorio de código que usarás en este lab.

  1. En la terminal que acabas de abrir, ingresa lo siguiente:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.

  2. Navega al repo clonado /training-data-analyst/quests/dataflow_python/. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab con un código que debes completar y una subcarpeta solution con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.

Opción Explorador destacada en el menú Ver expandido

Nota: Para abrir un archivo y editarlo, simplemente debes navegar al archivo y hacer clic en él. Se abrirá el archivo, en el que puedes agregar o modificar código.

El código del lab se divide en dos carpetas: 8a_Batch_Testing_Pipeline/lab y 8b_Stream_Testing_Pipeline/lab. Si en algún momento tienes problemas, puedes consultar la solución en las carpetas solution correspondientes.

Haz clic en Revisar mi progreso para verificar el objetivo. Crear una instancia de notebook y clonar el repo del curso.

Parte 1 del lab: Realiza pruebas de unidades para DoFn y PTransform

Tarea 1. Prepara el entorno

En esta parte del lab, realizaremos pruebas de unidades en DoFn y PTransform para obtener estadísticas de procesamiento de una canalización por lotes a partir de sensores meteorológicos. Para probar las transformaciones que tú creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:

  • Crea una TestPipeline.
  • Crea algunos datos de entrada de prueba y usa la transformación Create para crear una PCollection de tus datos de entrada.
  • Aplica tu transformación a la PCollection de entrada y guarda la PCollection resultante.
  • Usa el método Assert_that del módulo testing.util, junto con los demás disponibles, para verificar que la PCollection de salida contenga los elementos previstos.

TestPipeline es una clase especial incluida en el SDK de Beam específicamente para probar tus transformaciones y la lógica de tu canalización. Cuando realices pruebas, usa TestPipeline en lugar de Pipeline para crear el objeto de canalización. La transformación Create toma una colección de objetos en la memoria (un iterable de Java) y crea una PCollection a partir de aquella. El objetivo es tener un pequeño conjunto de datos de entrada de prueba (de los cuales conozcamos la PCollection de salida esperada) a partir de nuestras PTransforms.

with TestPipeline() as p: INPUTS = [fake_input_1, fake_input_2] test_output = p | beam.Create(INPUTS) | # Transforms to be tested

Por último, debemos comprobar que la PCollection de salida coincida con la salida que se espera. Para ello, usamos el método assert_that. Por ejemplo, podemos usar el método equal_to para verificar que la PCollection de salida tenga los elementos correctos:

assert_that(test_output, equal_to(EXPECTED_OUTPUTS))

Al igual que en los labs anteriores, el primer paso es generar datos para que la canalización los procese. Abre el entorno del lab y genera los datos como lo hiciste anteriormente:

Abre el lab adecuado

  • En la terminal del IDE, ejecuta los siguientes comandos para cambiar al directorio que usarás en este lab:
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab export BASE_DIR=$(pwd)

Configura el entorno virtual y las dependencias

Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.

  1. Ejecuta el siguiente comando para crear un entorno virtual para tu trabajo en este lab:
sudo apt-get install -y python3-venv # Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Luego, instala los paquetes que necesitarás para ejecutar tu canalización:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Asegúrate de que la API de Dataflow esté habilitada:
gcloud services enable dataflow.googleapis.com
  1. Por último, crea un bucket de almacenamiento:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage buckets create gs://$PROJECT_ID --location=US

Haz clic en Revisar mi progreso para verificar el objetivo. Preparar el entorno

Tarea 2. Explora el código de canalización principal

  1. En el explorador de archivos, dirígete a 8a_Batch_Testing_Pipeline/lab. Este directorio contiene dos archivos: weather_statistics_pipeline.py, que incluye el código de canalización principal, y weather_statistics_pipeline_test.py, que incluye el código de prueba.

  2. En primer lugar, abre 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py.

Exploraremos brevemente el código de nuestra canalización antes de trabajar con las pruebas de unidades. Primero, vemos la clase WeatherRecord (a partir de la línea 6). Esta es una subclase de typing.NamedTuple, por lo que podemos usar transformaciones adaptadas al esquema para trabajar con objetos de esta clase. Más adelante, definiremos las colecciones de objetos de esta clase en la memoria para nuestras pruebas.

class WeatherRecord(typing.NamedTuple): loc_id: str lat: float lng: float date: str low_temp: float high_temp: float precip: float
  1. Ahora, desplázate hacia abajo hasta la línea en la que comenzaremos a definir DoFn y PTransform (línea 17).

Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:

  • Las DoFn ConvertCsvToWeatherRecord (a partir de la línea 17) y ConvertTempUnits (a partir de la línea 27). Realizaremos pruebas de unidades en estas DoFn más adelante.
  • La PTransform ComputeStatistics (a partir de la línea 41). Este es un ejemplo de una transformación compuesta que podremos probar de la misma manera que una DoFn.
  • La PTransform WeatherStatsTransform (a partir de la línea 55). Esta PTransform contiene la lógica de procesamiento de toda nuestra canalización (menos las transformaciones de origen y receptor) para que podamos realizar una prueba de integración de canalización pequeña con datos sintéticos creados con una transformación Create.
Nota: Si observas un error lógico en el código de procesamiento, no lo corrijas todavía. Más adelante, veremos cómo usar pruebas para identificarlo.

Tarea 3: Agrega dependencias para las pruebas

  1. Ahora, abre 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py en tu explorador de archivos.

Debemos agregar algunas dependencias para las pruebas. Aprovecharemos las utilidades de prueba incluidas en Apache Beam y el paquete unittest en Python.

  1. Para completar esta tarea, agrega las siguientes sentencias de importación cuando se te solicite en la parte superior de weather_statistics_pipeline_test.py:
from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import BeamAssertException from apache_beam.testing.util import assert_that, equal_to

Si tienes problemas, puedes consultar las soluciones.

Tarea 4. Escribe la primera prueba de unidades de DoFn en Apache Beam

Observa que el archivo 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py contiene el código de nuestras pruebas de unidades de DoFn y PTransform. Por ahora, el código está generalmente marcado como comentado, pero quitaremos el comentario a medida que avancemos.

Antes de explorar el código de Beam, ten en cuenta que definimos un método main personalizado para administrar la ejecución de nuestras pruebas y escribir el resultado de estas en un archivo de texto. De esta forma, tendremos un registro de las pruebas que podremos consultar después de que finalice la sesión de la terminal actual. Esto también se puede administrar con el módulo logging, por ejemplo.

def main(out = sys.stderr, verbosity = 2): loader = unittest.TestLoader() suite = loader.loadTestsFromModule(sys.modules[__name__]) unittest.TextTestRunner(out, verbosity = verbosity).run(suite) # Testing code omitted if __name__ == '__main__': with open('testing.out', 'w') as f: main(f)
  1. Comenzaremos explorando una prueba de unidades de DoFn para nuestra ConvertCsvToWeatherRecord DoFn (a partir de la línea 43). Primero, crearemos una clase para probar nuestra canalización y crearemos un objeto TestPipeline:
class ConvertToWeatherRecordTest(unittest.TestCase): def test_convert_to_csv(self): with TestPipeline() as p: ...
  1. Ahora, observa el código (incompleto) de nuestra primera prueba:
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1'] EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)] input_lines = p | # TASK 4: Create PCollection from LINES output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord()) # TASK 4: Write assert_that statement

Crearemos una entrada de prueba única (LINES), que represente una línea de un archivo CSV (el formato de entrada esperado para nuestra canalización), y la colocaremos en una lista. También definiremos el resultado esperado (EXPECTED_OUTPUT) como una lista de WeatherRecords.

Hay partes que faltan en el resto del código para la prueba.

  1. Para completar esta tarea, primero agrega la transformación Create y convierte LINES en una PCollection.

  2. Luego, incluye una sentencia assert_that con el método equal_to para comparar output con EXPECTED_OUTPUT.

Si tienes problemas, puedes consultar las pruebas con comentarios más recientes o las soluciones.

Tarea 5: Ejecuta la primera prueba de unidades de DoFn

  • Regresa a la terminal y ejecuta el siguiente comando:
python3 weather_statistics_pipeline_test.py cat testing.out

El resultado de las pruebas se escribirá en el archivo testing.out. Podemos ver el contenido de este archivo ejecutando cat testing.out en nuestra terminal. Si la tarea anterior se completó correctamente, el contenido del archivo testing.out debería ser el siguiente (el tiempo transcurrido podría variar):

test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 0.918s OK Nota: La prueba anterior se podría haber ejecutado con el comando "python3 -m unittest test_script.py". Ejecutamos la secuencia de comandos de Python directamente para acceder al método principal, pero el enfoque que se menciona aquí suele ser más común en la práctica.

Tarea 6: Ejecuta la segunda prueba de unidades de DoFn y la canalización de depuración

  1. Regresa a 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py y quita el comentario del código de la segunda prueba de unidades (de la línea 33 a la línea 50). Para ello, destaca el código y presiona Ctrl + / (o Cmd + / en MacOS). A continuación, se muestra el código a modo de referencia:
class ConvertTempUnitsTest(unittest.TestCase): def test_convert_temp_units(self): with TestPipeline() as p: RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)] EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)] input_records = p | beam.Create(RECORDS) output = input_records | beam.ParDo(ConvertTempUnits()) assert_that(output, equal_to(EXPECTED_RECORDS))

Esta prueba garantiza que la DoFn ConvertTempUnits() funcione según lo previsto. Guarda weather_statistics_pipeline_test.py y regresa a tu terminal.

  1. Ejecuta los siguientes comandos para realizar las pruebas y ver el resultado:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

La prueba falló esta vez. Si nos desplazamos por el resultado, encontraremos la siguiente información sobre la prueba con errores:

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR ... apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']

Si analizamos con más detalle la BeamAssertException, observamos que los valores de low_temp y high_temp son incorrectos. Hay un error en la lógica de procesamiento de la DoFn ConvertTempUnits.

  1. Regresa a 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py y desplázate hacia abajo hasta la definición de ConvertTempUnits (cerca de la línea 32). Para completar esta tarea, busca el error en la lógica de procesamiento de DoFn y vuelve a ejecutar los siguientes comandos para confirmar que la prueba ahora se realice correctamente:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

A modo de recordatorio, esta es la fórmula para convertir grados Celsius en grados Farenheit:

temp_f = temp_c * 1.8 + 32.0

Si tienes problemas, puedes consultar las soluciones.

Tarea 7: Ejecuta la prueba de unidades de PTransform y prueba la canalización de extremo a extremo

  1. Regresa a 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py y quita los comentarios del código de las dos últimas pruebas (a partir de la línea 53).

La primera prueba, a la que acabamos de quitar el comentario, corresponde a la prueba de la PTransform compuesta ComputeStatistics. A modo de referencia, este es un código con formato truncado:

def test_compute_statistics(self): with TestPipeline() as p: INPUT_RECORDS = # Test input omitted here EXPECTED_STATS = # Expected output omitted here inputs = p | beam.Create(INPUT_RECORDS) output = inputs | ComputeStatistics() assert_that(output, equal_to(EXPECTED_STATS))

Ten en cuenta que esto es muy similar a las pruebas de unidades de DoFn que realizamos anteriormente. La única diferencia real (aparte de las entradas y las salidas diferentes de las pruebas) es que aplicamos PTransform en lugar de beam.ParDo(DoFn()).

La prueba final es para la canalización de extremo a extremo. En el código de canalización (weather_statistics_pipeline.py), se incluyó toda la canalización de extremo a extremo (menos la fuente y el receptor) en una sola PTransform WeatherStatsTransform. Para probar la canalización de extremo a extremo, podemos repetir lo que hicimos anteriormente, pero con PTransform.

  1. Ahora, regresa a tu terminal y ejecuta el siguiente comando para ejecutar las pruebas una vez más:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalice la prueba:

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok ---------------------------------------------------------------------- Ran 4 tests in 2.295s OK
  1. Ahora, copia y pega el archivo testing.out en el bucket de almacenamiento:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/

Haz clic en Revisar mi progreso para verificar el objetivo. Realizar pruebas de unidades para DoFn y PTransform

Parte 2 del lab: Prueba la lógica de procesamiento de transmisión con TestStream

En esta parte del lab, realizaremos pruebas de unidades para una canalización de transmisión que procesa recuentos de los recorridos de un taxi mediante un sistema de ventanas. Para probar las transformaciones que tú creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:

  • Crea una TestPipeline.
  • Usa la clase TestStream para generar datos de transmisión. Esto incluye generar una serie de eventos y avanzar la marca de agua y el tiempo de procesamiento.
  • Usa el método Assert_that del módulo testing.util, junto con los demás disponibles, para verificar que la PCollection de salida contenga los elementos previstos.

Cuando se ejecuta una canalización que lee de TestStream, la operación de lectura espera a que se completen todas las consecuencias de cada evento antes de pasar al siguiente, incluso cuando se avanza el tiempo de procesamiento y se accionan los activadores correspondientes. TestStream permite que el efecto de la activación y el retraso permitido se observen y prueben en una canalización. Esto incluye la lógica en torno a los activadores tardíos y los datos descartados debido a un retraso.

Tarea 1: Explora el código de canalización principal

  1. Navega a 8b_Stream_Testing_Pipeline/lab en el explorador de archivos.

Este directorio contiene dos archivos: taxi_streaming_pipeline.py, que incluye el código de canalización principal, y taxi_streaming_pipeline_test.py, que incluye el código de prueba.

  1. Primero, abre 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py.

Exploraremos brevemente el código de nuestra canalización antes de trabajar con las pruebas de unidades. Primero, vemos la clase TaxiRide (a partir de la línea 6). Esta es una subclase de typing.NamedTuple, por lo que podemos usar transformaciones adaptadas al esquema para trabajar con objetos de esta clase. Más adelante, definiremos las colecciones de objetos de esta clase en la memoria para nuestras pruebas.

class TaxiRide(typing.NamedTuple): ride_id: str point_idx: int latitude: float longitude: float timestamp: str meter_reading: float meter_increment: float ride_status: str passenger_count: int

Luego, se encuentra el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:

  • La DoFn JsonToTaxiRide (a partir de la línea 22) usada para convertir los mensajes entrantes de Pub/Sub en objetos de la clase TaxiRide.
  • La PTransform TaxiCountTransform (a partir de la línea 36). Esta PTransform contiene la lógica principal de recuento y sistema de ventanas de la canalización. Nuestras pruebas se enfocarán en esta PTransform.

El resultado de la TaxiCountTransform debería ser un recuento de todos los recorridos registrados de taxi por ventana. Sin embargo, tendremos múltiples eventos por recorrido (partida, destino, etcétera). Filtraremos por la propiedad ride_status para garantizar que contemos cada recorrido solo una vez. Para ello, solo mantendremos los elementos cuyo ride_status sea igual a “pickup”:

... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')

Si lo analizamos un poco más en detalle, verás que se utilizó la siguiente lógica de renderización en ventanas en nuestra canalización:

... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60), trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=60, accumulation_mode=AccumulationMode.ACCUMULATING)

Estableceremos grupos de ventanas fijas de 60 segundos de duración. No contamos con un activador anticipado, por lo que emitiremos los resultados después de que la marca de agua pase el final de la ventana. Incluiremos activaciones tardías con cada elemento nuevo que ingrese, pero con un retraso permitido de solo 60 segundos. Por último, acumularemos el estado en ventanas hasta que haya pasado el retraso permitido.

Tarea 2. Explora el uso de TestStream y ejecuta la primera prueba

  1. En primer lugar, abre 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py.

El primer objetivo será comprender el uso de TestStream en nuestro código de prueba. Recuerda que la clase TestStream nos permite simular un flujo de mensajes en tiempo real y controlar el progreso del tiempo de procesamiento y la marca de agua. Este es el código de la primera prueba (a partir de la línea 66):

test_stream = TestStream().advance_watermark_to(0).add_elements([ TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_enroute, 0), TimestampedValue(base_json_pickup, 60) ]).advance_watermark_to(60).advance_processing_time(60).add_elements([ TimestampedValue(base_json_pickup, 120) ]).advance_watermark_to_infinity()
  1. Crearemos un nuevo objeto TestStream y, luego, pasaremos el mensaje JSON como una cadena denominada base_json_pickup o base_json_enroute, según el estado del recorrido. ¿Qué hace el TestStream anterior?

El TestStream realiza las siguientes tareas:

  • Se establece la marca de agua de tiempo como 0 (todas las marcas de tiempo son en segundos).
  • Agregar tres elementos al flujo a través del método add_elements con una marca de tiempo de evento de 0 Contar solo dos de estos eventos (ride_status = "pickup")
  • Agregar otro evento de “pickup”, pero con una marca de tiempo de evento correspondiente a 60
  • Avanzar la marca de agua y el tiempo de procesamiento a 60 para activar la primera ventana
  • Agregar otro evento de “pickup”, pero con una marca de tiempo de evento correspondiente a 120
  • Avanzar la marca de agua al “infinito”. Esto hará que todas las ventanas se cierren y permitirá que los datos nuevos superen el retraso permitido
  1. El resto del código de la primera prueba es similar al ejemplo por lotes anterior, pero ahora usamos TestStream en lugar de la transformación Create:
taxi_counts = (p | test_stream | TaxiCountTransform() ) EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3], IntervalWindow(60,120): [1], IntervalWindow(120,180): [1]} assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS), reify_windows=True)

En el código anterior, definimos nuestra PCollection de salida (taxi_counts) creando el TestStream y aplicando la PTransform TaxiCountTransform. Usamos la clase IntervalWindow para definir las ventanas que queremos verificar y, luego, usamos assert_that con el método equal_to_per_window para verificar los resultados por ventana.

  1. Guarda el archivo, regresa a la terminal y ejecuta los siguientes comandos para reubicar al directorio correcto:
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab export BASE_DIR=$(pwd)
  1. Ahora, ejecuta la prueba anterior y los siguientes comandos para ver el resultado:
python3 taxi_streaming_pipeline_test.py cat testing.out

Deberías ver el siguiente resultado después de la prueba (aunque el tiempo transcurrido puede variar):

test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 1.113s OK

Tarea 3: Crea TestStream para probar el manejo de datos tardíos

En esta tarea, escribirás código para un TestStream para probar la lógica de la prueba relacionada con el manejo de los datos tardíos.

  1. Regresa a 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py y desplázate hacia abajo hasta donde se marcó el método test_late_data_Behavior como comentario (línea 60). Quita el comentario del código de esta prueba, ya que completaremos el código para esta tarea.
class TaxiLateDataTest(unittest.TestCase): def test_late_data_behavior(self): options = PipelineOptions() options.view_as(StandardOptions).streaming = True with TestPipeline(options=options) as p: base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \ "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \ "\"ride_status\":\"pickup\",\"passenger_count\":1}" test_stream = # TASK 3: Create TestStream Object EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #On Time and Late Result taxi_counts = (p | test_stream | TaxiCountTransform() ) assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))

Ten en cuenta que EXPECTED_RESULTS contiene dos resultados para IntervalWindow(0,60). Estos representan los resultados de los activadores puntuales y tardíos de esta ventana.

El código de la prueba se completará fuera de la creación de TestStream.

  1. Para completar esta tarea, crea un objeto TestStream que realice las siguientes tareas:

    1. Avanzar la marca de agua a 0 (todas las marcas de tiempo están en segundos)
    2. Agregar dos TimestampedValues con el valor base_json_pickup y la marca de tiempo 0
    3. Avanzar la marca de agua y el tiempo de procesamiento a 60
    4. Agregar otro TimestampedValue con el valor base_json_pickup y la marca de tiempo 0
    5. Avanzar la marca de agua y el tiempo de procesamiento a 300
    6. Agregar otro TimestampedValue con el valor base_json_pickup y la marca de tiempo 0
    7. Avanzar la marca de agua al infinito

Se creará un TestStream con cuatro elementos que pertenecen a la primera ventana. Los primeros dos elementos son puntuales, el segundo está retrasado (pero dentro del retraso permitido) y el último está retrasado y supera el retraso permitido. Debido a que acumulamos paneles activados, el primer activador debe contar dos eventos, mientras que el activador final debe contar tres. No se debe incluir el cuarto evento.

Si tienes problemas, puedes consultar las soluciones.

Tarea 4: Ejecuta pruebas para el manejo de datos tardíos

  1. Ahora, regresa a tu terminal y ejecuta el siguiente comando para ejecutar las pruebas una vez más:
rm testing.out python3 taxi_streaming_pipeline_test.py cat testing.out

Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalice la prueba:

test_late_data_behavior (__main__.TaxiLateDataTest) ... ok test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- Ran 2 tests in 2.225s OK
  1. Ahora, copia y pega el archivo testing.out en el bucket de almacenamiento:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/

Haz clic en Revisar mi progreso para verificar el objetivo. Prueba la lógica de procesamiento de transmisión con TestStream

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2025 Google LLC. Todos los derechos reservados. Google y el logotipo de Google son marcas de Google LLC. El resto de los nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que están asociados.

Antes de comenzar

  1. Los labs crean un proyecto de Google Cloud y recursos por un tiempo determinado
  2. .
  3. Los labs tienen un límite de tiempo y no tienen la función de pausa. Si finalizas el lab, deberás reiniciarlo desde el principio.
  4. En la parte superior izquierda de la pantalla, haz clic en Comenzar lab para empezar

Usa la navegación privada

  1. Copia el nombre de usuario y la contraseña proporcionados para el lab
  2. Haz clic en Abrir la consola en modo privado

Accede a la consola

  1. Accede con tus credenciales del lab. Si usas otras credenciales, se generarán errores o se incurrirá en cargos.
  2. Acepta las condiciones y omite la página de recursos de recuperación
  3. No hagas clic en Finalizar lab, a menos que lo hayas terminado o quieras reiniciarlo, ya que se borrará tu trabajo y se quitará el proyecto

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible

Un lab a la vez

Confirma para finalizar todos los labs existentes y comenzar este

Usa la navegación privada para ejecutar el lab

Usa una ventana de navegación privada o de Incógnito para ejecutar el lab. Así evitarás cualquier conflicto entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.