En esta página, se muestra cómo usar Dataflow para leer datos de Google Cloud Managed Service para Apache Kafka y escribir los registros en una tabla de BigQuery. En este instructivo, se usa la plantilla de Apache Kafka a BigQuery para crear el trabajo de Dataflow.
Descripción general
Apache Kafka es una plataforma de código abierto para eventos de transmisión. Kafka se suele usar en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Puedes usar Dataflow para leer eventos de Kafka, procesarlos y escribir los resultados en una tabla de BigQuery para su análisis posterior.
Managed Service para Apache Kafka es un Google Cloud servicio que te ayuda a ejecutar clústeres de Kafka seguros y escalables.
Permisos necesarios
La cuenta de servicio del trabajador de Dataflow debe tener los siguientes roles de Identity and Access Management (IAM):
- Cliente de Kafka administrado (
roles/managedkafka.client) - Editor de datos de BigQuery (
roles/bigquery.dataEditor)
Para obtener más información, consulta Seguridad y permisos de Dataflow.
Crear clúster de Kafka
En este paso, crearás un clúster de Managed Service para Apache Kafka. Para obtener más información, consulta Crea un clúster de Managed Service para Apache Kafka.
Console
Ve a la página Managed Service para Apache Kafka > Clústeres.
Haz clic en Crear.
En el cuadro Nombre del clúster, ingresa un nombre para el clúster.
En la lista Región, selecciona una ubicación para el clúster.
Haz clic en Crear.
gcloud
Usa el comando managed-kafka clusters create:
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
Reemplaza lo siguiente:
CLUSTER: Es un nombre para el clúster.REGION: Es la región en la que creaste la subred.PROJECT_ID: Es el ID del proyecto.SUBNET_NAME: Es la subred en la que deseas implementar el clúster.
Por lo general, la creación de un clúster tarda entre 20 y 30 minutos.
Crear tema de Kafka
Después de crear el clúster de Managed Service para Apache Kafka, crea un tema.
Console
Ve a la página Managed Service para Apache Kafka > Clústeres.
Haz clic en el nombre del clúster.
En la página de detalles del clúster, haz clic en Crear tema.
En el cuadro Nombre del tema, ingresa un nombre para el tema.
Haz clic en Crear.
gcloud
Usa el comando managed-kafka topics create:
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Reemplaza lo siguiente:
TOPIC_NAME: Es el nombre del tema que se creará.
Crea una tabla de BigQuery
En este paso, crearás una tabla de BigQuery con el siguiente esquema:
| Nombre de la columna | Tipo de datos |
|---|---|
name |
STRING |
customer_id |
INTEGER |
Si aún no tienes un conjunto de datos de BigQuery, primero crea uno. Para obtener más información, consulta Crea conjuntos de datos. Luego, crea una tabla vacía nueva:
Console
Ve a la página de BigQuery.
En el panel Explorador , expande tu proyecto y, luego, elige un conjunto de datos.
En la sección Información del conjunto de datos, haz clic en Crear tabla.
En la lista Crear tabla desde, selecciona Tabla vacía.
En el cuadro Tabla, ingresa el nombre de la tabla.
En la sección Esquema, haz clic en Editar como texto.
Pega la siguiente definición de esquema:
name:STRING, customer_id:INTEGERHaz clic en Crear tabla.
gcloud
Usa el bq mk comando.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Reemplaza lo siguiente:
PROJECT_ID: Es el ID del proyecto.DATASET_NAME: Es el nombre del conjunto de datos.TABLE_NAME: Es el nombre de la tabla que se creará.
Ejecuta el trabajo de Dataflow:
Después de crear el clúster de Kafka y la tabla de BigQuery, ejecuta la plantilla de Dataflow.
Console
Primero, obtén la dirección del servidor de arranque del clúster:
En la Google Cloud consola de, ve a la página Clústeres.
Haz clic en el nombre del clúster.
Haz clic en la pestaña Configurations.
Copia la dirección del servidor de arranque de Bootstrap URL.
Luego, ejecuta la plantilla para crear el trabajo de Dataflow:
Ve a la página Dataflow > Trabajos.
Haz clic en Crear trabajo a partir de una plantilla.
En el campo Nombre del trabajo, ingresa
kafka-to-bq.En Extremo regional, selecciona la región en la que se encuentra tu clúster de Managed Service para Apache Kafka.
Selecciona la plantilla “Kafka a BigQuery”.
Ingresa los siguientes parámetros de plantilla:
- Servidor de arranque de Kafka: Es la dirección del servidor de arranque.
- Tema de Kafka de origen: Es el nombre del tema que se leerá.
- Modo de autenticación de origen de Kafka:
APPLICATION_DEFAULT_CREDENTIALS - Formato del mensaje de Kafka:
JSON - Estrategia de nombre de tabla:
SINGLE_TABLE_NAME - Tabla de salida de BigQuery: Es la tabla de BigQuery, con el siguiente formato:
PROJECT_ID:DATASET_NAME.TABLE_NAME
En Cola de mensajes no entregados, marca Escribir errores en BigQuery.
Ingresa un nombre de tabla de BigQuery para la cola de mensajes no entregados, con el siguiente formato:
PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAMENo crees esta tabla con anticipación. La canalización la crea.
Haga clic en Ejecutar trabajo.
gcloud
Usa el
dataflow flex-template run
comando.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Reemplaza las siguientes variables:
LOCATION: Es la región en la que se encuentra tu Managed Service para Apache Kafka.PROJECT_ID: Es el nombre de tu Google Cloud proyecto de.CLUSTER_ID: Es el ID del clúster.TOPIC: Es el nombre del tema de Kafka.DATASET_NAME: Es el nombre del conjunto de datos.TABLE_NAME: Es el nombre de la tabla.ERROR_TABLE_NAME: Es un nombre de tabla de BigQuery para la cola de mensajes no entregados.
No crees la tabla para la cola de mensajes no entregados con anticipación. La canalización la crea.
Envía mensajes a Kafka
Después de que se inicia el trabajo de Dataflow, puedes enviar mensajes a Kafka y la canalización los escribe en BigQuery.
Crea una VM en la misma subred que el clúster de Kafka y, luego, instala las herramientas de línea de comandos de Kafka. Para obtener instrucciones detalladas, consulta Configura una máquina cliente en Publica y consume mensajes con la CLI.
Ejecuta el siguiente comando para escribir mensajes en el tema de Kafka:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
Reemplaza las siguientes variables:
TOPIC: Es el nombre del tema de Kafka.CLUSTER_ID: Es el nombre del clúster.LOCATION: Es la región en la que se encuentra el clúster.PROJECT_ID: Es el nombre de tu Google Cloud proyecto de.
En la solicitud, ingresa las siguientes líneas de texto para enviar mensajes a Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
Usa una cola de mensajes no entregados
Mientras se ejecuta el trabajo, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Los errores posibles incluyen los siguientes:
- Errores de serialización, incluido el JSON con formato incorrecto
- Errores de conversión de tipos, causados por una falta de coincidencia en el esquema de la tabla y los datos JSON
- Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla
Estos errores no hacen que falle el trabajo y no aparecen como errores en el registro de trabajos de Dataflow. En su lugar, la canalización usa una cola de mensajes no entregados para controlar estos tipos de errores.
Para habilitar la cola de mensajes no entregados cuando ejecutas la plantilla, establece los siguientes parámetros de plantilla:
useBigQueryDLQ:trueoutputDeadletterTable: Es un nombre de tabla de BigQuery completamente calificado; por ejemplo,my-project:dataset1.errors.
La canalización crea la tabla automáticamente. Si se produce un error cuando se procesa un mensaje de Kafka, la canalización escribe una entrada de error en la tabla.
Ejemplos de mensajes de error:
| Tipo de error | Datos de eventos | errorMessage |
|---|---|---|
| Error de serialización | "Hello world" | Failed to serialize json to table row: "Hello world" |
| Error en la conversión del tipo | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
| Campo desconocido | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Trabaja con tipos de datos de BigQuery
De forma interna, el conector de E/S de Kafka convierte las cargas útiles de mensajes JSON en
objetos TableRow de Apache Beam y traduce los valores de los campos TableRow
en tipos de BigQuery.
En la siguiente tabla, se muestran las representaciones JSON de los tipos de datos de BigQuery .
| Tipo de BigQuery | Representación JSON |
|---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)"Especifica la ubicación geográfica con un texto conocido (WKT) o GeoJSON, con formato como una cadena. Para obtener más información, consulta Carga datos geoespaciales. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z"Usa el método |
Datos estructurados
Si tus mensajes JSON siguen un esquema coherente, puedes representar objetos JSON
con el
STRUCT tipo
de datos en BigQuery.
En el siguiente ejemplo, el campo answers es un objeto JSON con dos
subcampos, a y b:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
La siguiente instrucción de SQL crea una tabla de BigQuery con un esquema compatible:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
La tabla resultante se ve de la siguiente manera:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Datos semiestructurados
Si tus mensajes JSON no siguen un esquema estricto, considera almacenarlos en
BigQuery como un
JSON tipo de datos.
Cuando almacenas datos JSON como un tipo JSON, no necesitas definir el esquema por adelantado. Después de la transferencia de datos, puedes consultar los datos con los operadores de acceso a campos (notación de puntos) y acceso a arrays en GoogleSQL. Para obtener más
información, consulta
Trabaja con datos JSON en GoogleSQL.
Usa una UDF para transformar los datos
En este instructivo, se supone que los mensajes de Kafka tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON, sin que se apliquen transformaciones a los datos.
De forma opcional, puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de que se escriban en BigQuery. La UDF también puede realizar un procesamiento adicional, como filtrar, quitar información de identificación personal (PII) o enriquecer los datos con campos adicionales.
Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.