通过 Override Thread类中的start方法,不断循环调用传递过来的Runnable对象
线程池主要由4部分组成:
线程池 使用 Executor,Executors,ExecutorService,ThreadPoolExecutor,Callable,Future,FutureTask
// ThreadPoolExecutor 内部构造
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime
, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
};
}.start();
new Thread(){
public void run() {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
};
}.start();
System.out.println("等待 2 个子线程执行完毕..."); latch.await();
System.out.println("2 个子线程已经执行完毕"); System.out.println("继续执行主线程");
让一组线程等待到某个状态后再全部同时执行;调用 await()方法后 线程就处于barrier了
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
System.out.println(" 线 程 "+Thread.currentThread().getName()+"写入数据完
毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作");
}
}
例如:若一个工厂有5台机器,但有8个工人,一台机器同时只能被一个工人使用,只有使用完,其他工人才能继续使用
public static void main (String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread {
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
保证volatile的变量被所有线程可见
volatile是比synchronized更轻量的同步所,一个变量被多个线程共享
public class MyData {
private int j = 0;
public synchronized void add() {
j++;
System.out.println("线程" + Thread.currentThread().getName()+"j为: "+j);
}
public synchronized void dec() {
j--;
System.out.println("线程" + Thread.currentThread().getName()+"j为: "+j);
public int getData() {
return j;
}
}
}
public class AddRunnable implements Runnable {
MyData data;
public AddRunnable(MyData data) {
this.data = data;
}
public void run() {
data.add();
}
}
public class DecRunnable implements Runnable {
MyData data;
public DecRunnable(MyData data) {
this.data = data;
}
public void run() {
data.dec();s
}
}
public static void main (String[] args) {
MyData data = new MyData();
Runnable add = new AddRunnable(data);
Runnable dec = new DecRunnable(data);
for (int i=0; i < 2; i++) {
new Thread(add).start();
new Thread(dec).start();
}
}
public class MyData {
private int j=0;
public synchronized void add() {
j++;
System.out.println("线程" + Thread.currentThread().getName() + "j为: " + j);
}
public synchronized void dec() {
j--;
System.out.println("线程" + Thread.currentThread().getName() + "j为: " + j);
}
public int getData() {
return j;
}
}
public class TestThread {
public static void main (String[] args) {
final MyData data = new MyData();
for (int i=0; i<2; i++) {
new Thread(new Runnable(){
public void run() {
data.add();
}
}).start();
new Thread(new Runnable(){
public void run() {
data.dec();
}
}).start();
}
}
}
threadlocal 只在thread的生命周期内起作用
每一个线程都有自己独有的ThreadLocalMap,将 ThreadLocal 的静态实例作为key;有 set(v), get(), remove() 方法使用
ThreadLocal 被用来解决 数据库连接,Session管理等
private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (null == s) {
s = getSessionFactory().openSession;
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}
缩小锁定Object的范围
ConcurrentHashMap 内部被分为若干个小的HashMap,成为 Segment;
默认情况下一个ConcurrentHashmap 被细分为16个Segment
只对相应的Segment 加锁, Segment 和 Segment 之间是并行的
线程执行、切换都由系统控制,这种调度机制不会让一个thread的堵塞导致整个process堵塞
某一线程执行完主动通知系统切换另一个线程; 不存在线程同步问题; 线程切换可以预知
一个thread阻塞会导致整个process堵塞
JVM线程采用抢占式调度;优先级越高越先执行;优先级高不代表可以独自占用CPU时间片
CAS(V,E,N) V代表更新的变量,E表示旧值,N表示新值;当V=E,V才被设为N;如果V!=E,表示其他线程更新;当前不做操作
CAS 是 乐观锁
public class AtomicInteger extends Number implements java.io.Serializable {
private volatile int value;
public final int get() {
return value;
}
public final int getAndIncrement() {
for(;;) {// CAS 自旋,一直尝试直到成功
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
}
数据在并发过程中被其他thread影响形成阶段性脏数据
部分CAS采用version number (版本号解决ABA问题)
AQS = AbstractQueuedSynchronizer; AQS 定义了一套多线程访问共享资源的同步器
ReentrantLock,Semaphore,CountDownLatch 都依赖于他
独占,只有一个thread能执行
共享,多个threads可以执行
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。