侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130557 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

并发编程专题三-线程的并发工具类

2023-12-14 星期四 / 0 评论 / 0 点赞 / 19 阅读 / 13543 字

一、Fork-Join框架 1、分而治之 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 动态规范 2、工

一、Fork-Join框架

1、分而治之

规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

动态规范

2、工作密取 workStealing

就是在任务分割的时候,前面的任务执行可能会比后面的执行速度快,当前面的执行完,后面的还没执行的时候,执行完前面的任务的线程不会停止,而是从后面的任务的尾部取出子任务继续工作。Fork-Join就是实现了这样的机制。

Fork/Join使用的标准范式

代码如下:

import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;/** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */public class SumArray {    private static class SumTask extends RecursiveTask<Integer>{        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;        private int[] src; //表示我们要实际统计的数组        private int fromIndex;//开始统计的下标        private int toIndex;//统计到哪里结束的下标        public SumTask(int[] src, int fromIndex, int toIndex) {            this.src = src;            this.fromIndex = fromIndex;            this.toIndex = toIndex;        }		@Override		protected Integer compute() {			if(toIndex-fromIndex < THRESHOLD) {				int count = 0;				for(int i=fromIndex;i<=toIndex;i++) {			    	//SleepTools.ms(1);			    	count = count + src[i];				}				return count;			}else {				//fromIndex....mid....toIndex				//1...................70....100				int mid = (fromIndex+toIndex)/2;				SumTask left = new SumTask(src,fromIndex,mid);				SumTask right = new SumTask(src,mid+1,toIndex);				invokeAll(left,right);				return left.join()+right.join();			}		}    }    public static void main(String[] args) {        ForkJoinPool pool = new ForkJoinPool();        int[] src = MakeArray.makeArray();        SumTask innerFind = new SumTask(src,0,src.length-1);        long start = System.currentTimeMillis();        pool.invoke(innerFind);//同步调用        System.out.println("Task is Running.....");        System.out.println("The count is "+innerFind.join()                +" spend time:"+(System.currentTimeMillis()-start)+"ms");    }}public class MakeArray {    //数组长度    public static final int ARRAY_LENGTH  = 100000000;    public static int[] makeArray() {        //new一个随机数发生器        Random r = new Random();        int[] result = new int[ARRAY_LENGTH];        for(int i=0;i<ARRAY_LENGTH;i++){            //用随机数填充数组            result[i] =  r.nextInt(ARRAY_LENGTH*3);        }        return result;    }}

二、常用的并发工具类

1、CountDownLatch

作用:是一组线程等待其他的线程完成工作以后在执行,加强版join

await用来等待,countDown负责计数器的减一

代码示例:

import java.util.concurrent.CountDownLatch;/** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */public class UseCountDownLatch {		static CountDownLatch latch = new CountDownLatch(6);	//初始化线程(只有一步,有4个)    private static class InitThread implements Runnable{        @Override        public void run() {        	System.out.println("Thread_"+Thread.currentThread().getId()        			+" ready init work......");        	latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;            for(int i =0;i<2;i++) {            	System.out.println("Thread_"+Thread.currentThread().getId()            			+" ........continue do its work");            }        }    }        //业务线程    private static class BusiThread implements Runnable{        @Override        public void run() {        	try {				latch.await();			} catch (InterruptedException e) {				e.printStackTrace();			}            for(int i =0;i<3;i++) {            	System.out.println("BusiThread_"+Thread.currentThread().getId()            			+" do business-----");            }        }    }    public static void main(String[] args) throws InterruptedException {    	//单独的初始化线程,初始化分为2步,需要扣减两次        new Thread(new Runnable() {            @Override            public void run() {            	SleepTools.ms(1); //休眠1s                System.out.println("Thread_"+Thread.currentThread().getId()            			+" ready init work step 1st......");                latch.countDown();//每完成一步初始化工作,扣减一次                System.out.println("begin step 2nd.......");                SleepTools.ms(1);                System.out.println("Thread_"+Thread.currentThread().getId()            			+" ready init work step 2nd......");                latch.countDown();//每完成一步初始化工作,扣减一次            }        }).start();        new Thread(new BusiThread()).start();        for(int i=0;i<=3;i++){            Thread thread = new Thread(new InitThread());            thread.start();        }        latch.await();        System.out.println("Main do ites work........");    }}

2、CyclicBarrier

让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)

CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行

代码示例:

import java.util.Map;import java.util.Random;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CyclicBarrier;/** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */public class UseCyclicBarrier {		private static CyclicBarrier barrier 		= new CyclicBarrier(5,new CollectThread());	    private static ConcurrentHashMap<String,Long> resultMap            = new ConcurrentHashMap<>();//存放子线程工作结果的容器    public static void main(String[] args) {        for(int i=0;i<=4;i++){            Thread thread = new Thread(new SubThread());            thread.start();        }    }    //负责屏障开放以后的工作    private static class CollectThread implements Runnable{        @Override        public void run() {            StringBuilder result = new StringBuilder();            for(Map.Entry<String,Long> workResult:resultMap.entrySet()){            	result.append("["+workResult.getValue()+"]");            }            System.out.println(" the result = "+ result);            System.out.println("do other business........");        }    }    //工作线程    private static class SubThread implements Runnable{        @Override        public void run() {        	long id = Thread.currentThread().getId();//线程本身的处理结果            resultMap.put(Thread.currentThread().getId()+"",id);            Random r = new Random();//随机决定工作线程的是否睡眠            try {                if(r.nextBoolean()) {                	Thread.sleep(2000+id);                	System.out.println("Thread_"+id+" ....do something ");                }                System.out.println(id+"....is await");                barrier.await();            	Thread.sleep(1000+id);                System.out.println("Thread_"+id+" ....do its business ");            } catch (Exception e) {                e.printStackTrace();            }        }    }}

CountDownLatch和CyclicBarrier辨析

1、countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
2、countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数

3、Semaphore

控制同时访问某个特定资源的线程数量,用在流量控制

import java.sql.Connection;import java.util.LinkedList;import java.util.concurrent.Semaphore;/** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */public class DBPoolSemaphore {		private final static int POOL_SIZE = 10;	private final Semaphore useful,useless;//useful表示可用的数据库连接,useless表示已用的数据库连接		public DBPoolSemaphore() {		this. useful = new Semaphore(POOL_SIZE);		this.useless = new Semaphore(0);	}		//存放数据库连接的容器	private static LinkedList<Connection> pool = new LinkedList<Connection>();	//初始化池	static {        for (int i = 0; i < POOL_SIZE; i++) {            pool.addLast(SqlConnectImpl.fetchConnection());        }	}	/*归还连接*/	public void returnConnect(Connection connection) throws InterruptedException {		if(connection!=null) {			System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"					+"可用连接数:"+useful.availablePermits());			useless.acquire();			synchronized (pool) {				pool.addLast(connection);			}				useful.release();		}	}		/*从池子拿连接*/	public Connection takeConnect() throws InterruptedException {		useful.acquire();		Connection conn;		synchronized (pool) {			conn = pool.removeFirst();		}		useless.release();		return conn;	}	}

4、Exchange

两个线程间的数据交换, 用的比较少

代码示例:

import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.concurrent.Exchanger;/** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */public class UseExchange {    private static final Exchanger<Set<String>> exchange     	= new Exchanger<Set<String>>();    public static void main(String[] args) {    	//第一个线程        new Thread(new Runnable() {            @Override            public void run() {            	Set<String> setA = new HashSet<String>();//存放数据的容器                try {                	/*添加数据                	 * set.add(.....)                	 * */                	setA = exchange.exchange(setA);//交换set                	/*处理交换后的数据*/                } catch (InterruptedException e) {                }            }        }).start();      //第二个线程        new Thread(new Runnable() {            @Override            public void run() {            	Set<String> setB = new HashSet<String>();//存放数据的容器                try {                	/*添加数据                	 * set.add(.....)                	 * set.add(.....)                	 * */                	setB = exchange.exchange(setB);//交换set                	/*处理交换后的数据*/                } catch (InterruptedException e) {                }            }        }).start();    }}

5、Callable、Future和FutureTask

类之间的关系

isDone,结束,正常还是异常结束,或者自己取消,返回true;

isCancelled 任务完成前被取消,返回true;

cancel(boolean):

  1. 任务还没开始,返回false
  2. 任务已经启动,cancel(true),中断正在运行的任务,中断成功,返回true,cancel(false),不会去中断已经运行的任务
  3. 任务已经结束,返回false

代码示例:

import java.util.Random;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;/** * @Auther: BlackKingW * @Date: 2019/4/14 12:09 * @Description: */public class UseFuture {		/*实现Callable接口,允许有返回值*/	private static class UseCallable implements Callable<Integer>{		private int sum;		@Override		public Integer call() throws Exception {			System.out.println("Callable子线程开始计算");			Thread.sleep(2000);			for(int i=0;i<5000;i++) {				sum = sum+i;			}			System.out.println("Callable子线程计算完成,结果="+sum);			return sum;		}	}		public static void main(String[] args) 			throws InterruptedException, ExecutionException {				UseCallable useCallable = new UseCallable();		FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);		new Thread(futureTask).start();		Random r = new Random();		SleepTools.second(1);		if(r.nextBoolean()) {//随机决定是获得结果还是终止任务			System.out.println("Get UseCallable result = "+futureTask.get());		}else {			System.out.println("中断计算");			futureTask.cancel(true);		}			}}

场景举例:包含图片和文字的文档的处理:图片(云上),可以用future去取图片,主线程继续解析文字。

广告 广告

评论区