豫ICP备17040950号-2

使用信号量Semaphore实现数据库连接池

我们先来看一下Semaphore 的定义;
Semaphore:一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

我们简单的来讲,Semaphore里面存放着固定数量的许可证,假设为10,那么每个线程执行的时候,都需要先获得一个许可证,否则将进行阻塞,当10个许可证分别被10个线程所持有时,后面的线程将无法执行,全部在获取许可证这个操作时被阻塞,除非有线程执行完毕,归还许可证;
这就像极了数据库连接池,固定数量的连接被初始化好,当连接资源被线程消耗完毕时,其它线程暂时无法获得数据库连接,当其中有线程SQL操作完毕归还连接资源时,其它线程才能继续获得许可拿到线程资源;

我们来模拟一下吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//数据库连接池  
class ConnectPool{
private int size;
private Connect[] connects ;

//记录下标为X的Connect是否已被使用
private boolean [] connectFlag;
//信号量对象
private Semaphore semaphore;

//size:初始化连接池大小
public ConnectPool(int size) {
this.size = size;
//信号量初始值为10,每成功进行一次acquire()操作,信号数减1,release()操作,信号数加1
semaphore = new Semaphore(size,true);
connects = new Connect[size];
connectFlag = new boolean[size];
initConnects();
}

//初始化数据库连接
private void initConnects(){
for (int i = 0; i < this.size; i++) {
connects[i] = new Connect();
}
}

//获得某个数据库连接
public Connect openConnect() throws InterruptedException{
//得先 获得使用许可证,如果信号量为0,则拿不到许可证,一直阻塞直到能获得
semaphore.acquire();
return getConnect();
}
private synchronized Connect getConnect(){
for (int i = 0; i < connectFlag.length; i++) {
if(!connectFlag[i]){
//标记该连接已被使用
connectFlag[i] = true;
return connects[i];
}
}
return null;
}

//释放某个数据库连接
public synchronized void releaseConnect( Connect connect ){
for (int i = 0; i < this.size; i++) {
if( connect==connects[i] ){
connectFlag[i] = false;
semaphore.release();
}
}
}
}

//数据库连接
class Connect{
private static int count = 1;
private int id = count++;
public Connect() {
//假设打开一个连接很耗费资源,需要等待1秒
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("连接#"+ id +"#已与数据库建立通道!");
}
@Override
public String toString() {
return "#"+id+"#";
}

}

//Semaphore是信号量,使用方式具体查看API文档,这里只做一个示范,这个示范基本和Api上演示的差不多…
//什么是信号量呢?假设我们有数据库连接池,只有固定数量的2个连接,这个数量2就是一个信号量, 每当一个线程拿到连接时,
//信号量减一,当线程将连接释放时,信号量加一,当信号量为0时,想获取连接的线程将在阻塞中等待连接被释放;

//Semaphore类保证我们的信号量的增加和减少是线程安全的,也保证在信号量为0时,线程能正确的被阻塞直到信号量大于0;
//so,我们就来模仿数据库连接池的实现吧!(注意:这并非真正数据库连接池的实现,这里的示范只是做个非常简单的例子!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void semaphore(){  
final ConnectPool pool = new ConnectPool(2);
ExecutorService exec = Executors.newCachedThreadPool();

//30个并发来争抢连接资源
for (int i = 0; i < 5; i++) {
final int id = i+1;
exec.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程"+id+"等待获取数据库连接");
Connect connect = pool.openConnect();
System.out.println("线程"+id+"已拿到数据库连接:"+connect);
//进行数据库操作2秒...然后释放连接
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println("线程"+id+"释放数据库连接:"+connect);
pool.releaseConnect(connect);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
});
}
}

先看看ConnectPool类,它使用initConnects()方法初始化数据库资源池大小,这里传的参数是2,意思就是只有两个数据库连接;
我们在测试方法中开启5个线程,分别调用它的openConnect()来获取连接资源以及releaseConnect()来释放连接;
在openConnect()中,我们先调用信号量的acquire()方法拿到许可证,接着在getConnect()中判断某个连接是否正在被使用中,如果没有被使用,则取到该连接并将其标识为使用中;
在releaseConnect()中,我们将标识设置为false并调用信号量的release()归还许可证,这样后面的线程就能继续取得连接了;

看一下输出*************************************************************************************
连接#1#已与数据库建立通道!
连接#2#已与数据库建立通道!
线程2等待获取数据库连接
线程1等待获取数据库连接
线程2已拿到数据库连接:#1#
线程4等待获取数据库连接
线程1已拿到数据库连接:#2#
线程3等待获取数据库连接
线程5等待获取数据库连接
线程2释放数据库连接:#1#
线程1释放数据库连接:#2#
线程4已拿到数据库连接:#1#
线程3已拿到数据库连接:#2#
线程3释放数据库连接:#2#
线程4释放数据库连接:#1#
线程5已拿到数据库连接:#1#
线程5释放数据库连接:#1#


仔细看看输出,是否是多个线程在等待获取数据库连接,当某个线程释放数据库连接时,另一个线程马上获取到接着消费呢…