源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

java多线程处理执行solr创建索引示例

  • 时间:2020-04-18 13:45 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:java多线程处理执行solr创建索引示例
[u]复制代码[/u] 代码如下:
public class SolrIndexer implements Indexer, Searcher, DisposableBean {  //~ Static fields/initializers =============================================  static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);  private static final long SHUTDOWN_TIMEOUT    = 5 * 60 * 1000L; // long enough  private static final int  INPUT_QUEUE_LENGTH  = 16384;  //~ Instance fields ========================================================  private CommonsHttpSolrServer server;  private BlockingQueue<Operation> inputQueue;  private Thread updateThread;  volatile boolean running = true;  volatile boolean shuttingDown = false;  //~ Constructors ===========================================================  public SolrIndexer(String url) throws MalformedURLException {   server = new CommonsHttpSolrServer(url);   inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);   updateThread = new Thread(new UpdateTask());   updateThread.setName("SolrIndexer");   updateThread.start();  }  //~ Methods ================================================================  public void setSoTimeout(int timeout) {   server.setSoTimeout(timeout);  }  public void setConnectionTimeout(int timeout) {   server.setConnectionTimeout(timeout);  }  public void setAllowCompression(boolean allowCompression) {   server.setAllowCompression(allowCompression);  }  public void addIndex(Indexable indexable) throws IndexingException {   if (shuttingDown) {    throw new IllegalStateException("SolrIndexer is shutting down");   }   inputQueue.offer(new Operation(indexable, OperationType.UPDATE));  }    public void delIndex(Indexable indexable) throws IndexingException {   if (shuttingDown) {    throw new IllegalStateException("SolrIndexer is shutting down");   }   inputQueue.offer(new Operation(indexable, OperationType.DELETE));  }    private void updateIndices(String type, List<Indexable> indices) throws IndexingException {   if (indices == null || indices.size() == 0) {    return;   }   logger.debug("Updating {} indices", indices.size());   UpdateRequest req = new UpdateRequest("/" + type + "/update");   req.setAction(UpdateRequest.ACTION.COMMIT, false, false);   for (Indexable idx : indices) {    Doc doc = idx.getDoc();    SolrInputDocument solrDoc = new SolrInputDocument();    solrDoc.setDocumentBoost(doc.getDocumentBoost());    for (Iterator<Field> i = doc.iterator(); i.hasNext();) {     Field field = i.next();     solrDoc.addField(field.getName(), field.getValue(), field.getBoost());    }    req.add(solrDoc);      }   try {    req.process(server);      } catch (SolrServerException e) {    logger.error("SolrServerException occurred", e);    throw new IndexingException(e);   } catch (IOException e) {    logger.error("IOException occurred", e);    throw new IndexingException(e);   }  }    private void delIndices(String type, List<Indexable> indices) throws IndexingException {   if (indices == null || indices.size() == 0) {    return;   }   logger.debug("Deleting {} indices", indices.size());   UpdateRequest req = new UpdateRequest("/" + type + "/update");   req.setAction(UpdateRequest.ACTION.COMMIT, false, false);   for (Indexable indexable : indices) {       req.deleteById(indexable.getDocId());   }   try {    req.process(server);   } catch (SolrServerException e) {    logger.error("SolrServerException occurred", e);    throw new IndexingException(e);   } catch (IOException e) {    logger.error("IOException occurred", e);    throw new IndexingException(e);   }  }    public QueryResult search(Query query) throws IndexingException {   SolrQuery sq = new SolrQuery();   sq.setQuery(query.getQuery());   if (query.getFilter() != null) {    sq.addFilterQuery(query.getFilter());   }   if (query.getOrderField() != null) {    sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);   }   sq.setStart(query.getOffset());   sq.setRows(query.getLimit());   QueryRequest req = new QueryRequest(sq);   req.setPath("/" + query.getType() + "/select");   try {    QueryResponse rsp = req.process(server);    SolrDocumentList docs = rsp.getResults();    QueryResult result = new QueryResult();    result.setOffset(docs.getStart());    result.setTotal(docs.getNumFound());    result.setSize(sq.getRows());    List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());    for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {     SolrDocument solrDocument = i.next();     Doc doc = new Doc();     for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {      Map.Entry<String, Object> field = iter.next();      doc.addField(field.getKey(), field.getValue());     }     resultDocs.add(doc);    }    result.setDocs(resultDocs);    return result;   } catch (SolrServerException e) {    logger.error("SolrServerException occurred", e);    throw new IndexingException(e);   }  }    public void destroy() throws Exception {   shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);    }  public boolean shutdown(long timeout, TimeUnit unit) {   if (shuttingDown) {    logger.info("Suppressing duplicate attempt to shut down");    return false;   }   shuttingDown = true;   String baseName = updateThread.getName();   updateThread.setName(baseName + " - SHUTTING DOWN");   boolean rv = false;   try {    // Conditionally wait    if (timeout > 0) {     updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");     rv = waitForQueue(timeout, unit);    }   } finally {    // But always begin the shutdown sequence    running = false;    updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");   }   return rv;  }  /**   * @param timeout   * @param unit   * @return   */  private boolean waitForQueue(long timeout, TimeUnit unit) {   CountDownLatch latch = new CountDownLatch(1);   inputQueue.add(new StopOperation(latch));   try {    return latch.await(timeout, unit);   } catch (InterruptedException e) {    throw new RuntimeException("Interrupted waiting for queues", e);   }  }    class UpdateTask implements Runnable {   public void run() {    while (running) {     try {      syncIndices();     } catch (Throwable e) {      if (shuttingDown) {       logger.warn("Exception occurred during shutdown", e);      } else {       logger.error("Problem handling solr indexing updating", e);      }     }    }    logger.info("Shut down SolrIndexer");   }  }  void syncIndices() throws InterruptedException {   Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);   if (op == null) {    return;   }   if (op instanceof StopOperation) {    ((StopOperation) op).stop();    return;   }   // wait 1 second   try {    Thread.sleep(1000);   } catch (InterruptedException e) {   }   List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);   ops.add(op);   inputQueue.drainTo(ops);   Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);   Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);   for (Operation o : ops) {    if (o instanceof StopOperation) {     ((StopOperation) o).stop();    } else {     Indexable indexable = o.indexable;     if (o.type == OperationType.DELETE) {      List<Indexable> docs = deleteMap.get(indexable.getType());      if (docs == null) {       docs = new LinkedList<Indexable>();       deleteMap.put(indexable.getType(), docs);      }      docs.add(indexable);     } else {      List<Indexable> docs = updateMap.get(indexable.getType());      if (docs == null) {       docs = new LinkedList<Indexable>();       updateMap.put(indexable.getType(), docs);      }      docs.add(indexable);     }    }   }   for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {    Map.Entry<String, List<Indexable>> entry = i.next();    delIndices(entry.getKey(), entry.getValue());   }   for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {    Map.Entry<String, List<Indexable>> entry = i.next();    updateIndices(entry.getKey(), entry.getValue());   }  }  enum OperationType { DELETE, UPDATE, SHUTDOWN }  static class Operation {   OperationType type;   Indexable indexable;   Operation() {}   Operation(Indexable indexable, OperationType type) {    this.indexable = indexable;    this.type = type;   }  }  static class StopOperation extends Operation {   CountDownLatch latch;   StopOperation(CountDownLatch latch) {    this.latch = latch;    this.type = OperationType.SHUTDOWN;   }   public void stop() {    latch.countDown();   }  }  //~ Accessors =============== }
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部