Java笔记5:多线程
作者:陆金龙
发表时间:2022-09-23 22:41
关键词:
5 多线程
5.1 使用Runnable实例创建线程
// 通过实现Runnable接口来创建线程类
public class RunnableTask implements Runnable { private int i ; // run方法同样是线程执行体 public void run() { for ( ; i < 100 ; i++ ) { // 当线程类实现Runnable接口时, // 如果想获取当前线程,只能用Thread.currentThread()方法。 System.out.println(Thread.currentThread().getName() + " " + i); } } public static void main(String[] args) { for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + " " + i); if (i == 20) { RunnableTask task = new RunnableTask(); // ① // 通过new Thread(target , name)方法创建新线程 new Thread(task , "新线程1").start(); new Thread(task , "新线程2").start(); } } } }
5.2 使用FutureTask来包装Callable对象
public class ThirdThread { public static void main(String[] args) { // 创建Callable对象 ThirdThread rt = new ThirdThread(); // 先使用Lambda表达式创建Callable对象 // 使用FutureTask来包装Callable对象 FutureTask task = new FutureTask ((Callable )() -> { int i = 0; for ( ; i < 100 ; i++ ) { System.out.println(Thread.currentThread().getName()+ " 的循环变量i的值:" + i); } // call()方法可以有返回值 return i; }); for (int i = 0 ; i < 100 ; i++) { System.out.println(Thread.currentThread().getName()+ " 的循环变量i的值:" + i); if (i == 20) { // 实质还是以Callable对象来创建、并启动线程 new Thread(task , "有返回值的线程").start(); } } try { // 获取线程返回值 System.out.println("子线程的返回值:" + task.get()); } catch (Exception ex) { ex.printStackTrace(); } } }
5.4 控制线程
优先级高的线程获得更多的执行机会。
5.4.1 join线程
public class JoinThread extends Thread { // 提供一个有参数的构造器,用于设置该线程的名字 public JoinThread(String name) { super(name); } // 重写run()方法,定义线程执行体 public void run() { for (int i = 0; i < 100 ; i++ ) { System.out.println(getName() + " " + i); } } public static void main(String[] args)throws Exception { // 启动子线程 new JoinThread("新线程").start(); for (int i = 0; i < 100 ; i++ ) { if (i == 20) { JoinThread jt = new JoinThread("被Join的线程"); jt.start(); // main线程调用了jt线程的join()方法,main线程 // 必须等jt执行结束才会向下执行 jt.join(); } System.out.println(Thread.currentThread().getName()+ " " + i); } } }
5.4.2 后台线程
public class DaemonThread extends Thread { // 定义后台线程的线程执行体与普通线程没有任何区别 public void run() { for (int i = 0; i < 1000 ; i++ ) { System.out.println(getName() + " " + i); } } public static void main(String[] args) { DaemonThread t = new DaemonThread(); // 将此线程设置成后台线程 t.setDaemon(true); // 启动后台线程 t.start(); for (int i = 0 ; i < 10 ; i++ ) { System.out.println(Thread.currentThread().getName()+ " " + i); } // -----程序执行到此处,前台线程(main线程)结束------ // 后台线程也应该随之结束 } }
5.4.3 线程睡眠:sleep
当前正在执行的线程暂停一段时间,并进入阻塞状态。
sleep方法暂停当前线程后,会给其他线程机会,不会判断优先级。
Thread.sleep(1000);
5.4.4 线程让步:yield
yield方法只给优先级相同,或优先级更高的线程让步。
public class YieldTest extends Thread { public YieldTest(String name) { super(name); } // 定义run方法作为线程执行体 public void run() { for (int i = 0; i < 50 ; i++ ) { System.out.println(getName() + " " + i); // 当i等于20时,使用yield方法让当前线程让步 if (i == 20) { Thread.yield(); } } } public static void main(String[] args)throws Exception { // 启动两条并发线程 YieldTest yt1 = new YieldTest("高级"); // 将ty1线程设置成最高优先级 //yt1.setPriority(Thread.MAX_PRIORITY); yt1.start(); YieldTest yt2 = new YieldTest("低级"); // 将yt2线程设置成最低优先级 //yt2.setPriority(Thread.MIN_PRIORITY); yt2.start(); } }
5.5 线程同步
5.5.1 同步代码块
public class DrawThread extends Thread { // 模拟用户账户 private Account account; // 当前取钱线程所希望取的钱数 private double drawAmount; public DrawThread(String name , Account account , double drawAmount) { super(name); this.account = account; this.drawAmount = drawAmount; } // 当多条线程修改同一个共享数据时,将涉及数据安全问题。 public void run() { // 使用account作为同步监视器,任何线程进入下面同步代码块之前, // 必须先获得对account账户的锁定——其他线程无法获得锁,也就无法修改它 // 这种做法符合:“加锁 → 修改 → 释放锁”的逻辑 synchronized (account) { // 账户余额大于取钱数目 if (account.getBalance() >= drawAmount) { // 吐出钞票 System.out.println(getName() + "取钱成功!吐出钞票:" + drawAmount); try { Thread.sleep(1); } catch (InterruptedException ex) { ex.printStackTrace(); } // 修改余额 account.setBalance(account.getBalance() - drawAmount); System.out.println("\t余额为: " + account.getBalance()); } else { System.out.println(getName() + "取钱失败!余额不足!"); } } // 同步代码块结束,该线程释放同步锁 } }
5.5.2 同步方法
public class DrawThread extends Thread { // 模拟用户账户 private Account account; // 当前取钱线程所希望取的钱数 private double drawAmount; public DrawThread(String name , Account account, double drawAmount) { super(name); this.account = account; this.drawAmount = drawAmount; } // 当多条线程修改同一个共享数据时,将涉及数据安全问题。 public void run() { // 直接调用account对象的draw方法来执行取钱 // 同步方法的同步监视器是this,this代表调用draw()方法的对象。 // 也就是说:线程进入draw()方法之前,必须先对account对象的加锁。 account.draw(drawAmount); } } public class Account { // 封装账户编号、账户余额两个成员变量 private String accountNo; private double balance; public Account(){} // 构造器 public Account(String accountNo , double balance) { this.accountNo = accountNo; this.balance = balance; } // accountNo的setter和getter方法 public void setAccountNo(String accountNo) { this.accountNo = accountNo; } public String getAccountNo() { return this.accountNo; } // 因此账户余额不允许随便修改,所以只为balance提供getter方法, public double getBalance() { return this.balance; } // 提供一个线程安全draw()方法来完成取钱操作 public synchronized void draw(double drawAmount) { // 账户余额大于取钱数目 if (balance >= drawAmount) { // 吐出钞票 System.out.println(Thread.currentThread().getName()+ "取钱成功!吐出钞票:" + drawAmount); try { Thread.sleep(1); } catch (InterruptedException ex) { ex.printStackTrace(); } // 修改余额 balance -= drawAmount; System.out.println("\t余额为: " + balance); } else { System.out.println(Thread.currentThread().getName()+ "取钱失败!余额不足!"); } } // 下面两个方法根据accountNo来重写hashCode()和equals()方法 public int hashCode() { return accountNo.hashCode(); } public boolean equals(Object obj) { if(this == obj) return true; if (obj !=null && obj.getClass() == Account.class) { Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
5.5.3 同步锁 ReentrantLock
public class Account { // 定义锁对象 private final ReentrantLock lock = new ReentrantLock(); //省略 同5.5.2 ... // 提供一个线程安全draw()方法来完成取钱操作 public void draw(double drawAmount) { // 加锁 lock.lock(); try { // 账户余额大于取钱数目 if (balance >= drawAmount) { // 吐出钞票 System.out.println(Thread.currentThread().getName() + "取钱成功!吐出钞票:" + drawAmount); try { Thread.sleep(1); } catch (InterruptedException ex) { ex.printStackTrace(); } // 修改余额 balance -= drawAmount; System.out.println("\t余额为: " + balance); } else { System.out.println(Thread.currentThread().getName() + "取钱失败!余额不足!"); } } finally { // 修改完成,释放锁 lock.unlock(); } } //... }
5.6 线程间通讯
5.6.1 与synchronized配合使用的wait notify notifyAll
wait 等待其他线程的唤醒
notify 唤醒处于等待的一个线程 选择是任意的
notifyAll 唤醒处于等待的所有线程
public synchronized void draw(double drawAmount) { try { // 如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞 if (!flag) { wait(); } else { // 执行取钱 System.out.println(Thread.currentThread().getName()+ " 取钱:" + drawAmount); balance -= drawAmount; System.out.println("账户余额为:" + balance); // 将标识账户是否已有存款的旗标设为false。 flag = false; // 唤醒其他线程 notifyAll(); } } catch (InterruptedException ex) { ex.printStackTrace(); } } public synchronized void deposit(double depositAmount) { try { // 如果flag为真,表明账户中已有人存钱进去,则存钱方法阻塞 if (flag) //① { wait(); } else { // 执行存款 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("账户余额为:" + balance); // 将表示账户是否已有存款的旗标设为true flag = true; // 唤醒其他线程 notifyAll(); } } catch (InterruptedException ex) { ex.printStackTrace(); } }
5.6.2 与lock配合使用的Condition
// 显式定义Lock对象 private final Lock lock = new ReentrantLock(); // 获得指定Lock对象对应的Condition private final Condition cond = lock.newCondition(); public void draw(double drawAmount) { // 加锁 lock.lock(); try { // 如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞 if (!flag) { cond.await(); } else { // 执行取钱 System.out.println(Thread.currentThread().getName()+ " 取钱:" + drawAmount); balance -= drawAmount; System.out.println("账户余额为:" + balance); // 将标识账户是否已有存款的旗标设为false。 flag = false; // 唤醒其他线程 cond.signalAll(); } } catch (InterruptedException ex) { ex.printStackTrace(); } // 使用finally块来释放锁 finally { lock.unlock(); } } public void deposit(double depositAmount) { lock.lock(); try { // 如果flag为真,表明账户中已有人存钱进去,则存钱方法阻塞 if (flag) // ① { cond.await(); } else { // 执行存款 System.out.println(Thread.currentThread().getName()+ " 存款:" + depositAmount); balance += depositAmount; System.out.println("账户余额为:" + balance); // 将表示账户是否已有存款的旗标设为true flag = true; // 唤醒其他线程 cond.signalAll(); } } catch (InterruptedException ex) { ex.printStackTrace(); } // 使用finally块来释放锁 finally { lock.unlock(); } }
5.6.3 阻塞队列
public static void main(String[] args) throws Exception { // 定义一个长度为2的阻塞队列 BlockingQueuebq = new ArrayBlockingQueue<>(2); bq.put("Java"); // 与bq.add("Java"、bq.offer("Java")相同 bq.put("Java"); // 与bq.add("Java"、bq.offer("Java")相同 bq.put("Java"); // ① 阻塞线程。 } class Producer extends Thread { private BlockingQueue bq; public Producer(BlockingQueue bq) { this.bq = bq; } public void run() { String[] strArr = new String[] { "Java", "Struts", "Spring" }; for (int i = 0 ; i < 999999999 ; i++ ) { System.out.println(getName() + "生产者准备生产集合元素!"); try { Thread.sleep(200); // 尝试放入元素,如果队列已满,线程被阻塞 bq.put(strArr[i % 3]); } catch (Exception ex){ex.printStackTrace();} System.out.println(getName() + "生产完成:" + bq); } } } class Consumer extends Thread { private BlockingQueue bq; public Consumer(BlockingQueue bq) { this.bq = bq; } public void run() { while(true) { System.out.println(getName() + "消费者准备消费集合元素!"); try { Thread.sleep(200); // 尝试取出元素,如果队列已空,线程被阻塞 bq.take(); } catch (Exception ex){ex.printStackTrace();} System.out.println(getName() + "消费完成:" + bq); } } } public class BlockingQueueTest2 { public static void main(String[] args) { // 创建一个容量为1的BlockingQueue BlockingQueue bq = new ArrayBlockingQueue<>(1); // 启动3条生产者线程 new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); // 启动一条消费者线程 new Consumer(bq).start(); } }
5.8 线程池
5.8.1 ExecutorService 多线程同步工作
public static void main(String[] args) throws Exception { // 创建足够的线程来支持4个CPU并行的线程池 // 创建一个具有固定线程数(6)的线程池 ExecutorService pool = Executors.newFixedThreadPool(6); // 使用Lambda表达式创建Runnable对象 Runnable target = () -> { for (int i = 0; i < 100 ; i++ ) { System.out.println(Thread.currentThread().getName()+ "的i值为:" + i); } }; // 向线程池中提交两个线程 pool.submit(target); pool.submit(target); // 关闭线程池 pool.shutdown(); }
5.8.2 多线程分解任务:ForkJoinPool和RecursiveAction 执行无返回值的任务
// 继承RecursiveAction来实现"可分解"的任务
class PrintTask extends RecursiveAction { // 每个“小任务”只最多只打印50个数 private static final int THRESHOLD = 50; private int start; private int end; // 打印从start到end的任务 public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 当end与start之间的差小于THRESHOLD时,开始打印 if(end - start < THRESHOLD) { for (int i = start ; i < end ; i++ ) { System.out.println(Thread.currentThread().getName()+ "的i值:" + i); } } else { // 如果当end与start之间的差大于THRESHOLD时,即要打印的数超过50个 // 将大任务分解成两个小任务。 int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 并行执行两个“小任务” left.fork(); right.fork(); } } } public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的PrintTask任务 pool.submit(new PrintTask(0 , 300)); pool.awaitTermination(2, TimeUnit.SECONDS); // 关闭线程池 pool.shutdown(); } }
5.8.3 多线程分解任务:ForkJoinPool和RecursiveTask 执行有返回值的任务并合并结果
// 继承RecursiveTask来实现"可分解"的任务
class CalTask extends RecursiveTask{ // 每个“小任务”只最多只累加20个数 private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; // 累加从start到end的数组元素 public CalTask(int[] arr , int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 当end与start之间的差小于THRESHOLD时,开始进行实际累加 if(end - start < THRESHOLD) { for (int i = start ; i < end ; i++ ) { sum += arr[i]; } return sum; } else { // 如果当end与start之间的差大于THRESHOLD时,即要累加的数超过20个时 // 将大任务分解成两个小任务。 int middle = (start + end) / 2; CalTask left = new CalTask(arr , start, middle); CalTask right = new CalTask(arr , middle, end); // 并行执行两个“小任务” left.fork(); right.fork(); // 把两个“小任务”累加的结果合并起来 return left.join() + right.join(); // ① } } } public class Sum { public static void main(String[] args) throws Exception { int[] arr = new int[100]; Random rand = new Random(); int total = 0; // 初始化100个数字元素 for (int i = 0 , len = arr.length; i < len ; i++ ) { int tmp = rand.nextInt(20); // 对数组元素赋值,并将数组元素的值添加到sum总和中。 total += (arr[i] = tmp); } System.out.println(total); // 创建一个通用池 ForkJoinPool pool = ForkJoinPool.commonPool(); // 提交可分解的CalTask任务 Future future = pool.submit(new CalTask(arr , 0 , arr.length)); System.out.println(future.get());//返回值 // 关闭线程池 pool.shutdown(); } }
Concurrent 开头的集合类代表了支持并发访问的集合,可支持多线程并发写入访问。