理解C/C++异步IO编程——做一个RAII的Socket抽象

前言

​ 在我们开始异步IO编程之前,咱们最好先理解一下Socket API有哪些。这篇博客,如我所言,只是说尝试给一个Reference,告诉看官那些抽象是我们需要做的。这样,我们后面构建更加现代的异步IO编程完全可以有更好的抽象。为此,这篇博客是必要的。

创建套接字并做一些设置

int socket(int domain, int type, int protocol);
  • domain:地址簇,常用 AF_INET(IPv4)、AF_INET6(IPv6)等。
  • type:套接字类型,SOCK_STREAM(TCP,面向连接)、SOCK_DGRAM(UDP)等。 内核也支持在这里用位标志(例如 Linux 上可位或 SOCK_NONBLOCKSOCK_CLOEXEC)。
  • protocol:通常为 0(让内核根据 domain/type 选择默认协议),或显式例如 IPPROTO_TCP
  • 返回:>=0 为新的 socket fd;<0 表示错误(设置 errno)。
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
  • sockfd:目标 socket 描述符。
  • level:选项所在协议层,例如 SOL_SOCKETIPPROTO_TCP
  • optname:选项名,例如 SO_REUSEADDRSO_RCVTIMEO
  • optval/optlen:指向值及其长度。
  • 返回:0 成功,-1 失败(errno 设置)。

绑定到给定的二元端口

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • 绑定 socket 到本地地址(IP + port)。addr 指向 sockaddr_in/sockaddr_in6addrlen 为对应结构长度。
  • 返回:0 成功,-1 失败。

监听服务器

int listen(int sockfd, int backlog);
  • sockfd:已 bind 的 socket。
  • backlog:内核队列长度(未 accept 的连接队列)。常用 SOMAXCONN
  • 返回:0 成功,-1 失败。

接受一个新的链接(这里说Accept扩展)

int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
  • accept,但可直接传 flags(例如 SOCK_NONBLOCK | SOCK_CLOEXEC),避免后续 fcntl 调整。
  • addr/addrlen:输出参数,返回对端地址(可为 nullptr/0)。
  • 返回:新的连接 socket fd(>=0)或 -1(失败)。

IO操作

ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
  • recv/send:数据收发。flags 常用 0;返回值为读取/发送字节数,0 表示对端关闭连接,-1 表示错误(可能设置 errno 为 EAGAIN/EWOULDBLOCK 等)。

关闭释放套接字

int close(int fd);
  • 关闭文件描述符(或 socket)。返回 0 成功,-1 失败。

其他的一些操作

uint16_t htons(uint16_t hostshort);
  • 主机字节序(host)转换到网络字节序(big-endian),常用于设置 sin_port

其他

  • AF_INETSOCK_STREAMSOCK_NONBLOCKSOCK_CLOEXECSOL_SOCKETSO_REUSEADDRINADDR_ANYSOMAXCONN

注意:INADDR_ANY 实际上是一个 in_addr_t 宏,表示绑定到任意本地地址(0.0.0.0)。

行动起来:做一个RAII的Socket抽象

using socket_raw_t = int;
static constexpr const socket_raw_t INVALID_SOCK_INTERNAL = -1;

class Socket {
public:
    Socket(const socket_raw_t fd);
    virtual ~Socket();
    socket_raw_t internal() const;
    bool is_valid() const;
    static bool is_valid(socket_raw_t fd);
    void close();
protected:
    socket_raw_t socket_fd;
    // delete copy, allow move
};

class PassiveClientSocket : public Socket {
public:
    PassiveClientSocket(const socket_raw_t fd);
    ssize_t read(void* buffer_ptr, size_t size);
    ssize_t write(const void* buffer_ptr, size_t size);
};

class ServerSocket : public Socket {
public:
    ServerSocket(const int port);
    ServerSocket(ServerSocket&&) = default;
    socket_raw_t listen();
    std::shared_ptr<PassiveClientSocket> accept();
    int port() const;
private:
    const int open_port;
    // copy/move assignments deleted
};
  • Socket::Socket(socket_raw_t fd):用现有 fd 构造对象。
  • Socket::~Socket():析构时会 close()(RAII)。
  • socket_raw_t Socket::internal() const:返回底层 fd。
  • PassiveClientSocket::read/ write:封装 recv/send,返回字节数或错误。
  • ServerSocket::listen():创建 socket、bind、listen,并把 socket_fd 设置为监听 fd,返回 fd。
  • ServerSocket::accept():调用 accept4,返回 shared_ptr<PassiveClientSocket>(出错返回 nullptr)。

逐个击破:Socket(基类)

代码片段(简化)

Socket::Socket(const socket_raw_t fd)
    : socket_fd(fd) { }

void Socket::close() {
    ::close(socket_fd);
}

socket_raw_t Socket::internal() const { return socket_fd; }
bool Socket::is_valid() const { return is_valid(socket_fd); }
  • Socket 作为资源所有者(RAII)是合理的:析构时会 close()。构造时可接受一个已经存在的 fd,便于从 accept 创建对象。
  • is_valid() 通过与 INVALID_SOCK_INTERNAL 比较判断 fd 是否有效。

PassiveClientSocket(被动客户端连接对象)

PassiveClientSocket::PassiveClientSocket(const socket_raw_t fd)
    : Socket(fd) {
}

ssize_t PassiveClientSocket::read(void* buffer_ptr, size_t size) {
    if (!is_valid())
        throw "invalid socket!";
    return ::recv(socket_fd, buffer_ptr, size, 0);
}
ssize_t PassiveClientSocket::write(const void* buffer_ptr, size_t size) {
    if (!is_valid())
        throw "invalid socket!";
    return ::send(socket_fd, buffer_ptr, size, 0);
}

说明

  • 简洁封装 recv/send。在调用前检查 socket 有效性是必要的。

ServerSocket(监听端)

socket_raw_t ServerSocket::listen() {
    int listen_fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
    if (listen_fd < 0) { throw "Socket Creation Error"; }

    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    sockaddr_in addr {};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(open_port);
    addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) != 0) {
        throw "Bind Error!";
    }

    if (::listen(listen_fd, SOMAXCONN) != 0) {
        throw "Listen Error!";
    }
    socket_fd = listen_fd;
    return listen_fd;
}
  • 创建非阻塞的监听 fd,并设置 SO_REUSEADDR,然后 bindlisten,最后把 fd 存入对象。

ServerSocket::accept()

​ 返回被构建的

std::shared_ptr<PassiveClientSocket> ServerSocket::accept() {
    sockaddr_in cli {};
    socklen_t cli_len = sizeof(cli);

    int fd = ::accept4(socket_fd, (sockaddr*)&cli, &cli_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
    if (fd < 0) {
        return nullptr;
    }
    return std::make_shared<PassiveClientSocket>(fd);
}

附录demo和socket代码

文件:socket.hpp: socket的头文件

#pragma once

#include <cstddef>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
using socket_raw_t = int;
static constexpr const socket_raw_t INVALID_SOCK_INTERNAL = -1;

class Socket {
public:
	socket_raw_t internal() const { return socket_fd; }
	virtual ~Socket() {
		close();
	};
	Socket(const socket_raw_t fd);
	Socket(Socket&& other) noexcept
	    : socket_fd(other.socket_fd) {
		other.socket_fd = INVALID_SOCK_INTERNAL;
	}
	bool is_valid() const {
		return is_valid(socket_fd);
	}

	static bool is_valid(socket_raw_t fd) {
		return fd != INVALID_SOCK_INTERNAL;
	}

	void close();

protected:
	socket_raw_t socket_fd;
	Socket() = delete;
	Socket(const Socket&) = delete;
	Socket& operator=(Socket&&) = delete;
	Socket& operator=(const Socket&) = delete;
};

class PassiveClientSocket : public Socket {
public:
	PassiveClientSocket(const socket_raw_t fd);

	ssize_t read(void* buffer_ptr, size_t size);
	ssize_t write(const void* buffer_ptr, size_t size);
};

class ServerSocket : public Socket {
public:
	ServerSocket(ServerSocket&&) = default;

	ServerSocket(const int port)
	    : Socket(INVALID_SOCK_INTERNAL)
	    , open_port(port) {
	}

	socket_raw_t listen();
	int port() const { return open_port; }
	std::shared_ptr<PassiveClientSocket> accept();

private:
	const int open_port;

private:
	ServerSocket() = delete;
	ServerSocket(const ServerSocket&) = delete;
	ServerSocket& operator=(ServerSocket&&) = delete;
	ServerSocket& operator=(const ServerSocket&) = delete;
};


文件:socket.cppsocket的实现源文件

#include "socket.hpp"
#include <memory>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>

Socket::Socket(const socket_raw_t fd)
    : socket_fd(fd) { }

void Socket::close() {
	if (!is_valid())
		return;
	::close(socket_fd);
	socket_fd = INVALID_SOCK_INTERNAL;
}

PassiveClientSocket::PassiveClientSocket(const socket_raw_t fd)
    : Socket(fd) {
}

ssize_t PassiveClientSocket::read(void* buffer_ptr, size_t size) {
	if (!is_valid())
		throw "invalid socket!";
	return ::recv(socket_fd, buffer_ptr, size, 0);
}
ssize_t PassiveClientSocket::write(const void* buffer_ptr, size_t size) {
	if (!is_valid())
		throw "invalid socket!";
	return ::send(socket_fd, buffer_ptr, size, 0);
}

socket_raw_t ServerSocket::listen() {
	int listen_fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
	if (listen_fd < 0) {
		throw "Socket Creation Error";
	}

	int opt = 1;
	setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

	sockaddr_in addr {};
	addr.sin_family = AF_INET;
	addr.sin_port = htons(open_port);
	addr.sin_addr.s_addr = INADDR_ANY;

	if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) != 0) {
		throw "Bind Error!";
	}

	if (::listen(listen_fd, SOMAXCONN) != 0) {
		throw "Listen Error!";
	}
	socket_fd = listen_fd;
	return listen_fd;
}

std::shared_ptr<PassiveClientSocket> ServerSocket::accept() {
	sockaddr_in cli {};
	socklen_t cli_len = sizeof(cli);

	int fd = ::accept4(socket_fd, (sockaddr*)&cli, &cli_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
	if (fd < 0) {
		return nullptr;
	}
	return std::make_shared<PassiveClientSocket>(fd);
}

文件:echo_server.cpp(示例服务器)

// echo_server.cpp
#include "socket.hpp"
#include <iostream>
#include <vector>
#include <poll.h>
#include <fcntl.h>
#include <cstring>
#include <errno.h>
#include <unistd.h>

int set_blocking(int fd, bool block) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags < 0) return -1;
    if (block) flags &= ~O_NONBLOCK;
    else flags |= O_NONBLOCK;
    return fcntl(fd, F_SETFL, flags);
}

int main() {
    const int port = 12345;
    ServerSocket srv(port);
    try {
        srv.listen();
    } catch (const std::exception &e) {
        std::cerr << "listen failed: " << e.what() << "\n";
        return 1;
    } catch (...) {
        std::cerr << "listen failed with unknown error\n";
        return 1;
    }

    std::cout << "Echo server listening on port " << port << "\n";

    struct pollfd pfd;
    pfd.fd = srv.internal();
    pfd.events = POLLIN;
    pfd.revents = 0;

    while (true) {
        int ret = poll(&pfd, 1, -1); // wait indefinitely
        if (ret < 0) {
            if (errno == EINTR) continue;
            perror("poll");
            break;
        }
        if (pfd.revents & POLLIN) {
            auto client = srv.accept();
            if (!client) {
                // either no incoming (EAGAIN) or an error; continue
                continue;
            }
            int cfd = client->internal();
            // For demo simplicity, set client socket to blocking to
            // allow simple synchronous recv() usage
            set_blocking(cfd, true);

            std::cout << "Accepted client fd=" << cfd << "\n";

            const size_t BUF_SIZE = 4096;
            std::vector<char> buf(BUF_SIZE);
            while (true) {
                ssize_t n = client->read(buf.data(), BUF_SIZE);
                if (n > 0) {
                    // echo back (handle partial write in loop)
                    ssize_t written = 0;
                    const char* p = buf.data();
                    while (written < n) {
                        ssize_t w = client->write(p + written, n - written);
                        if (w <= 0) {
                            perror("write");
                            break;
                        }
                        written += w;
                    }
                } else if (n == 0) {
                    std::cout << "client closed connection\n";
                    break;
                } else {
                    // error
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        // no data (non-blocking) — break for demo
                        break;
                    } else {
                        perror("recv");
                        break;
                    }
                }
            }
            // client shared_ptr goes out of scope here -> destructor will close fd
            std::cout << "Connection handler done for fd=" << cfd << "\n";
        }
    }

    return 0;
}

文件:echo_client.cpp(示例客户端)

// echo_client.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <iostream>

int main() {
    const char* server_ip = "127.0.0.1";
    const int server_port = 12345;

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket");
        return 1;
    }

    sockaddr_in srv{};
    srv.sin_family = AF_INET;
    srv.sin_port = htons(server_port);
    if (inet_pton(AF_INET, server_ip, &srv.sin_addr) <= 0) {
        std::cerr << "inet_pton failed\n";
        close(sockfd);
        return 1;
    }

    if (connect(sockfd, (sockaddr*)&srv, sizeof(srv)) != 0) {
        perror("connect");
        close(sockfd);
        return 1;
    }

    const char* msg = "Hello from client!\n";
    ssize_t s = send(sockfd, msg, strlen(msg), 0);
    if (s < 0) {
        perror("send");
        close(sockfd);
        return 1;
    }

    char buf[4096];
    ssize_t n = recv(sockfd, buf, sizeof(buf)-1, 0);
    if (n > 0) {
        buf[n] = '\0';
        std::cout << "Received echo: " << buf;
    } else if (n == 0) {
        std::cout << "Server closed connection\n";
    } else {
        perror("recv");
    }

    close(sockfd);
    return 0;
}

编译与运行(在 Linux 下)

假设文件在同一目录:

  • socket.hpp
  • socket.cpp
  • echo_server.cpp
  • echo_client.cpp

编译:

g++ -std=c++17 socket.cpp echo_server.cpp -o echo_server
g++ -std=c++17 echo_client.cpp -o echo_client

运行:

  1. 先在一个终端运行 server:
./echo_server
  1. 在另一个终端运行 client:
./echo_client

你应该在 server 端看到 Accepted client fd=... 等输出,client 端会打印收到的回显。