Redis基于多路复用技术实现了一套简单的事件驱动库,代码在ae.h、ae.c以及ae_epoll.c、ae_evport.c和ae_kqueue.c、ae_select.c这几个文件中。其中ae表示的是antirez eventloop的意思。
Redis里面包含两种事件类型:FileEvent和TimeEvent。
Redis采用IO多路复用技术,所有的事件都是在一个线程中进行处理。Redis的事件驱动模型可以以以下为代码进行表示:
int main(int argc,char **argv)
{
while(true) {
// 等待事件到来:wait4Event();
// 处理事件:processEvent()
}
}
在一个死循环中等待事件的到来,然后对事件进行处理,以此往复。这就是一个最经典的网络编程模型。
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
aeEventLoop是Redis中事件驱动模型的核心,封装了整个事件循环,其中每个字段解释如下:
这两个结构分别表示文件事件和时间事件,定义如下
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc; // 函数指针,写事件处理
aeFileProc *wfileProc; // 函数指针,读事件处理
void *clientData; // 具体的数据
} aeFileEvent;
其中mask表示文件事件类型掩码,可以是AE_READABLE表示是可读事件,AE_WRITABLE为可写事件。aeFileProc是函数指针。
/* Time event structure */
typedef struct aeTimeEvent {
long long id; // 事件ID
long when_sec; // 事件触发的时间:s
long when_ms; // 事件触发的时间:ms
aeTimeProc *timeProc; // 函数指针
aeEventFinalizerProc *finalizerProc; // 函数指针:在对应的aeTieEvent节点被删除前调用,可以理解为aeTimeEvent的析构函数
void *clientData; // 指针,指向具体的数据
struct aeTimeEvent *next; // 指向下一个时间事件指针
} aeTimeEvent;
aeFiredEvent结构表示一个已经被触发的事件,结果如下:
/* A fired event */
typedef struct aeFiredEvent {
int fd; // 事件被触发的文件描述符
int mask; // 被触发事件的掩码,表示被触发事件的类型
} aeFiredEvent;
fd表示事件发生在哪个文件描述符上面,mask用来表示具体事件的类型。
Redis底层采用IO多路复用技术实现高并发,具体实现可以采用kqueue、select、epoll等技术。对于Linux来说,epoll的性能要优于select,所以以epoll为例来进行分析。
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
aeApiState封装了跟epoll相关的数据,epfd保存epoll_create()返回的文件描述符。
事件驱动的启动代码位于ae.c的aeMain()函数中,代码如下:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
从aeMain()方法中可以看到,整个事件驱动是在一个while()循环中不停地执行aeProcessEvents()方法,在这个方法中执行从客户端发送过来的请求。
初始化:aeCreateEventLoop()
aeEventLoop的初始化是在aeCreateEventLoop()方法中进行的,这个方法是在server.c中的initServer()中调用的。实现如下:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
// 调用aeApiCreate()初始化epoll相关的数据
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
/**
* 把每个刚新建的aeFileEvent.mask设置为AE_NONE
* 这点是必须的
*/
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
在这个方法中主要就是给aeEventLoop对象分配内存然后并进行初始化。其中关键的地方有:
1、调用aeApiCreate()初始化epoll相关的数据。aeApiCreate()实现如下:
static int aeApiCreate(aeEventLoop *eventLoop) {
// 1.分配内存
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 1.分配events内存,epoll_event的大小为setsize
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 2.调用epoll_create()生成epoll文件描述符,并保存在epfd这个域中
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 把apidata指针指向第一步中分配的内存地址
eventLoop->apidata = state;
return 0;
}
在aeApiCreate()方法中主要完成以下三件事:
2、初始化events中的mask字段为为AE_NONE。
Redis使用aeCreateFileEvent()来生成fileEvent,代码如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 1. 检查新增的fd是否超过所能容纳最大值
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
// 2. 调用aeApiAddEvent()方法把对应的fd以mask模式添加到epoll监听器中
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 3. 设置相应的字段值
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
// 如果有需要则修改maxfd字段的值
eventLoop->maxfd = fd;
return AE_OK;
}
aeCreateFileEvent()方法主要做了一下三件事:
检查新增的fd是否超过所能容纳最大值。 调用aeApiAddEvent()方法把对应的fd以mask模式添加到epoll监听器中。 设置相应的字段值。其中最关键的步骤是第二步,aeApiAddEvent()方法如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
生成timeEvent:aeCreateTimeEvent()
aeCreateTimeEvent()方法主要是用来生成timeEvent节点,其实现比较简单,代码如下所示:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 1. 获取id
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
// 2. 分配内存
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
// 3. 设置aeTimeEvent的各个字段的值
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
// timeEventHead总是指向最新添加的timeEvent节点
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
Redis在processTimeEvents()方法中来处理所有的timeEvent,实现如下:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
/**
* 如果系统时间被调整到将来某段时间然后又被设置回正确的时间,
* 这种情况下链表中的timeEvent有可能会被随机的延迟执行,因
* 此在这个情况下把所有的timeEvent的触发时间设置为0表示及执行
*/
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now; // 设置上次运行时间为now
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/**
* 删除已经被标志位 删除 的时间事件
*/
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
// 在时间事件节点被删除前调用finlizerProce()方法
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
if (te->id > maxId) {
/**
* te->id > maxId 表示当前te指向的timeEvent为当前循环中新添加的,
* 对于新添加的节点在本次循环中不作处理。
* PS:为什么会出现这种情况呢?有可能是在timeProc()里面会注册新的timeEvent节点?
* 对于当前的Redis版本中不会出现te->id > maxId这种情况
*/
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
// 如果当前时间已经超过了对应的timeEvent节点设置的触发时间,
// 则调用timeProc()方法执行对应的任务
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
// 要执行多次,则计算下次执行时间
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 如果只需要执行一次,则把id设置为-1,再下次循环中删除
te->id = AE_DELETED_EVENT_ID;
}
}
prev = te;
te = te->next;
}
return processed;
}
在这个方法中会
Redis中所有的事件,包括timeEvent和fileEvent都是在aeProcessEvents()方法中进行处理的,刚方法实现如下:
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/**
* 既没有时间事件也没有文件事件,则直接返回
*/
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/**
* -1 == eventloop->maxfd 表示还么有任何aeFileEvent被添加到epoll
* 事件循环中进行监听
*/
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
/**
* 如果有aeFileEvent需要处理,就先要从所有待处理的
* aeTimeEvent事件中找到最近的将要被执行的aeTimeEvent节点
* 并结算该节点触发时间
*/
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
// 计算epoll_wait()需要等待的时间
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
// 如果flags设置了AE_DONT_WAIT,则设置epoll_wait()等待时间为0,
// 即立刻从epoll中返回
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 调用aeApiPoll()进行阻塞等待事件的到来,等待时间为tvp
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// fe->mask && mask 的目的是确保对应事件时候还有效
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
// 处理aeTimeEvent
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
该方法的入参flag表示要处理哪些事件,可以取以下几个值 :
aeProcessEvents()方法会做下面几件事:
判断传入的flag的值,如果既不包含AE_TIME_EVENTS也不包含AE_FILE_EVENTS则直接返回。
计算如果有aeFileEvent事件需要进行处理,则先计算epoll_wait()方法需要阻塞等待的时间,计算方式如下:
调用aeApiPoll()方法阻塞等待事件的到来,阻塞时间为第二步中计算的时间。aeApiPoll()实现如下:
static int aeApiPoll(aeEventLoop eventLoop, struct timeval tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0;
// 1. 根据传入的tvp计算需要等待时间,然后调用epoll_wait()进行阻塞等待
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// 有事件到来
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// 2. 计算到来的event的类型
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// 3. 把有事件发生的fd以及对应的mask类型拷贝到eventloop->fired数组中
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
} aeApiPoll()会做下面几件事:
从aeApiPoll()方法返回之后,所有事件已经就绪了的fd以及对应事件的类型mask已经保存在eventLoop->fired[]数组中。依次遍历fired数组,根据mask类型,执行对应的frileProc()或者wfileProce()方法。
如果传入的flags中有AE_TIME_EVENTS,则调用processTimeEvents()执行所有已经到时间了的timeEvent。