RxJava JDBC 简介

http://www.baeldung.com/rxjava-jdbc
作者:baeldung
译者:oopsguy.com

1、概述

简单地说,rxjava-jdbc 是一个用于与关系数据库交互的 API,其允许以链式的方式调用。在此快速教程中,我们将来了解这个类库,以及如何使用它的一些常用功能。

在阅读本教程之前,你需要有一定的 RxJava 基础知识。

2、Maven 依赖

从 Maven 依赖开始,我们需要把依赖添加到 pom.xml 文件中:

1
2
3
4
5
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-jdbc</artifactId>
<version>0.7.11</version>
</dependency>

我们可以在 Maven Central 上找到最新版本的 API。

3、主要组件

Database 类是运行所有常见类型数据库交互的主入口点。要创建一个 Database 对象,我们可以将 ConnectionProvider 接口实现实例传递给 from() 静态方法:

1
2
3
4
public static ConnectionProvider connectionProvider
= new ConnectionProviderFromUrl(
DB_CONNECTION, DB_USER, DB_PASSWORD);
Database db = Database.from(connectionProvider);

ConnectionProvider 有几个值得注意的实现 — 例如 ConnectionProviderFromContextConnectionProviderFromDataSourceConnectionProviderFromUrlConnectionProviderPooled

为了做些基本操作,我们可以使用以下 Database 的 API:

  • select() — 用于 SQL select 查询
  • update() — 用于 DDL 语句,如 create 和 drop,以及 insert、update 和 delete

4、启动

在下面的快速示例中,我们将展示基本的数据库操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class BasicQueryTypesTest {

Observable<Integer> create,
insert1,
insert2,
insert3,
update,
delete = null;

@Test
public void whenCreateTableAndInsertRecords_thenCorrect() {
create = db.update(
"CREATE TABLE IF NOT EXISTS EMPLOYEE("
+ "id int primary key, name varchar(255))")
.count();
insert1 = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
.dependsOn(create)
.count();
update = db.update(
"UPDATE EMPLOYEE SET name = 'Alan' WHERE id = 1")
.dependsOn(create)
.count();
insert2 = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(2, 'Sarah')")
.dependsOn(create)
.count();
insert3 = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(3, 'Mike')")
.dependsOn(create)
.count();
delete = db.update(
"DELETE FROM EMPLOYEE WHERE id = 2")
.dependsOn(create)
.count();
List<String> names = db.select(
"select name from EMPLOYEE where id < ?")
.parameter(3)
.dependsOn(create)
.dependsOn(insert1)
.dependsOn(insert2)
.dependsOn(insert3)
.dependsOn(update)
.dependsOn(delete)
.getAs(String.class)
.toList()
.toBlocking()
.single();

assertEquals(Arrays.asList("Alan"), names);
}
}

这里有一点需要注意 — 我们调用 dependsOn() 来决定查询的运行顺序。

否则,代码将失败或产生不可预测的结果,除非我们以一定的顺序指定要执行的查询。

5、自动映射

自动映射功能允许我们将指定的数据库记录映射到对象。

我们来看看两种数据库记录的自动映射方法。

5.1、使用接口自动映射

我们可以使用带注解的接口将数据库记录 automap() 到对象。为此,我们可以创建一个带注解的接口:

1
2
3
4
5
6
7
8
9
public interface Employee {

@Column("id")
int id();

@Column("name")
String name();

}

之后,运行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void whenSelectFromTableAndAutomap_thenCorrect() {
List<Employee> employees = db.select("select id, name from EMPLOYEE")
.dependsOn(create)
.dependsOn(insert1)
.dependsOn(insert2)
.autoMap(Employee.class)
.toList()
.toBlocking()
.single();

assertThat(
employees.get(0).id()).isEqualTo(1);
assertThat(
employees.get(0).name()).isEqualTo("Alan");
assertThat(
employees.get(1).id()).isEqualTo(2);
assertThat(
employees.get(1).name()).isEqualTo("Sarah");
}

5.2、使用类自动映射

我们还可以使用具体的类来将数据库记录自动映射到对象。让我们看看该类的写法:

1
2
3
4
5
6
7
public class Manager {

private int id;
private String name;

// standard constructors, getters, and setters
}

运行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void whenSelectManagersAndAutomap_thenCorrect() {
List<Manager> managers = db.select("select id, name from MANAGER")
.dependsOn(create)
.dependsOn(insert1)
.dependsOn(insert2)
.autoMap(Manager.class)
.toList()
.toBlocking()
.single();

assertThat(
managers.get(0).getId()).isEqualTo(1);
assertThat(
managers.get(0).getName()).isEqualTo("Alan");
assertThat(
managers.get(1).getId()).isEqualTo(2);
assertThat(
managers.get(1).getName()).isEqualTo("Sarah");
}

这里有几个要点:

  • createinsert1insert2 引用了在创建 Manager 表并将记录插入其中返回的 Observables
  • 我们查询中所 select 的列的数量必须与 Manager 类构造方法中的参数数量相匹配
  • 列必须是可以自动映射到构造方法中的类型

有关自动映射的更多信息,请访问 GitHub 上的 rxjava-jdbc 仓库

6、使用大对象

API 支持使用大对象(如 CLOB 和 BLOBS)。在接下来的小节中,我们将介绍如何利用这一功能。

6.1、CLOB

我们来如何 insert 和 select 一个 CLOB:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Before
public void setup() throws IOException {
create = db.update(
"CREATE TABLE IF NOT EXISTS " +
"SERVERLOG (id int primary key, document CLOB)")
.count();

InputStream actualInputStream
= new FileInputStream("src/test/resources/actual_clob");
actualDocument = getStringFromInputStream(actualInputStream);

InputStream expectedInputStream = new FileInputStream(
"src/test/resources/expected_clob");

expectedDocument = getStringFromInputStream(expectedInputStream);
insert = db.update(
"insert into SERVERLOG(id,document) values(?,?)")
.parameter(1)
.parameter(Database.toSentinelIfNull(actualDocument))
.dependsOn(create)
.count();
}

@Test
public void whenSelectCLOB_thenCorrect() throws IOException {
db.select("select document from SERVERLOG where id = 1")
.dependsOn(create)
.dependsOn(insert)
.getAs(String.class)
.toList()
.toBlocking()
.single();

assertEquals(expectedDocument, actualDocument);
}

请注意,getStringFromInputStream() 是将 InputStream 的内容转换为 String

6.2、BLOB

我们可以使用 API 以类似的方式使用 BLOB。唯一的区别是,我们不必传递一个 StringtoSentinelIfNull() 方法,而是传递一个字节数组。

我们可以这样做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Before
public void setup() throws IOException {
create = db.update(
"CREATE TABLE IF NOT EXISTS "
+ "SERVERLOG (id int primary key, document BLOB)")
.count();

InputStream actualInputStream
= new FileInputStream("src/test/resources/actual_clob");
actualDocument = getStringFromInputStream(actualInputStream);
byte[] bytes = this.actualDocument.getBytes(StandardCharsets.UTF_8);

InputStream expectedInputStream = new FileInputStream(
"src/test/resources/expected_clob");
expectedDocument = getStringFromInputStream(expectedInputStream);
insert = db.update(
"insert into SERVERLOG(id,document) values(?,?)")
.parameter(1)
.parameter(Database.toSentinelIfNull(bytes))
.dependsOn(create)
.count();
}

之后,我们可以在之前的例子中服用同样的测试。

7、事务

接下来,我们来看看事务支持。

事务管理允许我们处理在单个事务中分组多个数据库操作的事务,以便它们都能被提交 — 永久保存到数据库中,或者完全回滚。

我们来看一个快速示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void whenCommitTransaction_thenRecordUpdated() {
Observable<Boolean> begin = db.beginTransaction();
Observable<Integer> createStatement = db.update(
"CREATE TABLE IF NOT EXISTS EMPLOYEE(id int primary key, name varchar(255))")
.dependsOn(begin)
.count();
Observable<Integer> insertStatement = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
.dependsOn(createStatement)
.count();
Observable<Integer> updateStatement = db.update(
"UPDATE EMPLOYEE SET name = 'Tom' WHERE id = 1")
.dependsOn(insertStatement)
.count();
Observable<Boolean> commit = db.commit(updateStatement);
String name = db.select("select name from EMPLOYEE WHERE id = 1")
.dependsOn(commit)
.getAs(String.class)
.toBlocking()
.single();

assertEquals("Tom", name);
}

我们调用 beginTransaction() 方法来开始一个事务。调用此方法后,每个数据库操作都将在同一个事务中运行,直到调用 commit()rollback() 方法为止。

我们可以使用 rollback() 方法捕获 Exception 来回滚整个事务,以防代码由于某些原因而失败。我们可以为所有 Exception 或指定 Exception 进行此操作。

8、返回生成的 key

如果我们在处理的表中设置了 auto_increment 字段,我们可能需要检索生成的值。我们可以通过调用 returnGeneratedKeys() 方法来实现。

我们来看一个快速示例:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void whenInsertAndReturnGeneratedKey_thenCorrect() {
Integer key = db.update("INSERT INTO EMPLOYEE(name) VALUES('John')")
.dependsOn(createStatement)
.returnGeneratedKeys()
.getAs(Integer.class)
.count()
.toBlocking()
.single();

assertThat(key).isEqualTo(1);
}

9、结论

在本教程中,我们了解了如何使用 rxjava-jdbc 的链式方法。我们还介绍了它提供的一些常用功能,例如自动化、使用大对象和事务。

您可以在 GitHub 上获取完整的代码。

相关链接和原文代码