Skip to content

Commit 794ae36

Browse files
committed
多线程
1 parent b44bad2 commit 794ae36

15 files changed

Lines changed: 940 additions & 6 deletions
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.yale.test.math;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
import java.util.concurrent.atomic.AtomicLong;
5+
6+
/*
7+
* Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。
8+
* 我们以AtomicInteger为例,它提供的主要操作有:
9+
* 增加值并返回新值:int addAndGet(int delta)
10+
* 加1后返回新值:int incrementAndGet()
11+
* 获取当前值:int get()
12+
* 用CAS方式设置:int compareAndSet(int expect, int update)
13+
*Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。
14+
* 如果我们自己通过CAS编写incrementAndGet(),它大概长这样:
15+
* 小结
16+
* 使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:
17+
* 原子操作实现了无锁的线程安全;
18+
* 适用于计数器,累加器等。
19+
*/
20+
public class AtomicIntegerDemo {
21+
22+
/*
23+
* CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。
24+
* 通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。
25+
* 通常情况下,我们并不需要直接用do ... while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。
26+
* 在高度竞争的情况下,还可以使用Java 8提供的LongAdder和LongAccumulator。
27+
*/
28+
public int incrementAndGet(AtomicInteger ai) {
29+
int prev, next;
30+
do {
31+
prev = ai.get();
32+
next = prev + 1;
33+
} while(!ai.compareAndSet(prev, next));
34+
return next;
35+
}
36+
}
37+
38+
class IdGenerator {
39+
//我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:
40+
AtomicLong al = new AtomicLong(0);
41+
public long getNextId() {
42+
return al.incrementAndGet();
43+
}
44+
}

src/com/yale/test/math/array/ArraysTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ public static void main(String[] args) {
110110
Arrays.sort(dataC);
111111
System.out.println("排序后的数组:" + Arrays.toString(dataC));
112112

113+
int [] dataCopy = Arrays.copyOf(dataC, dataC.length);
114+
System.out.println("复制数组dataC:" + Arrays.toString(dataCopy));
115+
116+
/*
117+
* Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,
118+
* 它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
119+
*/
120+
Arrays.parallelSort(dataCopy);//1.8之后才有的
121+
113122
HashMap<String, String> paramMap = new HashMap<String, String>();
114123
paramMap.put("a", "A");
115124
paramMap.put("b", "B");

src/com/yale/test/math/array/CollectionsDemo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,12 @@ public static void main(String[] args) {
7979

8080

8181
//Collections还提供了一组方法,可以把线程不安全的集合变为线程安全的集合:
82-
//因为从Java 5开始,引入了更高效的并发集合类,所以上述这几个同步方法已经没有什么用了。
82+
//因为从Java 5开始,引入了更高效的并发集合类,所以下面这几个同步方法已经没有什么用了。
83+
//https://www.liaoxuefeng.com/wiki/1252599548343744/1299919855943714
8384
List<String> listSafe = Collections.synchronizedList(listTest);
8485
Set<String> setSafe = Collections.synchronizedSet(singleSet);
8586
Map<String,String> mapSafe = Collections.synchronizedMap(singleMap);
87+
//但是它实际上synchronizedMap是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。
8688

8789
List<String> strList = new ArrayList<String>();
8890
Collections.addAll(strList, "A", "B", "C");//相当于调用了3次add方法

src/com/yale/test/thread/heima/zhangxiaoxiang/BlockingQueueTest.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,24 @@
66

77
/**
88
* 先进先出就叫队列,BlockingQueue是阻塞队列的接口,
9+
* 我们在前面已经通过ReentrantLock和Condition实现了一个BlockingQueue:
10+
* BlockingQueue的意思就是说,当一个线程调用这个TaskQueue的getTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。
11+
* 因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue。
12+
* 除了BlockingQueue外,针对List、Map、Set、Deque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:
13+
* interface non-thread-safe thread-safe
14+
List ArrayList CopyOnWriteArrayList
15+
Map HashMap ConcurrentHashMap
16+
Set HashSet/TreeSet CopyOnWriteArraySet
17+
Queue ArrayDeque/LinkedList ArrayBlockingQueue/LinkedBlockingQueue
18+
Deque ArrayDeque/LinkedList LinkedBlockingDeque
19+
使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:
20+
因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:
21+
Map<String, String> map = new HashMap<>();改为:Map<String, String> map = new ConcurrentHashMap<>();就可以了。
22+
java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:
23+
Map unsafeMap = new HashMap();
24+
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
25+
但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。
926
* @author dell
10-
*
1127
*/
1228
public class BlockingQueueTest {
1329

src/com/yale/test/thread/heima/zhangxiaoxiang/CallableAndFuture.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,26 @@
1313
import java.util.concurrent.Future;
1414
import java.util.concurrent.TimeoutException;
1515

16+
/*
17+
* Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。
18+
* 所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:并且Callable接口是一个泛型接口,可以返回指定类型的结果。
19+
* 现在的问题是,如何获得异步执行的结果?
20+
* 如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:
21+
* ExecutorService executor = Executors.newFixedThreadPool(4);
22+
// 定义任务:
23+
Callable<String> task = new Task();
24+
// 提交任务并获得Future:
25+
Future<String> future = executor.submit(task);
26+
// 从Future获取异步执行返回的结果:
27+
String result = future.get(); // 可能阻塞
28+
当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,
29+
如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
30+
一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:
31+
get():获取结果(可能会等待)
32+
get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
33+
cancel(boolean mayInterruptIfRunning):取消当前任务;
34+
isDone():判断任务是否已完成。
35+
*/
1636
public class CallableAndFuture {
1737

1838
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package com.yale.test.thread.heima.zhangxiaoxiang;
2+
3+
import java.lang.management.ManagementFactory;
4+
import java.lang.management.MemoryMXBean;
5+
import java.lang.management.ThreadMXBean;
6+
import java.util.Random;
7+
import java.util.concurrent.Callable;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.CompletionService;
10+
import java.util.concurrent.ExecutionException;
11+
import java.util.concurrent.ExecutorCompletionService;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.Future;
15+
import java.util.concurrent.TimeoutException;
16+
17+
/*
18+
* 使用CompletableFuture
19+
* 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
20+
* 从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
21+
* 我们以获取股票价格为例,看看如何使用CompletableFuture:
22+
* 小结
23+
CompletableFuture可以指定异步处理流程:
24+
thenAccept()处理正常结果;
25+
exceptional()处理异常结果;
26+
thenApplyAsync()用于串行化另一个CompletableFuture;
27+
anyOf()和allOf()用于并行化多个CompletableFuture。
28+
*/
29+
public class CompletableFutureDemo {
30+
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
31+
/*
32+
* 创建异步执行任务
33+
* 创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:
34+
* public interface Supplier<T> {
35+
T get();
36+
}
37+
* 这里我们用lambda语法简化了一下,直接传入CompletableFutureDemo::fetchPrice,因为CompletableFutureDemo.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。
38+
* 紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:
39+
* public interface Consumer<T> {
40+
void accept(T t);
41+
}
42+
* 异常时,CompletableFuture会调用Function对象:
43+
* public interface Function<T, R> {
44+
R apply(T t);
45+
}
46+
* 这里我们都用lambda语法简化了代码。
47+
* 可见CompletableFuture的优点是:
48+
异步任务结束时,会自动回调某个对象的方法;
49+
异步任务出错时,会自动回调某个对象的方法;
50+
主线程设置好回调后,不再关心异步任务的执行。
51+
*
52+
*/
53+
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompletableFutureDemo::fetchPrice);//supplyAsync内部会启动线程
54+
55+
cf.thenAccept((result)->{//如果执行成功
56+
System.out.println("price:" + result);
57+
});
58+
59+
cf.exceptionally((e) -> {//如果执行异常
60+
e.printStackTrace();
61+
return null;
62+
});
63+
64+
//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭;
65+
Thread.sleep(200);
66+
67+
68+
/*
69+
* 如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,
70+
* 例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:
71+
*/
72+
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(()->{//第一个任务
73+
return queryCode("中国石油");
74+
});
75+
76+
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code)->{//cfQuery成功后继续执行下一个任务
77+
return fetchPrice(code);
78+
});
79+
80+
cfFetch.thenAccept((result)->{//cfFetch成功后打印结果:
81+
System.out.println("price:" + result);
82+
});
83+
//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
84+
Thread.sleep(2000);
85+
86+
/*
87+
* 除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:
88+
* 同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
89+
* 下面逻辑实现的异步查询规则实际上是:
90+
┌─────────────┐ ┌─────────────┐
91+
│ Query Code │ │ Query Code │
92+
│ from sina │ │ from 163 │
93+
└─────────────┘ └─────────────┘
94+
│ │
95+
└───────┬───────┘
96+
97+
┌─────────────┐
98+
│ anyOf │
99+
└─────────────┘
100+
101+
┌───────┴────────┐
102+
▼ ▼
103+
┌─────────────┐ ┌─────────────┐
104+
│ Query Price │ │ Query Price │
105+
│ from sina │ │ from 163 │
106+
└─────────────┘ └─────────────┘
107+
│ │
108+
└────────┬───────┘
109+
110+
┌─────────────┐
111+
│ anyOf │
112+
└─────────────┘
113+
114+
115+
┌─────────────┐
116+
│Display Price│
117+
└─────────────┘
118+
* 除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
119+
* 最后我们注意CompletableFuture的命名规则:
120+
* xxx():表示该方法将继续在已有的线程中执行;
121+
* xxxAsync():表示将异步在线程池中执行。
122+
* CompletableFuture可以指定异步处理流程:
123+
* thenAccept()处理正常结果;
124+
* exceptional()处理异常结果;
125+
* thenApplyAsync()用于串行化另一个CompletableFuture;
126+
* anyOf()和allOf()用于并行化多个CompletableFuture。
127+
*/
128+
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(()->{//俩个CompletableFuture执行异步查询
129+
return queryCode("中国石油", "https://finace.sina.com.cn/code/");
130+
});
131+
132+
CompletableFuture<String> cfQueryForm163 = CompletableFuture.supplyAsync(()->{
133+
return queryCode("中国石油", "https://money.163.com/code/");
134+
});
135+
136+
//用anyof合并为一个新的CompletableFuture:
137+
CompletableFuture<Object> cfQueryMegre = CompletableFuture.anyOf(cfQueryFromSina, cfQueryForm163);
138+
139+
CompletableFuture<Double> cfFetchFromSina = cfQueryMegre.thenApplyAsync((code)->{//俩个CompletableFuture执行异步查询
140+
return fetchPrice((String)code, "https://finance.sina.com.cn/price/");
141+
});
142+
143+
CompletableFuture<Double> cfFetchFrom163 = cfQueryMegre.thenApplyAsync((code)->{
144+
return fetchPrice((String)code, "https://money.163.com/price");
145+
});
146+
147+
//用anyof合并为一个新的CompletableFuture:
148+
CompletableFuture<Object> cfFetchSec = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
149+
cfFetchSec.thenAccept((result)->{//最终结果
150+
System.out.println("price:" + result);
151+
});
152+
//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
153+
Thread.sleep(2000);
154+
155+
/**
156+
* 咨询一下老师。我看到supplyAsync在thenAccept的前面,而且任务是在supplyAsync中启动的。那有没有可能在通过cf.thenAccept设置回调前,
157+
* 异步执行的任务已经完成了(假如fetchPrice是个空函数),这样在thenAccept中设置的回调就不会被执行了,有这种可能吗?
158+
* 答案是:可以执行
159+
*/
160+
// 创建异步执行任务:
161+
CompletableFuture<Double> cfTime = CompletableFuture.supplyAsync(CompletableFutureDemo::fetchPriceZer);
162+
// 暂停让Future完成:
163+
Thread.sleep(5000);
164+
// 如果执行成功:
165+
cfTime.thenAccept((result) -> {
166+
System.out.println("thenAccept: " + result);
167+
});
168+
// 如果执行异常:
169+
cfTime.exceptionally((e) -> {
170+
e.printStackTrace();
171+
return null;
172+
});
173+
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
174+
Thread.sleep(2000);
175+
}
176+
177+
static Double fetchPriceZer() {
178+
System.out.println("fetched!");
179+
return 123.46;
180+
}
181+
182+
static Double fetchPrice() {
183+
try {
184+
Thread.sleep(100);
185+
} catch (InterruptedException e) {
186+
e.printStackTrace();
187+
}
188+
if (Math.random() < 0.3) {
189+
throw new RuntimeException("fetch price failed");
190+
}
191+
return 5 + Math.random() * 20;
192+
}
193+
194+
static String queryCode(String name) {
195+
try {
196+
Thread.sleep(100);
197+
} catch (InterruptedException e) {
198+
e.printStackTrace();
199+
}
200+
return "601857";
201+
}
202+
203+
static Double fetchPrice(String code) {
204+
try {
205+
Thread.sleep(100);
206+
} catch (InterruptedException e) {
207+
e.printStackTrace();
208+
}
209+
return 5 + Math.random() * 20;
210+
}
211+
212+
static String queryCode(String name, String url) {
213+
System.out.println("query Code " + name + " from " + url + ".....");
214+
try {
215+
Thread.sleep((long)(Math.random() * 100));
216+
} catch (InterruptedException e) {
217+
e.printStackTrace();
218+
}
219+
return "601857";
220+
}
221+
222+
static Double fetchPrice(String code, String url) {
223+
System.out.println("query price" + code + " from " + url);
224+
try {
225+
Thread.sleep((long)(Math.random() * 100));
226+
} catch (InterruptedException e) {
227+
e.printStackTrace();
228+
}
229+
return 5 + Math.random() * 20;
230+
}
231+
}

0 commit comments

Comments
 (0)