理解C/C++异步IO编程——做一个RAII的Socket抽象
前言
在我们开始异步IO编程之前,咱们最好先理解一下Socket API有哪些。这篇博客,如我所言,只是说尝试给一个Reference,告诉看官那些抽象是我们需要做的。这样,我们后面构建更加现代的异步IO编程完全可以有更好的抽象。为此,这篇博客是必要的。
创建套接字并做一些设置
int socket(int domain, int type, int protocol);
domain
:地址簇,常用AF_INET
(IPv4)、AF_INET6
(IPv6)等。type
:套接字类型,SOCK_STREAM
(TCP,面向连接)、SOCK_DGRAM
(UDP)等。 内核也支持在这里用位标志(例如 Linux 上可位或SOCK_NONBLOCK
、SOCK_CLOEXEC
)。protocol
:通常为0
(让内核根据 domain/type 选择默认协议),或显式例如IPPROTO_TCP
。- 返回:>=0 为新的 socket fd;<0 表示错误(设置 errno)。
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
sockfd
:目标 socket 描述符。level
:选项所在协议层,例如SOL_SOCKET
、IPPROTO_TCP
。optname
:选项名,例如SO_REUSEADDR
、SO_RCVTIMEO
。optval
/optlen
:指向值及其长度。- 返回:0 成功,-1 失败(errno 设置)。
绑定到给定的二元端口
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
- 绑定 socket 到本地地址(IP + port)。
addr
指向sockaddr_in
/sockaddr_in6
,addrlen
为对应结构长度。 - 返回:0 成功,-1 失败。
监听服务器
int listen(int sockfd, int backlog);
sockfd
:已 bind 的 socket。backlog
:内核队列长度(未 accept 的连接队列)。常用SOMAXCONN
。- 返回:0 成功,-1 失败。
接受一个新的链接(这里说Accept扩展)
int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
- 同
accept
,但可直接传flags
(例如SOCK_NONBLOCK | SOCK_CLOEXEC
),避免后续fcntl
调整。 addr
/addrlen
:输出参数,返回对端地址(可为 nullptr/0)。- 返回:新的连接 socket fd(>=0)或 -1(失败)。
IO操作
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
recv
/send
:数据收发。flags
常用0
;返回值为读取/发送字节数,0 表示对端关闭连接,-1 表示错误(可能设置 errno 为EAGAIN/EWOULDBLOCK
等)。
关闭释放套接字
int close(int fd);
- 关闭文件描述符(或 socket)。返回 0 成功,-1 失败。
其他的一些操作
uint16_t htons(uint16_t hostshort);
- 主机字节序(host)转换到网络字节序(big-endian),常用于设置
sin_port
。
其他
AF_INET
、SOCK_STREAM
、SOCK_NONBLOCK
、SOCK_CLOEXEC
、SOL_SOCKET
、SO_REUSEADDR
、INADDR_ANY
、SOMAXCONN
。
注意:
INADDR_ANY
实际上是一个in_addr_t
宏,表示绑定到任意本地地址(0.0.0.0)。
行动起来:做一个RAII的Socket抽象
using socket_raw_t = int;
static constexpr const socket_raw_t INVALID_SOCK_INTERNAL = -1;
class Socket {
public:
Socket(const socket_raw_t fd);
virtual ~Socket();
socket_raw_t internal() const;
bool is_valid() const;
static bool is_valid(socket_raw_t fd);
void close();
protected:
socket_raw_t socket_fd;
// delete copy, allow move
};
class PassiveClientSocket : public Socket {
public:
PassiveClientSocket(const socket_raw_t fd);
ssize_t read(void* buffer_ptr, size_t size);
ssize_t write(const void* buffer_ptr, size_t size);
};
class ServerSocket : public Socket {
public:
ServerSocket(const int port);
ServerSocket(ServerSocket&&) = default;
socket_raw_t listen();
std::shared_ptr<PassiveClientSocket> accept();
int port() const;
private:
const int open_port;
// copy/move assignments deleted
};
Socket::Socket(socket_raw_t fd)
:用现有 fd 构造对象。Socket::~Socket()
:析构时会close()
(RAII)。socket_raw_t Socket::internal() const
:返回底层 fd。PassiveClientSocket::read/ write
:封装recv
/send
,返回字节数或错误。ServerSocket::listen()
:创建 socket、bind、listen,并把socket_fd
设置为监听 fd,返回 fd。ServerSocket::accept()
:调用accept4
,返回shared_ptr<PassiveClientSocket>
(出错返回nullptr
)。
逐个击破:Socket(基类)
代码片段(简化)
Socket::Socket(const socket_raw_t fd)
: socket_fd(fd) { }
void Socket::close() {
::close(socket_fd);
}
socket_raw_t Socket::internal() const { return socket_fd; }
bool Socket::is_valid() const { return is_valid(socket_fd); }
Socket
作为资源所有者(RAII)是合理的:析构时会close()
。构造时可接受一个已经存在的 fd,便于从accept
创建对象。is_valid()
通过与INVALID_SOCK_INTERNAL
比较判断 fd 是否有效。
PassiveClientSocket(被动客户端连接对象)
PassiveClientSocket::PassiveClientSocket(const socket_raw_t fd)
: Socket(fd) {
}
ssize_t PassiveClientSocket::read(void* buffer_ptr, size_t size) {
if (!is_valid())
throw "invalid socket!";
return ::recv(socket_fd, buffer_ptr, size, 0);
}
ssize_t PassiveClientSocket::write(const void* buffer_ptr, size_t size) {
if (!is_valid())
throw "invalid socket!";
return ::send(socket_fd, buffer_ptr, size, 0);
}
说明
- 简洁封装
recv
/send
。在调用前检查 socket 有效性是必要的。
ServerSocket(监听端)
socket_raw_t ServerSocket::listen() {
int listen_fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (listen_fd < 0) { throw "Socket Creation Error"; }
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in addr {};
addr.sin_family = AF_INET;
addr.sin_port = htons(open_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) != 0) {
throw "Bind Error!";
}
if (::listen(listen_fd, SOMAXCONN) != 0) {
throw "Listen Error!";
}
socket_fd = listen_fd;
return listen_fd;
}
- 创建非阻塞的监听 fd,并设置
SO_REUSEADDR
,然后bind
、listen
,最后把 fd 存入对象。
ServerSocket::accept()
返回被构建的
std::shared_ptr<PassiveClientSocket> ServerSocket::accept() {
sockaddr_in cli {};
socklen_t cli_len = sizeof(cli);
int fd = ::accept4(socket_fd, (sockaddr*)&cli, &cli_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (fd < 0) {
return nullptr;
}
return std::make_shared<PassiveClientSocket>(fd);
}
附录demo和socket代码
文件:socket.hpp
: socket的头文件
#pragma once
#include <cstddef>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
using socket_raw_t = int;
static constexpr const socket_raw_t INVALID_SOCK_INTERNAL = -1;
class Socket {
public:
socket_raw_t internal() const { return socket_fd; }
virtual ~Socket() {
close();
};
Socket(const socket_raw_t fd);
Socket(Socket&& other) noexcept
: socket_fd(other.socket_fd) {
other.socket_fd = INVALID_SOCK_INTERNAL;
}
bool is_valid() const {
return is_valid(socket_fd);
}
static bool is_valid(socket_raw_t fd) {
return fd != INVALID_SOCK_INTERNAL;
}
void close();
protected:
socket_raw_t socket_fd;
Socket() = delete;
Socket(const Socket&) = delete;
Socket& operator=(Socket&&) = delete;
Socket& operator=(const Socket&) = delete;
};
class PassiveClientSocket : public Socket {
public:
PassiveClientSocket(const socket_raw_t fd);
ssize_t read(void* buffer_ptr, size_t size);
ssize_t write(const void* buffer_ptr, size_t size);
};
class ServerSocket : public Socket {
public:
ServerSocket(ServerSocket&&) = default;
ServerSocket(const int port)
: Socket(INVALID_SOCK_INTERNAL)
, open_port(port) {
}
socket_raw_t listen();
int port() const { return open_port; }
std::shared_ptr<PassiveClientSocket> accept();
private:
const int open_port;
private:
ServerSocket() = delete;
ServerSocket(const ServerSocket&) = delete;
ServerSocket& operator=(ServerSocket&&) = delete;
ServerSocket& operator=(const ServerSocket&) = delete;
};
文件:socket.cpp
socket的实现源文件
#include "socket.hpp"
#include <memory>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
Socket::Socket(const socket_raw_t fd)
: socket_fd(fd) { }
void Socket::close() {
if (!is_valid())
return;
::close(socket_fd);
socket_fd = INVALID_SOCK_INTERNAL;
}
PassiveClientSocket::PassiveClientSocket(const socket_raw_t fd)
: Socket(fd) {
}
ssize_t PassiveClientSocket::read(void* buffer_ptr, size_t size) {
if (!is_valid())
throw "invalid socket!";
return ::recv(socket_fd, buffer_ptr, size, 0);
}
ssize_t PassiveClientSocket::write(const void* buffer_ptr, size_t size) {
if (!is_valid())
throw "invalid socket!";
return ::send(socket_fd, buffer_ptr, size, 0);
}
socket_raw_t ServerSocket::listen() {
int listen_fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (listen_fd < 0) {
throw "Socket Creation Error";
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in addr {};
addr.sin_family = AF_INET;
addr.sin_port = htons(open_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) != 0) {
throw "Bind Error!";
}
if (::listen(listen_fd, SOMAXCONN) != 0) {
throw "Listen Error!";
}
socket_fd = listen_fd;
return listen_fd;
}
std::shared_ptr<PassiveClientSocket> ServerSocket::accept() {
sockaddr_in cli {};
socklen_t cli_len = sizeof(cli);
int fd = ::accept4(socket_fd, (sockaddr*)&cli, &cli_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (fd < 0) {
return nullptr;
}
return std::make_shared<PassiveClientSocket>(fd);
}
文件:echo_server.cpp
(示例服务器)
// echo_server.cpp
#include "socket.hpp"
#include <iostream>
#include <vector>
#include <poll.h>
#include <fcntl.h>
#include <cstring>
#include <errno.h>
#include <unistd.h>
int set_blocking(int fd, bool block) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return -1;
if (block) flags &= ~O_NONBLOCK;
else flags |= O_NONBLOCK;
return fcntl(fd, F_SETFL, flags);
}
int main() {
const int port = 12345;
ServerSocket srv(port);
try {
srv.listen();
} catch (const std::exception &e) {
std::cerr << "listen failed: " << e.what() << "\n";
return 1;
} catch (...) {
std::cerr << "listen failed with unknown error\n";
return 1;
}
std::cout << "Echo server listening on port " << port << "\n";
struct pollfd pfd;
pfd.fd = srv.internal();
pfd.events = POLLIN;
pfd.revents = 0;
while (true) {
int ret = poll(&pfd, 1, -1); // wait indefinitely
if (ret < 0) {
if (errno == EINTR) continue;
perror("poll");
break;
}
if (pfd.revents & POLLIN) {
auto client = srv.accept();
if (!client) {
// either no incoming (EAGAIN) or an error; continue
continue;
}
int cfd = client->internal();
// For demo simplicity, set client socket to blocking to
// allow simple synchronous recv() usage
set_blocking(cfd, true);
std::cout << "Accepted client fd=" << cfd << "\n";
const size_t BUF_SIZE = 4096;
std::vector<char> buf(BUF_SIZE);
while (true) {
ssize_t n = client->read(buf.data(), BUF_SIZE);
if (n > 0) {
// echo back (handle partial write in loop)
ssize_t written = 0;
const char* p = buf.data();
while (written < n) {
ssize_t w = client->write(p + written, n - written);
if (w <= 0) {
perror("write");
break;
}
written += w;
}
} else if (n == 0) {
std::cout << "client closed connection\n";
break;
} else {
// error
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// no data (non-blocking) — break for demo
break;
} else {
perror("recv");
break;
}
}
}
// client shared_ptr goes out of scope here -> destructor will close fd
std::cout << "Connection handler done for fd=" << cfd << "\n";
}
}
return 0;
}
文件:echo_client.cpp
(示例客户端)
// echo_client.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
int main() {
const char* server_ip = "127.0.0.1";
const int server_port = 12345;
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket");
return 1;
}
sockaddr_in srv{};
srv.sin_family = AF_INET;
srv.sin_port = htons(server_port);
if (inet_pton(AF_INET, server_ip, &srv.sin_addr) <= 0) {
std::cerr << "inet_pton failed\n";
close(sockfd);
return 1;
}
if (connect(sockfd, (sockaddr*)&srv, sizeof(srv)) != 0) {
perror("connect");
close(sockfd);
return 1;
}
const char* msg = "Hello from client!\n";
ssize_t s = send(sockfd, msg, strlen(msg), 0);
if (s < 0) {
perror("send");
close(sockfd);
return 1;
}
char buf[4096];
ssize_t n = recv(sockfd, buf, sizeof(buf)-1, 0);
if (n > 0) {
buf[n] = '\0';
std::cout << "Received echo: " << buf;
} else if (n == 0) {
std::cout << "Server closed connection\n";
} else {
perror("recv");
}
close(sockfd);
return 0;
}
编译与运行(在 Linux 下)
假设文件在同一目录:
socket.hpp
socket.cpp
echo_server.cpp
echo_client.cpp
编译:
g++ -std=c++17 socket.cpp echo_server.cpp -o echo_server
g++ -std=c++17 echo_client.cpp -o echo_client
运行:
- 先在一个终端运行 server:
./echo_server
- 在另一个终端运行 client:
./echo_client
你应该在 server 端看到 Accepted client fd=...
等输出,client 端会打印收到的回显。