package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description:
*/
@Slf4j
public class Thread01 extends Thread{
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
log.error("创建线程异常:{}",e);
}
log.info("Thread01-当前线程:{}", Thread.currentThread());
}
}
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) {
log.info("主线程开始。。。");
Thread01 thread01 = new Thread01();
//异步化
new Thread(thread01).start();
log.info("主线程结束......");
}
}
运行结果:
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description:
*/
@Slf4j
public class Thread02 implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
log.error("创建线程异常:{}",e);
}
log.info("Thread02-当前线程:{}"+Thread.currentThread());
}
}
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) {
log.info("主线程开始。。。");
new Thread(new Thread02()).start();
log.info("主线程结束......");
}
}
运行结果:
以上没法使用线程里面的返回值。而使用Callable可以
package com.xiepanpan.locks.lockstest.service;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description:
*/
public class Thread03 implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(3);
return "OK";
}
}
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程开始。。。");
FutureTask<String> task = new FutureTask<>(new Thread03());
new Thread(task).start();
String s = task.get();
log.info("主线程哈哈哈。。。。");
log.info("异步获取到的结果是:{}",s);
log.info("主线程结束......");
}
}
log.info("异步获取到的结果是:{}",s);
要获取s的结果 此时主线程处于等待状态 所以主线程结束
最后执行
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 Future提供了三种功能: 1)判断任务是否完成; 2)能够中断任务; 3)能够获取任务执行结果。 FutureTask是Future和Runable的实现
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
假如不使用线程池
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程开始。。。");
for (int i=0;i<10;i++) {
new Thread(()->{
log.info("当前线程开始:{}",Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
log.error("线程异常:{}",e);
}
log.info("当前线程结束: {}",Thread.currentThread());
}).start();
}
log.info("主线程结束......");
}
}
执行结果:
循环十次要创建十个线程
使用线程池
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.concurrent.*;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程开始。。。");
ExecutorService threadPool = Executors.newFixedThreadPool(2);
for (int i=0;i<10;i++) {
Thread thread = new Thread(() -> {
log.info("当前线程开始:{}", Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
int j= 5/0;
} catch (InterruptedException e) {
log.error("线程异常:{}", e);
}
log.info("当前线程结束: {}", Thread.currentThread());
});
给线程池提交任务
threadPool.submit(thread);
}
log.info("主线程结束......");
}
}
执行结果:
可以看到 只用两个线程再执行 使用线程池控制系统资源,防止线程资源耗尽
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); Java通过Executors提供四种线程池,分别为:
默认的线程池里面的Queue是一个无界队列。 极限情况。线程全部放进队列。无界队列撑爆内存。 ThreadPool:拒绝策略。四种:自己总结;默认出异常。
使用线程池有个问题 我写了个int j= 5/0;
但不报错 任务交给线程池,出现异常无法感知。
Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.concurrent.*;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程开始。。。");
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(()->{
log.info("当前线程开始: {}",Thread.currentThread());
String uuid = UUID.randomUUID().toString();
// uuid+=10/0;
log.info("当前线程结束:{}",Thread.currentThread());
return uuid;
},threadPool).thenApply((r)->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
log.error("线程睡眠异常:{}",e);
}
log.info("上一步执行结果:{}",r);
int i = 10/0;
return r+10/0;
}).whenComplete((r,e)->{
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ex) {
log.error("线程睡眠异常:{}",ex);
}
log.info("最终结果:{}",r);
log.error("异常信息:{}",e);
});
log.info("主线程结束......");
}
}
执行结果:
有异常就输出了 实际使用:
package com.xiepanpan.locks.lockstest.service;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.concurrent.*;
/**
* @author: xiepanpan
* @Date: 2020/2/27
* @Description: 多线程测试类
*/
@Slf4j
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程开始。。。");
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
log.info("查询商品基本数据...");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
log.error("线程睡眠异常:{}",e);
}
return "华为";
}, threadPool).whenComplete((r, e) -> {
log.info("结果是:{}", r);
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
log.info("查询商品属性数据...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
log.error("线程睡眠异常:{}",e);
}
return "金色";
}, threadPool).whenComplete((r, e) -> {
log.info("结果是:{}", r);
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
log.info("查询商品营销数据...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
log.error("线程睡眠异常:{}",e);
}
return "满199减100";
}, threadPool).whenComplete((r, e) -> {
log.info("结果是:{}" + r);
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
//线程插队
allOf.join();
log.info("所有人都完事了:{}",future1.get()+future2.get()+future3.get());
}
}
这个方法5秒完事 一个线程的话需要10秒
以后异步任务的编程模式: CompletableFuture.supplyAsync(()->{},pool).whenComplete()
当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。 thenApply相当于回调函数(callback)(如ajax的success,error等回调)
thenAccept和thenRun都是无返回值的。
如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。 同样是执行指定的动作,同样是消耗,二者也有区别:
1、thenAccept只接受上一步的结果
thenAccept(r){
r:上一步的结果
}
2、 thenApply(r){
r:把上一步的结果拿来进行修改再返回,
}
3、thenAccpet(){} 上一步结果1s+本次处理2s=3s
4、thenAccpetAsync(){} 上一步1s+异步2s = 最多等2s