¿Que es Azure Event Hubs?

Azure Event Hubs es una plataforma de streaming de datos y un servicio de ingesta de eventos. Puede recibir y procesar millones de eventos por segundo, almacenarlos en la nube y permite leerlos y usarlos en un enorme abanico de aplicaciones.

Los siguientes escenarios son algunos de los casos donde se puede usar Event Hubs:

  • Detección de anomalías (fraude/valores atípicos)
  • Registro de aplicaciones
  • Canalizaciones de análisis, como secuencias de clics
  • Paneles en vivo
  • Archivado de datos
  • Procesamiento de transacciones
  • Procesamiento de telemetría  de usuario
  • Streaming de telemetría  de dispositivo

¿Que vamos a realizar en esta práctica?

Vamos a realizar dos aplicaciones de consola, una para enviar los mensajes al Hub, y otra para leer los mensajes del mismo.

La practica, sera un «ejemplo practico» de como un profesor, podría enviar las faltas de los alumnos en el periodo de un mes al Hub, y como al leer los mensajes, veríamos las faltas de los alumnos en dicho mes, a la vez, si un alumno ha superado las faltas permitidas en dicho mes, se mostrara un mensaje de alerta con el alumno y las faltas.

Puesta a punto

Antes de empezar, esta disponible el código del proyecto en mi dirección de GitHub, por si tienes alguna duda.

Dividiré la puesta a punto en diferentes secciones:

  • Recursos del portal de Azure
  • Aplicaciones de consola
    • Aplicación de consola para enviar mensajes
    • Aplicación de consola para leer mensajes

Recursos en el portal de Azure

Primero iremos al portal de Azure, creamos un nuevo grupo de recursos, para tener todos los recursos de esta practica en el mismo grupo. Lo llamaremos RecursosEventHub.

Resource group info

Dentro del mismo, añadiremos una cuenta de almacenamiento.

Search storage

Le daremos un nombre único, todo en minúsculas, en mi caso, yo he añadido mis iniciales al final, storageeventhubsjsl.

Storage creation

Acto seguido, en añadiremos un namespace de Event Hubs.

Search event hub

Le daremos un nombre único también, en mi caso EventHubsTajamarJSL.

Creation Event Hubs

Dentro del namespace de Event Hubs, añadiremos nuestro Event Hub. Este no necesita un nombre único. Para esta practica yo lo he nombrado centroeventotajamar.

creation eventhub

Aplicaciones de consola

Crearemos un nuevo proyecto con una aplicacion de tipo Console
Application
. En este caso le he dado el nombre de EventHubsTajamar.

Consola1

Dentro del mismo proyecto, añadiremos una nueva aplicación de tipo Console Application.

add 2 consola

Una de ellas sera la que envía los mensajes, le daremos el nombre de SendMenssagesTajamar, y la otra la que los lee, con el nombre de ReceptorMensagesTajamar. Debería quedar algo parecido a la siguiente imagen.

RENAME

Aplicación de consola para enviar mensajes

Primero configuraremos la consola para enviar mensajes. En el enlace de github, si entráis a la misma, veréis los dos archivos xml que vamos a utilizar, AlumnosTajamar.xml y Meses.xml. Los cuales añadiremos a nuestro proyecto, y modificaremos sus propiedades para que estén como recurso incrustado o Embedded Resource (si tenéis el Visual Studio en ingles como yo)

Embedded Resource

Añadiremos la referencia a System.Configuration

Reference

Y añadimos estos dos Nugets:

nugetc1

Crearemos tres clases:

Esta sera la clase Mensajes.cs

public class Mensaje
    {
        public Mensaje(Alumno alumno, Mes mes, int faltas)
        {
            Alumno = alumno;
            Mes = mes;
            Faltas = faltas;
        }

        public Alumno Alumno { get; set; }
        public Mes Mes { get; set; }
        public int Faltas { get; set; }
    }

Esta sera la clase Alumno.cs

    public class Alumno
    {
        public Alumno(int idAlumno, string nombre, string apellidos, string curso)
        {
            IdAlumno = idAlumno;
            Nombre = nombre;
            Apellidos = apellidos;
            Curso = curso;
        }

        public int IdAlumno { get; set; }
        public String Nombre { get; set; }
        public String Apellidos { get; set; }
        public String Curso { get; set; }
    }

Y esta ultima, sera la clase Mes.cs

public class Mes
    {
        public Mes(int idMes, string nombre, int faltas)
        {
            IdMes = idMes;
            Nombre = nombre;
            Faltas = faltas;
        }

        public int IdMes { get; set; }
        public String Nombre { get; set; }
        public int Faltas { get; set; }
    }

Ahora vamos a modificar Program.cs. Va a consistir de un metodo Main statico, donde enviaremos los mensajes y escribiremos por pantalla. Y de cuatro funciones: GetMeses, GetAlumnos, CrearMensajeUsuario y ConsoleSpiner.

El metodo Main sera el siguiente:

static void Main(string[] args)
        {
            //ACCEDEMOS A LA CLAVE EVENT HUB DE APPSETTINGS
            string claveeventhub = ConfigurationManager.AppSettings["EventHub"];
            //NOMBRE DEL EVENT HUB CONTENEDOR
            String eventhubname = "centroeventostajamar";
            //CREAMOS UN CLIENTE PARA LOS MENSAJES
            EventHubClient cliente = EventHubClient.CreateFromConnectionString(claveeventhub, eventhubname);
            //RECUPERAMOS LAS MESES Y LOS ALUMNOS PARA NUESTRA LOGICA
           
            List<Mes> listaMeses = GetMeses();
            List<Alumno> listaAlumnos = GetAlumnos();
            //VAMOS A REALIZAR UN BUCLE DE LOS MESES Y DE LOS ALUMNOS PARA CREAR MENSAJES
            //Y ENVIARLOS AL PROCESO DE SERVICE BUS


            //RECORREMOS LOS MESES QUE HAY LECTIVOS
            foreach (Mes mes in listaMeses)
            {
                //CAMBIAMOS EL COLOR DE LA CONSOLA PARA QUE SE VEA VERDE EL MES Y LUEGO LO RESETEAMOS PARA QUE EL RESTO DE MENSAJES SE VEAN NORMALES
                Console.ForegroundColor = ConsoleColor.DarkGreen;
                Console.WriteLine("---MENSAJES PARA EL MES: " + mes.Nombre +" MAXIMO NUMERO DE FALTAS: " + mes.Faltas);
                Console.ResetColor();
                //CREAMOS UN SPIN PARA QUE PAREZCA QUE SE ESTAN CARGANDO LOS MENSAJES
                ConsoleSpiner spin = new ConsoleSpiner();
                Console.Write("Cargando mensajes...");
                int c = 0;
                //HACEMOS QUE EL SPIN SE MUEVA CADA SEGUNDO 6 VECES
                while (c < 6)
                {
                    Thread.Sleep(1000);
                    spin.Turn();
                    c++;
                }
                Console.WriteLine();
                //RECORREMOS LOS ALUMNOS Y CREAMOS LOS MENSAJES POR CADA MES
                foreach (Alumno alumno in listaAlumnos)
                {
                    try
                    {
                        
                        //RECUPERAMOS EL MENSAJE QUE DESEAMOS ENVIAR
                        Mensaje mensaje = CrearMensajeUsuario(mes, alumno);
                        //ESCRIBIMOS MENSAJES POR PANTALLA DE LO QUE VAMOS A ENVIAR
                        //AL SERVICE BUS
                        Console.WriteLine("Fecha: "+DateTime.Now + ", Mes: "+ mensaje.Mes.Nombre +", Alumno: "
                            + mensaje.Alumno.Nombre +", con id: " + mensaje.Alumno.IdAlumno + " FALTAS: --> " + mensaje.Faltas);
                        //CONVERTIMOS LA INFORMACION QUE VAMOS A ENVIAR EN UN STING JSON
                        String json = JsonConvert.SerializeObject(mensaje);
                        //ENVIAMOS EL MENSAJE A NUESTRO EVENTO HUB
                        //EL SERVICIO RECIBE LA INFORMACION EN STREAM O BYTES
                        //VAMOS A TRANSFORMAR EL MENSAJE EN BYTE[]
                        cliente.Send(new EventData(Encoding.UTF8.GetBytes(json)));
                        
                        
                    }
                    catch (Exception ex)
                    {
                        throw ex;
                    }
                }
                Console.ForegroundColor = ConsoleColor.DarkRed;
                Console.WriteLine("---FIN MENSAJES DEL MES: " + mes.Nombre);
                Console.ResetColor();
                Console.WriteLine();
            }

        }

La funcion GetMeses:

public static List<Mes> GetMeses()
        {
            String recurso = "EventHubsTajamar.Meses.xml";
            Stream stream = Assembly.GetExecutingAssembly().GetManifestResourceStream(recurso);
            XDocument docxml = XDocument.Load(stream);
            var consulta = from datos in docxml.Descendants("mes")
                           select new Mes( int.Parse(datos.Element("idmes").Value),
                               datos.Element("nombre").Value,
                               int.Parse(datos.Element("faltas").Value));
                               
            return consulta.ToList();
        }

La funcion GetAlumnos:

public static List<Alumno> GetAlumnos()
        {
            String recurso = "EventHubsTajamar.AlumnosTajamar.xml";
            Stream stream = Assembly.GetExecutingAssembly().GetManifestResourceStream(recurso);
            XDocument docxml = XDocument.Load(stream);
            var consulta = from datos in docxml.Descendants("alumno")
                           select new Alumno(
                               int.Parse(datos.Element("idalumno").Value),
                               datos.Element("nombre").Value,
                               datos.Element("apellidos").Value,
                               datos.Element("curso").Value);
            return consulta.ToList();
        }

La funcion CrearMensajeUsuario:

public static Mensaje CrearMensajeUsuario(Mes mes, Alumno alumno )
         {
             Random rndfalta = new Random();
             int faltas = rndfalta.Next(0, 5);
             Mensaje mensaje = new Mensaje(alumno, mes, faltas);
             return mensaje;
         }

Y finalmente el ConsoleSpiner:

public class ConsoleSpiner
         {
             int counter;
             public ConsoleSpiner()
             {
                 counter = 0;
             }
             public void Turn()
             {
                 counter++;
                 switch (counter % 4)
                 {
                     case 0: Console.Write("/"); break;
                     case 1: Console.Write("-"); break;
                     case 2: Console.Write("\"); break;
                     case 3: Console.Write("|"); break;
                 }
                 Console.SetCursorPosition(Console.CursorLeft - 1, Console.CursorTop);
             }
         }

Aparte, tendremos que copiar las claves de acceso del event hub que muestro a continuación:

keys

Y añadirlas en el App.config, de esta manera:

    <appSettings>
        <!-- Service Bus specific app setings for messaging connections -->
      <add key="eventhub" value="Endpoint=sb://eventhubtajamarjsl.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=iOWjqbICJGB/F3deo9Rp+Su03phi/QONZishNqpYRf8=" />
    </appSettings>

Las podemos añadir antes del ultimo cierre de configuracion <\configuration>

Aplicación de consola para leer mensajes

Para poder leer los mensajes con nuestra consola, necesitaremos las clases mensaje.cs, alumno.cs y mes.cs que usamos para la consola que envía mensajes. Ademas, tendemos una clase llamada ProcesadoMensajes.cs con el siguiente código:

public class ProcesadorMensajes : IEventProcessor
    {
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Proceso apagando. Particion "
                + context.Lease.PartitionId
                + ", Razon: " + reason + ".");
            return Task.CompletedTask;
        }

        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"Proceso abriendo. Particion "
                + context.Lease.PartitionId);
            return Task.CompletedTask;
        }

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            //RECORREMOS TODOS LOS MENSAJES
            foreach (EventData mensaje in messages)
            {
                //RECUPERAMOS LOS DATOS DEL MENSAJE
                String json = Encoding.UTF8.GetString(mensaje.GetBytes());
                //DESERIALIZAMOS EN MENSAJE QUE HABIAMOS SERIALIZADO A UN STRING JSON
                //PARA USARLO COMO OBJETO
                Mensaje newmesaje = JsonConvert.DeserializeObject<Mensaje>(json);
                String datos = ""; 
                //COMPROBAMOS QUE ALUMNOS SE HAN PASADO DEL MAXIMO DE FALTAS PERMITIDAS POR MES
                //Y SI ES ASI MOSTRAMOS UN MENSAJE EN ROJO EN LA CONSOLA CON EL ALUMNO
                if (newmesaje.Faltas > newmesaje.Mes.Faltas)
                {
                    datos = "El alumno: " + newmesaje.Alumno.Nombre + ", con id: " + newmesaje.Alumno.IdAlumno + " ha sobrepasa el numero de faltas permitidas en el mes: " + newmesaje.Mes.Nombre + ", con un total de faltas de: " + newmesaje.Faltas + ". Avisar a Juan.";
                    String msj = string.Format("Mensaje recibido. " + datos);
                    Console.ForegroundColor = ConsoleColor.DarkRed;
                    Console.WriteLine(msj);
                    Console.ResetColor();
                }
                //SI NO, MOSTRAMOS LOS ALUMNOS Y LAS FALTAS QUE HA TENIDO ESE MES
                else
                {
                    datos = "El alumno: " + newmesaje.Alumno.Nombre + ", con id: " + newmesaje.Alumno.IdAlumno + " ha faltado en el mes: " + newmesaje.Mes.Nombre + ", " + newmesaje.Faltas + " veces.";
                    String msj = string.Format("Mensaje recibido. "+ datos);
                    Console.WriteLine(msj);
                }
                
            }
            return context.CheckpointAsync();
        }
    }

Y modificaremos la clase Program.cs para crear el procesado de mensajes con el siguiente código:

class Program
    {
        private static async Task ProcesarMensajesEventHub(string[] args)
        {
            //RECUPERAMOS LA CADENA EVENTHUB
            //Y STORAGE DEL CONFIG
            String cadenaeventhub =
                ConfigurationManager.AppSettings["eventhub"];
            String cadenastorage =
                ConfigurationManager.AppSettings["cuentastorage"];
            Console.WriteLine("Comenzando el procesado de mensajes");
            //CREAMOS EL PROCESADOR DE MENSAJES
            EventProcessorHost procesadormensajes =
                 new EventProcessorHost(
                     "PROCESADOR CONSOLA",
                     "centroalumnostajamar",
                     EventHubConsumerGroup.DefaultGroupName,
                     cadenaeventhub, cadenastorage);
            //CREAMOS OPCIONES PARA EL PROCESO
            EventProcessorOptions opcionesproceso = new EventProcessorOptions()
            {
                MaxBatchSize = 100,
                PrefetchCount = 1,
                ReceiveTimeOut = TimeSpan.FromSeconds(20)
            };

            //REGISTRAMOS EL PROCESO CON LA CLASE QUE
            //ADMINISTRARA LA RECEPCION DE MENSAJES
            await procesadormensajes.RegisterEventProcessorAsync<ProcesadorMensajes>();

            Console.WriteLine("Recibiendo. Pulse ENTER cuando quiera finalizar.");
            Console.ReadLine();

            //FINALIZAMOS LA LECTURA DE MENSAJES
            await procesadormensajes.UnregisterEventProcessorAsync();
        }

        static void Main(string[] args)
        {
            //REALIZAMOS LA LLAMADA PARA PROCESAR
            //LOS MENSAJES
            ProcesarMensajesEventHub(args).GetAwaiter().GetResult();
        }
    }

Haremos lo mismo en el App.cofig que hicimos para el envió de mensajes, solo que añadiremos ademas las claves de acceso a nuestro storage, copiaremos de nuestro portal de azure la cadena de conexión:

storagekeys

Y lo añadiremos al App.cofig de esta manera:

<appSettings>
        <!-- Service Bus specific app setings for messaging connections -->
      <add key="eventhub" value="Endpoint=sb://eventhubpracticasjsl.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=hnrCvAKZb/5QRxi6TXRw9LZbbuN2Ayua4ne+qJG6QhA=" />
      <add key="cuentastorage" value="Endpoint=sb://eventhubtajamarjsl.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=iOWjqbICJGB/F3deo9Rp+Su03phi/QONZishNqpYRf8=" />
    </appSettings>

Ademas, añadiremos la referencia como hicimos en la otra consola y los siguientes Nugets:

nugetc1 - Copy

Envió y visualización de los mensajes

Primero, ejecutaremos en una nueva instancia la aplicación de recibir mensajes.

debug new instance

Se nos abrirá una consola como esta:

recibir mensajes vacio

Su función sera mostrarnos los mensajes según los vaya recibiendo de la otra consola, y nos ira mostrando las faltas que ha tenido cada usuario, y en el caso de que hubiera un usuario que haya faltado mas de las permitidas, mostraría un mensaje en rojo, y un aviso. Pero hasta que no iniciemos la otra consola por primera vez, no veremos nada.

También, si hubiera mensajes enviados, los visualizaría primero, y después iría cargando los nuevos mensajes.

Ahora, iniciaremos la consola para enviar mensajes. Iremos viendo, como nos muestra el mes en el que se envían las faltas, el numero de faltas permitidas y los alumnos. Aparte, con el spiner, daría la sensación de que estaría cargando los mensajes y enviándolos de una forma mas atractiva.

envimo mensajes

Y para finalizar, veríamos los mensajes en la consola para recibir mensajes, según van siendo recibidos.

mensajes recibidos

Eso seria todo. Un saludo, y muchas gracias por llegar hasta el final 🙂

Autor/a: Javier Sánchez de Lago

Curso: Microsoft MCSA Web Applications + Microsoft MCSD App Builder + Xamarin

Centro: Tajamar

Año académico: 2018-2019

Código / recursos utilizados / Otros datos de interés: GitHub

Linkedin: Enlace

Leave a Comment

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Este sitio usa Akismet para reducir el spam. Aprende cómo se procesan los datos de tus comentarios.