这是一个粗略的例子 ICommunicationListener ZeroMQ的实施。此实现将充当ZeroMQ ResponseSocket ,但可以很容易地改为 RequestSocket , SubscriberSocket 或任何一种 NetMQ.Sockets.* 你喜欢的socket实现。当然,在实现中需要更多的细节,比如在检索消息时不会抛出异常,但是应该清楚地看到它是如何完成的。它的灵感来自现有的dotnetcore实现 ICommunicationListener 接口。
ICommunicationListener
ResponseSocket
RequestSocket
SubscriberSocket
NetMQ.Sockets.*
public class ZeroMqResponseSocketCommunicationListener : ICommunicationListener, IDisposable { private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource(); private readonly ResponseSocket _responseSocket = new ResponseSocket(); private readonly ServiceContext _serviceContext; private readonly string _endpointName; public ZeroMqResponseSocketCommunicationListener(ServiceContext serviceContext, string endpointName) { if (string.IsNullOrEmpty(endpointName)) throw new ArgumentException("endpointName cannot be null or empty string."); _serviceContext = serviceContext; _endpointName = endpointName; } public Task<string> OpenAsync(CancellationToken cancellationToken) { var address = GetListenerUrl(); if (address == null) throw new InvalidOperationException("No Url returned from ZeroMqResponseSocketCommunicationListener.GetListenerUrl"); _responseSocket.Bind(address); ThreadPool.QueueUserWorkItem(state => MessageHandler(_cancellationToken.Token)); return Task.FromResult(address); } public Task CloseAsync(CancellationToken cancellationToken) { _responseSocket.Close(); return Task.FromResult(true); } public void Abort() { _responseSocket.Close(); } private void MessageHandler(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { var message = _responseSocket.ReceiveFrameBytes(); if (message != null) throw new Exception($"Message {Encoding.UTF8.GetString(message)}"); } } private string GetListenerUrl() { var endpoints = _serviceContext.CodePackageActivationContext.GetEndpoints(); if (!endpoints.Contains(_endpointName)) throw new InvalidOperationException($"{_endpointName} not found in Service Manifest."); var serviceEndpoint = _serviceContext.CodePackageActivationContext.GetEndpoint(_endpointName); if (string.IsNullOrEmpty(serviceEndpoint.IpAddressOrFqdn)) throw new InvalidOperationException("IpAddressOrFqdn not set on endpoint"); if (serviceEndpoint.Port <= 0) throw new InvalidOperationException("Port not set on endpoint"); var listenUrl = $"{serviceEndpoint.Protocol.ToString().ToLower()}://{serviceEndpoint.IpAddressOrFqdn}:{serviceEndpoint.Port}"; return listenUrl; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (!disposing || _responseSocket == null) return; try { _responseSocket.Close(); _responseSocket.Dispose(); } catch (Exception ex) { ServiceEventSource.Current.Message(ex.Message); } } }
并在您的应用程序结构服务中返回ZeroMqResponseSocketCommunicationListener:
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners() { yield return new ServiceInstanceListener(listener => new ZeroMqResponseSocketCommunicationListener(listener, "EndpointName")); }
确保在服务的ServiceManifest.xml中指定了端点:
<Resources> <Endpoints> <Endpoint Name="EndpointName" Port="80" Protocol="tcp" /> </Endpoints> </Resources>
如果不存在,您可以通过创建自己的实现来创建自己的 ICommunicationListener 并从中返回 CreateServiceInstanceListeners 。 使用 OpenAsync 打开一个频道并开始收听。使用 CloseAsync 停止听
CreateServiceInstanceListeners
OpenAsync
CloseAsync
看一下 Service Bus的这个实现 ,为了灵感。