Escrito por

Senior Developer at Aquaservice
Artículo Daniel Aguilar · feb 25, 2022 6m read

Enviando mensajes a Kafka

Hola!

Llevaba un tiempo queriendo dedicarle un rato para implementar alguna DLL o algo que pudiese usar desde Caché y al final he sacado un huequillo, si te interesa poder producir mensajes que se envíen a Kafka de una manera rápida estás en el lugar adecuado ;-)

Antes de daros la chapa con lo que vamos a ver os hago un resumen para que decidáis si os puede interesar leer el artículo.

En este artículo nos vamos a centrar "solo" en la parte de producir mensajes y enviarlos a Kafka:

¿Cómo funciona?

Uso una DLL de .Net (Netframework 4.5)  que he hecho (está dentro de la carpeta dll del repositorio)

Después mediante 2 clases que he creado (puedes consultar el código fuente en el repositorio:  aqui ):

  • Kafka.Helper.cls  (Sirve para configurar e instalar todo).
  • Kafka.Producer.cls (Sirve para crear mensajes y enviarlos a Kafka).

Podemos enviar mensajes a Kafka desde nuestras clases o rutinas.

Ejemplo:

SendObjectMessage()topic, message, resSet topic="mitopic" 

    Set message=##class(%ZEN.proxyObject).%New()    Set message.nombre="Dani"    Set message.direccion="C/Falsa, 123" 

    Set res=##class(Kafka.Producer).sendObject(topic, .message) 

    Q

 

¿Te interesa? Pues vamos a verlo en detalle:

La DLL en .Net usa una implementación de Confluent para enviar mensajes a un topic de Kafka (que debemos haber generado previamente, os dejo el fichero docker-compose que he creado para las pruebas, tendréis que modificar la dirección ip para hacerla coincidir con la ip del equipo donde lo despleguéis), (El fichero docker-compose es mejorable pero para las pruebas nos va a servir):

 

 

Fichero docker-compose

version: "3"
services:
  zookeeper:
    image: zookeeper
    restart: always
    container_name: zookeeper
    hostname: zookeeper
    ports:
      - 2181:2181
    volumes:
      - /kafka/zookeeper_data:/zookeeper/data
      - /kafka/zookeeper_logs:/zookeeper/logs
      - /kafka/zookeeper_conf:/zookeeper/conf
    environment:
      ZOO_MY_ID: 1
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - /kafka/data:/kafka/data
      - /kafka/config:/kafka/config
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.16.172.10
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka_manager:
    image: hlebalbau/kafka-manager:stable
    container_name: kakfa-manager
    restart: always
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zookeeper:2181"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null

Configuración en Caché y envío de mensaje:

Para configurarlo en Caché:

Copiaremos el contenido de la carpeta dll en una carpeta del servidor donde tenemos instalado Caché (yo lo que copiado dentro de c:\kafkaaqs ).

Después importaremos el paquete Kafka y abriremos la clase Kafka.Helper.cls

Dentro de la misma configuraremos los parámetros de configuración del Gateway que ejecuta la DLL

Si no sabes configurar el Gateway consulta este spoiler:

 

Configurar Gateway

Dentro del portal entramos en Gateways de objeto

Pulsamos en Crear Nuevo Gateway y rellenamos la información (Cada uno la configuración que corresponda esto es un ejemplo):

Una vez configurado ejecutamos por terminal el metodo install:

Do ##class(Kafka.Helper).install()

 

A continuación procederemos a configurar el topic (Este topic debe estar definido previamente en Kafa)

 

Definir topic:

Para crear un topic primero necesitamos definir un Cluster que lo albergue:

Con el docker-compose arrancado: 

docker-compose up -d

Abrimos en el navegador web el KafkaManager (También se puede hacer por terminal pero si no estás familiarizado con los comandos de Kafka quizás te resulte mas sencillo así)

Rellenamos el nombre del cluster y la configuración del Zookeepr (para la configuración del docker-compose si no lo habéis tocado es esta)

Una vez guardado nos aparecerá en nuestra lista de Cluster entramos dentro haciendo click sobre el nombre y pasamos a crear nuestro Topic:

En el topic deberemos asignar: el nombre, nº de particiones, factor de replica... etc..  (Si no sabes de que estoy hablando de momento para probar puedes poner el nombre que quieras y dejar todo por defecto.

Y listo! ya tenemos nuestro Topic creado y listo para empezar a recibir los mensajes.

Creamos el topic en caché:

New topic,server,keyIdSet topic = "mitopic"Set server = "172.16.172.5:9092"Set keyId = "miKey"Do ##class(Kafka.Helper).createTopic(topic,server,keyId)

(En server debemos poner el puerto e ip configurados en el docker-compose)

Los topics creados se graban el el global ^KAFKA

La estructura del mismo es ^KAFA("TOPICS", nombreTopicX)

                                                  ^KAFA("TOPICS", nombreTopicX,"keyId") = clave Id enviado junto los mensajes

                                                  ^KAFA("TOPICS", nombreTopicX,"server") = ip kafka : puerto escucha 

Podemos generar tantos topics como queramos.

Una vez tenemos todo configurado por fin podemos empezar a enviar mensajes!!

He implementado 2 metodos para hacerlo

1 - Envía un texto en formato JSON

2 - Envía un objeto %ZEN.proxyObject

3 - No he tenido tiempo pero si os gusta implementaré envío con dynamicObjects y objetos de clases definidas también.

Opción 1 Enviar texto en formato JSON:

Set topic="mitopic"Set message="{""Valor"":""hola""}"Set res=##class(Kafka.Producer).sendMessage(topic,message)

 

Opción 2 Enviar objeto %ZEN.proxyObject

Set topic="mitopic"Set message=##class(%ZEN.proxyObject).%New()Set message.nombre="Dani"Set message.direccion="C/Falsa, 123"Set res=##class(Kafka.Producer).sendObject(topic, .message)

 

Cuando enviamos un mensaje si por algún casual falla la comunicación o no puede enviar el mensaje por cualquier motivo deja registrado el error en un subnivel del global ^KAFKA dentro del topic que ha fallado:

Lo que graba en el nivel "MESSAGE" es el mensaje en formato JSON (ya se haya pasado como objeto o como texto)

En resMessage se graba el resultado devuelto por la DLL de .Net.

Con esto ya podríamos enviar mensajes a Kafka, evidentemente habría que configurar algún subscriptor para los topics para que los mensajes que enviemos puedan ser procesados y ejecuten su función.

¿Que cuales podrían ser? Cualquiera!!, Enviar un e-mail, SMS, generar un PDF, sincronizar la información recibida con otro sistema... en fin cualquier cosa que se nos ocurra.

Personalmente me parece una herramienta muy muy potente y que nos permite desacoplar nuestros desarrollos y mantener las lógicas separadas.

Espero que os guste la idea, voy a seguir desarrollándola en mis ratos libres por lo que si tenéis cualquier sugerencia o mejora será mas que bienvenida.

Gracias por leer el artículo, y espero que os haya gustado.

Gracias por leerlo!!.