Java笔记5:多线程

作者:陆金龙    发表时间:2022-09-23 22:41   

关键词:  

5 多线程
 
5.1 使用Runnable实例创建线程
// 通过实现Runnable接口来创建线程类
public class RunnableTask implements Runnable
{
	private int i ;
	// run方法同样是线程执行体
	public void run()
	{
		for ( ; i < 100 ; i++ )
		{
			// 当线程类实现Runnable接口时,
			// 如果想获取当前线程,只能用Thread.currentThread()方法。
			System.out.println(Thread.currentThread().getName() + "  " + i);
		}
	}

	public static void main(String[] args)
	{
		for (int i = 0; i < 100;  i++)
		{
			System.out.println(Thread.currentThread().getName() + "  " + i);
			if (i == 20)
			{
				RunnableTask task = new RunnableTask();     // ①
				// 通过new Thread(target , name)方法创建新线程
				new Thread(task , "新线程1").start();
				new Thread(task , "新线程2").start();
			}
		}
	}
}
 
5.2 使用FutureTask来包装Callable对象
 
public class ThirdThread
{
	public static void main(String[] args)
	{
		// 创建Callable对象
		ThirdThread rt = new ThirdThread();
		
		// 先使用Lambda表达式创建Callable对象
		// 使用FutureTask来包装Callable对象
		FutureTask task = new FutureTask((Callable)() -> {
			int i = 0;
			for ( ; i < 100 ; i++ )
			{
				System.out.println(Thread.currentThread().getName()+ " 的循环变量i的值:" + i);
			}
			// call()方法可以有返回值
			return i;
		});
		for (int i = 0 ; i < 100 ; i++)
		{
			System.out.println(Thread.currentThread().getName()+ " 的循环变量i的值:" + i);
			if (i == 20)
			{
				// 实质还是以Callable对象来创建、并启动线程
				new Thread(task , "有返回值的线程").start();
			}
		}
		try
		{
			// 获取线程返回值
			System.out.println("子线程的返回值:" + task.get());
		}
		catch (Exception ex)
		{
			ex.printStackTrace();
		}
	}
}
 
 
5.4 控制线程
优先级高的线程获得更多的执行机会。
 
5.4.1 join线程
public class JoinThread extends Thread
{
	// 提供一个有参数的构造器,用于设置该线程的名字
	public JoinThread(String name)
	{
		super(name);
	}
	// 重写run()方法,定义线程执行体
	public void run()
	{
		for (int i = 0; i < 100 ; i++ )
		{
			System.out.println(getName() + "  " + i);
		}
	}
	public static void main(String[] args)throws Exception
	{
		// 启动子线程
		new JoinThread("新线程").start();
		for (int i = 0; i < 100 ; i++ )
		{
			if (i == 20)
			{
				JoinThread jt = new JoinThread("被Join的线程");
				jt.start();
				// main线程调用了jt线程的join()方法,main线程
				// 必须等jt执行结束才会向下执行
				jt.join();
			}
			System.out.println(Thread.currentThread().getName()+ "  " + i);
		}
	}
}
5.4.2 后台线程
public class DaemonThread extends Thread
{
	// 定义后台线程的线程执行体与普通线程没有任何区别
	public void run()
	{
		for (int i = 0; i < 1000 ; i++ )
		{
			System.out.println(getName() + "  " + i);
		}
	}
	public static void main(String[] args)
	{
		DaemonThread t = new DaemonThread();
		// 将此线程设置成后台线程
		t.setDaemon(true);
		// 启动后台线程
		t.start();
		for (int i = 0 ; i < 10 ; i++ )
		{
			System.out.println(Thread.currentThread().getName()+ "  " + i);
		}
		// -----程序执行到此处,前台线程(main线程)结束------
		// 后台线程也应该随之结束
	}
}
 
5.4.3 线程睡眠:sleep
当前正在执行的线程暂停一段时间,并进入阻塞状态。
sleep方法暂停当前线程后,会给其他线程机会,不会判断优先级。
Thread.sleep(1000);
 
5.4.4 线程让步:yield
yield方法只给优先级相同,或优先级更高的线程让步。
public class YieldTest extends Thread
{
	public YieldTest(String name)
	{
		super(name);
	}
	// 定义run方法作为线程执行体
	public void run()
	{
		for (int i = 0; i < 50 ; i++ )
		{
			System.out.println(getName() + "  " + i);
			// 当i等于20时,使用yield方法让当前线程让步
			if (i == 20)
			{
				Thread.yield();
			}
		}
	}
	public static void main(String[] args)throws Exception
	{
		// 启动两条并发线程
		YieldTest yt1 = new YieldTest("高级");
		// 将ty1线程设置成最高优先级
		//yt1.setPriority(Thread.MAX_PRIORITY);
		yt1.start();
		
		YieldTest yt2 = new YieldTest("低级");
		// 将yt2线程设置成最低优先级
		//yt2.setPriority(Thread.MIN_PRIORITY);
		yt2.start();
	}
}
5.5 线程同步
5.5.1 同步代码块
public class DrawThread extends Thread
{
	// 模拟用户账户
	private Account account;
	// 当前取钱线程所希望取的钱数
	private double drawAmount;
	public DrawThread(String name , Account account
		, double drawAmount)
	{
		super(name);
		this.account = account;
		this.drawAmount = drawAmount;
	}
	// 当多条线程修改同一个共享数据时,将涉及数据安全问题。
	public void run()
	{
		// 使用account作为同步监视器,任何线程进入下面同步代码块之前,
		// 必须先获得对account账户的锁定——其他线程无法获得锁,也就无法修改它
		// 这种做法符合:“加锁 → 修改 → 释放锁”的逻辑
		synchronized (account)
		{
			// 账户余额大于取钱数目
			if (account.getBalance() >= drawAmount)
			{
				// 吐出钞票
				System.out.println(getName()
					+ "取钱成功!吐出钞票:" + drawAmount);
				try
				{
					Thread.sleep(1);
				}
				catch (InterruptedException ex)
				{
					ex.printStackTrace();
				}
				// 修改余额
				account.setBalance(account.getBalance() - drawAmount);
				System.out.println("\t余额为: " + account.getBalance());
			}
			else
			{
				System.out.println(getName() + "取钱失败!余额不足!");
			}
		}
		// 同步代码块结束,该线程释放同步锁
	}
}
 
5.5.2 同步方法
 
public class DrawThread extends Thread
{
	// 模拟用户账户
	private Account account;
	// 当前取钱线程所希望取的钱数
	private double drawAmount;
	public DrawThread(String name , Account account, double drawAmount)
	{
		super(name);
		this.account = account;
		this.drawAmount = drawAmount;
	}
	// 当多条线程修改同一个共享数据时,将涉及数据安全问题。
	public void run()
	{
		// 直接调用account对象的draw方法来执行取钱
		// 同步方法的同步监视器是this,this代表调用draw()方法的对象。
		// 也就是说:线程进入draw()方法之前,必须先对account对象的加锁。
		account.draw(drawAmount);
	}
}

public class Account
{
	// 封装账户编号、账户余额两个成员变量
	private String accountNo;
	private double balance;
	public Account(){}
	// 构造器
	public Account(String accountNo , double balance)
	{
		this.accountNo = accountNo;
		this.balance = balance;
	}

	// accountNo的setter和getter方法
	public void setAccountNo(String accountNo)
	{
		this.accountNo = accountNo;
	}
	public String getAccountNo()
	{
		return this.accountNo;
	}
	// 因此账户余额不允许随便修改,所以只为balance提供getter方法,
	public double getBalance()
	{
		return this.balance;
	}

	// 提供一个线程安全draw()方法来完成取钱操作
	public synchronized void draw(double drawAmount)
	{
		// 账户余额大于取钱数目
		if (balance >= drawAmount)
		{
			// 吐出钞票
			System.out.println(Thread.currentThread().getName()+ "取钱成功!吐出钞票:" + drawAmount);
			try
			{
				Thread.sleep(1);
			}
			catch (InterruptedException ex)
			{
				ex.printStackTrace();
			}
			// 修改余额
			balance -= drawAmount;
			System.out.println("\t余额为: " + balance);
		}
		else
		{
			System.out.println(Thread.currentThread().getName()+ "取钱失败!余额不足!");
		}
	}

	// 下面两个方法根据accountNo来重写hashCode()和equals()方法
	public int hashCode()
	{
		return accountNo.hashCode();
	}
	public boolean equals(Object obj)
	{
		if(this == obj)
			return true;
		if (obj !=null
			&& obj.getClass() == Account.class)
		{
			Account target = (Account)obj;
			return target.getAccountNo().equals(accountNo);
		}
		return false;
	}
}
 
5.5.3 同步锁 ReentrantLock
 
public class Account
{
	// 定义锁对象
	private final ReentrantLock lock = new ReentrantLock();
	
	//省略 同5.5.2 ...

	// 提供一个线程安全draw()方法来完成取钱操作
	public void draw(double drawAmount)
	{
		// 加锁
		lock.lock();
		try
		{
			// 账户余额大于取钱数目
			if (balance >= drawAmount)
			{
				// 吐出钞票
				System.out.println(Thread.currentThread().getName()
					+ "取钱成功!吐出钞票:" + drawAmount);
				try
				{
					Thread.sleep(1);
				}
				catch (InterruptedException ex)
				{
					ex.printStackTrace();
				}
				// 修改余额
				balance -= drawAmount;
				System.out.println("\t余额为: " + balance);
			}
			else
			{
				System.out.println(Thread.currentThread().getName()
					+ "取钱失败!余额不足!");
			}
		}
		finally
		{
			// 修改完成,释放锁
			lock.unlock();
		}
	}

	//...
}
5.6 线程间通讯
 
5.6.1 与synchronized配合使用的wait notify notifyAll
wait 等待其他线程的唤醒
notify 唤醒处于等待的一个线程 选择是任意的
notifyAll 唤醒处于等待的所有线程
 
public synchronized void draw(double drawAmount)
	{
		try
		{
			// 如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
			if (!flag)
			{
				wait();
			}
			else
			{
				// 执行取钱
				System.out.println(Thread.currentThread().getName()+ " 取钱:" +  drawAmount);
				balance -= drawAmount;
				System.out.println("账户余额为:" + balance);
				// 将标识账户是否已有存款的旗标设为false。
				flag = false;
				// 唤醒其他线程
				notifyAll();
			}
		}
		catch (InterruptedException ex)
		{
			ex.printStackTrace();
		}
	}
	public synchronized void deposit(double depositAmount)
	{
		try
		{
			// 如果flag为真,表明账户中已有人存钱进去,则存钱方法阻塞
			if (flag)             //①
			{
				wait();
			}
			else
			{
				// 执行存款
				System.out.println(Thread.currentThread().getName()
					+ " 存款:" +  depositAmount);
				balance += depositAmount;
				System.out.println("账户余额为:" + balance);
				// 将表示账户是否已有存款的旗标设为true
				flag = true;
				// 唤醒其他线程
				notifyAll();
			}
		}
		catch (InterruptedException ex)
		{
			ex.printStackTrace();
		}
	}
 
5.6.2 与lock配合使用的Condition
 
// 显式定义Lock对象
private final Lock lock = new ReentrantLock();
// 获得指定Lock对象对应的Condition
private final Condition cond  = lock.newCondition();
public void draw(double drawAmount)
{
	// 加锁
	lock.lock();
	try
	{
		// 如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
		if (!flag)
		{
			cond.await();
		}
		else
		{
			// 执行取钱
			System.out.println(Thread.currentThread().getName()+ " 取钱:" +  drawAmount);
			balance -= drawAmount;
			System.out.println("账户余额为:" + balance);
			// 将标识账户是否已有存款的旗标设为false。
			flag = false;
			// 唤醒其他线程
			cond.signalAll();
		}
	}
	catch (InterruptedException ex)
	{
		ex.printStackTrace();
	}
	// 使用finally块来释放锁
	finally
	{
		lock.unlock();
	}
}
public void deposit(double depositAmount)
{
	lock.lock();
	try
	{
		// 如果flag为真,表明账户中已有人存钱进去,则存钱方法阻塞
		if (flag)             // ①
		{
			cond.await();
		}
		else
		{
			// 执行存款
			System.out.println(Thread.currentThread().getName()+ " 存款:" +  depositAmount);
			balance += depositAmount;
			System.out.println("账户余额为:" + balance);
			// 将表示账户是否已有存款的旗标设为true
			flag = true;
			// 唤醒其他线程
			cond.signalAll();
		}
	}
	catch (InterruptedException ex)
	{
		ex.printStackTrace();
	}
	// 使用finally块来释放锁
	finally
	{
		lock.unlock();
	}
}
5.6.3 阻塞队列
public static void main(String[] args) throws Exception
{
	// 定义一个长度为2的阻塞队列
	BlockingQueue bq = new ArrayBlockingQueue<>(2);
	bq.put("Java"); // 与bq.add("Java"、bq.offer("Java")相同
	bq.put("Java"); // 与bq.add("Java"、bq.offer("Java")相同
	bq.put("Java"); // ① 阻塞线程。
}

class Producer extends Thread
{
	private BlockingQueue bq;
	public Producer(BlockingQueue bq)
	{
		this.bq = bq;
	}
	public void run()
	{
		String[] strArr = new String[]
		{
			"Java",
			"Struts",
			"Spring"
		};
		for (int i = 0 ; i < 999999999 ; i++ )
		{
			System.out.println(getName() + "生产者准备生产集合元素!");
			try
			{
				Thread.sleep(200);
				// 尝试放入元素,如果队列已满,线程被阻塞
				bq.put(strArr[i % 3]);
			}
			catch (Exception ex){ex.printStackTrace();}
			System.out.println(getName() + "生产完成:" + bq);
		}
	}
}
class Consumer extends Thread
{
	private BlockingQueue bq;
	public Consumer(BlockingQueue bq)
	{
		this.bq = bq;
	}
	public void run()
	{
		while(true)
		{
			System.out.println(getName() + "消费者准备消费集合元素!");
			try
			{
				Thread.sleep(200);
				// 尝试取出元素,如果队列已空,线程被阻塞
				bq.take();
			}
			catch (Exception ex){ex.printStackTrace();}
			System.out.println(getName() + "消费完成:" + bq);
		}
	}
}
public class BlockingQueueTest2
{
	public static void main(String[] args)
	{
		// 创建一个容量为1的BlockingQueue
		BlockingQueue bq = new ArrayBlockingQueue<>(1);
		
		// 启动3条生产者线程
		new Producer(bq).start();
		new Producer(bq).start();
		new Producer(bq).start();
		
		// 启动一条消费者线程
		new Consumer(bq).start();
	}
}
5.8 线程池
 
5.8.1 ExecutorService 多线程同步工作
public static void main(String[] args) throws Exception
{
	// 创建足够的线程来支持4个CPU并行的线程池
	// 创建一个具有固定线程数(6)的线程池
	ExecutorService pool = Executors.newFixedThreadPool(6);
	// 使用Lambda表达式创建Runnable对象
	Runnable target = () -> {
		for (int i = 0; i < 100 ; i++ )
		{
			System.out.println(Thread.currentThread().getName()+ "的i值为:" + i);
		}
	};
	// 向线程池中提交两个线程
	pool.submit(target);
	pool.submit(target);
	// 关闭线程池
	pool.shutdown();
}
5.8.2 多线程分解任务:ForkJoinPool和RecursiveAction 执行无返回值的任务
 
// 继承RecursiveAction来实现"可分解"的任务
class PrintTask extends RecursiveAction
{
	// 每个“小任务”只最多只打印50个数
	private static final int THRESHOLD = 50;
	private int start;
	private int end;
	// 打印从start到end的任务
	public PrintTask(int start, int end)
	{
		this.start = start;
		this.end = end;
	}
	@Override
	protected void compute()
	{
		// 当end与start之间的差小于THRESHOLD时,开始打印
		if(end - start < THRESHOLD)
		{
			for (int i = start ; i < end ; i++ )
			{
				System.out.println(Thread.currentThread().getName()+ "的i值:" + i);
			}
		}
		else
		{
			// 如果当end与start之间的差大于THRESHOLD时,即要打印的数超过50个
			// 将大任务分解成两个小任务。
			int middle = (start + end) / 2;
			PrintTask left = new PrintTask(start, middle);
			PrintTask right = new PrintTask(middle, end);
			// 并行执行两个“小任务”
			left.fork();
			right.fork();
		}
	}
}
public class ForkJoinPoolTest
{
	public static void main(String[] args)
		throws Exception
	{
		ForkJoinPool pool = new ForkJoinPool();
		
		// 提交可分解的PrintTask任务
		pool.submit(new PrintTask(0 , 300));
		pool.awaitTermination(2, TimeUnit.SECONDS);
		// 关闭线程池
		pool.shutdown();
	}
}
5.8.3 多线程分解任务:ForkJoinPool和RecursiveTask 执行有返回值的任务并合并结果
// 继承RecursiveTask来实现"可分解"的任务
class CalTask extends RecursiveTask
{
	// 每个“小任务”只最多只累加20个数
	private static final int THRESHOLD = 20;
	private int arr[];
	private int start;
	private int end;
	// 累加从start到end的数组元素
	public CalTask(int[] arr , int start, int end)
	{
		this.arr = arr;
		this.start = start;
		this.end = end;
	}
	@Override
	protected Integer compute()
	{
		int sum = 0;
		// 当end与start之间的差小于THRESHOLD时,开始进行实际累加
		if(end - start < THRESHOLD)
		{
			for (int i = start ; i < end ; i++ )
			{
				sum += arr[i];
			}
			return sum;
		}
		else
		{
			// 如果当end与start之间的差大于THRESHOLD时,即要累加的数超过20个时
			// 将大任务分解成两个小任务。
			int middle = (start + end) / 2;
			CalTask left = new CalTask(arr , start, middle);
			CalTask right = new CalTask(arr , middle, end);
			// 并行执行两个“小任务”
			left.fork();
			right.fork();
			// 把两个“小任务”累加的结果合并起来
			return left.join() + right.join();    // ①
		}
	}
}
public class Sum
{
	public static void main(String[] args)
		throws Exception
	{
		int[] arr = new int[100];
		Random rand = new Random();
		int total = 0;
		// 初始化100个数字元素
		for (int i = 0 , len = arr.length; i < len ; i++ )
		{
			int tmp = rand.nextInt(20);
			// 对数组元素赋值,并将数组元素的值添加到sum总和中。
			total += (arr[i] = tmp);
		}
		System.out.println(total);
		
		// 创建一个通用池
		ForkJoinPool pool = ForkJoinPool.commonPool();
		
		// 提交可分解的CalTask任务
		Future future = pool.submit(new CalTask(arr , 0 , arr.length));
		System.out.println(future.get());//返回值
		
		// 关闭线程池
		pool.shutdown();
	}
}
Concurrent 开头的集合类代表了支持并发访问的集合,可支持多线程并发写入访问。