charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli 127.0.0.1:6379> lpush list hello (integer) 1 127.0.0.1:6379> brpop list 0 1) "list" 2) "hello" 127.0.0.1:6379> brpop list 0 //阻塞在这里 /* ---------------------------------------------------- */ //当我在另一个客户端lpush一个元素之后,客户端输出为 127.0.0.1:6379> brpop list 0 1) "list" 2) "world" (50.60s)//阻塞的时间
//阻塞状态
typedef struct blockingState {
/* Generic fields. */
mstime_t timeout; /* 超时时间 */
/* REDIS_BLOCK_LIST */
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
/* REDIS_BLOCK_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
} blockingState;
//继续列表
typedef struct readyList {
redisDb *db;//就绪键所在的数据库
robj *key;//就绪键
} readyList;
//客户端有关属性
typedef struct redisClient {
int btype; /* Type of blocking op if REDIS_BLOCKED. */
blockingState bpop; /* blocking state */
}
//服务器有关属性
struct redisServer {
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
}
//数据库有关属性
typedef struct redisDb {
//keys->redisCLient映射
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
}redisDB
void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
robj *o;
mstime_t timeout;
int j;
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= REDIS_OK) return;//将超时时间保存在timeout中
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表
if (o != NULL) {//如果不为空
if (o->type != REDIS_LIST) {//不是链表类型
addReply(c,shared.wrongtypeerr);//报错
return;
} else {
if (listTypeLength(o) != 0) {//链表不为空
/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);//从链表中pop出一个元素
redisAssert(value != NULL);
//给客户端发送pop出来的元素信息
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
/* 省略一部分 */
}
}
}
}
/* 如果链表为空,则阻塞客户端 */
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性
c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop, //则target为空
if (target != NULL) incrRefCount(target);//不为空,增加引用计数
for (j = 0; j < numkeys; j++) {
/* 将阻塞的key存入c.bpop.keys字典中 */
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
//将阻塞的key和客户端添加进c->db->blocking_keys
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);//添加到阻塞键的客户点链表中
}
blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志
}
void blockClient(redisClient *c, int btype) {
c->flags |= REDIS_BLOCKED;//设置标志
c->btype = btype;//阻塞操作类型
server.bpop_blocked_clients++;
}
if (listLength(server.ready_keys)) handleClientsBlockedOnLists();
//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);//将数据添加进数据库
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
//判断是否为链表类型,如果是,调用有链表已经ready函数
if (val->type == REDIS_LIST) signalListAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
readyList *rl;
/* 没有客户端阻塞在这个键上,则直接返回. */
if (dictFind(db->blocking_keys,key) == NULL) return;
/* 这个键已近被唤醒了,所以没必要重新入队 */
if (dictFind(db->ready_keys,key) != NULL) return;
/* Ok, 除了上述两情况,把这个键放入server.ready_keys */
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);//添加链表末尾
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
incrRefCount(key);
//同时将这个阻塞键放入db->ready_keys
redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
void handleClientsBlockedOnLists(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
/* 将server.ready_keys赋给一个新的list,再将server.ready_keys清空 */
l = server.ready_keys;
server.ready_keys = listCreate();
/* 迭代每一个就绪的每一个readyList */
while(listLength(l) != 0) {
listNode *ln = listFirst(l);//获取第一个就绪readyList
readyList *rl = ln->value;
/* 从rl所属的数据库中删除rl */
dictDelete(rl->db->ready_keys,rl->key);
/* 查询rl所属的数据库查找rl->key ,给阻塞客户端回复rl->key链表中的第一个元素*/
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == REDIS_LIST) {
dictEntry *de;
/* 在rl->db->blocking_keys查找阻塞在rl->key的客户端链表 */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);//转换为客户端链表
int numclients = listLength(clients);
while(numclients--) {//给每个客户端发送消息
listNode *clientnode = listFirst(clients);
redisClient *receiver = clientnode->value;//阻塞的客户端
robj *dstkey = receiver->bpop.target;//brpoplpush命令目的链表
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
REDIS_HEAD : REDIS_TAIL;//获取取出的方向
robj *value = listTypePop(o,where);//取出就绪链表的元素
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);//设置客户端为非阻塞状态
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == REDIS_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}//给客户端回复链表中的元素内容
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
break;
}
}
}
//如果链表为空,则从数据库中删除
if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
/* 回收rl */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
robj *argv[3];
if (dstkey == NULL) {
/* Propagate the [LR]POP operation. */
argv[0] = (where == REDIS_HEAD) ? shared.lpop :
shared.rpop;
argv[1] = key;
propagate((where == REDIS_HEAD) ?
server.lpopCommand : server.rpopCommand,
db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
/* BRPOP/BLPOP */
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,value);
} else {
/* BRPOPLPUSH */
/* 省略 */
}
}
int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
time_t now = now_ms/1000;
//..........
else if (c->flags & REDIS_BLOCKED) {
/* Blocked OPS timeout is handled with milliseconds resolution.
* However note that the actual resolution is limited by
* server.hz. */
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
/* Handle blocking operation specific timeout. */
replyToBlockedClientTimedOut(c);
unblockClient(c);
}
}
//.............
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有