//static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。 private static List<Queue> queueCache = new LinkedList<Queue>();
private Integer offerMaxQueue = 2000;
new Thread(){
public void run(){
while(true){
String ip = null;
try {
synchronized (queueCache) {
Integer size = queueCache.size();
if(size==0){
//队列缓存池没有消息,等待。。。。 queueCache.wait();
}
Queue queue = queueCache.remove(0);
if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
continue;
}else{
;//这里是处理该消息的操作。
}
size = queueCache.size();
if(size<offerMaxQueue&&size>=0){ queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
try {//检出该消息队列的锁
unIpLock(queueStr);
} catch (Execption e) {//捕获异常,不能让线程挂掉
e.printStackTrace();
}
}
}
}.start();
synchronized (queueCache) {
while(true){
Integer size = queueCache.size();
if(size>=offerMaxQueue){
try {
queueCache.wait();
continue;//继续执行等待中的检入任务。
} catch (InterruptedException e) {
e.printStackTrace();
}
}//IF
if(size<=offerMaxQueue&&size>0){
queueCache.notifyAll();
}
break;//检入完毕
}//while
}
/**
* 锁
* @param ip
* @return
* @throws
*/
public Boolean isLock(String queueStr) {
return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
}
//解锁
public void unIpLock(String queueStr) {
if(ip!=null){
this.redisManager.del(queueStr+"_lock");
// lock.unlock();
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有