dangdang的扩展 sharding-jdbc实现动态数据源分库分表分页查询

dangdang的分库分表扩展 sharding-jdbc

这段时间应公司需求需要分库查询数据,还好不是分表
研究了一下sharding-jdbc
我这里需求有点复杂,说是数据源不固定,随时添加一个数据源我就想到不能配置到项目文件中,就在业务库中新建一个表用来存储数据源的信息
表信息
用最原始的jdbc连接数据库的方式实现分库查询

封装的DBUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263

import com.dangdang.ddframe.rdb.sharding.api.rule.DataSourceRule;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.api.rule.TableRule;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.DatabaseShardingStrategy;
import com.dangdang.ddframe.rdb.sharding.api.strategy.table.TableShardingStrategy;
import com.dangdang.ddframe.rdb.sharding.jdbc.core.datasource.ShardingDataSource;
import com.maven.znly.entity.NewDataSource;
import org.apache.commons.dbcp.BasicDataSource;

import javax.sql.DataSource;
import java.sql.*;
import java.util.*;

/**
* @ClassName DBUtil
* @Description 为了方便建立数据库链接 只为查询封装
* @Author asus
* @Date Created by asus on 2018/11/1917:22
* @Version 1.0
**/
public class DBUtil {
public static Page getPage(List<NewDataSource> dataSourceList, String sql,PageRequest pageRequest){
ResultSet rs=null;
Connection conn=null;
List<Map<String, Object>> list = new ArrayList<>();
Integer pageNo=pageRequest.getPageNumber();//第几页
Integer pageSize=pageRequest.getPageSize();//一页几条
Integer count=0;
try {
Map<String, DataSource> dataSourceMap = createDataSourceMap(dataSourceList);
DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap);
TableRule[] tableRules=new TableRule[dataSourceList.size()];
for (int i = 0; i <dataSourceList.size() ; i++) {
TableRule tableRule = TableRule.builder("device_log").actualTables(Arrays.asList("device_log")).dataSourceRule(dataSourceRule).build();
tableRules[i]=tableRule;
}
List<TableRule> newTableRule=Arrays.asList(tableRules);
//TableRule orderTableRule = TableRule.builder("device_log").actualTables(Arrays.asList("device_log")).dataSourceRule(dataSourceRule).build();
ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(newTableRule)
.databaseShardingStrategy(new DatabaseShardingStrategy("create_time", new ModuloDatabaseShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy("create_time", new ModuloTableShardingAlgorithm())).build();
DataSource dataSource = new ShardingDataSource(shardingRule);
//2.获得数据库的连接
conn=dataSource.getConnection();
//构造一个statement对象来执行sql语句:主要有Statement,PreparedStatement,CallableStatement三种实例来实现
// stmt=conn.createStatement();
//获得总个数
String countSql = "select count(*) totalCount from (" + sql + " ) cout";
PreparedStatement pstmt = conn.prepareStatement(countSql);
rs=pstmt.executeQuery();
count=rs.getInt("totalCount");
StringBuilder sb = new StringBuilder("SELECT * FROM ");
sb.append("( ");
sb.append(sql);
sb.append(") A limit " + ( pageNo* pageSize) + "," + pageSize + "");
PreparedStatement pstmts = conn.prepareStatement(sb.toString());
rs=pstmts.executeQuery();
//PreparedStatement pstmt = conn.prepareStatement(sql);
//执行sql并返还结束 ;

ResultSetMetaData md = rs.getMetaData(); //获得结果集结构信息,元数据
int columnCount = md.getColumnCount(); //获得列数
while (rs.next()) {
Map<String,Object> rowData = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
rowData.put(md.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
if(rs !=null){//11.关闭记录集
try{
rs.close();
} catch (SQLException e){
e.printStackTrace();
}
}
if(conn !=null){//13.关闭连接 (记住一定要先关闭前面的11.12.然后在关闭连接,就像关门一样,先关里面的,最后关最外面的)
try{
conn.close();
}catch(SQLException e){
e.printStackTrace();
}
}
}catch (Exception e){
e.printStackTrace();
}
return new MyPage(pageRequest, list, count);
}
//将获得的数据库与java的链接返回(返回的类型为Connection)
public static List<Map<String,Object>> getConnection(List<NewDataSource> dataSourceList, String sql){
ResultSet rs=null;
Connection conn=null;
List<Map<String, Object>> list = new ArrayList<>();
Integer count=0;
try {
Map<String, DataSource> dataSourceMap = createDataSourceMap(dataSourceList);
DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap);
TableRule[] tableRules=new TableRule[dataSourceList.size()];
for (int i = 0; i <dataSourceList.size() ; i++) {
TableRule tableRule = TableRule.builder("device_log").actualTables(Arrays.asList("device_log")).dataSourceRule(dataSourceRule).build();
tableRules[i]=tableRule;
}
List<TableRule> newTableRule=Arrays.asList(tableRules);
//TableRule orderTableRule = TableRule.builder("device_log").actualTables(Arrays.asList("device_log")).dataSourceRule(dataSourceRule).build();
ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(newTableRule)
.databaseShardingStrategy(new DatabaseShardingStrategy("create_time", new ModuloDatabaseShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy("create_time", new ModuloTableShardingAlgorithm())).build();
DataSource dataSource = new ShardingDataSource(shardingRule);
//2.获得数据库的连接
conn=dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql);
rs=pstmt.executeQuery();
ResultSetMetaData md = rs.getMetaData(); //获得结果集结构信息,元数据
int columnCount = md.getColumnCount(); //获得列数
while (rs.next()) {
Map<String,Object> rowData = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
rowData.put(md.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
if(rs !=null){//11.关闭记录集
try{
rs.close();
} catch (SQLException e){
e.printStackTrace();
}
}
if(conn !=null){//13.关闭连接 (记住一定要先关闭前面的11.12.然后在关闭连接,就像关门一样,先关里面的,最后关最外面的)
try{
conn.close();
}catch(SQLException e){
e.printStackTrace();
}
}
}catch (Exception e){
e.printStackTrace();
}
return list;
}
private static DataSource createDataSource(NewDataSource dataSource) {
BasicDataSource result = new BasicDataSource();
result.setDriverClassName("com.mysql.jdbc.Driver");
String URL="jdbc:mysql://"+dataSource.getDataIp()+":"+dataSource.getDataCode()+"/"+dataSource.getDataDbName()+"?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=CONVERT_TO_NULL&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8";
result.setUrl(URL);
result.setUsername(dataSource.getDataUserName());
result.setPassword(dataSource.getDataUserPassword());
return result;
}
private static Map<String, DataSource> createDataSourceMap(List<NewDataSource> dataSourceList) {
Map<String, DataSource> mapList = new HashMap<>();
for (int i = 0; i < dataSourceList.size(); i++) {
NewDataSource dataSource=dataSourceList.get(i);
DataSource dataSource1=createDataSource(dataSource);
mapList.put(dataSource.getDataDbName(),dataSource1);
}
return mapList;
}
static class MyPage implements Page {
private PageRequest pageRequest;
private List<Map<String, Object>> objectList;

private int total;

public MyPage(PageRequest pageRequest, List<Map<String,Object>> objectList, int total) {
this.pageRequest = pageRequest;
this.objectList = objectList;
this.total = total;
}

@Override
public int getNumber() {
return pageRequest.getPageNumber();
}

@Override
public int getSize() {
return pageRequest.getPageSize();
}

@Override
public int getTotalPages() {

if (total % pageRequest.getPageSize() == 0) {
return total/pageRequest.getPageSize();
} else {
return total/pageRequest.getPageSize()+1;
}


}

@Override
public int getNumberOfElements() {
return 0;
}

@Override
public long getTotalElements() {
return this.total;
}

@Override
public Page map(Converter converter) {
return null;
}


@Override
public Iterator<Map<String,Object>> iterator() {
return objectList.iterator();
}

@Override
public List<Map<String,Object>> getContent() {
return objectList;
}



@Override
public boolean hasContent() {
return objectList!=null&&objectList.size()>0;
}

@Override
public Sort getSort() {
return null;
}

@Override
public boolean isFirst() {
return pageRequest.getPageNumber() == 0;
}

@Override
public boolean isLast() {
return pageRequest.getPageNumber()==getTotalPages()-1;
}

@Override
public boolean hasNext() {
return pageRequest.getPageNumber()<getTotalPages()-1;
}

@Override
public boolean hasPrevious() {
return pageRequest.getPageNumber() >0;
}

@Override
public Pageable nextPageable() {
return null;
}

@Override
public Pageable previousPageable() {
return null;
}
}

}

因为我需要分页所以这里就再次封装一个

ModuloDatabaseShardingAlgorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import com.dangdang.ddframe.rdb.sharding.api.strategy.database.SingleKeyDatabaseShardingAlgorithm;
import com.google.common.collect.Range;

import java.util.Collection;
import java.util.LinkedHashSet;

/**
* @ClassName ModuloDatabaseShardingAlgorithm
* @Description TODO
* @Author asus
* @Date Created by asus on 2018/12/1012:10
* @Version 1.0
**/
public class ModuloDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm<Integer>{
@Override
public String doEqualSharding(Collection<String> collection, ShardingValue<Integer> shardingValue) {
for (String each : collection) {
if (each.endsWith(shardingValue.getValue() % 2 + "")) {
return each;
}
}
throw new IllegalArgumentException();
}

@Override
public Collection<String> doInSharding(Collection<String> collection, ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(collection.size());
for (Integer value : shardingValue.getValues()) {
for (String tableName : collection) {
if (tableName.endsWith(value % 2 + "")) {
result.add(tableName);
}
}
}
return result;

}

@Override
public Collection<String> doBetweenSharding(Collection<String> collection, ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(collection.size());
Range<Integer> range = (Range<Integer>) shardingValue.getValueRange();
for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
for (String each : collection) {
if (each.endsWith(i % 2 + "")) {
result.add(each);
}
}
}
return result;

}
}

ModuloTableShardingAlgorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import com.dangdang.ddframe.rdb.sharding.api.strategy.table.SingleKeyTableShardingAlgorithm;
import com.google.common.collect.Range;

import java.util.Collection;
import java.util.LinkedHashSet;

/**
* @ClassName ModuloTableShardingAlgorithm
* @Description TODO
* @Author asus
* @Date Created by asus on 2018/12/1012:13
* @Version 1.0
**/
public class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {
@Override
public String doEqualSharding(Collection<String> collection, ShardingValue<Integer> shardingValue) {
for (String each : collection) {
if (each.endsWith(shardingValue.getValue() % 2 + "")) {
return each;
}
}
throw new IllegalArgumentException();
}

@Override
public Collection<String> doInSharding(Collection<String> collection, ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(collection.size());
for (Integer value : shardingValue.getValues()) {
for (String tableName : collection) {
if (tableName.endsWith(value % 2 + "")) {
result.add(tableName);
}
}
}
return result;
}

@Override
public Collection<String> doBetweenSharding(Collection<String> collection, ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(collection.size());
Range<Integer> range = (Range<Integer>) shardingValue.getValueRange();
for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
for (String each : collection) {
if (each.endsWith(i % 2 + "")) {
result.add(each);
}
}
}
return result;
}
}

我把table的也给贴出来了
引用的时候
先查出动态的数据源信息 dataSourceList和编写好sql

1
List<Map<String, Object>> mapList=DBUtil.getConnection(dataSourceList,sql);

就可以得到查出的map集合,下面是我封装的分页
pageRequest就是这样来的
分页
分页调用

1
Page<Map<String,Object>> pageInfo = DBUtil.getPage(dataSourceList,sql,pageRequest);

到此动态数据源分库分页查询就结束了

本文标题:dangdang的扩展 sharding-jdbc实现动态数据源分库分表分页查询

文章作者:範範

发布时间:2019年06月04日 - 14:06

最后更新:2019年09月11日 - 15:09

原始链接:https://trhsy.github.io/dangdang.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------The End-------------
範範 wechat
欢迎关注微信公众号,孬蛋等着你!
坚持原创技术分享,您的支持将鼓励我继续创作!