Olá! Antes de iniciar a última parte desta série, uma notícia boa: a White Fox se tornou parceiro Microsoft Silver em Application Development e Gold em Devices and Deployment. Além do reconhecimento, a parceria com a Microsoft é importante para dar uma visibilidade maior para a White Fox e nos permitir acesso a muito mais recursos para nosso processo de desenvolvimento. Obrigado a todos que nos ajudaram durante a certificação! Em 2016 a White Fox deve focar mais no Microsoft Azure, sempre buscando trazer o melhor da tecnologia para nossos clientes com a melhor relação custo/benefício.
No último post, mostramos a estrutura montada no SQL Service Broker para suportar nossa arquitetura de Microservices. Nesta última parte vamos mostrar as estruturas em .NET que suportam o Thin Client (cliente) e os executores. Como estamos montando uma arquitetura expansível, criamos então uma classe estática, utilizando um container Microsoft Unity, que chamamos de ServiceBus, para registrar cada cliente e executor. O código dela pode ser visto abaixo.
public static class ServiceBus { private static IUnityContainer iocCcontainer; private static bool isInitialized; public static void InitializeSql(string connectionStringName) { isInitialized = true; iocCcontainer = new UnityContainer(); iocCcontainer.RegisterInstance(typeof(IStorage), new SqlStorage(connectionStringName)); } public static T GetService<T>() where T: class, IService { if (!isInitialized) throw new ServiceBusNotInitializedException(); return iocCcontainer.Resolve<T>(); } public static void RegisterService<T>(Type type) where T : class, IService { if (!isInitialized) throw new ServiceBusNotInitializedException(); iocCcontainer.RegisterType(typeof(T), type); } public static void RegisterComponent<T>(T instance) where T : class, IComponent { if (!isInitialized) throw new ServiceBusNotInitializedException(); iocCcontainer.RegisterInstance(typeof(T), instance); } }
As interfaces IService são os serviços de mensageria, tanto do cliente quanto do executor. As interfaces de IComponent são para classes que são responsáveis por implementar as regras de negócio do executor. Abaixo um exemplo de uso do ServiceBus, onde o ResultMessagesReceiver (que implmenta IReceiver) é o responsável por processar as mensagens de retorno recebidas e o ClienteService (que implementa IClientService) é o Thin Client.
ServiceBus.InitializeSql("ServiceBroker"); ServiceBus.RegisterComponent<IReceiver>(new ResultMessagesReceiver()); ServiceBus.RegisterService<IClientService>(typeof(ClientService));
Criamos também um ServiceBase, que é usado por executores e clientes para implementar as rotinas principais de processamento de mensagens. O código está abaixo. É praticamente um loop onde as mensagens são lidas do SQL Server, dentro de uma transação, e processadas. Em caso de qualquer problema, a transação pode ser desfeita e a mensagem permanece lá. É importante notar que caso aconteçam muitos rollbacks em sequência, o SQL Server desativa a Queue (isto nos causou alguns problemas de debug!). Neste caso, a Queue deve ser reativada antes que outras mensagens possam ser lidas.
protected void ProcessMessages(int maxMessages) { var counter = 1; do { if (++counter > maxMessages) break; storage.BeginTransaction(); try { var message = storage.ReadMessage(endPoint); if (message == null) { storage.Commit(); break; } if (string.IsNullOrEmpty(message.Handle)) { storage.Commit(); break; } if (initiator && message.MessageType == EndMessageType) { storage.EndConversation(message.Handle); storage.Commit(); continue; } if (message.MessageType == ErrorMessageType) { // todo: logar o erro storage.EndConversation(message.Handle); storage.Commit(); continue; } message.Date = DateTime.Now; if (!ProcessMessage(message)) { storage.Rollback(); break; } if (!initiator) storage.EndConversation(message.Handle); storage.Commit(); } catch (Exception) { storage.Rollback(); throw; } } while (true); }
Abaixo está a parte relevante do código que implementa o storage do SQL Server. Optamos por simplesmente encapsular chamadas a stored procedures que ficam no banco de dados utilizado para as filas de mensagens. A seguir estão também as 3 procedures de leitura, envio e final de conversação.
public string SendMessage(EndPointConfiguration endPoint, string data, string conversationHandle = null) { var parameters = new Parameters() .Add("initiatorService", endPoint.Initiator).Add("targetService", endPoint.Target) .Add("contract", endPoint.Contract).Add("messageType", endPoint.MessageType) .Add("data", data); if (!string.IsNullOrEmpty(conversationHandle)) parameters.Add("handle", conversationHandle); StoredProcedureFacility.ExecuteScalar<string>(connectionStringName, "SendMessage", parameters) } public Message ReadMessage(EndPointConfiguration endPoint) { var root = StoredProcedureFacility.GetXml(connectionStringName, "ReadMessage", Parameter.Create("queueName", endPoint.QueueName)); return new Deserializer<Message>(root) .Property(m => m.MessageType, "mt") .Property(m => m.Contents, "data") .Property(m => m.Date, "dt") .Property(m => m.Handle, "ch") .Instance(); } public void EndConversation(string handle) { StoredProcedureFacility.ExecuteNoResults(connectionStringName, "EndConversation", Parameter.Create("handle", handle)); }
Stored procedures:
CREATE PROCEDURE [dbo].[SendMessage] ( @initiatorService sysname, @targetService varchar(255), @contract varchar(255), @messageType varchar(255), @data varchar(MAX) = NULL, @handle varchar(255) = null ) AS BEGIN if @handle is null begin declare @id uniqueidentifier BEGIN DIALOG CONVERSATION @id FROM SERVICE @initiatorService TO SERVICE @targetService ON CONTRACT @contract WITH ENCRYPTION = OFF, LIFETIME = 7200; set @handle = cast(@id as varchar(255)); end; send on conversation @handle message type @messageType (@data) select @handle; END GO CREATE procedure [dbo].[ReadMessage](@queueName varchar(255)) as begin declare @ch varchar(255) declare @mt varchar(255) declare @data varchar(max); declare @dt DateTime; set nocount on declare @Sql nvarchar(max) = N'RECEIVE TOP(1) @h = conversation_handle, @messageTypeName = message_type_name, @Packet = message_body, @date = message_enqueue_time FROM ' + @queueName + ';' EXECUTE sp_executesql @Sql, N'@h UNIQUEIDENTIFIER OUTPUT, @messageTypeName varchar(255) OUTPUT, @Packet VARCHAR(max) OUTPUT, @date datetime OUTPUT' ,@h = @ch OUTPUT ,@messageTypeName = @mt OUTPUT ,@Packet = @data OUTPUT ,@date = @dt output; select * from (select @ch ch, @mt mt, @data [data], @dt [dt]) M for xml auto end GO CREATE procedure [dbo].[EndConversation](@handle varchar(255)) as begin ;end conversation @handle; end
Finalmente, a seguir está o exemplo de um Thin Client, com um envio de mensagem e a rotina que faz o processamento das mensagens de retorno. Para cada tipo de ação criada, deve haver um tratamento específico. O construtor recebe, por injection (o Unity cuida disto) o storage e a classe que irá tratar as regras de negócio.
public class ClientService: ServiceBase, IClientService { private readonly IReceiver receiver; public ClientService(IStorage storage, IReceiver receiver) : base(storage, true, Configuration.Requester.Service, Configuration.Executer.Service, Configuration.Contract, Configuration.Requester.Message, Configuration.Requester.Queue) { this.receiver = receiver; } public void RequestAction1() { SendMessage(new MessageContent { Action = Actions.Action1}); } public override bool ProcessMessage(Message message) { var ser = new DataContractJsonSerializer(typeof (MessageContent)); MessageContent content; using (var ms = new MemoryStream(Encoding.UTF8.GetBytes(message.Contents))) { content = (MessageContent) ser.ReadObject(ms); } if (content == null) return true; try { switch (content.Action) { case Actions.Action1: return receiver.ReceiveMessage(content, message.Date); default: return receiver.Error(content.Action, ErrorCodes.NotImplemented, null); } } catch (Exception ex) { return receiver.Error(content.Action, ErrorCodes.Exception, ex); } }
O executor não é diferente, como pode ser visto abaixo. Ele somente recebe a mensagem, faz algum tipo de ação de negócio específica e retorna os dados para o solicitante. Claro que isto é um código simplificado, já que em um código de produção é importante tratar todos os tipos de erros possíveis, evitando rollbacks da fila.
public override bool ProcessMessage(Message message) { var ser = new DataContractJsonSerializer(typeof(MessageContent)); MessageContent message; using (var ms = new MemoryStream(Encoding.UTF8.GetBytes(message.Contents))) { message = (MessageContent)ser.ReadObject(ms); } if (message == null) return true; var returnMessage = new MessageContent {Action = message.Action}; try { switch (message.Action) { case Action.Action1: returnMessage.Data = MyBusinessRulesManager.ExecuteSomething(message).ToString(); break; } } catch (Exception ex) { returnMessage.Error = ex.Message; } SendMessage(returnMessage, message.Handle); return true; }
Bom, espero ter conseguido passar uma visão geral da arquitetura de Microservices que estamos utilizando. Claro que temos muitos outros cenários que fazem com que a complexidade desta arquitetura seja bem maior. Por exemplo, temos situações onde temos mensagens geradas em horários específicos ou que só podem ser executadas em horas úteis. Isto faz com que o Executor tenha toda uma lógica para armazenar as mensagens com agendamento de execução específica em outras filas. Mas tudo isto é feito com o fundamento que mostrei nesta série. Como sempre, fiquem à vontade para entrar em contato para tirar dúvidas ou conversar mais sobre este assunto. Até a próxima!