Azure Event Hubs: ejemplo práctico.
¿Que es Azure Event Hubs?
Azure Event Hubs es una plataforma de streaming de datos y un servicio de ingesta de eventos. Puede recibir y procesar millones de eventos por segundo, almacenarlos en la nube y permite leerlos y usarlos en un enorme abanico de aplicaciones.
Los siguientes escenarios son algunos de los casos donde se puede usar Event Hubs:
- Detección de anomalías (fraude/valores atípicos)
- Registro de aplicaciones
- Canalizaciones de análisis, como secuencias de clics
- Paneles en vivo
- Archivado de datos
- Procesamiento de transacciones
- Procesamiento de telemetría de usuario
- Streaming de telemetría de dispositivo
¿Que vamos a realizar en esta práctica?
Vamos a realizar dos aplicaciones de consola, una para enviar los mensajes al Hub, y otra para leer los mensajes del mismo.
La practica, sera un «ejemplo practico» de como un profesor, podría enviar las faltas de los alumnos en el periodo de un mes al Hub, y como al leer los mensajes, veríamos las faltas de los alumnos en dicho mes, a la vez, si un alumno ha superado las faltas permitidas en dicho mes, se mostrara un mensaje de alerta con el alumno y las faltas.
Puesta a punto
Antes de empezar, esta disponible el código del proyecto en mi dirección de GitHub, por si tienes alguna duda.
Dividiré la puesta a punto en diferentes secciones:
- Recursos del portal de Azure
- Aplicaciones de consola
- Aplicación de consola para enviar mensajes
- Aplicación de consola para leer mensajes
Recursos en el portal de Azure
Primero iremos al portal de Azure, creamos un nuevo grupo de recursos, para tener todos los recursos de esta practica en el mismo grupo. Lo llamaremos RecursosEventHub.
Dentro del mismo, añadiremos una cuenta de almacenamiento.
Le daremos un nombre único, todo en minúsculas, en mi caso, yo he añadido mis iniciales al final, storageeventhubsjsl.
Acto seguido, en añadiremos un namespace de Event Hubs.
Le daremos un nombre único también, en mi caso EventHubsTajamarJSL.
Dentro del namespace de Event Hubs, añadiremos nuestro Event Hub. Este no necesita un nombre único. Para esta practica yo lo he nombrado centroeventotajamar.
Aplicaciones de consola
Crearemos un nuevo proyecto con una aplicacion de tipo Console
Application. En este caso le he dado el nombre de EventHubsTajamar.
Dentro del mismo proyecto, añadiremos una nueva aplicación de tipo Console Application.
Una de ellas sera la que envía los mensajes, le daremos el nombre de SendMenssagesTajamar, y la otra la que los lee, con el nombre de ReceptorMensagesTajamar. Debería quedar algo parecido a la siguiente imagen.
Aplicación de consola para enviar mensajes
Primero configuraremos la consola para enviar mensajes. En el enlace de github, si entráis a la misma, veréis los dos archivos xml que vamos a utilizar, AlumnosTajamar.xml y Meses.xml. Los cuales añadiremos a nuestro proyecto, y modificaremos sus propiedades para que estén como recurso incrustado o Embedded Resource (si tenéis el Visual Studio en ingles como yo)
Añadiremos la referencia a System.Configuration
Y añadimos estos dos Nugets:
Crearemos tres clases:
Esta sera la clase Mensajes.cs
public class Mensaje
{
public Mensaje(Alumno alumno, Mes mes, int faltas)
{
Alumno = alumno;
Mes = mes;
Faltas = faltas;
}
public Alumno Alumno { get; set; }
public Mes Mes { get; set; }
public int Faltas { get; set; }
}
Esta sera la clase Alumno.cs
public class Alumno
{
public Alumno(int idAlumno, string nombre, string apellidos, string curso)
{
IdAlumno = idAlumno;
Nombre = nombre;
Apellidos = apellidos;
Curso = curso;
}
public int IdAlumno { get; set; }
public String Nombre { get; set; }
public String Apellidos { get; set; }
public String Curso { get; set; }
}
Y esta ultima, sera la clase Mes.cs
public class Mes
{
public Mes(int idMes, string nombre, int faltas)
{
IdMes = idMes;
Nombre = nombre;
Faltas = faltas;
}
public int IdMes { get; set; }
public String Nombre { get; set; }
public int Faltas { get; set; }
}
Ahora vamos a modificar Program.cs. Va a consistir de un metodo Main statico, donde enviaremos los mensajes y escribiremos por pantalla. Y de cuatro funciones: GetMeses, GetAlumnos, CrearMensajeUsuario y ConsoleSpiner.
El metodo Main sera el siguiente:
static void Main(string[] args)
{
//ACCEDEMOS A LA CLAVE EVENT HUB DE APPSETTINGS
string claveeventhub = ConfigurationManager.AppSettings["EventHub"];
//NOMBRE DEL EVENT HUB CONTENEDOR
String eventhubname = "centroeventostajamar";
//CREAMOS UN CLIENTE PARA LOS MENSAJES
EventHubClient cliente = EventHubClient.CreateFromConnectionString(claveeventhub, eventhubname);
//RECUPERAMOS LAS MESES Y LOS ALUMNOS PARA NUESTRA LOGICA
List<Mes> listaMeses = GetMeses();
List<Alumno> listaAlumnos = GetAlumnos();
//VAMOS A REALIZAR UN BUCLE DE LOS MESES Y DE LOS ALUMNOS PARA CREAR MENSAJES
//Y ENVIARLOS AL PROCESO DE SERVICE BUS
//RECORREMOS LOS MESES QUE HAY LECTIVOS
foreach (Mes mes in listaMeses)
{
//CAMBIAMOS EL COLOR DE LA CONSOLA PARA QUE SE VEA VERDE EL MES Y LUEGO LO RESETEAMOS PARA QUE EL RESTO DE MENSAJES SE VEAN NORMALES
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.WriteLine("---MENSAJES PARA EL MES: " + mes.Nombre +" MAXIMO NUMERO DE FALTAS: " + mes.Faltas);
Console.ResetColor();
//CREAMOS UN SPIN PARA QUE PAREZCA QUE SE ESTAN CARGANDO LOS MENSAJES
ConsoleSpiner spin = new ConsoleSpiner();
Console.Write("Cargando mensajes...");
int c = 0;
//HACEMOS QUE EL SPIN SE MUEVA CADA SEGUNDO 6 VECES
while (c < 6)
{
Thread.Sleep(1000);
spin.Turn();
c++;
}
Console.WriteLine();
//RECORREMOS LOS ALUMNOS Y CREAMOS LOS MENSAJES POR CADA MES
foreach (Alumno alumno in listaAlumnos)
{
try
{
//RECUPERAMOS EL MENSAJE QUE DESEAMOS ENVIAR
Mensaje mensaje = CrearMensajeUsuario(mes, alumno);
//ESCRIBIMOS MENSAJES POR PANTALLA DE LO QUE VAMOS A ENVIAR
//AL SERVICE BUS
Console.WriteLine("Fecha: "+DateTime.Now + ", Mes: "+ mensaje.Mes.Nombre +", Alumno: "
+ mensaje.Alumno.Nombre +", con id: " + mensaje.Alumno.IdAlumno + " FALTAS: --> " + mensaje.Faltas);
//CONVERTIMOS LA INFORMACION QUE VAMOS A ENVIAR EN UN STING JSON
String json = JsonConvert.SerializeObject(mensaje);
//ENVIAMOS EL MENSAJE A NUESTRO EVENTO HUB
//EL SERVICIO RECIBE LA INFORMACION EN STREAM O BYTES
//VAMOS A TRANSFORMAR EL MENSAJE EN BYTE[]
cliente.Send(new EventData(Encoding.UTF8.GetBytes(json)));
}
catch (Exception ex)
{
throw ex;
}
}
Console.ForegroundColor = ConsoleColor.DarkRed;
Console.WriteLine("---FIN MENSAJES DEL MES: " + mes.Nombre);
Console.ResetColor();
Console.WriteLine();
}
}
La funcion GetMeses:
public static List<Mes> GetMeses()
{
String recurso = "EventHubsTajamar.Meses.xml";
Stream stream = Assembly.GetExecutingAssembly().GetManifestResourceStream(recurso);
XDocument docxml = XDocument.Load(stream);
var consulta = from datos in docxml.Descendants("mes")
select new Mes( int.Parse(datos.Element("idmes").Value),
datos.Element("nombre").Value,
int.Parse(datos.Element("faltas").Value));
return consulta.ToList();
}
La funcion GetAlumnos:
public static List<Alumno> GetAlumnos()
{
String recurso = "EventHubsTajamar.AlumnosTajamar.xml";
Stream stream = Assembly.GetExecutingAssembly().GetManifestResourceStream(recurso);
XDocument docxml = XDocument.Load(stream);
var consulta = from datos in docxml.Descendants("alumno")
select new Alumno(
int.Parse(datos.Element("idalumno").Value),
datos.Element("nombre").Value,
datos.Element("apellidos").Value,
datos.Element("curso").Value);
return consulta.ToList();
}
La funcion CrearMensajeUsuario:
public static Mensaje CrearMensajeUsuario(Mes mes, Alumno alumno )
{
Random rndfalta = new Random();
int faltas = rndfalta.Next(0, 5);
Mensaje mensaje = new Mensaje(alumno, mes, faltas);
return mensaje;
}
Y finalmente el ConsoleSpiner:
public class ConsoleSpiner
{
int counter;
public ConsoleSpiner()
{
counter = 0;
}
public void Turn()
{
counter++;
switch (counter % 4)
{
case 0: Console.Write("/"); break;
case 1: Console.Write("-"); break;
case 2: Console.Write("\"); break;
case 3: Console.Write("|"); break;
}
Console.SetCursorPosition(Console.CursorLeft - 1, Console.CursorTop);
}
}
Aparte, tendremos que copiar las claves de acceso del event hub que muestro a continuación:
Y añadirlas en el App.config, de esta manera:
<appSettings>
<!-- Service Bus specific app setings for messaging connections -->
<add key="eventhub" value="Endpoint=sb://eventhubtajamarjsl.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=iOWjqbICJGB/F3deo9Rp+Su03phi/QONZishNqpYRf8=" />
</appSettings>
Las podemos añadir antes del ultimo cierre de configuracion <\configuration>
Aplicación de consola para leer mensajes
Para poder leer los mensajes con nuestra consola, necesitaremos las clases mensaje.cs, alumno.cs y mes.cs que usamos para la consola que envía mensajes. Ademas, tendemos una clase llamada ProcesadoMensajes.cs con el siguiente código:
public class ProcesadorMensajes : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Proceso apagando. Particion "
+ context.Lease.PartitionId
+ ", Razon: " + reason + ".");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"Proceso abriendo. Particion "
+ context.Lease.PartitionId);
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
//RECORREMOS TODOS LOS MENSAJES
foreach (EventData mensaje in messages)
{
//RECUPERAMOS LOS DATOS DEL MENSAJE
String json = Encoding.UTF8.GetString(mensaje.GetBytes());
//DESERIALIZAMOS EN MENSAJE QUE HABIAMOS SERIALIZADO A UN STRING JSON
//PARA USARLO COMO OBJETO
Mensaje newmesaje = JsonConvert.DeserializeObject<Mensaje>(json);
String datos = "";
//COMPROBAMOS QUE ALUMNOS SE HAN PASADO DEL MAXIMO DE FALTAS PERMITIDAS POR MES
//Y SI ES ASI MOSTRAMOS UN MENSAJE EN ROJO EN LA CONSOLA CON EL ALUMNO
if (newmesaje.Faltas > newmesaje.Mes.Faltas)
{
datos = "El alumno: " + newmesaje.Alumno.Nombre + ", con id: " + newmesaje.Alumno.IdAlumno + " ha sobrepasa el numero de faltas permitidas en el mes: " + newmesaje.Mes.Nombre + ", con un total de faltas de: " + newmesaje.Faltas + ". Avisar a Juan.";
String msj = string.Format("Mensaje recibido. " + datos);
Console.ForegroundColor = ConsoleColor.DarkRed;
Console.WriteLine(msj);
Console.ResetColor();
}
//SI NO, MOSTRAMOS LOS ALUMNOS Y LAS FALTAS QUE HA TENIDO ESE MES
else
{
datos = "El alumno: " + newmesaje.Alumno.Nombre + ", con id: " + newmesaje.Alumno.IdAlumno + " ha faltado en el mes: " + newmesaje.Mes.Nombre + ", " + newmesaje.Faltas + " veces.";
String msj = string.Format("Mensaje recibido. "+ datos);
Console.WriteLine(msj);
}
}
return context.CheckpointAsync();
}
}
Y modificaremos la clase Program.cs para crear el procesado de mensajes con el siguiente código:
class Program
{
private static async Task ProcesarMensajesEventHub(string[] args)
{
//RECUPERAMOS LA CADENA EVENTHUB
//Y STORAGE DEL CONFIG
String cadenaeventhub =
ConfigurationManager.AppSettings["eventhub"];
String cadenastorage =
ConfigurationManager.AppSettings["cuentastorage"];
Console.WriteLine("Comenzando el procesado de mensajes");
//CREAMOS EL PROCESADOR DE MENSAJES
EventProcessorHost procesadormensajes =
new EventProcessorHost(
"PROCESADOR CONSOLA",
"centroalumnostajamar",
EventHubConsumerGroup.DefaultGroupName,
cadenaeventhub, cadenastorage);
//CREAMOS OPCIONES PARA EL PROCESO
EventProcessorOptions opcionesproceso = new EventProcessorOptions()
{
MaxBatchSize = 100,
PrefetchCount = 1,
ReceiveTimeOut = TimeSpan.FromSeconds(20)
};
//REGISTRAMOS EL PROCESO CON LA CLASE QUE
//ADMINISTRARA LA RECEPCION DE MENSAJES
await procesadormensajes.RegisterEventProcessorAsync<ProcesadorMensajes>();
Console.WriteLine("Recibiendo. Pulse ENTER cuando quiera finalizar.");
Console.ReadLine();
//FINALIZAMOS LA LECTURA DE MENSAJES
await procesadormensajes.UnregisterEventProcessorAsync();
}
static void Main(string[] args)
{
//REALIZAMOS LA LLAMADA PARA PROCESAR
//LOS MENSAJES
ProcesarMensajesEventHub(args).GetAwaiter().GetResult();
}
}
Haremos lo mismo en el App.cofig que hicimos para el envió de mensajes, solo que añadiremos ademas las claves de acceso a nuestro storage, copiaremos de nuestro portal de azure la cadena de conexión:
Y lo añadiremos al App.cofig de esta manera:
<appSettings>
<!-- Service Bus specific app setings for messaging connections -->
<add key="eventhub" value="Endpoint=sb://eventhubpracticasjsl.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=hnrCvAKZb/5QRxi6TXRw9LZbbuN2Ayua4ne+qJG6QhA=" />
<add key="cuentastorage" value="Endpoint=sb://eventhubtajamarjsl.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=iOWjqbICJGB/F3deo9Rp+Su03phi/QONZishNqpYRf8=" />
</appSettings>
Ademas, añadiremos la referencia como hicimos en la otra consola y los siguientes Nugets:
Envió y visualización de los mensajes
Primero, ejecutaremos en una nueva instancia la aplicación de recibir mensajes.
Se nos abrirá una consola como esta:
Su función sera mostrarnos los mensajes según los vaya recibiendo de la otra consola, y nos ira mostrando las faltas que ha tenido cada usuario, y en el caso de que hubiera un usuario que haya faltado mas de las permitidas, mostraría un mensaje en rojo, y un aviso. Pero hasta que no iniciemos la otra consola por primera vez, no veremos nada.
También, si hubiera mensajes enviados, los visualizaría primero, y después iría cargando los nuevos mensajes.
Ahora, iniciaremos la consola para enviar mensajes. Iremos viendo, como nos muestra el mes en el que se envían las faltas, el numero de faltas permitidas y los alumnos. Aparte, con el spiner, daría la sensación de que estaría cargando los mensajes y enviándolos de una forma mas atractiva.
Y para finalizar, veríamos los mensajes en la consola para recibir mensajes, según van siendo recibidos.
Eso seria todo. Un saludo, y muchas gracias por llegar hasta el final 🙂
Autor/a: Javier Sánchez de Lago
Curso: Microsoft MCSA Web Applications + Microsoft MCSD App Builder + Xamarin
Centro: Tajamar
Año académico: 2018-2019
Código / recursos utilizados / Otros datos de interés: GitHub
Linkedin: Enlace