博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java.util.concurrent包API学习笔记
阅读量:4641 次
发布时间:2019-06-09

本文共 12410 字,大约阅读时间需要 41 分钟。

newFixedThreadPool创建一个固定大小的线程池。shutdown():用于关闭启动线程,如果不调用该语句,jvm不会关闭。awaitTermination():用于等待子线程结束,再继续执行下面的代码。该例中我设置一直等着子线程结束。  Java代码  收藏代码public class Test {        public static void main(String[] args) throws IOException, InterruptedException {          ExecutorService service = Executors.newFixedThreadPool(2);          for (int i = 0; i < 4; i++) {              Runnable run = new Runnable() {                  @Override                  public void run() {                      System.out.println("thread start");                  }              };              service.execute(run);          }          service.shutdown();          service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);          System.out.println("all thread complete");      }  }      输出:thread startthread startthread startthread startall thread completenewScheduledThreadPool这个先不说,我喜欢用spring quartz.CyclicBarrier假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待.  Java代码  收藏代码import java.io.IOException;  import java.util.Random;  import java.util.concurrent.BrokenBarrierException;  import java.util.concurrent.CyclicBarrier;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    class Runner implements Runnable {        private CyclicBarrier barrier;        private String name;        public Runner(CyclicBarrier barrier, String name) {          super();          this.barrier = barrier;          this.name = name;      }        @Override      public void run() {          try {              Thread.sleep(1000 * (new Random()).nextInt(8));              System.out.println(name + " 准备OK.");              barrier.await();          } catch (InterruptedException e) {              e.printStackTrace();          } catch (BrokenBarrierException e) {              e.printStackTrace();          }          System.out.println(name + " Go!!");      }  }    public class Race {        public static void main(String[] args) throws IOException, InterruptedException {          CyclicBarrier barrier = new CyclicBarrier(3);            ExecutorService executor = Executors.newFixedThreadPool(3);          executor.submit(new Thread(new Runner(barrier, "zhangsan")));          executor.submit(new Thread(new Runner(barrier, "lisi")));          executor.submit(new Thread(new Runner(barrier, "wangwu")));            executor.shutdown();      }    }    输出:wangwu 准备OK.zhangsan 准备OK.lisi 准备OK.lisi Go!!zhangsan Go!!wangwu Go!!ThreadPoolExecutor newFixedThreadPool生成一个固定的线程池,顾名思义,线程池的线程是不会释放的,即使它是Idle。这就会产生性能问题,比如如果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。如果要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像Tomcat的线程池一样设置“最大线程数”、“最小线程数”和“空闲线程keepAlive的时间”。  ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue, RejectedExecutionHandler handler) corePoolSize:池中所保存的线程数,包括空闲线程(非最大同时干活的线程数)。如果池中线程数多于 corePoolSize,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。maximumPoolSize:线程池中最大线程数keepAliveTime:线程空闲回收的时间unit:keepAliveTime的单位workQueue:保存任务的队列,可以如下选择: 无界队列: new LinkedBlockingQueue
(); 有界队列: new ArrayBlockingQueue
(8);你不想让客户端无限的请求吃光你的CPU和内存吧,那就用有界队列handler:当提交任务数大于队列size会抛出RejectedExecutionException,可选的值为: ThreadPoolExecutor.CallerRunsPolicy 等待队列空闲ThreadPoolExecutor.DiscardPolicy:丢弃要插入队列的任务ThreadPoolExecutor.DiscardOldestPolicy:删除队头的任务关于corePoolSize和maximumPoolSize: Java官方Docs写道:当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求(即使存在空闲线程)。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列(queue)满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。 Java代码 收藏代码public class Test { public static void main(String[] args) { BlockingQueue
queue = new LinkedBlockingQueue
(); ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue); for (int i = 0; i < 20; i++) { final int index = i; executor.execute(new Runnable() { public void run() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("thread %d finished", index)); } }); } executor.shutdown(); } } 原子变量(Atomic )并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。 下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。 Java代码 收藏代码import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class Test { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容纳100个文件 final BlockingQueue
queue = new LinkedBlockingQueue
(100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("D:\\dist\\blank"); // 完成标志 final File exitFile = new File(""); // 读个数 final AtomicInteger rc = new AtomicInteger(); // 写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".log"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四个写线程 for (int index = 0; index < 4; index++) { // write thread final int num = index; Runnable write = new Runnable() { String threadName = "Write" + num; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 队列已经无对象 if (file == exitFile) { // 再次添加"标志",以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } } CountDownLatch 从名字可以看出,CountDownLatch是一个倒数计数的锁,当倒数到0时触发事件,也就是开锁,其他人就可以进入了。在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。一个CountDouwnLatch实例是不能重复使用的,也就是说它是一次性的,锁一经被打开就不能再关闭使用了,如果想重复使用,请考虑使用CyclicBarrier。下面的例子简单的说明了CountDownLatch的使用方法,模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。 Java代码 收藏代码import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test { public static void main(String[] args) throws InterruptedException { // 开始的倒数锁 final CountDownLatch begin = new CountDownLatch(1); // 结束的倒数锁 final CountDownLatch end = new CountDownLatch(10); // 十名选手 final ExecutorService exec = Executors.newFixedThreadPool(10); for (int index = 0; index < 10; index++) { final int NO = index + 1; Runnable run = new Runnable() { public void run() { try { begin.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + NO + " arrived"); } catch (InterruptedException e) { } finally { end.countDown(); } } }; exec.submit(run); } System.out.println("Game Start"); begin.countDown(); end.await(); System.out.println("Game Over"); exec.shutdown(); } } 使用Callable和Future实现线程等待和多线程返回值 假设在main线程启动一个线程,然后main线程需要等待子线程结束后,再继续下面的操作,我们会通过join方法阻塞main线程,代码如下: Java代码 收藏代码Runnable runnable = ...; Thread t = new Thread(runnable); t.start(); t.join(); ...... 通过JDK1.5线程池管理的线程可以使用Callable和Future实现(join()方法无法应用到在线程池线程) Java代码 收藏代码import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("start main thread"); final ExecutorService exec = Executors.newFixedThreadPool(5); Callable
call = new Callable
() { public String call() throws Exception { System.out.println(" start new thread."); Thread.sleep(1000 * 5); System.out.println(" end new thread."); return "some value."; } }; Future
task = exec.submit(call); Thread.sleep(1000 * 2); task.get(); // 阻塞,并待子线程结束, exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("end main thread"); } } Java代码 收藏代码import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 多线程返回值测试 */ public class ThreadTest { public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("start main thread"); int threadCount = 5; final ExecutorService exec = Executors.newFixedThreadPool(threadCount); List
> tasks = new ArrayList
>(); for (int i = 0; i < threadCount; i++) { Callable
call = new Callable
() { public Integer call() throws Exception { Thread.sleep(1000); return 1; } }; tasks.add(exec.submit(call)); } long total = 0; for (Future
future : tasks) { total += future.get(); } exec.shutdown(); System.out.println("total: " + total); System.out.println("end main thread"); } } CompletionService这个东西的使用上很类似上面的example,不同的是,它会首先取完成任务的线程。下面的参考文章里,专门提到这个,大家有兴趣可以看下,例子: Java代码 收藏代码import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = Executors.newFixedThreadPool(10); CompletionService
serv = new ExecutorCompletionService
(exec); for (int index = 0; index < 5; index++) { final int NO = index; Callable
downImg = new Callable
() { public String call() throws Exception { Thread.sleep((long) (Math.random() * 10000)); return "Downloaded Image " + NO; } }; serv.submit(downImg); } Thread.sleep(1000 * 2); System.out.println("Show web content"); for (int index = 0; index < 5; index++) { Future
task = serv.take(); String img = task.get(); System.out.println(img); } System.out.println("End"); // 关闭线程池 exec.shutdown(); } } Semaphore信号量 拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。下面的例子只允许5个线程同时进入执行acquire()和release()之间的代码 Java代码 收藏代码import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Test { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); } catch (InterruptedException e) { } } }; exec.execute(run); } // 退出线程池 exec.shutdown(); } } 参考:jdk1.5中的线程池使用简介http://www.java3z.com/cwbwebhome/article/article2/2875.htmlCAS原理http://www.blogjava.net/syniii/archive/2010/11/18/338387.html?opt=adminjdk1.5中java.util.concurrent包编写多线程http://hi.baidu.com/luotoo/blog/item/b895c3c2d650591e0ef47731.htmlExecutorSerive vs CompletionServicehttp://www.coderanch.com/t/491704/threads/java/ExecutorSerive-vs-CompletionService -- end --

  

转载于:https://www.cnblogs.com/IamThat/p/4341125.html

你可能感兴趣的文章
爬取网站附件
查看>>
java基础图形界面和IO系统
查看>>
javascript学习笔记
查看>>
hdu 3996
查看>>
python第三十九课——面向对象(二)之初始化属性
查看>>
python学习笔记之函数装饰器
查看>>
FEM计算2D瞬态热传导方程
查看>>
四年时光,匆匆而过
查看>>
【php】【psr】psr1 基础编码规范
查看>>
WAF SSI
查看>>
LDAP & it's implementation
查看>>
Apache HttpComponents中的cookie匹配策略
查看>>
冰封的海盗攻略
查看>>
python from entry to abandon
查看>>
Netty4.x中文教程系列(四) 对象传输
查看>>
linux下find命令使用举例、
查看>>
GET请求在Tomcat中的传递及URI传递
查看>>
ubuntun 服务器与Mac
查看>>
重温JSP学习笔记--与日期数字格式化有关的jstl标签库
查看>>
java-Date-DateFormat-Calendar
查看>>