源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

详解spring集成mina实现服务端主动推送(包含心跳检测)

  • 时间:2021-06-28 14:55 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:详解spring集成mina实现服务端主动推送(包含心跳检测)
本文介绍了spring集成mina实现服务端主动推送(包含心跳检测),分享给大家,具体如下: [b]服务端[/b] 1.常规的spring工程集成mina时,pom.xml中需要加入如下配置:
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-jdk14</artifactId>
    <version>1.7.7</version>
  </dependency>
  <dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-integration-beans</artifactId>
    <version>2.0.13</version>
  </dependency>
  <dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-core</artifactId>
    <version>2.0.13</version>
    <type>bundle</type> 
    <scope>compile</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-integration-spring</artifactId>
    <version>1.1.7</version>
  </dependency>
注意此处mina-core的配置写法。如果工程中引入上述依赖之后报错:Missing artifact xxx bundle,则需要在pom.xml的plugins之间加入如下插件配置:
  <plugin>
    <groupId>org.apache.felix</groupId>
    <artifactId>maven-bundle-plugin</artifactId>
    <extensions>true</extensions>
  </plugin>
2.Filter1:编解码器,实现ProtocolCodecFactory解码工厂
package com.he.server;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineDecoder;
import org.apache.mina.filter.codec.textline.TextLineEncoder;
public class MyCodeFactory implements ProtocolCodecFactory {
  private final TextLineEncoder encoder;
  private final TextLineDecoder decoder;
  public MyCodeFactory() {
    this(Charset.forName("utf-8"));
  }
  public MyCodeFactory(Charset charset) {
    encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);
    decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);
  }
  public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
    // TODO Auto-generated method stub
    return decoder;
  }
  public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
    // TODO Auto-generated method stub
    return encoder;
  }
  public int getEncoderMaxLineLength() {
    return encoder.getMaxLineLength();
  }
  public void setEncoderMaxLineLength(int maxLineLength) {
    encoder.setMaxLineLength(maxLineLength);
  }
  public int getDecoderMaxLineLength() {
    return decoder.getMaxLineLength();
  }
  public void setDecoderMaxLineLength(int maxLineLength) {
    decoder.setMaxLineLength(maxLineLength);
  }
}

此处使用了mina自带的TextLineEncoder编解码器,此解码器支持使用固定长度或者固定分隔符来区分上下两条消息。如果要使用自定义协议,则需要自己编写解码器。要使用websocket,也需要重新编写解码器,关于mina结合websocket,jira上有一个开源项目[url=https://issues.apache.org/jira/browse/DIRMINA-907]https://issues.apache.org/jira/browse/DIRMINA-907[/url],专门为mina编写了支持websocket的编解码器,亲测可用。。。此部分不是本文重点,略。 3.Filter2:心跳工厂,加入心跳检测功能需要实现KeepAliveMessageFactory:
package com.he.server;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{

  private final Logger LOG = Logger.getLogger(MyKeepAliveMessageFactory.class);

  /** 心跳包内容 */ 
  private static final String HEARTBEATREQUEST = "1111"; 
  private static final String HEARTBEATRESPONSE = "1112"; 
  public Object getRequest(IoSession session) {
    LOG.warn("请求预设信息: " + HEARTBEATREQUEST); 
     return HEARTBEATREQUEST;
  }
  public Object getResponse(IoSession session, Object request) {
    LOG.warn("响应预设信息: " + HEARTBEATRESPONSE); 
    /** 返回预设语句 */ 
    return HEARTBEATRESPONSE; 
  }
  public boolean isRequest(IoSession session, Object message) {
     LOG.warn("请求心跳包信息: " + message); 
     if (message.equals(HEARTBEATREQUEST)) 
       return true; 
     return false; 
  }
  public boolean isResponse(IoSession session, Object message) {
   LOG.warn("响应心跳包信息: " + message); 
   if(message.equals(HEARTBEATRESPONSE)) 
     return true;
    return false;
  }
}

此处我设置服务端发送的心跳包是1111,客户端应该返回1112. 4.实现必不可少的IoHandlerAdapter,得到监听事件处理权:
package com.he.server;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class MyHandler extends IoHandlerAdapter {
  //private final int IDLE = 3000;//(单位s)
  private final Logger LOG = Logger.getLogger(MyHandler.class);
  // public static Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());
  public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();
  @Override
  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    session.closeOnFlush();
    LOG.warn("session occured exception, so close it." + cause.getMessage());
  }
  @Override
  public void messageReceived(IoSession session, Object message) throws Exception {
    String str = message.toString();
    LOG.warn("客户端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress() + "连接成功!");
    session.setAttribute("type", message);
    String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();
    session.setAttribute("ip", remoteAddress);
    LOG.warn("服务器收到的消息是:" + str);
    session.write("welcome by he");
  }
  @Override
  public void messageSent(IoSession session, Object message) throws Exception {
    LOG.warn("messageSent:" + message);
  }
  @Override
  public void sessionCreated(IoSession session) throws Exception {
    LOG.warn("remote client [" + session.getRemoteAddress().toString() + "] connected.");
    // my
    Long time = System.currentTimeMillis();
    session.setAttribute("id", time);
    sessionsConcurrentHashMap.put(time, session);
  }
  @Override
  public void sessionClosed(IoSession session) throws Exception {
    LOG.warn("sessionClosed.");
    session.closeOnFlush();
    // my
    sessionsConcurrentHashMap.remove(session.getAttribute("id"));
  }
  @Override
  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    LOG.warn("session idle, so disconnecting......");
    session.closeOnFlush();
    LOG.warn("disconnected.");
  }
  @Override
  public void sessionOpened(IoSession session) throws Exception {
    LOG.warn("sessionOpened.");
    // 
    //session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDLE);
  }
}

此处有几点说明: 第一点:网上教程会在此处(sessionOpened()方法中)设置IDLE,IDLE表示session经过多久判定为空闲的时间,单位s,上述代码中已经注释掉了,因为后面在spring配置中加入心跳检测部分时会进行IDLE的配置,已经不需要在此处进行配置了,而且如果在心跳配置部分和此处都对BOTH_IDLE模式设置了空闲时间,亲测发现此处配置不生效。 第二点:关于存放session的容器,建议使用
[url=https://github.com/smile326/minaSpring]https://github.com/smile326/minaSpring[/url] 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程素材网。
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部