江蘇省城鄉(xiāng)建設(shè)官網(wǎng)站免費私人網(wǎng)站建設(shè)
用Java實現(xiàn)一個簡單的KV數(shù)據(jù)庫
開發(fā)思路:
用map存儲數(shù)據(jù),再用一個List記錄操作日志,開一個新線程將List中的操作寫入日志文件中,再開一個線程用于網(wǎng)絡(luò)IO服務(wù)接收客戶端的命令,再啟動時檢查日志,如果有數(shù)據(jù)就讀入map中
關(guān)于redis:
- 存儲結(jié)構(gòu):
- redis:
redis的數(shù)據(jù)保存其實比較復(fù)雜,使用一個哈希表保存所有鍵值對,一個哈希表就是一個數(shù)組,數(shù)組的每一個元素是一個哈希桶,哈希桶中保存的是key和value的指針目錄,再通過,指針去找對應(yīng)的key和value,當(dāng)然對于value是List等數(shù)據(jù)結(jié)構(gòu)還用到跳表,雙向列表,壓縮列表,整數(shù)數(shù)組等數(shù)據(jù)結(jié)構(gòu) - SimpleKVDB:
只用了Java的HashMap(偷懶~)
- redis:
- 線程:
- redis:
redis雖然成為單線程,但是redis的網(wǎng)絡(luò)IO和鍵值對讀寫是由一個線程,但是另外的持久化,異步刪除,集群數(shù)據(jù)同步等,都是額外線程 - SimpleKVDB:
數(shù)據(jù)讀寫網(wǎng)絡(luò)IO一個線程,持久化一個線程(集群同步本來想做但是后來沒有寫,也是新開一條線程)
- redis:
- 網(wǎng)絡(luò)IO:
- redis:
單線程多路復(fù)用高性能IO模式 - SimpleKVDB:
直接用Java標(biāo)準(zhǔn)庫NIO,多路復(fù)用IO模式
- redis:
- 持久化:
- redis:
AOF操作日志,RDB快照,AOF用來記錄每一次的操作(增刪改)可以實時同步也可以每隔一個時間同步文件中,RDB全量數(shù)據(jù)快照但是需要開一條子進(jìn)程開銷比較大,redis4.0以后使用一種新的模式,RDB每隔一段時間全量快照內(nèi)存數(shù)據(jù),AOF記錄每個RDB之間的操作記錄,當(dāng)下一次全量RDB以后清空AOF再重新記錄操作日志 - SimpleKVDB
只記錄AOF操作日志,開一個新線程,有新的操作就寫入(后來我發(fā)現(xiàn)可以使用mmap內(nèi)存映射的方法,這樣更快效率更高)
- redis:
- 主從數(shù)據(jù)一致
- redis:
選一臺主服務(wù)器用于寫入,從服務(wù)器用于讀取,主服務(wù)器有數(shù)據(jù)寫入就同步從服務(wù)器,哨兵機(jī)制,用于監(jiān)控所有服務(wù)器,如果主服務(wù)器崩潰,就選擇一臺從服務(wù)器作為主服務(wù)器(會根據(jù)是否下線,網(wǎng)絡(luò)速度,讀寫速度等選擇主服務(wù)器),然后通知其他從服務(wù)器連接到新的主服務(wù)器 - SimpleKVDB:
沒寫,設(shè)想:本來是想寫一個配置文件,寫入主服務(wù)器IP,其他從服務(wù)器IP,開一個線程在服務(wù)端中寫一個客戶端當(dāng)作主服務(wù)器,讀取配置文件,只有主服務(wù)器才能開這個線程,其他從服務(wù)器還是開啟服務(wù),用來接收主服務(wù)器的數(shù)據(jù),同步從數(shù)據(jù)庫的內(nèi)存和操作日志里
- redis:
操作展示:
客戶端:
服務(wù)端:
日志文件:
目錄結(jié)構(gòu):
- SimpleKVDB
- SimpleKVDBClient(客戶端)
- SimpleKVDBClient.java(客戶端)
- SimpleKVDBService(服務(wù)端)
- AofAnnotation.java (注解)
- AofInterface.java(接口)
- DynamicAgent.java(動態(tài)代理)
- SimpleKVDBService.java(服務(wù)端)
- SimpleKVDBClient(客戶端)
SimpleKVDBClient.java(客戶端):
package SimpleKVDB.SimpleKVDBClient;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleKVDBClient {public static void main(String[] args) throws Exception {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);socketChannel.connect(new InetSocketAddress("127.0.0.1",5555));while (true){selector.select();//阻塞 等待事件發(fā)生Set<SelectionKey> selectionKeys = selector.selectedKeys();selectionKeys.forEach(key ->{try {if (key.isConnectable()){SocketChannel channel = (SocketChannel) key.channel();if (channel.isConnectionPending()){//是否正在連接channel.finishConnect(); //結(jié)束正在連接ByteBuffer writeBuffer = ByteBuffer.allocate(1024);writeBuffer.put((LocalDateTime.now() + " 連接成功").getBytes());writeBuffer.flip();channel.write(writeBuffer);//將buffer寫入channelExecutorService service = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());service.submit(()->{//線程,從鍵盤讀入數(shù)據(jù)try {while (true){writeBuffer.clear();//清空bufferInputStreamReader input = new InputStreamReader(System.in);BufferedReader bufferedReader = new BufferedReader(input);String senderMessage = bufferedReader.readLine();writeBuffer.put(senderMessage.getBytes());writeBuffer.flip();channel.write(writeBuffer);}}catch (Exception e){e.printStackTrace();}});}channel.register(selector,SelectionKey.OP_READ);//注冊事件}else if (key.isReadable()){//channel 有信息的輸入SocketChannel channel = (SocketChannel) key.channel();//哪個channel 觸發(fā)了 readByteBuffer readBuffer = ByteBuffer.allocate(1024);int count = channel.read(readBuffer);//server發(fā)來的if (count > 0){String receiveMessage = new String(readBuffer.array(),0,count);System.out.println("響應(yīng)結(jié)果:"+receiveMessage);}}}catch (Exception e){e.printStackTrace();}finally {selectionKeys.clear();//移除已經(jīng)發(fā)生的事件}});}}
}
AofAnnotation.java(注解):
package SimpleKVDB.SimpleKVDBService;import java.lang.annotation.*;// ----------- 自定義的注解,用于區(qū)分是什么操作(其實也可以不用,直接獲取方法名區(qū)分也一樣) -----------
// 自定義的注解
@Retention(RetentionPolicy.RUNTIME)//注解會在class中存在,運行時可通過反射獲取
@Target(ElementType.METHOD)//目標(biāo)是方法
@Documented
//文檔生成時,該注解將被包含在javadoc中,可去掉
@interface AofAnnotation {String name() default "";
}
AofInterface.java(動態(tài)代理接口):
package SimpleKVDB.SimpleKVDBService;// ----------- 動態(tài)代理需要的接口,主要想實現(xiàn)切面效果在每一個操作后面加一個日志 -----------
// 動態(tài)代理需要的接口
// 只需要給增刪改上加操作日志,保證數(shù)據(jù)一致性
interface AofInterface {
// @AofAnnotation(name="clear")
// int hashClear();@AofAnnotation(name="set")Object hashSet(String key, Object value);@AofAnnotation(name="remove")Object hashRemove(String key);
}
DynamicAgent.java(動態(tài)代理):
package SimpleKVDB.SimpleKVDBService;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;// ----------- 動態(tài)代理(實現(xiàn)切面效果的邏輯代碼) -----------
// 動態(tài)代理
public class DynamicAgent<T> implements InvocationHandler {// 接口實現(xiàn)類實例,如果不使用泛型,這里可以直接用ObjectT rent;void setObject(T obj){this.rent = obj;}// aof內(nèi)存List<String> listData;public void setListData(List<String> list){this.listData = list;}// 生成代碼類public Object getProxy(){// 第一個參數(shù)是代理類的類加載器,第二個參數(shù)是代理類要實現(xiàn)的接口,第三個參數(shù)是處理接口方法的程序// 這里代理類是自己,所以直接this,getClass().getClassLoader()是獲取加載器// getClass().getInterfaces() 是獲取實現(xiàn)類的接口// 因為invoke()就是執(zhí)行方法,所以第三個參數(shù)也是本身thisreturn Proxy.newProxyInstance(this.getClass().getClassLoader(), rent.getClass().getInterfaces(),this);}// 處理代理實例,并返回執(zhí)行結(jié)果public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 動態(tài)代理本質(zhì)就是通過反射實現(xiàn),這里就是執(zhí)行這個對象的方法Object result = method.invoke(rent, args);// 獲取注解AofAnnotation say = method.getAnnotation(AofAnnotation.class);// 注解的name內(nèi)容String name = say.name();System.out.println("name::"+name);// aof日志寫入aofSetLog(name, args);return result;}// 給aof開辟一個內(nèi)存public void aofSetLog(String name, Object[] args){Map<String, Object> dataMap = new HashMap<String, Object>();// 日志格式String aofData = "*|";if("set".equals(name)){dataMap.put(args[0].toString(), args[1]);aofData = aofData + name+"|"+args[0].toString()+"|"+dataMap.get(args[0].toString());}if("remove".equals(name)){if(null != dataMap && dataMap.size()>0){dataMap.remove(args[0].toString());}aofData = aofData + name+"|"+args[0].toString()+"|";}// 日志內(nèi)存listData.add(aofData);
// System.out.println("listData:::"+listData);}// 返回日志數(shù)據(jù)public List<String> getAofDatas(){return listData;}
}
SimpleKVDBService.java(服務(wù)端):
package SimpleKVDB.SimpleKVDBService;import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;// ----------- KV數(shù)據(jù)庫的服務(wù)端實現(xiàn) -----------
public class SimpleKVDBService implements AofInterface {// 全局存儲Map<String, Object> globalMap;public void setGlobalMap(Map<String, Object> map){this.globalMap = map;}// 動態(tài)代理對象AofInterface dl;public void setAofInterface(AofInterface i){this.dl = i;}// 寫入修改操作public Object hashSet(String key, Object value){return globalMap.put(key, value);}// 讀取操作public Object hashGet(String key){return globalMap.get(key);}// 刪除操作public Object hashRemove(String key){return globalMap.remove(key);}// 獲取長度操作public int hashSize(){return globalMap.size();}// 是否為空操作操作public boolean hashIsEmpty(){return globalMap.isEmpty();}// aof日志List<String> aofList;// 引用全局aof日志變量,用來存儲aof操作日志public void setAofList(List<String> list){this.aofList = list;}// 創(chuàng)建aof文件public File createAofFile(){final String ROOT = '.' + File.separator;File newFolder = new File(ROOT+"simpleKVDB");if(newFolder.exists() && newFolder.isDirectory()){System.out.println("文件夾已經(jīng)存在");}else {boolean isFolder = newFolder.mkdir();if(!isFolder){System.out.println("文件夾創(chuàng)建失敗");}}// 創(chuàng)建一個文件File newFile = new File(newFolder.getPath(),"aofDatas.aof");if(newFile.exists() && newFile.isFile()){System.out.println("文件已經(jīng)存在");}boolean isFile;try {isFile = newFile.createNewFile();if(!isFile){System.out.println("文件創(chuàng)建失敗");}} catch (IOException e) {e.printStackTrace();}return newFile;}// 開一個線程,寫aof寫入文件public void aofFileThread() {new Thread(()->{System.out.println("aof日志寫入線程:"+Thread.currentThread().getName());while (true){this.setAofFile(this.aofList);}}).start();}// aof寫入日志文件邏輯,將aof操作日志寫入文件中,持久化public void setAofFile(List<String> aofList){if(null != aofList && aofList.size()>0){// 休眠一秒再寫入,不頻繁使用IO寫入try{Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 為什么文件夾和文件檢測放這里每次都要檢測是防止文件被誤刪除File newFile = this.createAofFile();// 使用try的話自動回收/關(guān)閉資源,會自動調(diào)用close方法,不需要手動關(guān)閉// 將需要關(guān)閉的資源放在try(xxx; yyy;zzz;)// 流的關(guān)閉是有順序的,自己手動關(guān)閉很繁瑣,自動關(guān)閉大大降低了難度,非常方便try(// 創(chuàng)建一個FileOutputStream,Output是寫入,文件的byte數(shù)據(jù)傳輸流// FileOutputStream 第二參數(shù)是否追加FileOutputStream fos = new FileOutputStream(newFile, true);// FileOutputStream是通過byte字節(jié)流的,OutputStreamWriter是將字節(jié)流包裝成想要的字符集的字符流寫入OutputStreamWriter osw = new OutputStreamWriter(fos, StandardCharsets.UTF_8);// 使用PrintWriter,可以方便的寫入一行字符,第二個參數(shù)自動清空緩沖區(qū)PrintWriter pw = new PrintWriter(osw, true);){// 一邊遍歷一邊刪除aof操作日志Iterator<String> iterator = aofList.iterator();// 判斷是否還有下一個元素while (iterator.hasNext()){// 獲取下一個元素String str = iterator.next();// println是每段換行寫入,print是不換行寫入// 寫入其實是一層一層走的,先是寫入內(nèi)容進(jìn)入PrintWriter中,然后再OutputStreamWriter根據(jù)編碼轉(zhuǎn)成字節(jié)byte,然后再是FileOutputStream字節(jié)流寫入文件pw.println(str);// 因為是引用傳遞,所以直接刪除元素iterator.remove();}// 清空緩沖區(qū),因為數(shù)據(jù)是先進(jìn)入緩沖區(qū)再寫入文件,需要在關(guān)閉前將緩沖區(qū)的數(shù)據(jù)全部寫入文件才算完成,這樣才能關(guān)閉整個流,緩存區(qū)的作用是,一個字節(jié)一個字節(jié)寫入太費事兒,所以會等到一定量的字節(jié)再一起寫入,所以會出現(xiàn)一種可能就是緩存區(qū)還有少量的字節(jié)因為沒達(dá)到量沒有寫入,所以需要清空一下,將里面所有剩余的字節(jié)都寫入// PrintWriter中設(shè)置了自動清空緩沖區(qū)
// pw.flush();}catch (IOException e){e.printStackTrace();}}}// socket服務(wù),與客戶端通訊public void socketServer(AofInterface dl){try {//創(chuàng)建ServerSocketChannel,-->> ServerSocket// 打開通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 打開 SocketChannel 并連接到端口InetSocketAddress inetSocketAddress = new InetSocketAddress(5555);serverSocketChannel.socket().bind(inetSocketAddress);// 配置通道為非阻塞模式serverSocketChannel.configureBlocking(false);//開啟selector,并注冊accept事件// 獲取一個選擇器實例Selector selector = Selector.open();// 將套接字通過到注冊到選擇器serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true){// 阻塞,等待事件發(fā)生selector.select();// 返回已發(fā)生的注冊事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 判斷事件類型,進(jìn)行相應(yīng)操作selectionKeys.forEach(key ->{final SocketChannel client;try {// 根據(jù)key獲得channelif (key.isAcceptable()){// 之所以轉(zhuǎn)換ServerSocketChannel,因為前面注冊的就是這個類ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();// 新的channel 和客戶端建立了通道client = serverChannel.accept();// 非阻塞client.configureBlocking(false);// 將新的channel和selector,綁定client.register(selector,SelectionKey.OP_READ);//是否有數(shù)據(jù)可讀}else if (key.isReadable()){client = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int count = client.read(readBuffer);if (count>0){readBuffer.flip();Charset charset = StandardCharsets.UTF_8;String receiveMassage = String.valueOf(charset.decode(readBuffer).array());// 顯示哪個client發(fā)消息System.out.println(client +": "+receiveMassage);// 向客戶端返回的信息String serverStr = "";// 根據(jù)客戶端不同的命令,執(zhí)行不同的方法if(Objects.equals(receiveMassage.split(" ")[0], "set")){dl.hashSet(receiveMassage.split(" ")[1], receiveMassage.split(" ")[2]);serverStr = "set OK";}if(Objects.equals(receiveMassage.split(" ")[0], "remove")){dl.hashRemove(receiveMassage.split(" ")[1]);serverStr = "remove OK";}if(Objects.equals(receiveMassage.split(" ")[0], "get")){serverStr = this.hashGet(receiveMassage.split(" ")[1]).toString();}if(Objects.equals(receiveMassage.split(" ")[0], "isempty")){serverStr = String.valueOf(this.hashIsEmpty());}if(Objects.equals(receiveMassage.split(" ")[0], "size")){serverStr = String.valueOf(this.hashSize());}if(receiveMassage.contains("連接成功")){serverStr = receiveMassage;}SocketChannel channel = (SocketChannel) key.channel();;ByteBuffer writeBuffer = ByteBuffer.allocate(1024);//返回客戶端數(shù)據(jù)writeBuffer.put((serverStr).getBytes());writeBuffer.flip();channel.write(writeBuffer);}}// 處理完事件一定要移除//selectionKeys.clear();}catch (Exception e){e.printStackTrace();}finally {// 處理完事件一定要移除selectionKeys.clear();}});}}catch (IOException e){e.printStackTrace();}}// socket服務(wù)線程public void socketThread(){new Thread(()->{System.out.println("socketServer線程:"+Thread.currentThread().getName());this.socketServer(this.dl);}).start();}// 啟動時檢查持久化aof日志文件public void setAofToMap(){System.out.println("開始從AOF中恢復(fù)數(shù)據(jù)!");File readFile = this.createAofFile();// 使用try的話自動回收/關(guān)閉資源,會自動調(diào)用close方法,不需要手動關(guān)閉// 將需要關(guān)閉的資源放在try(xxx; yyy;zzz;)// 流的關(guān)閉是有順序的,自己手動關(guān)閉很繁瑣,自動關(guān)閉大大降低了難度,非常方便try(// 創(chuàng)建一個FileInputStream,Input是寫入,文件的byte數(shù)據(jù)傳輸流FileInputStream fis = new FileInputStream(readFile);// FileInputStream是通過byte字節(jié)流的,InputStreamReader是將字節(jié)流包裝成想要的字符集的字符流寫入InputStreamReader isr = new InputStreamReader(fis, StandardCharsets.UTF_8);// 使用BufferedReader,增加緩存,可以方便的寫入一行字符BufferedReader reader = new BufferedReader(isr);){// reader.lines().map(String::trim).forEach(System.out::println); 這是一種lambda寫法,效果和下面一樣String str;// 為什么要放在while的條件里面賦值呢?是因為readLine()一行一行讀取如果到文件結(jié)尾了會返回一個null,如果放在while的代碼體里賦值,就需要多一步null的判斷// 讀取和寫入正好相反,是先從文件讀取內(nèi)容到緩存區(qū),然后從緩存區(qū)讀出來while ((str = reader.readLine()) != null){String methodStr = str.split("\\|")[1];String keyStr = str.split("\\|")[2];// 根據(jù)不同指令操作不同方法if("set".equals(methodStr)){Object valueStr = str.split("\\|")[3];this.hashSet(keyStr, valueStr);}if("remove".equals(methodStr)){this.hashRemove(keyStr);}}System.out.println("AOF中恢復(fù)數(shù)據(jù)結(jié)束!");} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {System.out.println("主線程: "+Thread.currentThread().getName());// 全局內(nèi)存Map<String, Object> maps = new HashMap<>();// 全局aof日志內(nèi)存List<String> lists = new ArrayList<>();// 服務(wù)主體類SimpleKVDBService sKvService = new SimpleKVDBService();// 全局存儲內(nèi)存sKvService.setGlobalMap(maps);// 動態(tài)代理,主要是用于給操作添加日志DynamicAgent<AofInterface> nd = new DynamicAgent<AofInterface>();// 全局aof內(nèi)存nd.setListData(lists);nd.setObject(sKvService);// 獲取代理對象AofInterface dl = (AofInterface) nd.getProxy();// 啟動時檢查aof文件是否存在sKvService.setAofToMap();// 服務(wù)主體獲取已經(jīng)有日志信息的aof日志信息sKvService.setAofList(nd.getAofDatas());// 引用動態(tài)代理sKvService.setAofInterface(dl);// 子線程,寫aof寫入文件sKvService.aofFileThread();// 子線程,socket服務(wù)線程sKvService.socketThread(); System.out.println(sKvService.globalMap);
System.out.println("22222:"+nd.getAofDatas());
System.out.println("list:"+sKvService.aofList);
System.out.println("333333:"+sKvService.globalMap);}
}