最近,我使用NetMQ在服务器和客户端之间发送或接收消息。服务器代码如:
void Main() { CreatePullAndPushSocket(); Task.Factory.StartNew(()=> { …
在NetMQ中,非常重要的是线程模型。非常重要的是,您不应该在多个线程中使用套接字。因此,如果线程#1创建了套接字,那么它应该使用它。如果你想通过使用相同的套接字从其他线程发送消息(比如说线程#2)就忘了这个。你应该以某种方式将Messgae从线程#2发送到线程#1然后它应该通过套接字发送到客户端。
所以基本上CreatePullAndPushSocket是错的,奇怪的事情可能发生。您正在一个线程中创建套接字并在其他线程中使用。这完全错了。
另一件事是你的Thread.Sleep。你不应该使用Thread.Sleep,因为你的线程正在休眠1秒然后检查套接字一次,而不是睡眠和检查一次。 NetMQ具有TryReceive功能,具有超时功能。因此它可以检查套接字1秒钟并退出以检查您是否呼叫取消/停止或其他。或者更好的是有一个轮询器,它将一直监听套接字,并允许我们从其他线程调用stop。
让我们看看这段代码:
private Poller _poller; public void Start() { Task.Factory.StartNew(()=> { using(var pullSocket = new PullSocket("tcp://ip1:port1")) using(_poller = new Poller()) { pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) => { var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage(); } _poller.Add(pullSocket); _poller.Run(); } }); } public void Stop() { _poller?.Stop(); }
或者如果您想在没有使用while循环的轮询器的情况下使用代码:
private readonly CancellationTokenSource _cts;
public void Start() { _cts = new CancellationTokenSource(); var token = _cts.Token; Task.Factory.StartNew(()=> { using(var pullSocket = new PullSocket("tcp://ip1:port1")) { while (cancellationToken.IsCancellationRequested == false) { NetMQMessage message = new NetMQMessage(); bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message); if (success == false) { continue; } //if You reach this line, than You have a message } } }, token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } public void Stop() { _cts.Cancel(); _cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop }
回到你的问题,你应该只在你创建它的线程中使用套接字。好的是总是在using语句中使用socket总是最终释放它。
我看不到SendMessageToClient方法的用法,但我假设你是从某个按钮或其他东西调用它。如果从该线程调用了套接字的构造函数,则可以这样做。如果你能告诉我你在调用这种方法的地方我可以说些什么。