Java Stream流编程

Java Stream流编程

Stream流编程也是java8中的新特性。Stream是一个高级的迭代器,不是一个数据结构,不是一个集合,不会存放数据。它是将数据放在一个流水线中处理,在流水线的一边输入数据,在流水线的尾端得到结果,中间有一系列的操作。

外部迭代和内部迭代

先看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.stream.IntStream;

public class StreamDemo1 {
public static void main(String[] args) {
int[] nums = {1,2,3};
//外部迭代
int sum = 0;
for(int i:nums){
sum += i;
}
System.out.println("结果为" + sum);
//使用stream的内部迭代
int sum2 = IntStream.of(nums).sum();
System.out.println("结果为" + sum2);
}
}

外部迭代就是使用for、while等循环进行迭代的操作,串行的,如果数据量大要自己实现线程池等操作;内部迭代更加简短,只需要知道要做什么,并不需要知道内部实现细节。

中间操作/终止操作和惰性求值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.stream.IntStream;

public class StreamDemo1 {
public static void main(String[] args) {
int[] nums = {1,2,3};
//外部迭代
int sum = 0;
for(int i:nums){
sum += i;
}
System.out.println("结果为" + sum);
//使用stream的内部迭代
int sum2 = IntStream.of(nums).sum();
System.out.println("结果为" + sum2);
//map是中间操作(返回stream的操作)、sum是终止操作
int sum3 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
System.out.println("结果为" + sum3);
//惰性求值
System.out.println("惰性求值示例:");
IntStream.of(nums).map(StreamDemo1::doubleNum);

}

public static int doubleNum(int i){
System.out.println("执行了乘以2");
return i*2;
}
}

中间操作是返回流的操作,只是操作中的一个步骤;终止操作产生的是一个结果(副作用)。

惰性求值就是终止操作没有调用的情况下,中间操作不会执行。在上面代码清单中,main函数最后一行IntStream.of(nums).map(StreamDemo1::doubleNum);不会有输出,因为其没有终止操作。

流的创建

类型 相关方法
集合 Collection.stream/parallelStream
数组 Arrays.stream
数字Stream IntStream/LongStream.range/rangeClosed
随机数Stream Radom.ints/longs/doubles
自己创建 Stream.generate/iterate

下面代码清单是一些简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class StreamDemo2 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();

//从集合创建流
list.stream();
//从集合创建并行流
list.parallelStream();

//从数组创建
Arrays.stream(new int[]{2,3,4});

//创建数字流
IntStream.of(1,2,3);
//创建1到5区间数字(1,2,3,4 有头无尾)的流
IntStream.range(1,5);

//随机数字创建一个无限流,无限流使用limit可以限定产生的个数
new Random().ints().limit(10);

//自己生成流
Random random = new Random();
Stream.generate(()-> random.nextInt()).limit(20);

}
}

流的中间操作

流的中间操作都返回stream,大致有两类,一类是 有状态操作,表示结果需要依赖其他一些元素,例如排序依赖于其他操作都计算完毕才能有排序结果;一类是无状态操作,表示当前操作与前后无依赖关系。

方法 说明
map/mapToInt/Long… 无状态操作,对每个元素执行操作
flatMap/flatMapToInt/Long… 无状态操作,对每个元素的一个属性执行操作
filter 无状态操作,过滤
peek 无状态操作,类似于foreach
unordered 无状态操作,多用于并行流,无序
distinct 有状态操作,去重
sorted 有状态操作,排序
limit/skip 有状态操作,limit是无限流的限制,skip是跳过一些数据

下面代码清单是一些示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Random;
import java.util.stream.Stream;

public class StreamDemo3 {
public static void main(String[] args) {
String str = "hello world hello java";

//打印每个单词的长度
Stream.of(str.split(" ")).map(s -> s.length())
.forEach(System.out::println);

//把长度大于4的单词的长度打印出来
Stream.of(str.split(" ")).filter(s -> s.length()>4)
.map(s -> s.length())
.forEach(System.out::println);

//flatMap A->B属性(是个集合),最终得到所有的A元素里面的所有B属性的集合
//intStream/longStream 不是Stream的子类,所以要进行装箱 boxed
//打印所有字符
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
.forEach(i -> System.out.println((char)i.intValue()));

//peek 多用于debug 是个中间操作,foreach是终止操作
//下面代码会打印两次
Stream.of(str.split(" ")).peek(System.out::println)
.forEach(System.out::println);

//limit使用,主要用于无限流
//打印10个大于100小于1000的数
new Random().ints().filter(i -> i>100&&i<1000)
.limit(10)
.forEach(System.out::println);
}
}

流的终止操作

终止操作分为两类,一类是非短路操作,;一类是短路操作,短路操作是一些不用等结果全部计算完就可以结束流的操作。

方法 说明
forEach/forEachOrdered 非短路操作,遍历
collect/toArray 非短路操作,收集到集合或数组
reduce 非短路操作,归约
min/max/count 非短路操作,最大值,最小值,计数
findFirst/findAny 短路操作,找到第一个或任意一个
allMatch/anyMatch/noneMatch 短路操作,匹配

下面代码清单是一些示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamDemo4 {
public static void main(String[] args) {
String str = "hello world hello java";

//使用并行流时用forEach打印每个字符,发现是乱序的
str.chars().parallel().forEach(i -> System.out.print((char)i));
System.out.println();
//使用forEachOrdered解决
str.chars().parallel().forEachOrdered(i -> System.out.print((char)i));
System.out.println();

//将流收集到list中(也有toSet等)
List<String> list = Stream.of(str.split(" "))
.collect(Collectors.toList());

//在每个单词中间加一个|
Optional<String> letters = Stream.of(str.split(" "))
.reduce((s1, s2) -> s1+"|"+s2);
System.out.println(letters.orElse(""));
//带有初始化的操作方式
String reduce = Stream.of(str.split(" "))
.reduce("",(s1, s2) -> s1+"|"+s2);
System.out.println(reduce);
//计算单词总长度
Integer length = Stream.of(str.split(" "))
.map(s -> s.length())
.reduce(0,(i1, i2) -> i1 + i2);
System.out.println(length);

//最长的单词
Optional<String> max = Stream.of(str.split(" "))
.max((s1,s2) -> s1.length()-s2.length());
System.out.println(max.get());

//使用findFirst短路操作
OptionalInt findFirst = new Random().ints().findFirst();
System.out.println(findFirst.getAsInt());
}
}

并行流

使用Stream流编程有个非常重要的特点,就是可以非常方便的使用并行流,而不需要自己去管理多线程,自己去拆分任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class StreamDemo5 {

public static void main(String[] args) {
//调用parallel产生一个并行流
IntStream.range(1,100).parallel().peek(StreamDemo5::debug).count();
}

public static void debug(int i){
System.out.println(Thread.currentThread().getName() + "debug " + i);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果

我们发现,使用并行流只需要在调用的时候加一个parallel()方法即可(串行流使用sequential()),运行时,我的电脑会一次打印4个数字(乱序),表示当前有4个线程。有两个注意点,第一,多次调用parallel()sequential(),以最后一次调用为准;第二,多线程运行时,我们发现并行流使用的线程池是:ForkJoinPool.commonPool,默认线程数时当前电脑的cpu核数(我的为4核,因此有4个)。

修改默认的线程数的方法,下面代码清单是修改为20个线程:

1
2
//在main函数中调用并行流的代码之前加上下面代码即可。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");

还有一个问题,所有的并行流都使用同一个默认的并行池会形成阻塞,假设现在增加了一些任务并行流,但是由于默认线程池有其他的一些并行任务在处理,那么这个任务可能会处理的非常非常晚,这是不可预知的。那么我们可以让并行流使用自己的线程池,而不使用默认的线程池,以防止任务被阻塞。下面代码清单是使用自己创建的线程池示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class StreamDemo5 {

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(1,100).parallel().peek(StreamDemo5::debug).count());

pool.shutdown();

//线程池为守护线程,防止主线程退出导致线程池也自动退出
//因此加入代码防止主线程退出
synchronized (pool){
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public static void debug(int i){
System.out.println(Thread.currentThread().getName() + "debug " + i);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

收集器

收集器是Stream流编程里面一个非常重要以及非常有用的知识点。收集器的意思就是把流处理后的东西收集起来,可以收集到一些集合类,比如List、Set、Map等等,或者把最后处理后的数据再处理成一条,比如求和,字符串拼接等。下面代码清单是一些常用的收集器的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import org.apache.commons.collections4.MapUtils;

import java.util.*;
import java.util.stream.Collectors;

/**
* 学生
*/
class Student{
//姓名
private String name;
//年龄
private int age;
//性别
private Gender gender;
//班级
private Grade grade;

public Student(String name, int age, Gender gender, Grade grade) {
super();
this.name = name;
this.age = age;
this.gender = gender;
this.grade = grade;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public Gender getGender() {
return gender;
}

public void setGender(Gender gender) {
this.gender = gender;
}

public Grade getGrade() {
return grade;
}

public void setGrade(Grade grade) {
this.grade = grade;
}

@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
", gender=" + gender +
", grade=" + grade +
'}';
}
}


/**
* 性别
*/
enum Gender{
MALE,FEMALE;
}

/**
* 班级
*/
enum Grade{
ONE,TWO,THREE,FOUR;
}

public class CollectDemo {

public static void main(String[] args) {
//测试数据
List<Student> students = Arrays.asList(
new Student("小零",10,Gender.MALE,Grade.ONE),
new Student("小一",9,Gender.MALE,Grade.THREE),
new Student("小二",11,Gender.FEMALE,Grade.TWO),
new Student("小三",13,Gender.MALE,Grade.FOUR),
new Student("小四",12,Gender.FEMALE,Grade.ONE),
new Student("小五",10,Gender.FEMALE,Grade.FOUR),
new Student("小六",9,Gender.MALE,Grade.THREE),
new Student("小七",13,Gender.MALE,Grade.TWO),
new Student("小八",11,Gender.MALE,Grade.ONE),
new Student("小九",12,Gender.FEMALE,Grade.THREE)
);
//得到所有学生的年龄列表
//s -> s.getAge() 修改成方法引用的方式Student::getAge,不会多生成一个类似lambda$0这样的函数
List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList());//也可以改成toSet()等等
System.out.println("所有学生的年龄" + ages);
//指定集合类型
Set<Integer> set = students.stream().map(Student::getAge).collect(Collectors.toCollection(TreeSet::new));

//统计汇总信息
IntSummaryStatistics ageSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
//打印结果为年龄信息汇总 IntSummaryStatistics{count=10, sum=110, min=9, average=11.000000, max=13}
System.out.println("年龄信息汇总" + ageSummaryStatistics);

//按性别分块 (分块是特殊的分组)
Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
//使用apache commons中的工具类打印map
/* 打印结果:
男女学生列表 =
{
false = [Student{name='小二', age=11, gender=FEMALE, grade=TWO}, Student{name='小四', age=12, gender=FEMALE, grade=ONE}, Student{name='小五', age=10, gender=FEMALE, grade=FOUR}, Student{name='小九', age=12, gender=FEMALE, grade=THREE}]
true = [Student{name='小零', age=10, gender=MALE, grade=ONE}, Student{name='小一', age=9, gender=MALE, grade=THREE}, Student{name='小三', age=13, gender=MALE, grade=FOUR}, Student{name='小六', age=9, gender=MALE, grade=THREE}, Student{name='小七', age=13, gender=MALE, grade=TWO}, Student{name='小八', age=11, gender=MALE, grade=ONE}]
}
*/
MapUtils.verbosePrint(System.out,"男女学生列表",genders);

//按年级分组
Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade));
/*打印结果
各年级学生列表 =
{
ONE = [Student{name='小零', age=10, gender=MALE, grade=ONE}, Student{name='小四', age=12, gender=FEMALE, grade=ONE}, Student{name='小八', age=11, gender=MALE, grade=ONE}]
THREE = [Student{name='小一', age=9, gender=MALE, grade=THREE}, Student{name='小六', age=9, gender=MALE, grade=THREE}, Student{name='小九', age=12, gender=FEMALE, grade=THREE}]
FOUR = [Student{name='小三', age=13, gender=MALE, grade=FOUR}, Student{name='小五', age=10, gender=FEMALE, grade=FOUR}]
TWO = [Student{name='小二', age=11, gender=FEMALE, grade=TWO}, Student{name='小七', age=13, gender=MALE, grade=TWO}]
}
*/
MapUtils.verbosePrint(System.out,"各年级学生列表",grades);

//得到所有班级学生的个数,
Map<Grade, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getGrade,Collectors.counting()));
/*打印结果
各年级学生数量列表 =
{
ONE = 3
THREE = 3
FOUR = 2
TWO = 2
}
*/
MapUtils.verbosePrint(System.out,"各年级学生数量列表",gradesCount);
}
}
# Java
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×