名称
zmq_socket_monitor_versioned - 监控套接字事件
概要
int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');
int zmq_socket_monitor_pipes_stats (void '*socket');
描述
zmq_socket_monitor_versioned() 方法允许应用线程跟踪 ZeroMQ 套接字上的套接字事件(例如连接)。每次调用此方法都会创建一个 'ZMQ_PAIR' 套接字,并将其绑定到指定的 inproc:// 'endpoint'。要收集套接字事件,您必须创建自己的 'ZMQ_PAIR' 套接字,并将其连接到该 endpoint。
'events' 参数是您希望监控的套接字事件的位掩码,详见下方的“支持的事件”。要监控某个版本的所有事件,请使用事件值 ZMQ_EVENT_ALL_V<version>,例如 ZMQ_EVENT_ALL_V1。对于非 DRAFT 事件版本,此值将来不会改变,因此新的事件类型只会添加到新版本中(请注意,这与 zmq_socket_monitor 和未版本化的 ZMQ_EVENT_ALL 有所不同)。
请注意,event_version 2 当前处于 DRAFT(草案)模式。DRAFT 中的 event_versions 的协议随时可能更改。
ZMQ_CURRENT_EVENT_VERSION 和 ZMQ_CURRENT_EVENT_VERSION_DRAFT 始终定义为最新的稳定版或 DRAFT 事件版本,目前分别是 1 和 2。
本页仅描述 'event_version' 2 的协议。对于与 'event_version' 1 一起使用的协议,请参阅 zmq_socket_monitor
每个事件以多帧发送。第一帧包含事件号(64 位)。后续帧的数量和内容取决于此事件号。
除非另有说明,第二帧包含紧随其后的值帧的数量,表示为一个 64 位整数。第三帧到第 N 帧包含一个事件值(64 位),根据事件号提供附加数据。每种事件类型可能具有不同数量的值。倒数第二帧和最后一帧包含字符串,指定受影响的连接或 endpoint。前一帧包含表示本地 endpoint 的字符串,而后一帧包含表示远程 endpoint 的字符串。根据事件类型以及连接是否使用绑定的或连接的本地 endpoint,这两者中的任一个可能为空。
请注意,第二帧及后续帧的格式以及帧的数量,对于将来添加的事件可能会有所不同。
'type' 参数用于指定监控套接字的类型。支持的类型有 'ZMQ_PAIR'、'ZMQ_PUB' 和 'ZMQ_PUSH'。请注意,事件的消费者必须与套接字类型兼容,例如,类型为 'ZMQ_PUB' 的监控套接字将要求消费者类型为 'ZMQ_SUB'。如果监控套接字类型为 'ZMQ_PUB',多部分消息的主题是事件号,因此消费者应订阅他们希望接收的事件。
zmq_socket_monitor_pipes_stats() 方法为被监控套接字的每个连接对端触发一个类型为 ZMQ_EVENT_PIPES_STATS 的事件。注意:zmq_socket_monitor_pipes_stats() 处于 DRAFT(草案)状态。
Monitoring events are only generated by some transports: At the moment these are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call 'zmq_socket_monitor_versioned' on a socket that is connected or bound to some other transport, as this may not be known at the time 'zmq_socket_monitor_versioned' is called. Also, a socket can be connected/bound to multiple endpoints using different transports.
支持的事件 (v1)
ZMQ_EVENT_CONNECTED ~~~~~~~~~~~~~~~~~~~ The socket has successfully connected to a remote peer. The event value is the file descriptor (FD) of the underlying network socket. Warning: there is no guarantee that the FD is still valid by the time your code receives this event. ZMQ_EVENT_CONNECT_DELAYED ~~~~~~~~~~~~~~~~~~~~~~~~~ A connect request on the socket is pending. The event value is unspecified. ZMQ_EVENT_CONNECT_RETRIED ~~~~~~~~~~~~~~~~~~~~~~~~~ A connect request failed, and is now being retried. The event value is the reconnect interval in milliseconds. Note that the reconnect interval is recalculated for each retry. ZMQ_EVENT_LISTENING ~~~~~~~~~~~~~~~~~~~ The socket was successfully bound to a network interface. The event value is the FD of the underlying network socket. Warning: there is no guarantee that the FD is still valid by the time your code receives this event. ZMQ_EVENT_BIND_FAILED ~~~~~~~~~~~~~~~~~~~~~ The socket could not bind to a given interface. The event value is the errno generated by the system bind call. ZMQ_EVENT_ACCEPTED ~~~~~~~~~~~~~~~~~~ The socket has accepted a connection from a remote peer. The event value is the FD of the underlying network socket. Warning: there is no guarantee that the FD is still valid by the time your code receives this event. ZMQ_EVENT_ACCEPT_FAILED ~~~~~~~~~~~~~~~~~~~~~~~ The socket has rejected a connection from a remote peer. The event value is the errno generated by the accept call. ZMQ_EVENT_CLOSED ~~~~~~~~~~~~~~~~ The socket was closed. The event value is the FD of the (now closed) network socket. ZMQ_EVENT_CLOSE_FAILED ~~~~~~~~~~~~~~~~~~~~~~ The socket close failed. The event value is the errno returned by the system call. Note that this event occurs only on IPC transports. ZMQ_EVENT_DISCONNECTED ~~~~~~~~~~~~~~~~~~~~~~ The socket was disconnected unexpectedly. The event value is the FD of the underlying network socket. Warning: this socket will be closed. ZMQ_EVENT_MONITOR_STOPPED ~~~~~~~~~~~~~~~~~~~~~~~~~ Monitoring on this socket ended. ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Unspecified error during handshake. The event value is an errno. ZMQ_EVENT_HANDSHAKE_SUCCEEDED ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ZMTP security mechanism handshake succeeded. The event value is unspecified. ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ZMTP security mechanism handshake failed due to some mechanism protocol error, either between the ZMTP mechanism peers, or between the mechanism server and the ZAP handler. This indicates a configuration or implementation error in either peer resp. the ZAP handler. The event value is one of the ZMQ_PROTOCOL_ERROR_* values: ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA ZMQ_EVENT_HANDSHAKE_FAILED_AUTH ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ZMTP security mechanism handshake failed due to an authentication failure. The event value is the status code returned by the ZAP handler (i.e. 300, 400 or 500). ---- Supported events (v2)
ZMQ_EVENT_PIPES_STATS
此事件提供两个值,分别表示与返回的 endpoint 相关联的两个队列(出站和入站)中的消息数量。此事件仅在调用函数 zmq_socket_monitor_pipes_stats() 后触发。注意:此测量是异步的,因此在收到消息时,内部状态可能已经改变。注意:当被监控套接字和监控器未用于 poll 中时,事件可能不会被传递,直到在被监控套接字上调用了某个 API,例如 zmq_getsockopt(选项无关紧要)。注意:处于 DRAFT(草案)状态,尚未在稳定版本中提供。
返回值
zmq_socket_monitor() 和 zmq_socket_monitor_pipes_stats() 函数成功时返回 0 或更大值。否则,它们返回 -1
并将 'errno' 设置为下面定义的值之一。
错误 - zmq_socket_monitor()
- ETERM
-
与指定的 'socket' 关联的 0MQ 'context' 已终止。
- EPROTONOSUPPORT
-
监控器 'endpoint' 的传输协议不受支持。监控套接字要求使用 inproc:// 传输。
- EINVAL
-
提供的监控器 'endpoint' 不存在或指定的套接字 'type' 不受支持。
错误 - zmq_socket_monitor_pipes_stats()
- ENOTSOCK
-
'socket' 参数不是有效的 0MQ 套接字。
- EINVAL
-
套接字未启用监控。
- EAGAIN
-
被监控套接字尚未有任何连接可供监控。
示例
// Read one event off the monitor socket; return values and addresses // by reference, if not null, and event number by value. Returns -1 // in case of error. static uint64_t get_monitor_event (void *monitor, uint64_t **value, char **local_address, char **remote_address) { // First frame in message contains event number zmq_msg_t msg; zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); uint64_t event; memcpy (&event, zmq_msg_data (&msg), sizeof (event)); zmq_msg_close (&msg); // Second frame in message contains the number of values zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); uint64_t value_count; memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count)); zmq_msg_close (&msg); if (value) { *value = (uint64_t *) malloc (value_count * sizeof (uint64_t)); assert (*value); } for (uint64_t i = 0; i < value_count; ++i) { // Subsequent frames in message contain event values zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); if (value && *value) memcpy (&(*value)[i], zmq_msg_data (&msg), sizeof (uint64_t)); zmq_msg_close (&msg); } // Second-to-last frame in message contains local address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); if (local_address_) { uint8_t *data = (uint8_t *) zmq_msg_data (&msg); size_t size = zmq_msg_size (&msg); *local_address_ = (char *) malloc (size + 1); memcpy (*local_address_, data, size); (*local_address_)[size] = 0; } zmq_msg_close (&msg); // Last frame in message contains remote address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (!zmq_msg_more (&msg)); if (remote_address_) { uint8_t *data = (uint8_t *) zmq_msg_data (&msg); size_t size = zmq_msg_size (&msg); *remote_address_ = (char *) malloc (size + 1); memcpy (*remote_address_, data, size); (*remote_address_)[size] = 0; } zmq_msg_close (&msg); return event; } int main (void) { void *ctx = zmq_ctx_new (); assert (ctx); // We'll monitor these two sockets void *client = zmq_socket (ctx, ZMQ_DEALER); assert (client); void *server = zmq_socket (ctx, ZMQ_DEALER); assert (server); // Socket monitoring only works over inproc:// int rc = zmq_socket_monitor_versioned (client, "tcp://127.0.0.1:9999", 0, 2); assert (rc == -1); assert (zmq_errno () == EPROTONOSUPPORT); // Monitor all events on client and server sockets rc = zmq_socket_monitor_versioned (client, "inproc://monitor-client", ZMQ_EVENT_ALL, 2); assert (rc == 0); rc = zmq_socket_monitor_versioned (server, "inproc://monitor-server", ZMQ_EVENT_ALL, 2); assert (rc == 0); // Create two sockets for collecting monitor events void *client_mon = zmq_socket (ctx, ZMQ_PAIR); assert (client_mon); void *server_mon = zmq_socket (ctx, ZMQ_PAIR); assert (server_mon); // Connect these to the inproc endpoints so they'll get events rc = zmq_connect (client_mon, "inproc://monitor-client"); assert (rc == 0); rc = zmq_connect (server_mon, "inproc://monitor-server"); assert (rc == 0); // Now do a basic ping test rc = zmq_bind (server, "tcp://127.0.0.1:9998"); assert (rc == 0); rc = zmq_connect (client, "tcp://127.0.0.1:9998"); assert (rc == 0); bounce (client, server); // Close client and server close_zero_linger (client); close_zero_linger (server); // Now collect and check events from both sockets int event = get_monitor_event (client_mon, NULL, NULL); if (event == ZMQ_EVENT_CONNECT_DELAYED) event = get_monitor_event (client_mon, NULL, NULL); assert (event == ZMQ_EVENT_CONNECTED); event = get_monitor_event (client_mon, NULL, NULL); assert (event == ZMQ_EVENT_MONITOR_STOPPED); // This is the flow of server events event = get_monitor_event (server_mon, NULL, NULL); assert (event == ZMQ_EVENT_LISTENING); event = get_monitor_event (server_mon, NULL, NULL); assert (event == ZMQ_EVENT_ACCEPTED); event = get_monitor_event (server_mon, NULL, NULL); assert (event == ZMQ_EVENT_CLOSED); event = get_monitor_event (server_mon, NULL, NULL); assert (event == ZMQ_EVENT_MONITOR_STOPPED); // Close down the sockets close_zero_linger (client_mon); close_zero_linger (server_mon); zmq_ctx_term (ctx); return 0 ; }
另请参阅
作者
本页面由 0MQ 社区编写。如需修改,请阅读位于 https://zeromq.cn/how-to-contribute/ 的 0MQ 贡献政策。