名称

zmq_socket_monitor_versioned - 监控套接字事件

概要

int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');

int zmq_socket_monitor_pipes_stats (void '*socket');

描述

zmq_socket_monitor_versioned() 方法允许应用线程跟踪 ZeroMQ 套接字上的套接字事件(例如连接)。每次调用此方法都会创建一个 'ZMQ_PAIR' 套接字,并将其绑定到指定的 inproc:// 'endpoint'。要收集套接字事件,您必须创建自己的 'ZMQ_PAIR' 套接字,并将其连接到该 endpoint。

'events' 参数是您希望监控的套接字事件的位掩码,详见下方的“支持的事件”。要监控某个版本的所有事件,请使用事件值 ZMQ_EVENT_ALL_V<version>,例如 ZMQ_EVENT_ALL_V1。对于非 DRAFT 事件版本,此值将来不会改变,因此新的事件类型只会添加到新版本中(请注意,这与 zmq_socket_monitor 和未版本化的 ZMQ_EVENT_ALL 有所不同)。

请注意,event_version 2 当前处于 DRAFT(草案)模式。DRAFT 中的 event_versions 的协议随时可能更改。

ZMQ_CURRENT_EVENT_VERSION 和 ZMQ_CURRENT_EVENT_VERSION_DRAFT 始终定义为最新的稳定版或 DRAFT 事件版本,目前分别是 1 和 2。

本页仅描述 'event_version' 2 的协议。对于与 'event_version' 1 一起使用的协议,请参阅 zmq_socket_monitor

每个事件以多帧发送。第一帧包含事件号(64 位)。后续帧的数量和内容取决于此事件号。

除非另有说明,第二帧包含紧随其后的值帧的数量,表示为一个 64 位整数。第三帧到第 N 帧包含一个事件值(64 位),根据事件号提供附加数据。每种事件类型可能具有不同数量的值。倒数第二帧和最后一帧包含字符串,指定受影响的连接或 endpoint。前一帧包含表示本地 endpoint 的字符串,而后一帧包含表示远程 endpoint 的字符串。根据事件类型以及连接是否使用绑定的或连接的本地 endpoint,这两者中的任一个可能为空。

请注意,第二帧及后续帧的格式以及帧的数量,对于将来添加的事件可能会有所不同。

'type' 参数用于指定监控套接字的类型。支持的类型有 'ZMQ_PAIR'、'ZMQ_PUB' 和 'ZMQ_PUSH'。请注意,事件的消费者必须与套接字类型兼容,例如,类型为 'ZMQ_PUB' 的监控套接字将要求消费者类型为 'ZMQ_SUB'。如果监控套接字类型为 'ZMQ_PUB',多部分消息的主题是事件号,因此消费者应订阅他们希望接收的事件。

zmq_socket_monitor_pipes_stats() 方法为被监控套接字的每个连接对端触发一个类型为 ZMQ_EVENT_PIPES_STATS 的事件。注意:zmq_socket_monitor_pipes_stats() 处于 DRAFT(草案)状态。

Monitoring events are only generated by some transports: At the moment these
are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
'zmq_socket_monitor_versioned' on a socket that is connected or bound to some
other transport, as this may not be known at the time
'zmq_socket_monitor_versioned' is called. Also, a socket can be connected/bound
to multiple endpoints using different transports.

支持的事件 (v1)

ZMQ_EVENT_CONNECTED
~~~~~~~~~~~~~~~~~~~
The socket has successfully connected to a remote peer. The event value
is the file descriptor (FD) of the underlying network socket. Warning:
there is no guarantee that the FD is still valid by the time your code
receives this event.

ZMQ_EVENT_CONNECT_DELAYED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request on the socket is pending. The event value is unspecified.

ZMQ_EVENT_CONNECT_RETRIED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request failed, and is now being retried. The event value is the
reconnect interval in milliseconds. Note that the reconnect interval is
recalculated for each retry.

ZMQ_EVENT_LISTENING
~~~~~~~~~~~~~~~~~~~
The socket was successfully bound to a network interface. The event value
is the FD of the underlying network socket. Warning: there is no guarantee
that the FD is still valid by the time your code receives this event.

ZMQ_EVENT_BIND_FAILED
~~~~~~~~~~~~~~~~~~~~~
The socket could not bind to a given interface. The event value is the
errno generated by the system bind call.

ZMQ_EVENT_ACCEPTED
~~~~~~~~~~~~~~~~~~
The socket has accepted a connection from a remote peer. The event value is
the FD of the underlying network socket. Warning: there is no guarantee that
the FD is still valid by the time your code receives this event.

ZMQ_EVENT_ACCEPT_FAILED
~~~~~~~~~~~~~~~~~~~~~~~
The socket has rejected a connection from a remote peer. The event value is
the errno generated by the accept call.

ZMQ_EVENT_CLOSED
~~~~~~~~~~~~~~~~
The socket was closed. The event value is the FD of the (now closed) network
socket.

ZMQ_EVENT_CLOSE_FAILED
~~~~~~~~~~~~~~~~~~~~~~
The socket close failed. The event value is the errno returned by the system
call. Note that this event occurs only on IPC transports.

ZMQ_EVENT_DISCONNECTED
~~~~~~~~~~~~~~~~~~~~~~
The socket was disconnected unexpectedly. The event value is the FD of the
underlying network socket. Warning: this socket will be closed.

ZMQ_EVENT_MONITOR_STOPPED
~~~~~~~~~~~~~~~~~~~~~~~~~
Monitoring on this socket ended.

ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Unspecified error during handshake.
The event value is an errno.

ZMQ_EVENT_HANDSHAKE_SUCCEEDED
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ZMTP security mechanism handshake succeeded.
The event value is unspecified.

ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ZMTP security mechanism handshake failed due to some mechanism protocol
error, either between the ZMTP mechanism peers, or between the mechanism
server and the ZAP handler. This indicates a configuration or implementation
error in either peer resp. the ZAP handler.
The event value is one of the ZMQ_PROTOCOL_ERROR_* values:
ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED
ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND
ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE
ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME
ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA
ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC
ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH
ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED
ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY
ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID
ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION
ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE
ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA

ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ZMTP security mechanism handshake failed due to an authentication failure.
The event value is the status code returned by the ZAP handler (i.e. 300,
400 or 500).

----

Supported events (v2)

ZMQ_EVENT_PIPES_STATS

此事件提供两个值,分别表示与返回的 endpoint 相关联的两个队列(出站和入站)中的消息数量。此事件仅在调用函数 zmq_socket_monitor_pipes_stats() 后触发。注意:此测量是异步的,因此在收到消息时,内部状态可能已经改变。注意:当被监控套接字和监控器未用于 poll 中时,事件可能不会被传递,直到在被监控套接字上调用了某个 API,例如 zmq_getsockopt(选项无关紧要)。注意:处于 DRAFT(草案)状态,尚未在稳定版本中提供。

返回值

zmq_socket_monitor()zmq_socket_monitor_pipes_stats() 函数成功时返回 0 或更大值。否则,它们返回 -1 并将 'errno' 设置为下面定义的值之一。

错误 - zmq_socket_monitor()

ETERM

与指定的 'socket' 关联的 0MQ 'context' 已终止。

EPROTONOSUPPORT

监控器 'endpoint' 的传输协议不受支持。监控套接字要求使用 inproc:// 传输。

EINVAL

提供的监控器 'endpoint' 不存在或指定的套接字 'type' 不受支持。

错误 - zmq_socket_monitor_pipes_stats()

ENOTSOCK

'socket' 参数不是有效的 0MQ 套接字。

EINVAL

套接字未启用监控。

EAGAIN

被监控套接字尚未有任何连接可供监控。

示例

监控客户端和服务器套接字
//  Read one event off the monitor socket; return values and addresses
//  by reference, if not null, and event number by value. Returns -1
//  in case of error.

static uint64_t
get_monitor_event (void *monitor, uint64_t **value, char **local_address, char **remote_address)
{
    //  First frame in message contains event number
    zmq_msg_t msg;
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (zmq_msg_more (&msg));

    uint64_t event;
    memcpy (&event, zmq_msg_data (&msg), sizeof (event));
    zmq_msg_close (&msg);

    //  Second frame in message contains the number of values
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (zmq_msg_more (&msg));

    uint64_t value_count;
    memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
    zmq_msg_close (&msg);

    if (value) {
        *value = (uint64_t *) malloc (value_count * sizeof (uint64_t));
        assert (*value);
    }

    for (uint64_t i = 0; i < value_count; ++i) {
        //  Subsequent frames in message contain event values
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1;              //  Interrupted, presumably
        assert (zmq_msg_more (&msg));

        if (value && *value)
            memcpy (&(*value)[i], zmq_msg_data (&msg), sizeof (uint64_t));
        zmq_msg_close (&msg);
    }

    //  Second-to-last frame in message contains local address
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (zmq_msg_more (&msg));

    if (local_address_) {
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        size_t size = zmq_msg_size (&msg);
        *local_address_ = (char *) malloc (size + 1);
        memcpy (*local_address_, data, size);
        (*local_address_)[size] = 0;
    }
    zmq_msg_close (&msg);

    //  Last frame in message contains remote address
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (!zmq_msg_more (&msg));

    if (remote_address_) {
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        size_t size = zmq_msg_size (&msg);
        *remote_address_ = (char *) malloc (size + 1);
        memcpy (*remote_address_, data, size);
        (*remote_address_)[size] = 0;
    }
    zmq_msg_close (&msg);

    return event;
}

int main (void)
{
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    //  We'll monitor these two sockets
    void *client = zmq_socket (ctx, ZMQ_DEALER);
    assert (client);
    void *server = zmq_socket (ctx, ZMQ_DEALER);
    assert (server);

    //  Socket monitoring only works over inproc://
    int rc = zmq_socket_monitor_versioned (client, "tcp://127.0.0.1:9999", 0, 2);
    assert (rc == -1);
    assert (zmq_errno () == EPROTONOSUPPORT);

    //  Monitor all events on client and server sockets
    rc = zmq_socket_monitor_versioned (client, "inproc://monitor-client", ZMQ_EVENT_ALL, 2);
    assert (rc == 0);
    rc = zmq_socket_monitor_versioned (server, "inproc://monitor-server", ZMQ_EVENT_ALL, 2);
    assert (rc == 0);

    //  Create two sockets for collecting monitor events
    void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
    assert (client_mon);
    void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
    assert (server_mon);

    //  Connect these to the inproc endpoints so they'll get events
    rc = zmq_connect (client_mon, "inproc://monitor-client");
    assert (rc == 0);
    rc = zmq_connect (server_mon, "inproc://monitor-server");
    assert (rc == 0);

    //  Now do a basic ping test
    rc = zmq_bind (server, "tcp://127.0.0.1:9998");
    assert (rc == 0);
    rc = zmq_connect (client, "tcp://127.0.0.1:9998");
    assert (rc == 0);
    bounce (client, server);

    //  Close client and server
    close_zero_linger (client);
    close_zero_linger (server);

    //  Now collect and check events from both sockets
    int event = get_monitor_event (client_mon, NULL, NULL);
    if (event == ZMQ_EVENT_CONNECT_DELAYED)
        event = get_monitor_event (client_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_CONNECTED);
    event = get_monitor_event (client_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_MONITOR_STOPPED);

    //  This is the flow of server events
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_LISTENING);
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_ACCEPTED);
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_CLOSED);
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_MONITOR_STOPPED);

    //  Close down the sockets
    close_zero_linger (client_mon);
    close_zero_linger (server_mon);
    zmq_ctx_term (ctx);

    return 0 ;
}

另请参阅

作者

本页面由 0MQ 社区编写。如需修改,请阅读位于 https://zeromq.cn/how-to-contribute/ 的 0MQ 贡献政策。