源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

基于Java回顾之多线程同步的使用详解

  • 时间:2020-05-01 02:11 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:基于Java回顾之多线程同步的使用详解
首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的“线程池”,JDK为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索。 [b]为什么要线程同步?[/b] 说到线程同步,大部分情况下, 我们是在针对“[b]单对象多线程[/b]”的情况进行讨论,一般会将其分成两部分,一部分是关于“共享变量”,一部分关于“执行步骤”。 [b]共享变量[/b] 当我们在线程对象(Runnable)中定义了全局变量,run方法会修改该变量时,如果有多个线程同时使用该线程对象,那么就会造成全局变量的值被同时修改,造成错误。我们来看下面的代码:
[url=http://www.1sucai.cn/article/36550.htm]基于Java回顾之网络通信的应用分析[/url]>中,已经构建了一个Socket连接池,这里我们在此基础上,构建一个线程池,完成基本的启动、休眠、唤醒、停止操作。 基本思路还是以数组的形式保持一系列线程,通过Socket通信,客户端向服务器端发送命令,当服务器端接收到命令后,根据收到的命令对线程数组中的线程进行操作。 Socket客户端的代码保持不变,依然采用构建Socket连接池时的代码,我们主要针对服务器端进行改造。 首先,我们需要定义一个线程对象,它用来执行我们的业务操作,这里简化起见,只让线程进行休眠。
[u]复制代码[/u] 代码如下:
定义线程对象  enum ThreadStatus  {      Initial,      Running,      Sleeping,      Stopped  }  enum ThreadTask  {      Start,      Stop,      Sleep,      Wakeup  }    class MyThread extends Thread  {      public ThreadStatus status = ThreadStatus.Initial;      public ThreadTask task;      public void run()      {          status = ThreadStatus.Running;          while(true)          {              try {                  Thread.sleep(3000);                  if (status == ThreadStatus.Sleeping)                  {                      System.out.println(Thread.currentThread().getName() + " 进入休眠状态。");                      this.wait();                  }              } catch (InterruptedException e) {                  System.out.println(Thread.currentThread().getName() + " 运行过程中出现错误。");                  status = ThreadStatus.Stopped;              }          }      }  }
然后,我们需要定义一个线程管理器,它用来对线程池中的线程进行管理,代码如下:
[u]复制代码[/u] 代码如下:
定义线程池管理对象  class MyThreadManager  {      public static void manageThread(MyThread[] threads, ThreadTask task)      {          for (int i = 0; i < threads.length; i++)          {              synchronized(threads[i])              {                  manageThread(threads[i], task);              }          }          System.out.println(getThreadStatus(threads));      }      public static void manageThread(MyThread thread, ThreadTask task)      {          if (task == ThreadTask.Start)          {              if (thread.status == ThreadStatus.Running)              {                  return;              }              if (thread.status == ThreadStatus.Stopped)              {                  thread = new MyThread();              }              thread.status = ThreadStatus.Running;              thread.start();          }          else if (task == ThreadTask.Stop)          {              if (thread.status != ThreadStatus.Stopped)              {                  thread.interrupt();                  thread.status = ThreadStatus.Stopped;              }          }          else if (task == ThreadTask.Sleep)          {              thread.status = ThreadStatus.Sleeping;          }          else if (task == ThreadTask.Wakeup)          {              thread.notify();              thread.status = ThreadStatus.Running;          }      }      public static String getThreadStatus(MyThread[] threads)      {          StringBuffer sb = new StringBuffer();          for (int i = 0; i < threads.length; i++)          {              sb.append(threads[i].getName() + "的状态:" + threads[i].status).append("\r\n");          }          return sb.toString();      }  }
最后,是我们的服务器端,它不断接受客户端的请求,每收到一个连接请求,服务器端会新开一个线程,来处理后续客户端发来的各种操作指令。
[u]复制代码[/u] 代码如下:
定义服务器端线程池对象  public class MyThreadPool {      public static void main(String[] args) throws IOException      {          MyThreadPool pool = new MyThreadPool(5);      }      private int threadCount;      private MyThread[] threads = null;           public MyThreadPool(int count) throws IOException      {          this.threadCount = count;          threads = new MyThread[count];          for (int i = 0; i < threads.length; i++)          {              threads[i] = new MyThread();              threads[i].start();          }          Init();      }      private void Init() throws IOException      {          ServerSocket serverSocket = new ServerSocket(5678);          while(true)          {              final Socket socket = serverSocket.accept();              Thread thread = new Thread()              {                  public void run()                  {                      try                      {                          System.out.println("检测到一个新的Socket连接。");                          BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));                          PrintStream ps = new PrintStream(socket.getOutputStream());                          String line = null;                          while((line = br.readLine()) != null)                          {                              System.out.println(line);                              if (line.equals("Count"))                              {                                  System.out.println("线程池中有5个线程");                              }                              else if (line.equals("Status"))                              {                                  String status = MyThreadManager.getThreadStatus(threads);                                  System.out.println(status);                              }                              else if (line.equals("StartAll"))                              {                                  MyThreadManager.manageThread(threads, ThreadTask.Start);                              }                              else if (line.equals("StopAll"))                              {                                  MyThreadManager.manageThread(threads, ThreadTask.Stop);                              }                              else if (line.equals("SleepAll"))                              {                                  MyThreadManager.manageThread(threads, ThreadTask.Sleep);                              }                              else if (line.equals("WakeupAll"))                              {                                  MyThreadManager.manageThread(threads, ThreadTask.Wakeup);                              }                              else if (line.equals("End"))                              {                                  break;                              }                              else                              {                                  System.out.println("Command:" + line);                              }                              ps.println("OK");                              ps.flush();                          }                      }                      catch(Exception ex)                      {                          ex.printStackTrace();                      }                  }              };              thread.start();          }      }  }
[b]探索JDK中的concurrent工具包[/b] 为了简化开发人员在进行多线程开发时的工作量,并减少程序中的bug,JDK提供了一套concurrent工具包,我们可以用它来方便的开发多线程程序。 线程池 我们在上面实现了一个非常“简陋”的线程池,concurrent工具包中也提供了线程池,而且使用非常方便。 concurrent工具包中的线程池分为3类:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。 首先我们来定义一个Runnable的对象
[u]复制代码[/u] 代码如下:
定义Runnable对象  class MyRunner implements Runnable  {      public void run() {          System.out.println(Thread.currentThread().getName() + "运行开始");          for(int i = 0; i < 1; i++)          {              try              {                  System.out.println(Thread.currentThread().getName() + "正在运行");                  Thread.sleep(200);              }              catch(Exception ex)              {                  ex.printStackTrace();              }          }          System.out.println(Thread.currentThread().getName() + "运行结束");      }  }
可以看出,它的功能非常简单,只是输出了线程的执行过程。 [b]ScheduledThreadPool[/b] 这和我们平时使用的ScheduledTask比较类似,或者说很像Timer,它可以使得一个线程在指定的一段时间内开始运行,并且在间隔另外一段时间后再次运行,直到线程池关闭。 示例代码如下:
[u]复制代码[/u] 代码如下:
ScheduledThreadPool示例  private static void scheduledThreadPoolTest()  {      final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);      MyRunner runner = new MyRunner();      final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);      final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);      scheduler.schedule(new Runnable()      {          public void run()          {              handler1.cancel(true);              handler2.cancel(true);              scheduler.shutdown();          }      }, 30, TimeUnit.SECONDS      );  }
FixedThreadPool 这是一个指定容量的线程池,即我们可以指定在同一时间,线程池中最多有多个线程在运行,超出的线程,需要等线程池中有空闲线程时,才能有机会运行。 来看下面的代码:
[u]复制代码[/u] 代码如下:
FixedThreadPool示例  private static void fixedThreadPoolTest()  {      ExecutorService exec = Executors.newFixedThreadPool(3);      for(int i = 0; i < 5; i++)      {          MyRunner runner = new MyRunner();          exec.execute(runner);      }      exec.shutdown();  }
注意它的输出结果:
[u]复制代码[/u] 代码如下:
pool-1-thread-1运行开始 pool-1-thread-1正在运行 pool-1-thread-2运行开始 pool-1-thread-2正在运行 pool-1-thread-3运行开始 pool-1-thread-3正在运行 pool-1-thread-1运行结束 pool-1-thread-1运行开始 pool-1-thread-1正在运行 pool-1-thread-2运行结束 pool-1-thread-2运行开始 pool-1-thread-2正在运行 pool-1-thread-3运行结束 pool-1-thread-1运行结束 pool-1-thread-2运行结束
可以看到从始至终,最多有3个线程在同时运行。 CachedThreadPool 这是另外一种线程池,它不需要指定容量,只要有需要,它就会创建新的线程。 它的使用方式和FixedThreadPool非常像,来看下面的代码:
[u]复制代码[/u] 代码如下:
CachedThreadPool示例  private static void cachedThreadPoolTest()  {      ExecutorService exec = Executors.newCachedThreadPool();      for(int i = 0; i < 5; i++)      {          MyRunner runner = new MyRunner();          exec.execute(runner);      }      exec.shutdown();  }
它的执行结果如下:
[u]复制代码[/u] 代码如下:
pool-1-thread-1运行开始 pool-1-thread-1正在运行 pool-1-thread-2运行开始 pool-1-thread-2正在运行 pool-1-thread-3运行开始 pool-1-thread-3正在运行 pool-1-thread-4运行开始 pool-1-thread-4正在运行 pool-1-thread-5运行开始 pool-1-thread-5正在运行 pool-1-thread-1运行结束 pool-1-thread-2运行结束 pool-1-thread-3运行结束 pool-1-thread-4运行结束 pool-1-thread-5运行结束
可以看到,它创建了5个线程。 处理线程返回值 在有些情况下,我们需要使用线程的返回值,在上述的所有代码中,线程这是执行了某些操作,没有任何返回值。 如何做到这一点呢?我们可以使用JDK中的Callable<T>和CompletionService<T>,前者返回单个线程的结果,后者返回一组线程的结果。 返回单个线程的结果 还是直接看代码吧:
[u]复制代码[/u] 代码如下:
Callable示例  private static void callableTest() throws InterruptedException, ExecutionException  {      ExecutorService exec = Executors.newFixedThreadPool(1);      Callable<String> call = new Callable<String>()      {          public String call()          {              return "Hello World.";          }      };      Future<String> result = exec.submit(call);      System.out.println("线程的返回值是" + result.get());      exec.shutdown();  }
执行结果如下:
[u]复制代码[/u] 代码如下:
线程的返回值是Hello World.
返回线程池中每个线程的结果 这里需要使用CompletionService<T>,代码如下:
[u]复制代码[/u] 代码如下:
CompletionService示例  private static void completionServiceTest() throws InterruptedException, ExecutionException  {      ExecutorService exec = Executors.newFixedThreadPool(10);      CompletionService<String> service = new ExecutorCompletionService<String>(exec);      for (int i = 0; i < 10; i++)      {          Callable<String> call = new Callable<String>()          {              public String call() throws InterruptedException              {                  return Thread.currentThread().getName();              }          };          service.submit(call);      }      Thread.sleep(1000);      for(int i = 0; i < 10; i++)      {          Future<String> result = service.take();          System.out.println("线程的返回值是" + result.get());      }      exec.shutdown();  }
执行结果如下:
[u]复制代码[/u] 代码如下:
线程的返回值是pool-2-thread-1 线程的返回值是pool-2-thread-2 线程的返回值是pool-2-thread-3 线程的返回值是pool-2-thread-5 线程的返回值是pool-2-thread-4 线程的返回值是pool-2-thread-6 线程的返回值是pool-2-thread-8 线程的返回值是pool-2-thread-7 线程的返回值是pool-2-thread-9 线程的返回值是pool-2-thread-10
实现生产者-消费者模型 对于生产者-消费者模型来说,我们应该都不会陌生,通常我们都会使用某种数据结构来实现它。在concurrent工具包中,我们可以使用BlockingQueue来实现生产者-消费者模型,如下:
[u]复制代码[/u] 代码如下:
BlockingQueue示例  public class BlockingQueueSample {      public static void main(String[] args)      {          blockingQueueTest();      }      private static void blockingQueueTest()      {          final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();          final int maxSleepTimeForSetter = 10;          final int maxSleepTimerForGetter = 10;          Runnable setter = new Runnable()          {              public void run()              {                  Random r = new Random();                  while(true)                  {                      int value = r.nextInt(100);                      try                      {                          queue.put(new Integer(value));                          System.out.println(Thread.currentThread().getName() + "---向队列中插入值" + value);                          Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);                      }                      catch(Exception ex)                      {                          ex.printStackTrace();                      }                  }              }          };          Runnable getter = new Runnable()          {              public void run()              {                  Random r = new Random();                  while(true)                  {                      try                      {                          if (queue.size() == 0)                          {                              System.out.println(Thread.currentThread().getName() + "---队列为空");                          }                          else                          {                              int value = queue.take().intValue();                              System.out.println(Thread.currentThread().getName() + "---从队列中获取值" + value);                          }                          Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);                      }                      catch(Exception ex)                      {                          ex.printStackTrace();                      }                  }              }          };          ExecutorService exec = Executors.newFixedThreadPool(2);          exec.execute(setter);          exec.execute(getter);      }  }
我们定义了两个线程,一个线程向Queue中添加数据,一个线程从Queue中取数据。我们可以通过控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,来使得程序得出不同的结果。 可能的执行结果如下:
[u]复制代码[/u] 代码如下:
pool-1-thread-1---向队列中插入值88 pool-1-thread-2---从队列中获取值88 pool-1-thread-1---向队列中插入值75 pool-1-thread-2---从队列中获取值75 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-1---向队列中插入值50 pool-1-thread-2---从队列中获取值50 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-1---向队列中插入值51 pool-1-thread-1---向队列中插入值92 pool-1-thread-2---从队列中获取值51 pool-1-thread-2---从队列中获取值92
因为Queue中的值和Thread的休眠时间都是随机的,所以执行结果也不是固定的。 [b]使用信号量来控制线程[/b] JDK提供了Semaphore来实现“信号量”的功能,它提供了两个方法分别用于获取和释放信号量:acquire和release,示例代码如下:
[u]复制代码[/u] 代码如下:
SemaPhore示例  private static void semaphoreTest()  {      ExecutorService exec = Executors.newFixedThreadPool(10);      final Semaphore semp = new Semaphore(2);      for (int i = 0; i < 10; i++)      {          Runnable runner = new Runnable()          {              public void run()              {                  try                  {                      semp.acquire();                      System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在执行。");                      Thread.sleep(5000);                      semp.release();                  }                  catch(Exception ex)                  {                      ex.printStackTrace();                  }              }          };          exec.execute(runner);      }      exec.shutdown();  }
执行结果如下:
[u]复制代码[/u] 代码如下:
Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在执行。 Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在执行。 Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在执行。 Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在执行。 Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在执行。 Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在执行。 Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在执行。 Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在执行。 Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在执行。 Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在执行。
可以看出,尽管线程池中创建了10个线程,但是同时运行的,只有2个线程。 控制线程池中所有线程的执行步骤 在前面,我们已经提到,可以用synchronized关键字来控制单个线程中的执行步骤,那么如果我们想要对线程池中的所有线程的执行步骤进行控制的话,应该如何实现呢? 我们有两种方式,一种是使用CyclicBarrier,一种是使用CountDownLatch。 CyclicBarrier使用了类似于Object.wait的机制,它的构造函数中需要接收一个整型数字,用来说明它需要控制的线程数目,当在线程的run方法中调用它的await方法时,它会保证所有的线程都执行到这一步,才会继续执行后面的步骤。 示例代码如下:
[u]复制代码[/u] 代码如下:
CyclicBarrier示例  class MyRunner2 implements Runnable  {      private CyclicBarrier barrier = null;      public MyRunner2(CyclicBarrier barrier)      {          this.barrier = barrier;      }      public void run() {          Random r = new Random();          try          {              for (int i = 0; i < 3; i++)              {                  Thread.sleep(r.nextInt(10) * 1000);                  System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");                  barrier.await();              }          }          catch(Exception ex)          {              ex.printStackTrace();          }      }  }  private static void cyclicBarrierTest()  {      CyclicBarrier barrier = new CyclicBarrier(3);      ExecutorService exec = Executors.newFixedThreadPool(3);      for (int i = 0; i < 3; i++)      {          exec.execute(new MyRunner2(barrier));      }      exec.shutdown();  }
执行结果如下:
[u]复制代码[/u] 代码如下:
Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。 Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。 Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。 Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。 Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。 Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。 Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。 Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。 Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。
可以看出,thread-2到第1次等待点时,一直等到thread-1到达后才继续执行。 CountDownLatch则是采取类似”倒计时计数器”的机制来控制线程池中的线程,它有CountDown和Await两个方法。示例代码如下:
[u]复制代码[/u] 代码如下:
CountDownLatch示例  private static void countdownLatchTest() throws InterruptedException  {      final CountDownLatch begin = new CountDownLatch(1);      final CountDownLatch end = new CountDownLatch(5);      ExecutorService exec = Executors.newFixedThreadPool(5);      for (int i = 0; i < 5; i++)      {          Runnable runner = new Runnable()          {              public void run()              {                  Random r = new Random();                  try                  {                      begin.await();                      System.out.println(Thread.currentThread().getName() + "运行开始");                      Thread.sleep(r.nextInt(10)*1000);                      System.out.println(Thread.currentThread().getName() + "运行结束");                  }                  catch(Exception ex)                  {                      ex.printStackTrace();                  }                  finally                  {                      end.countDown();                  }              }          };          exec.execute(runner);      }      begin.countDown();      end.await();      System.out.println(Thread.currentThread().getName() + "运行结束");      exec.shutdown();  }
执行结果如下:
[u]复制代码[/u] 代码如下:
pool-1-thread-1运行开始 pool-1-thread-5运行开始 pool-1-thread-2运行开始 pool-1-thread-3运行开始 pool-1-thread-4运行开始 pool-1-thread-2运行结束 pool-1-thread-1运行结束 pool-1-thread-3运行结束 pool-1-thread-5运行结束 pool-1-thread-4运行结束 main运行结束
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部