Đồng bộ dữ liệu khi truy xuất tập tin với Java

Có những chương trình ứng dụng đòi hỏi việc ghi thông tin vào tập tin (file) để quản lý (log) hay chia sẻ dữ liệu giữa các tiến trình (process) hay tiểu trình (thread). Một vấn đề thường đặt ra đó là đồng bộ dữ liệu trong khi đọc hay ghi file. Giả sử hai tiến trình A và B cùng lúc ghi dữ liệu vào file X (hoặc một tiến trình ghi và một tiến trình đọc) thì khi đó dữ liệu được xử lý ra sao?

Chúng ta xem một ứng dụng thực tế sau: Ta dùng file counter.dat để đếm số người truy cập vào một trang web. Giả sử con số trong file này đã là 1000. Giả sử có 2 người cùng truy cập site vào một thời điểm, khi đó có 2 giao dịch xảy ra:

 

Giao dịch 1:


f = open(“counter.dat”);

hits = f.readNumber();

f.close();

Như vậy giao dịch 1 đọc “1000” vào biến hits. Tương tự, giao dịch 2 cũng đọc “1000” vào biến hits. Sau đó:

++hits;

Print(“Hits on this page: “ + hits);

Mỗi giao dịch tăng giá trị hits của mình và mỗi giao dịch có được con số 1001. Sau đó, hai giao dịch thực hiện việc cập nhật số lần truy cập trong file counter.dat lên 1001.

f = open(“counter.dat”);

f.write(hits);

f.close();

Vấn đề ở đây là trang web đã được truy cập 2 lần, trong khi giá trị đếm chỉ tăng lên 1. Đó là chưa kể mỗi người sử dụng đều thấy giá trị đếm là 1001 thay vì một người thấy 1001 và một người thấy 1002.

Ở đây ta dùng cơ chế Semaphore để giải quyết vấn đề. Một tiểu trình trước khi truy xuất file (đọc/ ghi) phải đưa ra yêu cầu (acquire) để được cấp phát tài nguyên. Sau khi xong việc phải trả lại (release) tài nguyên cho trình quản lý.

Class Semaphore sẽ hiện thực interface trong Code 1. Ta hình dung Semaphore như một cơ chế cấp phát tài nguyên mà ở đó số lượng tài nguyên có hạn. Khi một tiểu trình xin cấp phát tài nguyên, nó kiểm tra xem có còn tài nguyên hay không. Nếu còn thì tiến hành cấp phát, ngược lại báo tiến trình đó chờ đến khi tiểu trình khác giải phóng tài nguyên thì tiến hành cấp phát. Ở đây có hai chế độ chờ: chờ cho đến khi được cấp phát (acquire) hay chờ trong khoảng thời gian bao lâu (attempt) – trường hợp này nhằm tránh tắt nghẽn. Xem Code 2.

Khi một tiểu trình muốn đọc file phải đưa ra yêu cầu đọc, còn khi muốn ghi file phải đưa ra yêu cầu ghi. Xem Code 3.

Nếu tiểu trình A đang đọc, tiểu trình B yêu cầu đọc thì cho phép. Ngược lại, nếu tiểu trình A đang đọc, tiểu trình B yêu cầu ghi thì tiểu trình B phải đợi cho đến khi không còn tiểu trình đọc nào đang được phục vụ.

Tương tự nếu tiểu trình A đang ghi thì không cho phép tiểu trình khác đọc hay ghi cho đến khi tiểu trình A ghi xong.

Class SemReadWrireLock trong Code 4 sẽ hiện thực interface trong Code 3.

Các lớp khi dùng chung file phải thiết lập interface như Code 5.

Code 6 là ví dụ ứng dụng với Class A và B đọc/ ghi chung file.

Hy vọng các đoạn code giới thiệu trong bài sẽ giúp ích cho các bạn trong việc xây dựng ứng dụng có dùng chung – chia sẻ tài nguyên hay đọc/ghi file.

Code 1
package semaphoreapp;
/**

* The Sync class represents a Synchronization Object.
*
*/
public interface Sync {
void acquire() throws InterruptedException;
void release();
boolean attempt(long msec) throws InterruptedException;
}

Code 2
package semaphoreapp;
/**

* The Semaphore class represents a Semaphore Synchronization Resource Object.
*
*/
public class Semaphore implements Sync {
protected long permits;
// current number of available permits

public Semaphore(long initialPermits) {
permits = initialPermits; }
public synchronized void release() {
++permits;
notify(); }
public void acquire() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
synchronized(this) {
try {
while (permits <= 0) wait();
–permits; }
catch (InterruptedException ie) {
notify();
throw ie; } } }
public boolean attempt(long secs)throws InterruptedException{
if (Thread.interrupted()) throw new InterruptedException();
synchronized(this) {
if (permits > 0) {
// same as acquire but messier
–permits;
return true;
} else if (msecs <= 0)
// avoid timed wait if not needed
return false;
else {
try {
long startTime = System.currentTimeMillis();
long waitTime = msecs;
for (;;) {
wait(waitTime);
if (permits > 0) {
–permits;
return true; }
else {
// check for time-out
long now = System.currentTimeMillis();
waitTime = msecs – (now – startTime);
if (waitTime <= 0)
return false; } } }
catch(InterruptedException ie) {
notify();
throw ie; } } } } }

Code 3
package semaphoreapp;
/**

* The ReadWriteLock class represents a Read/Write Lock Object.
*
*/
public interface ReadWriteLock {
ync readLock();
Sync writeLock();
}

Code 4
package semaphoreapp;
/**

* The SemReadWriteLock class represents a Object Semaphore Read/Write Lock.
*
*/
public class SemReadWriteLock implements ReadWriteLock {
// Provide fair access to active slot
Sync active_ = new Semaphore(1);
// Control slot sharing by readers
class ReaderGate implements Sync {
int readers_ = 0;
public synchronized void acquire()
throws InterruptedException {
// readers pile up on lock until first passes
if (readers_++ == 0) active_.acquire(); }
public synchronized void release() {
if (–readers_ == 0) active_.release(); }
public synchronized boolean attempt(long msec)
throws InterruptedException {
if (readers_++ == 0) {
return active_.attempt(msec);
} else {
return true; }
} }
Sync rGate_ = new ReaderGate();
public Sync writeLock() { return active_; }
public Sync readLock() { return rGate_; } }

Code 5
package semaphoreapp;
/**

* The FileManager class represents a FileManager Object.
*
*/
public interface FileManager {
SemReadWriteLock oSemaphore = new SemReadWriteLock(); }

Code 6
public class A implements FileManager{
//before read
oSemaphore.readLock().acquire();
//do reading
;
//finish reading
oSemaphore.readLock().release();
}
public class B implements FileManager{
//before write
oSemaphore.writeLock().acquire();
//do writing
;
//finish writing
oSemaphore.writeLock().release();
}