数据压倒一切。如果选择了正确的数据结构并把一切组织的井井有条,正确的算法就不言自明。编程的核心是数据结构,而不是算法。——Rob Pike 说明 本文基于这样的认识:数据是易变的,逻辑是稳定的。本文例举的编程实现多为代码片段,但不影响描述的完整性。本文例举的编程虽然基于C语言,但其编程思想也适用于其他语言。此外,本文不涉及语言相关的运行效率讨论。 概念提出 所谓表驱动法(Table-Driven Approach)简而言之就是用查表的方法获取数据。此处的“表”通常为数组,但可视为数据库的一种体现。 根据字典中的部首检字表查找读音未知的汉字就是典型的表驱动法,即以每个字的字形为依据,计算出一个索引值,并映射到对应的页数。相比一页一页地顺序翻字典查字,部首检字法效率极高。 具体到编程方面,在数据不多时可用逻辑判断语句(if…else或switch…case)来获取值;但随着数据的增多,逻辑语句会越来越长,此时表驱动法的优势就开始显现。 例如,用36进制(A表示10,B表示11,…)表示更大的数字,逻辑判断语句如下: if(ucNum < 10) { ucNumChar = ConvertToChar(ucNum); } else if(ucNum == 10) { ucNumChar = 'A'; } else if(ucNum == 11) { ucNumChar = 'B'; } else if(ucNum == 12) { ucNumChar = 'C'; } //... ... else if(ucNum == 35) { ucNumChar = 'Z'; } 当然也可以用 switch…case 结构,但实现都很冗长。而用表驱动法(将numChar 存入数组)则非常直观和简洁。如: CHAR aNumChars[] = {'0', '1', '2', /*3~9*/'A', 'B', 'C', /*D~Y*/'Z'}; CHAR ucNumChar = aNumChars[ucNum % sizeof(aNumChars)]; 像这样直接将变量当作下数组下标来读取数值的方法就是直接查表法。 注意,如果熟悉字符串操作,则上述写法可以更简洁: CHAR ucNumChar = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"[ucNum]; 使用表驱动法时需要关注两个问题:一是如何查表,从表中读取正确的数据;二是表里存放什么,如数值或函数指针。前者参见1.1节“查表方式”内容,后者参见1.2节“实战示例”内容。 查表方式 常用的查表方式有直接查找、索引查找和分段查找等。 直接查找 即直接通过数组下标获取到数据。如果熟悉哈希表的话,可以很容易看出这种查表方式就是哈希表的直接访问法。 如获取星期名称,逻辑判断语句如下: if(0 == ucDay) { pszDayName = "Sunday"; } else if(1 == ucDay) { pszDayName = "Monday"; } //... ... else if(6 == ucDay) { pszDayName = "Saturday"; } 而实现同样的功能,可将这些数据存储到一个表里: CHAR *paNumChars[] = {"Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"}; CHAR *pszDayName = paNumChars[ucDay]; 类似哈希表特性,表驱动法适用于无需有序遍历数据,且数据量大小可提前预测的情况。 对于过于复杂和庞大的判断,可将数据存为文件,需要时加载文件初始化数组,从而在不修改程序的情况下调整里面的数值。 有时,访问之前需要先进行一次键值转换。如表驱动法表示端口忙闲时,需将槽位端口号映射为全局编号。所生成的端口数目大小的数组,其下标对应全局端口编号,元素值表示相应端口的忙闲状态。 索引查找 有时通过一次键值转换,依然无法把数据(如英文单词等)转为键值。此时可将转换的对应关系写到一个索引表里,即索引访问。 如现有100件商品,4位编号,范围从0000到9999。此时只需要申请一个长度为100的数组,且对应2位键值。但将4位的编号转换为2位的键值,可能过于复杂或没有规律,最合适的方法是建立一个保存该转换关系的索引表。采用索引访问既节省内存,又方便维护。比如索引A表示通过名称访问,索引B表示通过编号访问。 分段查找 通过确定数据所处的范围确定分类(下标)。有的数据可分成若干区间,即具有阶梯性,如分数等级。此时可将每个区间的上限(或下限)存到一个表中,将对应的值存到另一表中,通过第一个表确定所处的区段,再由区段下标在第二个表里读取相应数值。注意要留意端点,可用二分法查找,另外可考虑通过索引方法来代替。 如根据分数查绩效等级: #define MAX_GRADE_LEVEL (INT8U)5 DOUBLE aRangeLimit[MAX_GRADE_LEVEL] = {50.0, 60.0, 70.0, 80.0, 100.0}; CHAR *paGrades[MAX_GRADE_LEVEL] = {"Fail", "Pass", "Credit", "Distinction", "High Distinction"}; static CHAR* EvaluateGrade(DOUBLE dScore) { INT8U ucLevel = 0; for(; ucLevel < MAX_GRADE_LEVEL; ucLevel++) { if(dScore < aRangeLimit[ucLevel]) return paGrades[ucLevel]; } return paGrades[0]; } 上述两张表(数组)也可合并为一张表(结构体数组),如下所示: typedef struct{ DOUBLE aRangeLimit; CHAR *pszGrade; }T_GRADE_MAP; T_GRADE_MAP gGradeMap[MAX_GRADE_LEVEL] = { {50.0, "Fail"}, {60.0, "Pass"}, {70.0, "Credit"}, {80.0, "Distinction"}, {100.0, "High Distinction"} }; static CHAR* EvaluateGrade(DOUBLE dScore) { INT8U ucLevel = 0; for(; ucLevel < MAX_GRADE_LEVEL; ucLevel++) { if(dScore < gGradeMap[ucLevel].aRangeLimit) return gGradeMap[ucLevel].pszGrade; } return gGradeMap[0].pszGrade; } 该表结构已具备的数据库的雏形,并可扩展支持更为复杂的数据。其查表方式通常为索引查找,偶尔也为分段查找;当索引具有规律性(如连续整数)时,退化为直接查找。 使用分段查找法时应注意边界,将每一分段范围的上界值都考虑在内。 找出所有不在最高一级范围内的值,然后把剩下的值全部归入最高一级中。有时需要人为地为最高一级范围添加一个上界。 同时应小心不要错误地用“<”来代替“<=”。要保证循环在找出属于最高一级范围内的值后恰当地结束,同时也要保证恰当处理范围边界。 实战示例 本节多数示例取自实际项目。表形式为一维数组、二维数组和结构体数组;表内容有数据、字符串和函数指针。基于表驱动的思想,表形式和表内容可衍生出丰富的组合。 字符统计 问题:统计用户输入的一串数字中每个数字出现的次数。 普通解法主体代码如下: INT32U aDigitCharNum[10] = {0}; /* 输入字符串中各数字字符出现的次数 */ INT32U dwStrLen = strlen(szDigits); INT32U dwStrIdx = 0; for(; dwStrIdx < dwStrLen; dwStrIdx++) { switch(szDigits[dwStrIdx]) { case '1': aDigitCharNum[0]++; break; case '2': aDigitCharNum[1]++; break; //... ... case '9': aDigitCharNum[8]++; break; } } 这种解法的缺点显而易见,既不美观也不灵活。其问题关键在于未将数字字符与数组aDigitCharNum下标直接关联起来。 以下示出更简洁的实现方式: for(; dwStrIdx < dwStrLen; dwStrIdx++) { aDigitCharNum[szDigits[dwStrIdx] - '0']++; } 上述实现考虑到0也为数字字符。该解法也可扩展至统计所有ASCII可见字符。 月天校验 问题:对给定年份和月份的天数进行校验(需区分平年和闰年)。 普通解法主体代码如下: switch(OnuTime.Month) { case 1: case 3: case 5: case 7: case 8: case 10: case 12: if(OnuTime.Day>31 || OnuTime.Day<1) { CtcOamLog(FUNCTION_Pon,"Don't support this Day: %d(1~31)!!!\n", OnuTime.Day); retcode = S_ERROR; } break; case 2: if(((OnuTime.Year%4 == 0) && (OnuTime.Year%100 != 0)) || (OnuTime.Year%400 == 0)) { if(OnuTime.Day>29 || OnuTime.Day<1) { CtcOamLog(FUNCTION_Pon,"Don't support this Day: %d(1~29)!!!\n", OnuTime.Day); retcode = S_ERROR; } } else { if(OnuTime.Day>28 || OnuTime.Day<1) { CtcOamLog(FUNCTION_Pon,"Don't support this Day: %d(1~28)!!!\n", OnuTime.Day); retcode = S_ERROR; } } break; case 4: case 6: case 9: case 11: if(OnuTime.Day>30 || OnuTime.Day<1) { CtcOamLog(FUNCTION_Pon,"Don't support this Day: %d(1~30)!!!\n", OnuTime.Day); retcode = S_ERROR; } break; default: CtcOamLog(FUNCTION_Pon,"Don't support this Month: %d(1~12)!!!\n", OnuTime.Month); retcode = S_ERROR; break; } 以下示出更简洁的实现方式: #define MONTH_OF_YEAR 12 /* 一年中的月份数 */ /* 闰年:能被4整除且不能被100整除,或能被400整除 */ #define IS_LEAP_YEAR(year) ((((year) % 4 == 0) && ((year) % 100 != 0)) || ((year) % 400 == 0)) /* 平年中的各月天数,下标对应月份 */ INT8U aDayOfCommonMonth[MONTH_OF_YEAR] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; INT8U ucMaxDay = 0; if((OnuTime.Month == 2) && (IS_LEAP_YEAR(OnuTime.Year))) ucMaxDay = aDayOfCommonMonth[1] + 1; else ucMaxDay = aDayOfCommonMonth[OnuTime.Month-1]; if((OnuTime.Day < 1) || (OnuTime.Day > ucMaxDay) { CtcOamLog(FUNCTION_Pon,"Month %d doesn't have this Day: %d(1~%d)!!!\n", OnuTime.Month, OnuTime.Day, ucMaxDay); retcode = S_ERROR; } 名称构造 问题:根据WAN接口承载的业务类型(Bitmap)构造业务类型名称字符串。 普通解法主体代码如下: void Sub_SetServerType(INT8U *ServerType, INT16U wan_servertype) { if ((wan_servertype & 0x0001) == 0x0001) { strcat(ServerType, "_INTERNET"); } if ((wan_servertype & 0x0002) == 0x0002) { strcat(ServerType, "_TR069"); } if ((wan_servertype & 0x0004) == 0x0004) { strcat(ServerType, "_VOIP"); } if ((wan_servertype & 0x0008) == 0x0008) { strcat(ServerType, "_OTHER"); } } 以下示出C语言中更简洁的实现方式: /* 获取var变量第bit位,编号从右至左 */ #define GET_BIT(var, bit) (((var) >> (bit)) & 0x1) const CHAR* paSvrNames[] = {"_INTERNET", "_TR069", "_VOIP", "_OTHER"}; const INT8U ucSvrNameNum = sizeof(paSvrNames) / sizeof(paSvrNames[0]); VOID SetServerType(CHAR *pszSvrType, INT16U wSvrType) { INT8U ucIdx = 0; for(; ucIdx < ucSvrNameNum; ucIdx++) { if(1 == GET_BIT(wSvrType, ucIdx)) strcat(pszSvrType, paSvrNames[ucIdx]); } } 新的实现将数据和逻辑分离,维护起来非常方便。只要逻辑(规则)不变,则唯一可能的改动就是数据(paSvrNames)。 值名解析 问题:根据枚举变量取值输出其对应的字符串,如PORT_FE(1)输出“Fe”。 //值名映射表结构体定义,用于数值解析器 typedef struct{ INT32U dwElem; //待解析数值,通常为枚举变量 CHAR* pszName; //指向数值所对应解析名字符串的指针 }T_NAME_PARSER; /****************************************************************************** * 函数名称: NameParser * 功能说明: 数值解析器,将给定数值转换为对应的具名字符串 * 输入参数: VOID *pvMap :值名映射表数组,含T_NAME_PARSER结构体类型元素 VOID指针允许用户在保持成员数目和类型不变的前提下, 定制更有意义的结构体名和/或成员名。 INT32U dwEntryNum :值名映射表数组条目数 INT32U dwElem :待解析数值,通常为枚举变量 INT8U* pszDefName :缺省具名字符串指针,可为空 * 输出参数: NA * 返回值 : INT8U *: 数值所对应的具名字符串 当无法解析给定数值时,若pszDefName为空,则返回数值对应的16进制格式 字符串;否则返回pszDefName。 ******************************************************************************/ INT8U *NameParser(VOID *pvMap, INT32U dwEntryNum, INT32U dwElem, INT8U* pszDefName) { CHECK_SINGLE_POINTER(pvMap, "NullPoniter"); INT32U dwEntryIdx = 0; for(dwEntryIdx = 0; dwEntryIdx < dwEntryNum; dwEntryIdx++) { T_NAME_PARSER *ptNameParser = (T_NAME_PARSER *)pvMap; if(dwElem == ptNameParser->dwElem) { return ptNameParser->pszName; } //ANSI标准禁止对void指针进行算法操作;GNU标准则指定void*算法操作与char*一致。 //若考虑移植性,可将pvMap类型改为INT8U*,或定义INT8U*局部变量指向pvMap。 pvMap += sizeof(T_NAME_PARSER); } if(NULL != pszDefName) { return pszDefName; } else { static INT8U szName[12] = {0}; //Max:"0xFFFFFFFF" sprintf(szName, "0x%X", dwElem); return szName; } } 以下给出NameParser的简单应用示例: //UNI端口类型值名映射表结构体定义 typedef struct{ INT32U dwPortType; INT8U* pszPortName; }T_PORT_NAME; //UNI端口类型解析器 T_PORT_NAME gUniNameMap[] = { {1, "Fe"}, {3, "Pots"}, {99, "Vuni"} }; const INT32U UNI_NAM_MAP_NUM = (INT32U)(sizeof(gUniNameMap)/sizeof(T_PORT_NAME)); VOID NameParserTest(VOID) { INT8U ucTestIndex = 1; printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("Unknown", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 0, "Unknown")) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("DefName", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 0, "DefName")) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("Fe", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 1, "Unknown")) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("Pots", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 3, "Unknown")) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("Vuni", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 99, NULL)) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("Unknown", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 255, "Unknown")) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("0xABCD", NameParser(gUniNameMap, UNI_NAM_MAP_NUM, 0xABCD, NULL)) ? "ERROR" : "OK"); printf("[%s] Result: %s!\n", __FUNCTION__, ucTestIndex++, strcmp("NullPoniter", NameParser(NULL, UNI_NAM_MAP_NUM, 0xABCD, NULL)) ? "ERROR" : "OK"); } gUniNameMap在实际项目中有十余个条目,若采用逻辑链实现将非常冗长。 取值映射 问题:不同模块间同一参数枚举值取值可能有所差异,需要适配。 此处不再给出普通的switch…case或if…else if…else结构,而直接示出以下表驱动实现: typedef struct{ PORTSTATE loopMEState; PORTSTATE loopMIBState; }LOOPMAPSTRUCT; static LOOPMAPSTRUCT s_CesLoop[] = { {NO_LOOP, e_ds1_looptype_noloop}, {PAYLOAD_LOOP, e_ds1_looptype_PayloadLoop}, {LINE_LOOP, e_ds1_looptype_LineLoop}, {PON_LOOP, e_ds1_looptype_OtherLoop}, {CES_LOOP, e_ds1_looptype_InwardLoop}}; PORTSTATE ConvertLoopMEStateToMIBState(PORTSTATE vPortState) { INT32U num = 0, ii; num = ARRAY_NUM(s_CesLoop); for(ii = 0; ii < num; ii++) { if(vPortState == s_CesLoop[ii].loopMEState) return s_CesLoop[ii].loopMIBState; } return e_ds1_looptype_noloop; } 相应地,从loopMIBState映射到loopMEState需要定义一个ConvertLoopMIBStateToMEState函数。更进一步,所有类似的一对一映射关系都必须如上的映射(转换)函数,相当繁琐。事实上,从抽象层面看,该映射关系非常简单。提取共性后定义带参数宏,如下所示: /********************************************************** * 功能描述:进行二维数组映射表的一对一映射,用于参数适配 * 参数说明:map -- 二维数组映射表 elemSrc -- 映射源,即待映射的元素值 elemDest -- 映射源对应的映射结果 direction -- 映射方向字节,表示从数组哪列映射至哪列。 高4位对应映射源列,低4位对应映射结果列。 defaultVal -- 映射失败时置映射结果为缺省值 * 示例:ARRAY_MAPPER(gCesLoopMap, 3, ucLoop, 0x10, NO_LOOP); 则ucLoop = 2(LINE_LOOP) **********************************************************/ #define ARRAY_MAPPER(map, elemSrc, elemDest, direction, defaultVal) do{\ INT8U ucMapIdx = 0, ucMapNum = 0; \ ucMapNum = sizeof(map)/sizeof(map[0]); \ for(ucMapIdx = 0; ucMapIdx < ucMapNum; ucMapIdx++) \ { \ if((elemSrc) == map[ucMapIdx][((direction)&0xF0)>>4]) \ { \ elemDest = map[ucMapIdx][(direction)&0x0F]; \ break; \ } \ } \ if(ucMapIdx == ucMapNum) \ { \ elemDest = (defaultVal); \ } \ }while(0) 参数取值转换时直接调用统一的映射器宏,如下: static INT8U gCesLoopMap[][2] = { {NO_LOOP, e_ds1_looptype_noloop}, {PAYLOAD_LOOP, e_ds1_looptype_PayloadLoop}, {LINE_LOOP, e_ds1_looptype_LineLoop}, {PON_LOOP, e_ds1_looptype_OtherLoop}, {CES_LOOP, e_ds1_looptype_InwardLoop}}; ARRAY_MAPPER(gCesLoopMap, tPara.dwParaVal[0], dwLoopConf, 0x01, e_ds1_looptype_noloop); 另举一例: #define CES_DEFAULT_JITTERBUF (INT32U)2000 /* 默认jitterbuf为2000us,而1帧=125us */ #define CES_JITTERBUF_STEP (INT32U)125 /* jitterbuf步长为125us,即1帧 */ #define CES_DEFAULT_QUEUESIZE (INT32U)5 #define CES_DEFAULT_MAX_QUEUESIZE (INT32U)7 #define ARRAY_NUM(array) (sizeof(array) / sizeof((array)[0])) /* 数组元素个数 */ typedef struct{ INT32U dwJitterBuffer; INT32U dwFramePerPkt; INT32U dwQueueSize; }QUEUE_SIZE_MAP; /* gCesQueueSizeMap也可以(JitterBuffer / FramePerPkt)值为索引,更加紧凑 */ static QUEUE_SIZE_MAP gCesQueueSizeMap[]= { {1,1,1}, {1,2,1}, {2,1,2}, {2,2,1}, {3,1,3}, {3,2,1}, {4,1,3}, {4,2,1}, {5,1,4}, {5,2,3}, {6,1,4}, {6,2,3}, {7,1,4}, {7,2,3}, {8,1,4}, {8,2,3}, {9,1,5}, {9,2,4}, {10,1,5}, {10,2,4}, {11,1,5}, {11,2,4}, {12,1,5}, {12,2,4}, {13,1,5}, {13,2,4}, {14,1,5}, {14,2,4}, {15,1,5}, {15,2,4}, {16,1,5}, {16,2,4}, {17,1,6}, {17,2,5}, {18,1,6}, {18,2,5}, {19,1,6}, {19,2,5}, {20,1,6}, {20,2,5}, {21,1,6}, {21,2,5}, {22,1,6}, {22,2,5}, {23,1,6}, {23,2,5}, {24,1,6}, {24,2,5}, {25,1,6}, {25,2,5}, {26,1,6}, {26,2,5}, {27,1,6}, {27,2,5}, {28,1,6}, {28,2,5}, {29,1,6}, {29,2,5}, {30,1,6}, {30,2,5}, {31,1,6}, {31,2,5}, {32,1,6}, {32,2,5}}; /********************************************************** * 函数名称:CalcQueueSize * 功能描述:根据JitterBuffer和FramePerPkt计算QueueSize * 注意事项:配置的最大缓存深度 * = 2 * JitterBuffer / FramePerPkt * = 2 * N Packet = 2 ^ QueueSize * JitterBuffer为125us帧速率的倍数, * FramePerPkt为每个分组的帧数, * QueueSize向上取整,最大为7。 **********************************************************/ INT32U CalcQueueSize(INT32U dwJitterBuffer, INT32U dwFramePerPkt) { INT8U ucIdx = 0, ucNum = 0; //本函数暂时仅考虑E1 ucNum = ARRAY_NUM(gCesQueueSizeMap); for(ucIdx = 0; ucIdx < ucNum; ucIdx++) { if((dwJitterBuffer == gCesQueueSizeMap[ucIdx].dwJitterBuffer) && (dwFramePerPkt == gCesQueueSizeMap[ucIdx].dwFramePerPkt)) { return gCesQueueSizeMap[ucIdx].dwQueueSize; } } return CES_DEFAULT_MAX_QUEUESIZE; } 版本控制 问题:控制OLT与ONU之间的版本协商。ONU本地设置三比特控制字,其中bit2(MSB)~bit0(LSB)分别对应0x21、0x30和0xAA版本号;且bitX为0表示上报对应版本号,bitX为1表示不上报对应版本号。其他版本号如0x20、0x13和0x1必须上报,即不受控制。 最初的实现采用if…else if…else结构,代码非常冗长,如下: pstSendTlv->ucLength = 0x1f; if (gOamCtrlCode == 0) { vosMemCpy(pstSendTlv->aucVersionList, ctc_oui, 3); pstSendTlv->aucVersionList[3] = 0x30; vosMemCpy(&(pstSendTlv->aucVersionList[4]), ctc_oui, 3); pstSendTlv->aucVersionList[7] = 0x21; vosMemCpy(&(pstSendTlv->aucVersionList[8]), ctc_oui, 3); pstSendTlv->aucVersionList[11] = 0x20; vosMemCpy(&(pstSendTlv->aucVersionList[12]), ctc_oui, 3); pstSendTlv->aucVersionList[15] = 0x13; vosMemCpy(&(pstSendTlv->aucVersionList[16]), ctc_oui, 3); pstSendTlv->aucVersionList[19] = 0x01; vosMemCpy(&(pstSendTlv->aucVersionList[20]), ctc_oui, 3); pstSendTlv->aucVersionList[23] = 0xaa; } else if (gOamCtrlCode == 1) { vosMemCpy(pstSendTlv->aucVersionList, ctc_oui, 3); pstSendTlv->aucVersionList[3] = 0x30; vosMemCpy(&(pstSendTlv->aucVersionList[4]), ctc_oui, 3); pstSendTlv->aucVersionList[7] = 0x21; vosMemCpy(&(pstSendTlv->aucVersionList[8]), ctc_oui, 3); pstSendTlv->aucVersionList[11] = 0x20; vosMemCpy(&(pstSendTlv->aucVersionList[12]), ctc_oui, 3); pstSendTlv->aucVersionList[15] = 0x13; vosMemCpy(&(pstSendTlv->aucVersionList[16]), ctc_oui, 3); pstSendTlv->aucVersionList[19] = 0x01; } //此处省略gOamCtrlCode == 2~6的处理代码 else if (gOamCtrlCode == 7) { vosMemCpy(&(pstSendTlv->aucVersionList), ctc_oui, 3); pstSendTlv->aucVersionList[3] = 0x20; vosMemCpy(&(pstSendTlv->aucVersionList[4]), ctc_oui, 3); pstSendTlv->aucVersionList[7] = 0x13; vosMemCpy(&(pstSendTlv->aucVersionList[8]), ctc_oui, 3); pstSendTlv->aucVersionList[11] = 0x01; } 以下示出C语言中更简洁的实现方式(基于二维数组): /********************************************************************** * 版本控制字数组定义 * gOamCtrlCode: Bitmap控制字。Bit-X为0时上报对应版本,Bit-X为1时屏蔽对应版本。 * CTRL_VERS_NUM: 可控版本个数。 * CTRL_CODE_NUM: 控制字个数。与CTRL_VERS_NUM有关。 * gOamVerCtrlMap: 版本控制字数组。行对应控制字,列对应可控版本。 元素值为0时不上报对应版本,元素值非0时上报该元素值。 * Note: 该数组旨在实现“数据与控制隔离”。后续若要新增可控版本,只需修改 -- CTRL_VERS_NUM -- gOamVerCtrlMap新增行(控制字) -- gOamVerCtrlMap新增列(可控版本) **********************************************************************/ #define CTRL_VERS_NUM 3 #define CTRL_CODE_NUM (1< u8_t gOamVerCtrlMap[CTRL_CODE_NUM][CTRL_VERS_NUM]
一、io_uring 概述 io_uring 是 Linux 内核中的一种高效异步 I/O 框架,于 Linux 5.1 版本引入,旨在提高大规模并发 I/O 操作的性能。与传统的异步 I/O 接口(如 epoll、select、poll)相比,io_uring 提供了更低的延迟和更高的吞吐量。 二、核心概念 1.提交队列(Submission Queue, SQ): 用户空间应用程序将 I/O 请求添加到提交队列中。每个请求都会被描述为一个提交队列条目(Submission Queue Entry, SQE),包含操作类型、目标文件描述符、缓冲区等信息。 2.完成队列(Completion Queue, CQ): 当 I/O 操作完成时,内核会将结果添加到完成队列中。每个结果都是一个完成队列条目(Completion Queue Entry, CQE),其中包含了操作的返回值、状态码以及用户自定义的数据。 3.异步操作: io_uring 允许用户将 I/O 操作提交给内核,内核在后台异步处理这些操作。用户不需要等待操作完成,而是可以在稍后查询完成队列以获取操作结果。 三、主要系统调用 1. io_uring_setup 功能: io_uring_setup 是用于创建和初始化一个 io_uring 实例的系统调用。它分配和配置提交队列(SQ)和完成队列(CQ),并返回一个用于标识 io_uring 实例的文件描述符。 依赖函数: io_uring_queue_init:io_uring_queue_init 是最常用的初始化函数,内部调用 io_uring_setup 来创建一个 io_uring 实例。 io_uring_queue_init_params:io_uring_queue_init_params 是一个增强的初始化函数,它允许用户传递 io_uring_params结构体以配置额外的参数。它也依赖于io_uring_setup系统调用来创建和初始化io_uring实例。 2. io_uring_enter 功能: io_uring_enter 是用于将已准备好的 I/O 操作提交给内核并处理这些操作的系统调用。它可以用于提交操作、等待操作完成,或者两者兼而有之。 依赖函数: io_uring_submit:io_uring_submit 是用户提交操作到内核的函数,它在内部调用 io_uring_enter,将所有在提交队列中的 I/O 请求提交给内核。 io_uring_submit_and_wait:io_uring_submit_and_wait 提交 I/O 操作后,还可以等待至少一个操作完成,它也是通过调用 io_uring_enter 来实现这一功能。 io_uring_wait_cqe和io_uring_wait_cqe_nr:这些函数用于等待一个或多个操作完成,它们在内部也依赖于 io_uring_enter,通过传递适当的参数来等待完成队列中的事件。 3. io_uring_register 功能: io_uring_register 是用于将文件描述符、缓冲区或其他资源预先注册到 io_uring 实例中的系统调用。这可以提高操作的效率,因为内核在处理这些操作时可以直接访问预先注册的资源,而无需每次都重新设置。 依赖函数: io_uring_register_buffers:这个函数用于注册一组内存缓冲区,使它们可以在后续的 I/O 操作中重复使用。它在内部调用 io_uring_register 系统调用。 io_uring_unregister_buffers:这个函数用于取消之前注册的缓冲区,它也依赖于 io_uring_register 系统调用来取消注册。 io_uring_register_files和io_uring_unregister_files:这些函数分别用于注册和取消注册文件描述符集合,均依赖于 io_uring_register 系统调用。 io_uring_register_eventfd和io_uring_unregister_eventfd:这些函数用于注册和取消注册一个 eventfd,用来通知完成事件,同样依赖于 io_uring_register 系统调用。 四、常用操作 io_uring_prep_* 系列函数: 用于准备 I/O 操作,如 io_uring_prep_read、io_uring_prep_write、io_uring_prep_accept、io_uring_prep_send 等。这些函数将操作的细节填写到提交队列条目(SQE)中。 io_uring_submit: 将准备好的 SQE 提交给内核,触发内核执行操作,内部依赖 io_uring_enter 系统调用。 io_uring_wait_cqe与io_uring_peek_batch_cqe: io_uring_wait_cqe:阻塞等待至少一个操作完成,并返回完成的 CQE。 io_uring_peek_batch_cqe:非阻塞地检查完成队列,获取已经完成的操作。 五、优势 减少系统调用开销:通过批量提交和批量获取结果,减少了系统调用的次数,降低了上下文切换的开销。 高效的异步操作:内核异步处理 I/O 操作,用户空间无需阻塞等待,可以在处理其他任务的同时等待操作完成。 灵活的事件模型:io_uring 支持多种 I/O 操作,并可以在不同的操作之间灵活切换,适用于网络 I/O、文件 I/O、内存映射等多种场景。 扩展性强:io_uring 支持大量并发的 I/O 操作,适合需要处理高并发连接的应用程序,如高性能服务器和数据库。 六、代码实践 完整代码: #include #include #include #include #include #define EVENT_ACCEPT 0#define EVENT_READ 1#define EVENT_WRITE 2 struct conn_info{ int fd; int event;}; int init_server(unsigned short port){ int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10); return sockfd;} #define ENTRIES_LENGTH 1024#define BUFFER_LENGTH 1024 int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){ struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct conn_info accept_info = { .fd = sockfd, .event = EVENT_READ, }; io_uring_prep_recv(sqe, sockfd, buf, len, flags); memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));} int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){ struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct conn_info accept_info = { .fd = sockfd, .event = EVENT_WRITE, }; io_uring_prep_send(sqe, sockfd, buf, len, flags); memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));} int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags){ struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct conn_info accept_info = { .fd = sockfd, .event = EVENT_ACCEPT, }; io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)addr, addrlen, flags); memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));} int main(int argc, char *argv[]){ unsigned short port = 9999; int sockfd = init_server(port); struct io_uring_params params; memset(¶ms, 0, sizeof(params)); struct io_uring ring; io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms); #if 0 struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); accept(sockfd, (struct sockaddr*)&clientaddr, &len);#else struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0); #endif char buffer[BUFFER_LENGTH] = {0}; while (1) { io_uring_submit(&ring); struct io_uring_cqe *cqe; io_uring_wait_cqe(&ring, &cqe); struct io_uring_cqe *cqes[128]; int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait int i = 0; for (i = 0; i < nready; i++) { struct io_uring_cqe *entries = cqes[i]; struct conn_info result; memcpy(&result, &entries->user_data, sizeof(struct conn_info)); if (result.event == EVENT_ACCEPT) { set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0); // printf("set_event_accept\n"); // int connfd = entries->res; set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); } else if (result.event == EVENT_READ) { // int ret = entries->res; // printf("set_event_recv ret: %d, %s\n", ret, buffer); // if (ret == 0) { close(result.fd); } else if (ret > 0) { set_event_send(&ring, result.fd, buffer, ret, 0); } } else if (result.event == EVENT_WRITE) { // int ret = entries->res; // printf("set_event_send ret: %d, %s\n", ret, buffer); set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); } } io_uring_cq_advance(&ring, nready); }} 1. 服务器初始化 int init_server(unsigned short port){ int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10); return sockfd;} 解释: 该函数初始化了一个 TCP 服务器套接字,用于监听客户端连接请求。 socket、bind 和 listen 是常规的服务器初始化步骤,将服务器绑定到指定的端口,并使其开始监听客户端连接。 2. io_uring 环境初始化 struct io_uring_params params;memset(¶ms, 0, sizeof(params)); struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms); 解释: io_uring_queue_init_params 函数初始化了一个 io_uring 实例,这个实例将用于管理所有的异步 I/O 操作。 ENTRIES_LENGTH 定义了提交队列和完成队列的大小,表示可以同时处理的最大 I/O 操作数量。 3. 设置 accept 事件 struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0); 解释: set_event_accept 函数将一个 accept 操作添加到 io_uring 的提交队列中。这个操作用于接受客户端连接请求。 这一步是服务器启动时的初始操作,它告诉 io_uring 开始监听并处理客户端连接。 4. 主循环:提交操作和处理完成事件 while (1){ io_uring_submit(&ring); struct io_uring_cqe *cqe; io_uring_wait_cqe(&ring, &cqe); struct io_uring_cqe *cqes[128]; int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); 解释: io_uring_submit:将之前添加到提交队列中的所有操作提交给内核,由内核异步执行这些操作。 io_uring_wait_cqe:等待至少一个操作完成,这是一个阻塞调用。 io_uring_peek_batch_cqe:批量获取已经完成的操作结果,nready 表示完成的操作数量。 5. 处理完成的事件 for (i = 0; i < nready; i++){ struct io_uring_cqe *entries = cqes[i]; struct conn_info result; memcpy(&result, &entries->user_data, sizeof(struct conn_info)); if (result.event == EVENT_ACCEPT) { set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0); int connfd = entries->res; set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); } else if (result.event == EVENT_READ) { int ret = entries->res; if (ret == 0) { close(result.fd); } else if (ret > 0) { set_event_send(&ring, result.fd, buffer, ret, 0); } } else if (result.event == EVENT_WRITE) { int ret = entries->res; set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); }} 解释: EVENT_ACCEPT:处理 accept 事件。当一个新的客户端连接到来时,io_uring 完成队列会返回 EVENT_ACCEPT 事件,表示一个新的连接已经建立。此时,服务器会: 重新设置 accept 事件,继续监听新的客户端连接。 获取新连接的文件描述符 connfd,并设置一个 recv 事件来准备接收数据。 EVENT_READ:处理 recv 事件。当从客户端接收到数据时,io_uring 返回 EVENT_READ 事件。如果接收到的数据长度大于0,则会设置一个 send 事件来将数据发送回客户端。如果 ret == 0,说明客户端关闭了连接,则关闭文件描述符。 EVENT_WRITE:处理 send 事件。当数据成功发送给客户端后,io_uring 返回 EVENT_WRITE 事件。此时,服务器会再次设置一个 recv 事件,准备接收更多数据。 6. 完成队列的推进 io_uring_cq_advance(&ring, nready); 解释: 这个函数通知 io_uring,你已经处理完了 nready 个完成队列条目(CQE)。io_uring 可以释放这些 CQE 供后续操作使用。 7. 总结 io_uring 的作用:在这个示例中,io_uring 被用来高效地处理网络 I/O 操作。通过异步提交和处理 accept、recv、send 操作,服务器能够高效处理多个并发连接,而无需阻塞等待每个I/O操作完成。 异步模型:io_uring 提供了一种低延迟、高并发的异步 I/O 处理方式。操作在提交后由内核异步执行,完成后再由应用程序查询并处理结果。这种方式大大减少了系统调用的开销,提高了程序的并发处理能力。 关键点: 提交操作:使用 io_uring_prep_* 函数准备操作,并提交给内核处理。 等待完成:使用 io_uring_wait_cqe 等方法等待操作完成,并获取结果。 处理结果:根据完成队列中的事件类型(如 EVENT_ACCEPT、EVENT_READ、EVENT_WRITE)进行相应的处理和后续操作。 七、问题与思考 1.提交队列肯定需要多线程操作,需要枷锁? 对于io_uring的提交队列来说,在多线程操作下不需要加锁。io_uring使用了无锁环形队列,通过原子操作来管理队列的头尾指针,确保多个线程可以安全且高效地并发提交I/O请求。这种设计避免了传统锁的开销,尤其适合高并发场景下的性能优化。 无锁环形队列的原理 1.环形缓冲区: 环形队列是一种常用的数据结构,其中队列的头和尾指针在到达缓冲区末尾时回绕到开始位置,从而形成一个“环”。 在io_uring 中,提交队列和完成队列实际上是一个环形缓冲区,允许多个线程高效地提交和消费I/O操作。 2.原子操作: 无锁设计的关键在于使用原子操作来管理队列指针,确保即使在多线程环境下,多个线程同时访问队列时也不会产生竞争条件。 io_uring 使用原子性操作(例如 cmpxchg 或 fetch_add)来更新队列的头尾指针,从而避免了锁的使用。 3.多生产者/多消费者支持: io_uring 的设计支持多生产者(即多个线程提交I/O请求)和多消费者(即多个线程处理I/O完成事件)。在这种模式下,通过原子操作,多个线程可以无锁地同时对提交队列或完成队列进行操作。 2.io_uring如何避免频繁的拷贝的? io_uring 的实现旨在通过减少用户空间和内核空间之间的数据复制来提高 I/O 操作的性能。虽然io_uring并不是直接依赖于mmap来避免拷贝,但它使用了多种技术和机制来减少不必要的数据复制和系统资源消耗。 io_uring 的关键机制 1.提交队列和完成队列(SQ 和 CQ): io_uring 使用了两个环形队列:提交队列(Submission Queue, SQ)和完成队列(Completion Queue, CQ)。用户空间通过 SQ 提交 I/O 请求,而内核通过 CQ 返回操作的完成状态。这两个队列都可以通过 mmap 映射到用户空间,允许用户空间直接操作这些队列,减少了系统调用的频率。 2.直接提交和批量处理: 用户空间可以将多个 I/O 请求直接写入 SQ,然后通过一个系统调用将它们提交给内核。内核可以批量处理这些请求,并将结果写入 CQ。这减少了频繁的系统调用和数据拷贝。 3.注册固定缓冲区和文件描述符: io_uring 允许用户提前注册缓冲区和文件描述符,这些缓冲区和文件描述符在后续的 I/O 操作中可以被重复使用。由于这些资源已经预先注册并映射到内核,因此在实际的 I/O 操作中不需要再次传递和复制这些资源。 4.直接 I/O 支持: io_uring 可以与直接 I/O(Direct I/O)结合使用,使得数据可以直接从用户空间传输到存储设备或网络设备,或者从设备直接读取到用户空间,绕过内核缓冲区。这进一步减少了内核空间和用户空间之间的数据拷贝。 5.零拷贝发送和接收: 在网络传输中,io_uring 支持零拷贝发送和接收,特别是在高性能网络应用中,这意味着数据可以直接从用户空间传输到网络栈,而不需要在内核缓冲区和用户缓冲区之间进行拷贝。 实现细节 内存映射(mmap):SQ 和 CQ 通常会通过 mmap 映射到用户空间,这样用户空间可以直接访问这些队列的数据结构。这不仅减少了系统调用的开销,也避免了在用户空间和内核空间之间的数据复制。 原子操作和无锁队列:在多线程环境下,io_uring 使用原子操作来管理队列的头和尾指针,从而避免了锁的使用,进一步提高了性能。 批量提交与处理:io_uring 支持批量提交 I/O 请求,允许多个请求在一次系统调用中被提交到内核。这种机制减少了系统调用的次数,并且通过批量处理可以减少内核空间与用户空间之间的数据交换。 总结 io_uring 通过设计高效的提交和完成队列、支持直接 I/O 和零拷贝技术,以及允许注册和重用缓冲区来避免频繁的数据复制。这种设计使得 io_uring 在处理大规模异步 I/O 操作时,能够提供极高的性能和低延迟。 3.当服务器通过 listen 函数开始监听端口时,程序会阻塞在哪里? 阻塞点的分析 在程序运行后,当服务器通过 listen 函数开始监听端口时,它并不会立即处理任何连接请求,而是等待客户端连接的到来。在这期间,程序的执行流会阻塞在某些地方,具体来说,它阻塞在两个可能的地方: 1.io_uring_wait_cqe(&ring, &cqe);: 这个调用是 io_uring 的一个重要部分,它用于等待完成队列(CQE)中至少有一个事件完成。在没有事件发生的情况下,比如没有客户端发起连接请求时,这个函数会阻塞,直到有新的事件到达为止。因此,如果没有客户端连接请求,程序会阻塞在这里。 2.io_uring_peek_batch_cqe(&ring, cqes, 128);: 这个函数用于检查 io_uring 完成队列中是否有已完成的事件。虽然它本身并不阻塞,但在 io_uring_wait_cqe 解除了阻塞之后,这个函数会获取已经完成的事件列表并处理它们。 重点总结 阻塞点:程序在没有客户端连接请求时,会阻塞在 io_uring_wait_cqe 函数上,等待 io_uring 完成队列中的事件。 异步处理:一旦有事件完成(例如客户端连接到来),程序会解除阻塞并处理该事件,接着继续等待下一个事件的完成。 4.io_Uring和epoll有什么相同点和区别 相同点 1.高并发I/O处理:两者都旨在高效处理大量并发I/O操作,特别是在网络服务器等需要处理众多客户端连接的场景中。 2.事件驱动模型:两者都采用事件驱动的模型,程序通过等待I/O事件发生然后进行相应处理,避免了轮询的低效性。 3.减少阻塞:epoll 和 io_uring 都通过异步或非阻塞的方式减少了I/O操作中的等待时间,提高了应用程序的响应速度和整体性能。 区别 1.设计与用途: epoll: epoll 是基于 poll 和 select 的改进版,专门用于监控多个文件描述符(socket、文件等)的事件(如读、写、异常等)。 它本身并不执行I/O操作,而是等待并通知I/O事件的发生。 epoll 适合事件驱动的网络编程,例如监视多个客户端连接的服务器。 io_uring: io_uring 是一个更广泛的异步I/O框架,它不仅仅用于事件通知,还直接执行I/O操作。 支持文件读写、网络I/O等操作,并且设计上避免了频繁的上下文切换和数据复制。 io_uring 适合需要处理大量I/O操作的高性能应用,例如高吞吐量的服务器、数据库系统等。 2.系统调用的数量与性能: epoll: 使用时需要多次系统调用。例如,你需要用 epoll_ctl 注册或修改事件,再用 epoll_wait 等待事件发生。 每次等待事件都需要从用户空间切换到内核空间,尽管 epoll 的性能比 select 和 poll 高,但频繁的系统调用仍然是一个瓶颈。 io_uring: 通过提交和完成队列(SQ和CQ)机制,大大减少了系统调用的数量。你可以批量提交多个I/O操作,然后一次性等待它们的完成。 io_uring 利用共享内存区域在用户空间和内核空间之间传递I/O请求和结果,减少了上下文切换和系统调用开销,性能优势明显。 3.I/O操作类型的支持: epoll: 主要用于监听和处理文件描述符上的事件,不直接执行I/O操作。 你可以监控 EPOLLIN、EPOLLOUT 等事件,但具体的I/O操作仍需由用户代码完成。 io_uring: 不仅可以处理事件通知,还可以直接执行I/O操作(如读写文件、网络I/O)。 支持零拷贝传输、固定缓冲区等高级功能,适合需要高效I/O处理的复杂场景。 4.阻塞与非阻塞: epoll: epoll_wait 可以设置为阻塞或非阻塞模式,通常情况下会阻塞直到有事件发生。 io_uring: io_uring 支持完全异步的操作,通过提交和完成队列的机制实现了非阻塞I/O。 可以同时处理多个I/O操作并等待它们的完成,无需像 epoll 那样分别等待每个事件的发生。 开发复杂度: epoll: 相对来说更简单,只需关注文件描述符的事件注册和处理。 io_uring: 功能更强大,支持更多操作类型,但开发复杂度较高。需要管理提交队列和完成队列,以及处理可能的错误和资源管理。
为什么要学这个项目 传统的webserver已经烂大街了,只有一个webserver项目大概率是找不到工作的,今天给大家分享一个C++Linux进阶项目-仿写Redis之Qedis,Redis是C++ Linux开发必备的核心知识,通过学习Qedis开源项目,不仅可以深入理解redis,也更能提升自己的编程能力,比如C++11任意函数作为任务的线程池,Reactor网络模型的C++封装,时间轮定时器,Redis数据结构的实现等。 视频地址 C++Linux进阶项目分析-仿写Redis之Qedis(B站搜索程序员老廖) 1.项目介绍 Qedis网上很多编译方式是错误的。 C++后台开发偏基础服务,比如分布式存储相关的岗位,基本上都是使用c++。 校招的时候如果想从事分布式存储相关的岗位,则需要深入掌握MySQL/Redis,而且即使不是做分布式存储,C++后台开发也是必须掌握Redis,所以建议大家可以参考Qedis项目仿写Redis,这样和其他人写webserver就有区分度。 Qedis 是一个基于 C++11 实现的 Redis 服务器,支持集群功能,并使用 LevelDB 作为持久化存储。该项目旨在提供一个高性能、可扩展的内存数据库解决方案,适用于需要快速数据访问和处理的应用场景。 Qedis 可以广泛应用于需要高性能数据存储和访问的场景,例如: 实时数据分析:Qedis 的高性能和低延迟特性使其成为实时数据分析的理想选择。 缓存系统:Qedis 可以作为缓存层,加速应用程序的数据访问速度。 消息队列:结合 Redis 的发布/订阅功能,Qedis 可以用于构建高效的消息队列系统 2 项目快速启动 确保系统已安装 C++11 编译器。 2.1 安装 LevelDB 库 2.1.0 leveldb介绍 LevelDB是一种快速的键-值存储库,由Google开发,用于提供高性能的数据持久性存储。它通常被用作支持各种应用程序的底层数据库引擎,包括分布式数据库、区块链、分布式文件系统等。 2.1.1 下载Level git clone https://gitclone.com/github.com/google/leveldb 2.1.2 下载编译leveldb需要的googletest、benchmark 1、进入下载好的leveldb cd leveldb 2、进入third_party目录 cd third_party 3、下载 git clone https://gitclone.com/github.com/google/googletestgit clone https://gitclone.com/github.com/google/benchmark 2.1.3 编译安装 回到leveldb目录 cd ../mkdir build && cd build# 步骤4: 使用CMake生成构建文件,这里以Debug模式为例cmake -DCMAKE_BUILD_TYPE=Debug -DBUILD_SHARED_LIBS=1 ..makesudo make install 2.1.4 刷新环境 sudo ldconfig 2.1.5 c++测试范例 测试leveldb,文件名为hello.cc,内容如下所示: #include #include #include #include using namespace leveldb; int main(){ leveldb::DB* db; leveldb::Options options; options.create_if_missing = true; // 打开一个数据库,不存在就创建 leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db); assert(status.ok()); // 插入一个键值对 status = db->Put(leveldb::WriteOptions(), "hello", "LevelDB"); assert(status.ok()); // 读取键值对 std::string value; status = db->Get(leveldb::ReadOptions(), "hello", &value); assert(status.ok()); std::cout << value << std::endl; delete db; return 0;} 编译代码: g++ hello.cc -o hello -lpthread -lleveldb 执行程序: ./hello 显示: LevelDB 2.2 下载和编译Qedis git clone https://github.com/loveyacper/Qedis.gitcd Qedis mkdir buildcd build# 以debug模式编译目的是为了方便后续debug源码cmake -DCMAKE_BUILD_TYPE=Debug ..make 编译后的执行文件在Qedis/bin目录,所以需要切换目录: lqf@ubuntu:~/long/Qedis/build$ cd ../bin/ 2.3 启动Qedis和测试 需要确保之前已经安装过redis,因为测试我们需要redis-cli,但redis-server不要启动。 2.3.1 默认方式启动Qedis lqf@ubuntu:~/long/Qedis/bin$ ./qedis_server 打印: _____ _____ _____ _ _____ / _ \ | ____| | _ \ | | / ___/ | | | | | |__ | | | | | | | |___ Qedis(1.0.0) 64 bits, another redis written in C++11 | | | | | __| | | | | | | \___ \ Port: 6379 | |_| |_ | |___ | |_| | | | ___| | Author: Bert Young \_______| |_____| |_____/ |_| /_____/ https://github.com/loveyacper/Qedis start log thread2024-11-27[14:49:20.419][USR]:Success: listen on (127.0.0.1:6379) 通过redis-cli控制台进行测试: # 启动redis-cli控制台lqf@ubuntu:~/long/Qedis/bin$ redis-cli # 设置key127.0.0.1:6379> set key qedisOK# 获取key,测试正常127.0.0.1:6379> get key"qedis"127.0.0.1:6379> 2.3.2 通过配置文件启动Qedis 如果需要使用leveldb做持久化,需要修改配置文件Qedis/qedis.conf ,将backend 设置为1: 然后重新启动qedis_server,注意你配置文件路径 ./qedis_server ~/long/Qedis/qedis.conf 打印: Load libqedismodule.dylib failed because runtime error Load libnotexist.dylib failed because runtime error / _ \ | | | _ \ | | / ___/| | | | | |__ | | | | | | | |___ Qedis(1.0.0) 64 bits, another redis written in C++11| | | | | | | | | | | | _ \ Port: 6379| || | | |_ | |_| | | | _| | Author: Bert Young___| |_| |_/ |_| /_/ https://github.com/loveyacper/Qedisstart log thread2024-11-2818:32:21.938:Success: listen on (127.0.0.1:6379)2024-11-2818:32:21.946:Open leveldb dump02024-11-2818:32:21.951:Open leveldb dump12024-11-2818:32:21.957:Open leveldb dump22024-11-2818:32:21.961:Open leveldb dump32024-11-2818:32:21.967:Open leveldb dump42024-11-2818:32:21.971:Open leveldb dump52024-11-2818:32:21.976:Open leveldb dump62024-11-2818:32:21.980:Open leveldb dump72024-11-2818:32:21.984:Open leveldb dump82024-11-2818:32:21.989:Open leveldb dump92024-11-2818:32:21.994:Open leveldb dump102024-11-2818:32:21.999:Open leveldb dump112024-11-2818:32:22.004:Open leveldb dump122024-11-2818:32:22.009:Open leveldb dump132024-11-2818:32:22.014:Open leveldb dump142024-11-2818:32:22.019:Open leveldb dump15 3 如何仿写redis 熟悉redis原理,但这个不是说一定要每行代码都看懂,推荐书籍:《Redis设计与实现》 熟悉leveldb,也是不说一定要每行代码看懂,主要是先理解下leveldb如何使用,我这里有整理的leveldb相关的资料,大家可以加laoliao6668微信获取。 熟悉Qedis源码,参考Qedis仿写redis,前期可以先照抄Qedis源码都可以,我这里主要梳理Qedis的流程。 4 Qedis框架流程 这里是服务器开源项目调试的经典流程,不只局限于qedis源码。 4.0 通过gdb启动服务程序 我们通过gdb的方式启动qedis_server gdb ./qedis_server 如果需要设置配置文件,则在gdb启动后,这样就能加载配置文件 (gdb) set args ../qedis.conf 4.1 main函数在哪个文件 4.1.1 获取main函数所在源文件位置 在main函数打断点: (gdb) b mainBreakpoint 1 at 0x5f60:file/home/lqf/long/Qedis/QedisSvr/Qedis.cc, line 447. 这样能快速定位main函数所在的文件,结合vscode,点击 自动跳转到main函数,是不是非常方便。 这里我们没有提供配置文件参数,默认是前台运行,前台运行更方便debug 4.1.2 r运行程序 我们输入r先运行到main函数再打其他断点,输入r然后回车: Breakpoint 1 at 0x5f60: file /home/lqf/long/Qedis/QedisSvr/Qedis.cc, line 447.(gdb) rStarting program: /home/lqf/long/Qedis/bin/qedis_server [Thread debugging using libthread_db enabled]Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". Breakpoint 1, main (ac=1, av=0x7fffffffe038) at /home/lqf/long/Qedis/QedisSvr/Qedis.cc:447447 {(gdb) 接下来我们就要根据tcp 服务的套路打断点: 4.2 qedis服务框架 设置断点 所有的tcp服务器都遵循 函数: bind listen accept epoll_wait epoll_ctl 发送 send或者write,不同的项目可能用的函数不一样,先断点send 接收recv或者read,不同的项目可能用的函数不一样,先断点recv (gdb) b bindBreakpoint 2 at 0x7ffff7b71380: file ../sysdeps/unix/syscall-template.S, line 78.(gdb) b listenBreakpoint 3 at 0x7ffff7b714e0: file ../sysdeps/unix/syscall-template.S, line 78.(gdb) b acceptBreakpoint 4 at 0x7ffff798e4b0: accept. (2 locations)(gdb) b epoll_waitBreakpoint 5 at 0x7ffff7b70630: file ../sysdeps/unix/sysv/linux/epoll_wait.c, line 28. (gdb) b epoll_ctlBreakpoint 6 at 0x7ffff7b70d10: file ../sysdeps/unix/syscall-template.S, line 78.(gdb) b sendBreakpoint 7 at 0x7ffff798e770: send. (2 locations)(gdb) b recvBreakpoint 8 at 0x7ffff798e5f0: recv. (2 locations) bind和listen查看调用栈 打好断点后,我们输入c继续运行程序: Thread 1 "qedis_server" hit Breakpoint 2, bind () at ../sysdeps/unix/syscall-template.S:7878 ../sysdeps/unix/syscall-template.S: No such file or directory. 通过查看堆栈的方式: #0 bind () at ../sysdeps/unix/syscall-template.S:78#1 0x00007ffff7f11289 in Internal::ListenSocket::Bind (this=this@entry=0x5555555759a0, addr=...) at /home/lqf/long/Qedis/QBase/ListenSocket.cc:48#2 0x00007ffff7f1935a in Server::TCPBind (this=
Nginx是什么 Nginx(engine X)是一个开源的轻量级的HTTP服务器,能够提供高性能的HTTP和反向代理服务。 与传统的Apache服务器相比,在性能上Nginx占用系统资源更小、支持高并发,访问效率更高;在功能上,Nginx不仅作为Web服务软件,还适用于反向代理、负载均衡等场景;在安装配置上,Nginx更为简单、灵活。 Nginx因为并发性能和资源占用上的优势,已经广泛用于大中型互联网企业。 Nginx特点 支持高并发:Nginx是专门为性能优化而开发的,采用内核Poll模型,单机能够支持几万以上的并发连接;nginx支持高并发连接,处理2-3万并发连接数,官方监测能支持5万并发。对HTTP并发连接的处理能力高,单台物理服务器可支持30000~50000个并发请求。(实际操作,很多公司为了服务器的稳定,都会设置在20000个左右) 低资源消耗:Nginx采取了分阶段资源分配技术,使得CPU与内存的占用率非常低。一般1万个非活跃的HTTP Keep-Alive连接在Nginx中仅消耗几MB内存;可以跨平台,配置简单,内存消耗少,10个nginx才占用150M内存。 低成本,高稳定:成本低,且开源,稳定性高,宕机概率非常小; 高拓展性:设计极具扩展性,由多个不同功能、不同层次、不同类型且耦合度极低的模块组成 高可用性:Nginx支持热部署,其中的master管理进程与worker工作进程的分离设计;启动速度特别迅速,因此可以在不间断服务的情况下,对软件版本或者配置进行升级,即使运行数月也无需重新启动,几乎可以做到7x24小时不间断地运行 丰富的使用场景:可以作为Web服务端、HTTP反向代理、负载均衡和前端缓存服务等场景使用 开源协议:使用BSD许可协议,免费使用,且可修改源码。Nginx 是开源、高性能、高可靠的 Web 和反向代理服务器,而且支持热部署,几乎可以做到 7 * 24 小时不间断运行,即使运行几个月也不需要重新启动,还能在不间断服务的情况下对软件版本进行热更新。 异步非阻塞网络模型:使用的是epoll模型,这种模型是I/O多路复用技术(I/O多路复用是一种技术,它允许一个进程或线程监控多个网络连接,当其中某个或某几个连接有数据时,当前程序可以拿到网卡收到的数据进行下一步的处理;),异步非阻塞的模型(异步非阻塞模型可以提高程序的效率,在等待I/O操作完成的同时,可以继续执行其他代码。) 健康监测:内置的健康检查功能,可以允许在服务器宕机的时候,做健康检查,再发送的请求就不会发给宕机的服务器,会重新提交到其他节点上。 Nginx源码的目录结构: ├── auto 自动检测系统环境以及编译相关的脚本│ ├── cc 关于编译器相关的编译选项的检测脚本│ ├── lib nginx编译所需要的一些库的检测脚本│ ├── os 与平台相关的一些系统参数与系统调用相关的检测│ └── types 与数据类型相关的一些辅助脚本├── conf 存放默认配置文件,在make install后,会拷贝到安装目录中去├── contrib 存放一些实用工具,如geo配置生成工具(geo2nginx.pl)├── html 存放默认的网页文件,在make install后,会拷贝到安装目录中去├── man nginx的man手册└── src 存放nginx的源代码 ├── core nginx的核心源代码,包括常用数据结构的定义,以及nginx初始化运行的核心代码如main函数 ├── event 对系统事件处理机制的封装,以及定时器的实现相关代码 │ └── modules 不同事件处理方式的模块化,如select、poll、epoll、kqueue等 ├── http nginx作为http服务器相关的代码 │ └── modules 包含http的各种功能模块 ├── mail nginx作为邮件代理服务器相关的代码 ├── misc 一些辅助代码,测试c++头的兼容性,以及对google_perftools的支持 └── os 主要是对各种不同体系统结构所提供的系统函数的封装,对外提供统一的系统调用接口 1. Nginx架构简介 换张中文图,Nginx架构更容易理解 Master 进程 Master 进程: Nginx 的运行始于一个 master 进程,它负责管理所有的工作进程。 master 进程负责读取和解析配置文件,并启动工作进程。 当 Nginx 启动时,它会生成两种类型的进程:主进程(master)和工作进程(worker)。 主进程并不处理网络请求,而是负责调度工作进程,包括加载配置、启动工作进程以及进行非停升级。 因此,当 Nginx 启动后,查看操作系统的进程列表,至少会有两个 Nginx 进程。 工作进程 工作进程: 一旦 master 进程启动,它会生成一组工作进程。 每个工作进程都是独立运行的,负责处理来自客户端的连接和请求。 工作进程之间相互独立,可以并行处理请求,提高了 Nginx 的性能和吞吐量。 服务器实际 处理网络请求 及 响应 的是 工作进程(worker),在类 unix 系统上,Nginx 可以配置 多个 worker,而每个 worker 进程 都可以同时处理 数以千计 的 网络请求。 每个工作进程在启动时都会复制主进程的配置信息和相关资源,但它们彼此之间是相互独立的,这意味着它们可以并行地处理请求,互不影响。 此外,每个工作进程还会维护一个事件驱动的事件循环,通过事件驱动机制处理来自客户端的连接请求、数据读取和响应发送,这种异步非阻塞的 I/O 模型确保了 Nginx 的高性能和低资源消耗。 模块化设计 Nginx 核心模块: Nginx 的核心模块包括 HTTP 模块、事件模块、解析器模块等。 HTTP 模块处理 HTTP 请求和响应,包括 HTTP 头部解析、HTTP 请求方法解析、URI 解析等。 事件模块负责处理底层的事件通知机制,如 Epoll、Kqueue 等。 解析器模块负责解析 Nginx 配置文件。 Nginx 的 worker 进程分为核心模块和功能性模块。 核心模块主要负责维持一个运行循环(run-loop),在其中执行网络请求处理的不同阶段的模块功能,如网络读写、存储读写、内容传输、外出过滤,以及将请求发往上游服务器等。 Nginx 的代码采用了模块化设计,这使得我们可以根据需要选择和修改功能模块,然后编译成具有特定功能的服务器。 事件驱动模型 事件驱动模型: Nginx 采用了事件驱动的模型,主要利用了操作系统提供的异步 I/O 机制。 当有新的连接建立或者数据可读写时,Nginx 不会阻塞等待,而是通过事件通知机制处理这些事件,从而提高了处理效率。 Nginx 实现了高并发、高性能的关键在于其基于异步及非阻塞的事件驱动模型。 这种模型使得 Nginx 能够高效地处理大量并发请求,而不会因为阻塞等待而降低性能。 此外,Nginx 还充分利用了 Linux、Solaris 以及类 BSD 等操作系统内核中提供的事件通知和 I/O 性能增强功能,如 kqueue、epoll 以及 event ports,进一步提升了其性能表现。 代理设计 Nginx 作为高性能的代理服务器,其代理原理是其设计的核心之一。 无论是针对 HTTP 还是其他协议(如 FastCGI、Memcache、Redis等)的网络请求或响应,Nginx 都采用了代理机制来实现数据的转发和处理。 Nginx 的代理原理主要基于以下几个关键点: 接收请求:当 Nginx 接收到客户端的请求时,根据配置文件中的代理设置,确定是否需要进行代理转发。如果需要代理转发,则根据配置选择合适的代理方式。 建立连接:Nginx 会与目标服务器建立连接,可以是与远程服务器建立 TCP 连接,也可以是与本地应用程序之间建立的 Unix Socket 连接,取决于代理目标的具体情况。 数据传输:一旦连接建立成功,Nginx 会将客户端的请求数据转发给目标服务器,并且在接收到目标服务器的响应后,再将响应数据返回给客户端。这个过程可以是全双工的,意味着 Nginx 可以同时接收客户端请求和目标服务器响应,然后进行相应的转发和处理。 代理缓存:为了进一步提高性能,Nginx 还支持代理缓存功能。它可以将经常请求的数据缓存在本地,避免每次请求都要向后端服务器发起请求,从而减少响应时间和网络负载。 负载均衡:对于需要代理转发的请求,Nginx 还支持负载均衡功能,可以根据一定的策略将请求分发到多个后端服务器上,以实现负载均衡和高可用性。 2. Nginx架构详解 工作流程: 当有新的 HTTP 请求到达时,master 进程会将其分发给一个工作进程。 工作进程处理请求,根据配置文件进行请求的处理,包括反向代理、负载均衡、静态文件服务等。 处理完成后,工作进程将响应返回给客户端。 2.1 Nginx进程模型 Nginx默认采用多进程工作方式,在Nginx启动后,会运行一个master进程和多个worker进程。 master主要用来管理worker进程,充当整个进程组与用户的交互接口,同时对进程进行监护,实现worker进程的重启服务、平滑升级、更换日志文件、配置文件实时生效等功能; worker进程用来处理基本的网络事件,worker之间是平等的,他们共同竞争来处理来自客户端的请求。一个请求只能在一个worker进程中处理,一个worker进程不可能处理其它worker进程中的请求。 另外在Nginx架构中还有Cache Loader和Cache Manager进程,Cache Loader进程加载缓存索引文件信息;Cache Manager进程管理磁盘的缓存大小,超过预定值大小后最小使用的数据将被删除。 2.1.1 Master管理进程 Master进程主要用来管理worker进程,具体包括如下4个主要功能: 接收来自外界的信号; 向各worker进程发送信号; 监控woker进程的运行状态; 当woker进程退出后(异常情况下),会自动重新启动新的woker进程。 Master进程接受到命令重启Nginx进程(./nginx -s reload),会按照以下流程: 1) 首先master进程在收到重启命令后,会先重新加载配置文件,然后再启动新的worker进程,并向所有老的worker进程发送信号,告诉他们可以光荣退休了。 2) 新的worker进程在启动后,就开始接收新的请求,而老的worker在收到来自master的信号后,就不再接收新的请求,并且处理完当前进程中的所有未处理完的请求后,再退出。 2.1.2 Worker工作进程 Worker工作进程之间是对等的,每个进程处理请求的机会也是一样的。Nginx采用异步非阻塞的方式来处理网络事件,具体流程如下: 1) 接收请求:首先,每个worker进程都是从master进程fork过来,在master进程建立好需要listen的socket(listenfd)之后,然后再fork出多个worker进程。 a) 所有worker进程的listenfd会在新连接到来时变得可读,每个work进程都可以去accept这个socket(listenfd)。 b) 当一个client连接到来时,所有accept的work进程都会受到通知,但只有一个进程可以accept成功,其它的则会accept失败。 c) 为保证只有一个进程处理该连接,Nginx提供了一把共享锁accept_mutex来保证同一时刻只有一个work进程在accept连接。 d) 所有worker进程在注册listenfd读事件前抢accept_mutex,抢到互斥锁的那个进程注册listenfd读事件,在读事件里调用accept接受该连接。 2) 处理请求:当一个worker进程在accept这个连接之后,就开始读取请求,解析请求,处理请求,产生数据后,再返回给客户端,最后才断开连接。 由上可以看出,一个请求完全由worker进程处理,并且只在一个worker进程中处理。 2.2 Nginx请求处理流程 Nginx工作进程会监听套接字上的事件(accept_mutex和kernel socketsharding),来决定什么时候开始工作。 事件是由新的连接初始化的,这些连接会被分配给状态机,Nginx中有三大类状态机:处理应用层的HTTP状态机、处理TCP/UDP的4层的传输层状态机和处理邮件的MAIL状态机,其中HTTP状态机最为常见。 在多种流量进入Nginx后,Nginx的三种状态机在Nginx解析出请求后,会动用线程池处理调用,将静态资源、反向代理、错误日志等信息分别导向不同的出口,比如fastcgi会导向PHP处理、html会导向nginx处理,并将处理请求日志记录到本地或远程服务器中。 处理流程图: 在nginx中我们指的是http请求,具体到nginx中的数据结构是ngx_http_request_t。ngx_http_request_t是对一个http请求的封装。 我们知道,一个http请求,包含请求行、请求头、请求体、响应行、响应头、响应体。 http请求是典型的请求-响应类型的的网络协议,而http是文件协议,所以我们在分析请求行与请求头,以及输出响应行与响应头,往往是一行一行的进行处理。 如果我们自己来写一个http服务器,通常在一个连接建立好后,客户端会发送请求过来。 然后我们读取一行数据,分析出请求行中包含的method、uri、http_version信息。 然后再一行一行处理请求头,并根据请求method与请求头的信息来决定是否有请求体以及请求体的长度,然后再去读取请求体。 得到请求后,我们处理请求产生需要输出的数据,然后再生成响应行,响应头以及响应体。 在将响应发送给客户端之后,一个完整的请求就处理完了。 当然这是最简单的webserver的处理方式,其实nginx也是这样做的,只是有一些小小的区别,比如,当请求头读取完成后,就开始进行请求的处理了。 nginx通过ngx_http_request_t来保存解析请求与输出响应相关的数据。 2.3 Nginx多进程IO模型 2.3.1 Nginx多进程模型 Nginx默认使用多进程的工作方式,相比较多线程的方式,有以下好处: 1) 首先,对于每个worker进程来说,独立的进程不需要加锁,所以省掉了锁带来的开销,同时在编程以及问题查找时,也会方便很多; 2) 其次,采用独立的进程,可以让进程之间相互不会影响,一个进程退出后,其它进程还在工作,服务也不会中断,master进程则很快启动新的worker进程; 3) 再次,为Nginx热部署提供了支持。在修改配置文件nginx.conf后,重新生成新的worker进程,新的worker进程会以新的配置处理请求,而老的worker进程,等把以前的请求处理完成以后,kill掉就可以。 2.3.2 Nginx异步非阻塞事件模型 异步非阻塞事件是怎么回事? 先看一个请求的完整过程,首先请求过来建立连接,然后再接收数据再发送数据,具体到系统层就是IO读写事件。当读写事件没有准备好,如果不采用非阻塞的方式,就得阻塞调用,阻塞调用会进入内核等待,导致CPU资源被其它进程占用。当并发请求越大时,等待的事件越多,CPU利用不上去,并发也上不去。因此Nginx使用非阻塞的事件模型,系统中事件模型有很多中,比如select/poll/kqueue/epoll等,Nginx采用epoll模型。 Epoll模型基于事件驱动机制,可以监控多个事件是否准备完毕,如果可以,就放入epoll队列,这个过程是异步的,worker进程只需要从epoll队列循环处理即可。 Epoll调用过程如下图所示: 事件驱动&异步非阻塞: 本质来说,事件驱动是一种思想(事实上它不仅仅局限于编程) ,事件驱动思想是实现 异步非阻塞特性 的一个重要手段。 对于web服务器来说,造成性能拉胯不支持高并发的常见原因就是由于使用了传统的I/O模型造成在内核没有可读/可写事件(或者说没有数据可供用户进程读写)时,用户线程 一直在等待(其他事情啥也干不了就是干等等待内核上的数据可读/可写),这样的话其实是一个线程(ps:线程在Linux系统也是进程)对应一个请求,请求是无限的,而线程是有限的从而也就形成了并发瓶颈。 而大佬们为了解决此类问题,运用了事件驱动思想来对传统I/O模型做个改造,即在客户端发起请求后,用户线程不再阻塞等待内核数据就绪,而是立即返回(可以去执行其他业务逻辑或者继续处理其他请求)。 当内核的I/O操作完成后,内核系统会向用户线程发送一个事件通知,用户线程才来处理这个读/写操作,之后拿到数据再做些其他业务后响应给客户端,从而完成一次客户端请求的处理。 事件驱动的I/O模型中,程序不必阻塞等待I/O操作的完成,也无需为每个请求创建一个线程,从而提高了系统的并发处理能力和响应速度。 事件驱动型的I/O模型通常也被被称为I/O多路复用,即这种模型可以在一个线程中,处理多个连接(复用就是指多个连接复用一个线程,多路也即所谓的 多个连接),通过这种方式避免了线程间切换的开销,同时也使得用户线程不再被阻塞,提高了系统的性能和可靠性。 nginx支持事件驱动是因为他利用了操作系统提供的I/O多路复用接口,如Linux系统中,常用的I/O多路复用接口有select/poll,epoll。 这些接口可以监视多个文件描述符的状态变化,当文件描述符可读或可写时,就会向用户线程发送一个事件通知。 用户线程通过事件处理机制(读取/写入数据)来处理这个事件,之后进行对应的业务逻辑完了进行响应。 简单一句话概括: 事件驱动机制就是指当有读/写/连接事件就绪时 再去做读/写/接受连接这些事情,而不是一直在那里傻傻的等,也正应了他的名词: 【事件驱动!】,基于事件驱动思想设计的多路复用I/O(如select/poll,epoll),相对于传统I/O模型,达到了异步非阻塞的效果! 既然提到了select/poll,epoll 那么我们就简单说一下(注意我这里是简单描述,后续有时间会对相关知识点从源码层面做个系统的整理和图解): select: 将已连接的 Socket 都放到一个文件描述符集合,然后用户态调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。 poll: poll函数的话其实和select大差不差,唯一区别可能就是socket列表的结构有所不同,不再受FD_SETSIZE的限制。这里就不多说了。 epoll: epoll在前边两者的基础上做了很大的优化,select/poll都需要遍历整个socket列表,当检测到传入的socket可读/可写时,则copy socket列表给用户空间,用户态仍然需要遍历(因为内核copy给用户态的是整个socket列表) ,而epoll则是通过红黑树结构将需要监控的socket插入到进去,然后当有socket可读时会通过回调机制来将其添加到可读列表中,然后内核将可读列表copy给用户态即可(据说此处使用了mmap这里我们不去验证探究,后续写相关文章时在深究吧),整个过程少了无效的遍历以及不用copy整个socket集合。 2.3.3 Nginx请求响应过程种的 I/O过程 1、用户态:应用程序,我们可以控制 2、内核态:操作系统层面,我们不容易去控制的 操作系统 ① 当用户发起 http 请求需要请求一个index.html 网页文件 ② 客户端请求与服务器端建立连接,建立连接后,会发送请求报文 ③ 服务端的网卡收到请求报文,会将该报文复制到内核空间,内核空间分析报文后交给对应的程序。nginx 分析该报文,将报文和自己的配置文件,一一比对,按照配置文件完成请求,分析后发现客户需要 index.html 文件 ④ 由于程序的权限问题,没有资格直接调用磁盘上的文件,程序会再将这个请求再次转发给内核,内核得到后请求去磁盘上找文件,找到文件后复制给程序 nginx ⑤ 程序会构建响应报文,构建好后再交给内核空间 ⑥ 内核空间得到响应报文后,再交给网卡发给客户 2.3.4 零拷贝技术 在传统的数据传输过程中,数据通常需要经过多次复制。 比如,当数据从磁盘读取到内存时,首先将数据读入内核缓冲区,然后再从内核缓冲区复制到用户空间的应用程序缓冲区。 零拷贝技术通过避免或减少数据在内存和设备之间的多次复制来提高效率。 具体做法包括直接内存访问(DMA)、文件映射(mmap)和发送文件(sendfile)等。 MMAP ( Memory Mapping ) mmap()系统调用使得进程之间通过映射同一个普通文件实现共享内存。普通文件被映射到进程地址空间后,进程可以向访问普通内存一样对文件进行访问。 mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。 直接内存访问(DMA) 文件映射(mmap) 发送文件(sendfile) Nginx 的零拷贝技术主要是指在网络 I/O 操作中,使用 sendfile 方法来提高性能,这样可以避免用户空间和内核空间之间的数据拷贝,从而提高系统的吞吐量。 零拷贝技术的核心函数是 sendfile(),它在 Linux 2.0 版本以上的操作系统中得到支持。当 Nginx 配置文件中启用了 sendfile 指令时,Nginx 会使用这个系统调用来输出静态文件。 以下是一个简单的 Nginx 配置示例,其中启用了 sendfile: server { listen 80; server_name localhost; location / { root /usr/share/nginx/html; index index.html index.htm; sendfile on; # 启用 sendfile 方法 }} 在这个配置中,当客户端请求静态文件时,Nginx 会直接将文件内容通过 sendfile() 系统调用发送给客户端,而不是将文件内容读取到用户空间的缓冲区中,再从用户空间的缓冲区拷贝到内核空间去发送。这样就减少了数据拷贝的次数和 CPU 的参与,从而提高了性能。 2.3.5 Nginx为什么不使用多线程? Apache: 创建多个进程或线程,而每个进程或线程都会为其分配 cpu 和内存(线程要比进程小的多,所以worker支持比perfork高的并发),并发过大会耗光服务器资源 Nginx: 采用单线程来异步非阻塞处理请求(管理员可以配置Nginx主进程的工作进程的数量)(epoll),不会为每个请求分配cpu和内存资源,节省了大量资源,同时也减少了大量的CPU的上下文切换。所以才使得Nginx支持更高的并发。 2.3.6 Nginx 是如何实现高并发的? 1.异步,非阻塞,使用了epoll 和大量的底层代码优化 如果一个server采用一个进程负责一个request的方式,那么进程数就是并发数。正常情况下,会有很多进程一直在等待中 而nginx采用一个master进程,多个woker进程的模式 master进程主要负责收集、分发请求。每当一个请求过来时,master就拉起一个worker进程负责处理这个请求。同时master进程也负责监控woker的状态,保证高可靠性 woker进程一般设置为跟cpu核心数一致。nginx的woker进程在同一时间可以处理的请求数只受内存限制,可以处理多个请求 Nginx 的异步非阻塞工作方式正把当中的等待时间利用起来了。在需要等待的时候,这些进程就空闲出来待命了,因此表现为少数几个进程就解决了大量的并发问题 2.同步和异步 同步:一个服务的完成需要依赖其他服务时,只有等待被依赖的服务完成后,才算完成,这是一种可靠的服务序列。要么成功都成功,失败都失败,服务的状态可以保持一致 异步:一个服务的完成需要依赖其他服务时,只通知其他依赖服务开始执行,而不需要等待被依赖的服务完成,此时该服务就算完成了。被依赖的服务是否最终完成无法确定,因此它是一个不可靠的服务序列 3.阻塞与非阻塞 阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务,函数只有在得到结果之后才会返回。 非阻塞:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回 3.Nginx功能模块和应用场景 3.1 Nginx功能模块说明 Nginx由内核和模块组成,其中内核在设计上非常简洁,完成的工作非常简单,仅仅通过查找配置文件将客户端请求映射到一个location block,而在这个location中所配置的每个指令将会启动不同的模块去完成相应的工作。 nginx 六大模块 核心模块:是 Nginx 服务器正常运行必不可少的模块,提供错误日志记录 、配置文件解析 、事件驱动机制 、进程管理等核心功能 标准HTTP模块:提供 HTTP 协议解析相关的功能,比如: 端口配置 、 网页编码设置 、 HTTP响应头设置 等等 可选HTTP模块:主要用于扩展标准的 HTTP 功能,让 Nginx 能处理一些特殊的服务,比如:Flash 多媒体传输 、解析 GeoIP 请求、 网络传输压缩 、 安全协议 SSL 支持等 邮件服务模块:主要用于支持 Nginx 的 邮件服务 ,包括对 POP3 协议、 IMAP 协议和 SMTP协议的支持 Stream服务模块: 实现反向代理功能,包括TCP协议代理 反向 第三方模块:是为了扩展 Nginx 服务器应用,完成开发者自定义功能,比如: Json 支持、 Lua 支持等 nginx高度模块化,但其模块早期不支持DSO机制;1.9.11 版本支持动态装载和卸载 3.1.1 Nginx模块分类 Nginx的模块从结构上分为核心模块、基础模块和第三方模块,其中用户根据自己的需要开发的模块都属于第三方模块: 核心模块:HTTP模块、EVENT模块和MAIL模块 基础模块:HTTP Access模块、HTTP FastCGI模块、HTTP Proxy模块和HTTP Rewrite模块; 第三方模块:HTTP Upstream Request Hash模块、Notice模块和HTTP Access Key模块。 3.1.2 Nginx模块功能 Nginx模块常规的HTTP请求和响应的过程如上图所示,Nginx模块从功能上分为以下三类: Handlers处理器模块:此类模块直接处理请求,并进行输出内容和修改headers信息等操作。Handlers处理器模块一般只能有一个。 Filters过滤器模块:此类模块主要对其他处理器模块输出的内容进行修改操作,最后由Nginx输出。 Proxies代理类模块:此类模块是Nginx的HTTP Upstream之类的模块,这些模块主要与后端一些服务比如FastCGI等进行交互,实现服务代理和负载均衡等功能。 Nginx本身处理的工作很少,当它接到一个HTTP请求时,通过查找配置文件将此次请求映射到一个location block,而此location中所配置的各个指令则会启动不同的模块去完成工作。 3.2 Nginx使用场景 首先,我们一般会将请求打到Nginx, 再把请求转发到我们的应用服务。 比如我们常用的php-fpm/golang程序或者tomcat,再由应用服务访问缓存,数据库等存储以提供基本的数据服务能力。 由于我们开发过程中要求应用程序开发效率较高,但其运行效率是较低的。 单个应用程序的qps,tps都是受限的,不足以支撑用户的请求量,那么为了提高整个服务的吞吐能力,就需要将多个应用程序组成一个集群来整体向外提供高可用服务。这样就会延伸出来2个需求,1.负载均衡,2.当有个别应用程序出问题的时候,需要做容灾。那么我们的反向代理就需要具备负载均衡的能力。 其次,Nginx一般处于边缘节点,离用户最近,我们可以将一些热点数据缓存在Nginx中,直接向用户提供访问,从而达到减少用户时延的效果。这也就衍生出了缓存功能。 第三,当应用程序的性能不及缓存,数据库的性能时,有一些接口我们可以由Nginx直接访问数据库,redis,第三方应用服务。如:使用Openresty,lua等。 第四,我们还可以将css, js, 小图片等静态资源直接放到Nginx服务中,没有必要再次请求应用服务。 3.2.1 静态文件服务 静态文件服务:Nginx在提供静态资源服务方面效率很高,可以快速的响应大量的静态请求,减轻其他动态服务器的负担,如CSS、JavaScript、Image、Audio和Video文件等。 以下是一个简单的配置示例,用于设置Nginx以服务静态文件: server { listen 80; server_name example.com; location / { root /usr/share/nginx/html; index index.html index.htm; try_files $uri $uri/ =404; }} 解释: listen 80; 表示Nginx监听80端口。 server_name Example Domain; 表示对应的域名。 location / { ... } 定义了一个处理所有请求的区块。 root /usr/share/nginx/html; 指定了静态文件存放的根目录。 index index.html index.htm; 指定了默认页面。 try_files $uri $uri/ =404; 尝试按顺序提供请求的文件,如果找不到文件则返回404错误。 确保/usr/share/nginx/html目录包含您的静态文件,并根据需要调整server_name和root指令。 3.2.2 缓存服务器 缓存服务器:Nginx可以缓存一些响应结果,降低后端服务器的负载,提高数据的访问速度,平衡访问压力等。 Nginx 可以用作缓存服务器,通过配置 ngx_http_proxy_module 和 ngx_http_cache_module 实现。以下是一个简单的配置示例,它将设置 Nginx 作为反向代理的同时开启缓存功能: http { proxy_cache_path /data/nginx/cache levels=1:2 keys_zone=my_cache:10m max_size=10g inactive=60m use_temp_path=off; server { listen 80; location / { proxy_pass http://upstream_server; proxy_cache my_cache; proxy_cache_valid 200 302 10m; proxy_cache_valid 404 1m; proxy_cache_use_stale error timeout updating http_500 http_502 http_503 http_504; } }} 解释: proxy_cache_path 定义了缓存的存储路径、缓存目录的层次结构、缓存键的区域、最大缓存大小以及缓存内容在多长时间没有被访问后应被视为不活动并可能被删除。 proxy_cache 指定使用名为 my_cache 的缓存。 proxy_cache_valid 设置了不同HTTP响应状态码的缓存有效期。 proxy_cache_use_stale 定义了在指定的错误情况下或缓存内容超时时是否可以使用过期的缓存数据。 确保替换 http://upstream_server 为你的后端服务器地址。这个配置假设 Nginx 已经安装了 ngx_http_proxy_module 和 ngx_http_cache_module。 3.2.3 SSL加速 SSL加速:Nginx 可以通过 HTTPS 访问加速,提高 HTTPS 访问的性能,减少SSL负载压力,保证数据的安全性。 Nginx SSL加速通常指的是通过优化SSL/TLS配置来提高性能,这可以包括禁用不必要的SSL/TLS协议版本、使用更快的加密算法、启用会话复用等。以下是一个基本的Nginx配置示例,展示了一些可以用来加速SSL的配置: server { listen 443 ssl; server_name yourdomain.com; ssl_certificate /path/to/your/certificate.pem; ssl_certificate_key /path/to/your/private.key; # 仅启用最安全的TLS协议版本 ssl_protocols TLSv1.2 TLSv1.3; # 仅使用安全的加密算法 ssl_ciphers 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384'; ssl_prefer_server_ciphers on; # 启用会话复用 ssl_session_cache shared:SSL:10m; ssl_session_timeout 10m; # 其他SSL配置和安全头配置 ... # 服务器的其他配置 location / { ... }} 这个配置示例中,我们启用了TLSv1.2和TLSv1.3协议,并且指定了支持的加密算法。我们也启用了SSL会话复用,这可以减少SSL握手时间,从而加速连接建立的过程。 请根据你的实际需求和服务器的安全策略来调整这些配置。如果你需要进一步优化SSL性能,可以考虑使用HTTP/2,或者进行SSL/TLS性能分析来找到瓶颈。 3.2.4 WebSocket服务 WebSocket:Nginx也支持WebSocket协议,可用于实时通信应用程序。 在Nginx中使用WebSocket,你需要确保Nginx版本支持WebSocket,一般来说,较新版本的Nginx已经支持WebSocket。 以下是一个配置示例,它将Nginx配置为代理WebSocket请求: http { map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 80; server_name example.com; location / { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } } upstream websocket_backend { server websocket_server_address:port; }} 在这个配置中,我们定义了一个map指令,它根据$http_upgrade变量的值设置$connection_upgrade变量。这样做是为了确保当Nginx收到一个包含Upgrade头部的HTTP请求时,它会将连接升级到Upgrade类型。 然后,在server块中,我们定义了监听端口和服务器名称。在location /块中,我们配置了Nginx来代理WebSocket请求。我们设置了代理的目标(upstream块中定义的服务器),并设置了必要的头部,以确保WebSocket连接可以正确建立。 确保替换example.com、websocket_server_address和port为你的实际域名和WebSocket服务器的IP地址和端口。 3.2.5 访问控制和安全 访问控制和安全:Nginx可以使用访问控制、基于IP地址的访问限制等来提高服务器的安全性,有效保护Web应用程序和服务器。 在Nginx中实现访问控制和安全性,可以通过配置文件来设置。 以下是一些常见的安全性配置示例: 禁止特定IP访问: location / { deny 192.168.1.1; allow all; } 使用Auth Basic进行访问认证: location / { auth_basic "Restricted Content"; auth_basic_user_file /etc/nginx/.htpasswd; } 设置SSL要求,强制使用HTTPS: server { listen 80; return 301 https://$host$request_uri; } server { listen 443 ssl; ssl_certificate /etc/nginx/ssl/nginx.crt; ssl_certificate_key /etc/nginx/ssl/nginx.key; # 其他配置... } 限制请求速率: http { limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s; server { location / { limit_req zone=mylimit burst=5; } } } 设置X-Frame-Options防止点击劫持: add_header X-Frame-Options "SAMEORIGIN"; 设置Content Security Policy(CSP): add_header Content-Security-Policy "default-src 'self' https:; script-src 'self''unsafe-inline' https: 'unsafe-eval';"; 禁止浏览器缓存: add_header Cache-Control "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0"; 这些配置可以根据具体需求进行组合和调整,以增强Nginx服务器的安全性和访问控制。 3.2.6 Api网关服务(综合) Nginx可以作为API网关,通过配置来处理API路由、负载均衡、请求限流、缓存、SSL/TLS终结等。以下是一个简单的Nginx配置示例,用于将API请求代理到后端服务: http { upstream userapi { server user1.example.com; server user2.example.com; } upstream orderapi { server order1.example.com; server order2.example.com; } server { listen 80; location /api/user/ { proxy_pass http://userapi; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location /api/order/ { proxy_pass http://orderapi; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location / { return 404; } }} 在这个配置中: upstream 块定义了一个后端服务器群组,可以通过server指令添加更多后端服务器。 server 块中的 listen 指令设置Nginx监听80端口。 location /api/ 块捕获所有到达/api/路径的请求,并使用proxy_pass指令将请求转发到backend群组。 proxy_set_header 指令用于设置转发给后端服务器的HTTP头部,以确保后端服务可以获取到正确的原始请求信息。 第二个 location / 块捕获所有其他请求,并返回404状态码。 这个配置可以根据具体需求进行扩展,比如添加缓存、请求限流、服务发现、TLS/SSL配置等功能。 3.2.7 代理:正向代理和反向代理 代理服务器一般指代局域网内部的机器通过代理服务发送请求到互联网上的服务器,代理服务器一般作用于客户端。代理服务器是介于客户端和Web服务器之间的服务器,客户端首先与代理服务器创建连接,然后根据代理服务器所使用的代理协议,请求对目标服务器创建连接、或则获得目标服务器的指定资源。 正向代理:为了从原始服务器取的内容,客户端向代理发送一个请求并指定目标(Web服务器),然后代理向Web服务器转交请求并将获得的内容返回给客户端,客户端必须要进行一些特别的设置才能使用正向代理。 像VPN就是正向代理,一般在浏览器中配置代理服务器的相关信息。 正向代理中代理的对象是客户端,代理服务器和客户端属于同一个LAN,对服务器端来说是透明的。 反向代理:客户端发送请求到代理服务器,由代理服务器转发给相应的Web服务器进行处理,最终返回结果给客户端。 像Nginx就是反向代理服务器软件,对客户端暴露的其实是一个VIP,不是真实的Web服务器的IP。 反向代理的是对象是Web服务器端,代理服务器和Web服务端属于同一个LAN,对客户端来说是透明的。 使用反向代理的好处是客户端不需要任何配置就可以访问,对外暴露的是代理服务器的地址隐藏了真实服务器的地址,客户端只需要把请求发送给代理服务器,由代理服务器去选择后端的Web服务器,获取到数据后再返回给客户端。 Nginx配置正向代理的示例(典型的用香港机器访问脸书和谷歌): server { listen 3128; location / { proxy_pass http://$http_host$request_uri; proxy_set_header Host $http_host; proxy_buffers 256 4k; client_max_body_size 10m; client_body_buffer_size 128k; proxy_connect_timeout 300; proxy_send_timeout 300; proxy_read_timeout 300; send_timeout 300; }} Nginx配置反向代理的示例: http { upstream backend { server backend1.example.com; server backend2.example.com; } server { listen 80; location / { proxy_pass http://backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } }} 3.2.8 负载均衡 负载均衡建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。负载均衡(Load Balance)其意思就是分摊到多个操作单元上进行执行,例如Web服务器、FTP服务器、企业关键应用服务器和其它关键任务服务器等,从而共同完成工作任务。 简而言之,单个Web应用服务器不能承受日益增长的并发量请求,因此需要不断扩展web服务器来支撑高并发请求,根据不同的负载均衡策略将请求分配到各个服务器上。 Nginx支持6种不同的负载均衡策略: round_robin轮询:轮询(默认策略),每个请求按时间顺序依次分配至不同的后端服务器。每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉能够被自动剔除。轮询算法适合服务器配置相当,无状态且短平快的服务使用。 weight权重(基于轮询):指定轮询的几率,weight和后端的访问比例成比例,weight权重越高比例越大。通常用于后端服务器配置不均的情况。 ip_hash:上面两种算法存在一个问题是就是无法做到会话保持,当用户登录到服务器上后,第二次请求的时候会被定位到服务器集群中的某一个,那么已经登录到某个服务器上的用户会重新定位到另一台,之前的登录信息会丢失。ip_hash算法可以解决这个问题,当用户再次访问请求时,会通过hash算法自动定位到已经登录的服务器上,这样每个客户端可以固定在某个web服务器上,解决客户端session的问题。 hash $request_uri:uri哈希,根据请求的URI进行哈希计算,相同的URI将会被分配到相同的服务器。 hash $remote_addr:远程地址哈希,根据客户端IP地址进行哈希计算,相同的IP地址将会被分配到相同的服务器。 random:随机,每个请求随机分配至不同的后端服务器。 3.2.9 动静分离 动静分离技术是让动态网站里的动态网页根据一定规则把不变的资源和经常变的资源区分开来,将静态文件放在一个单独的web服务器上,加快解析速度,降低原来单个服务器的压力。在Nginx的配置中,在server{}段中加入带正则匹配的location来指定匹配项针对PHP的动静分离:静态页面交给Nginx处理,动态页面交给PHP-FPM模块或Apache处理。 Nginx 动静分离是一种常见的web服务器优化方法,通过将网站的静态内容(例如图片、CSS、JS等)与动态内容(例如PHP、Python脚本处理的内容)分开存储,可以有效提升网站的性能。 以下是一个简单的Nginx配置示例,实现动静分离: server { listen 80; server_name example.com; # 静态文件目录 location /static/ { alias /path/to/your/static/files/; expires 30d; add_header Cache-Control "public"; try_files $uri $uri/ =404; } # 动态内容处理 location / { include uwsgi_params; uwsgi_pass unix:/path/to/your/uwsgi/socket.sock; uwsgi_param UWSGI_SCRIPT your_app.wsgi; uwsgi_param UWSGI_CHDIR /path/to/your/project; client_max_body_size 32M; }} 在这个配置中: 静态文件位于 /path/to/your/static/files/ 目录下,并且通过 location /static/ 指定。 对于静态文件,设置了缓存时间为30天,并添加了Cache-Control头,以指示客户端和代理服务器缓存文件。 所有其他请求都被转发到uWSGI服务器(通过UNIX socket),以处理Python/Django等应用的动态内容。 确保替换 /path/to/your/、example.com、your_app.wsgi 和项目路径为你的实际路径和应用信息。
一、通信模式介绍 引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。” ---来自百度 ZeroMQ已经支持多种通信模式: Request-reply pattern(请求-回复模式) Publish-subscribe pattern(发布-订阅模式) Pipeline pattern(管道模式) Exclusive pair pattern (独占对等模式) 1-N、N-1 和 N-M消息传递模式 1-N 模式 本质:一个发送者(源)将消息广播或分发给多个接收者。发送者主动推送消息,而接收者被动接收消息。这种模式常用于广播和消息分发。 举例: Pub-Sub(发布-订阅)模式:一个发布者发布消息,多个订阅者接收这些消息。 场景:新闻广播系统 发布者:新闻机构 订阅者:各种新闻应用和用户 N-1 模式 本质:多个发送者将消息发送到一个接收者。接收者主动从多个发送者处拉取消息,而发送者被动推送消息。这种模式适用于任务收集或负载均衡。 举例: Pipeline(Push-Pull)模式:多个任务生成者(Push)将任务推送到一个任务处理器(Pull)。 场景:日志收集系统 任务生成者:各个服务器的日志生成模块 任务处理器:日志分析或存储系统 N-M 模式 本质:多个发送者将消息发送到多个接收者。每个发送者可以向多个接收者发送消息,多个接收者可以从多个发送者处接收消息。这种模式适用于复杂的广播、聚合或分布式任务处理。 举例: Pub-Sub(发布-订阅):多个发布者广播消息到多个订阅者。每个发布者可以广播不同的主题,订阅者可以订阅多个主题。 场景:实时数据流系统 发布者:数据源(传感器、日志系统) 订阅者:多个应用程序(监控仪表盘、分析工具) 1. Request-reply pattern(请求-回复模式) REQ套接字 REQ套接字用于客户端向服务发送请求并接收回复。该套接字类型仅允许发送和随后的接收调用交替进行。REQ套接字可以连接到任意数量的REP或ROUTER套接字。每个发送的请求会在所有连接的服务之间进行轮询,每个接收到的回复与最后发出的请求匹配。它适用于简单的请求-回复模型,其中对失效对等方的可靠性不是问题。 如果没有服务可用,则套接字上的任何发送操作将阻塞,直到至少有一个服务可用。REQ套接字不会丢弃任何消息。 特性总结: 兼容的对等套接字:REP, ROUTER 方向:双向 发送/接收模式:发送,接收,发送,接收,… 出站路由策略:轮询 入站路由策略:最后一个对等方 静默状态下的动作:阻塞 REP套接字 REP套接字用于服务从客户端接收请求并发送回复。该套接字类型仅允许接收和随后的发送调用交替进行。每个接收到的请求从所有客户端中公平排队,每个发送的回复路由到发出最后请求的客户端。如果原始请求者不存在,则回复会被静默丢弃。 特性总结: 兼容的对等套接字:REQ, DEALER 方向:双向 发送/接收模式:接收,发送,接收,发送,… 出站路由策略:公平轮询 入站路由策略:最后一个对等方 DEALER套接字 DEALER套接字类型与一组匿名对等方通信,使用轮询算法发送和接收消息。它是可靠的,因为它不会丢弃消息。DEALER作为REQ的异步替代,适用于与REP或ROUTER服务器通信的客户端。DEALER接收到的消息从所有连接的对等方中公平排队。 当DEALER套接字由于达到所有对等方的高水位标记而进入静默状态,或者如果根本没有对等方,则套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个对等方可用;消息不会被丢弃。 当DEALER套接字连接到REP套接字时,发送的消息必须包含一个空帧作为消息的第一部分(分隔符),后跟一个或多个正文部分。 特性总结: 兼容的对等套接字:ROUTER, REP, DEALER 方向:双向 发送/接收模式:不受限制 出站路由策略:轮询 入站路由策略:公平排队 静默状态下的动作:阻塞 ROUTER套接字 ROUTER套接字类型与一组对等方通信,使用显式地址,以便每个传出的消息发送到特定的对等连接。ROUTER作为REP的异步替代,通常用于与DEALER客户端通信的服务器。 当接收消息时,ROUTER套接字将在消息前添加一个包含原始对等方路由ID的消息部分,然后传递给应用程序。接收到的消息从所有连接的对等方中公平排队。当发送消息时,ROUTER套接字将移除消息的第一部分,并使用它来确定消息应路由到的对等方的路由ID。如果对等方不再存在,或者从未存在,则消息将被静默丢弃。 当ROUTER套接字由于达到所有对等方的高水位标记而进入静默状态时,发送到套接字的任何消息将被丢弃,直到静默状态结束。同样,任何路由到已达到其单个高水位标记的对等方的消息也将被丢弃。 当REQ套接字连接到ROUTER套接字时,除了原始对等方的路由ID,每个接收到的消息应包含一个空分隔符消息部分。因此,应用程序看到的每个接收消息的整个结构变为:一个或多个路由ID部分,分隔符部分,一个或多个正文部分。当发送回复给REQ套接字时,应用程序必须包含分隔符部分。 特性总结: 兼容的对等套接字:DEALER, REQ, ROUTER 方向:双向 发送/接收模式:不受限制 出站路由策略:见正文 入站路由策略:公平排队 静默状态下的动作:丢弃(见正文) 特性详细解释 REQ套接字 1.兼容的对等套接字: REP (Reply)和ROUTER套接字与 REQ 套接字兼容。也就是说,REQ 套接字可以与 REP 套接字和 ROUTER 套接字进行通信。 REP:这是一个典型的服务端套接字,负责接收请求并发送响应。 ROUTER:这是一个更复杂的服务端套接字,能够处理多个客户端,并可以进行更复杂的路由操作。 2.方向: 双向:虽然 REQ 套接字只处理请求和响应的交替操作,但它的通信方向是双向的。即,客户端发送请求,服务端返回响应,整个过程是双向交互的。 3.发送/接收模式: 发送,接收,发送,接收:REQ 套接字的工作模式是交替的。每次发送请求后,必须等待响应才能发送下一个请求。也就是说,发送和接收操作是严格交替进行的,这保证了每个请求都有一个响应对应。 4.出站路由策略: 轮询:REQ 套接字会在所有连接的服务端之间轮询发送请求。假设客户端连接到多个服务端(如 REP 套接字),REQ 套接字会按照轮询的方式将请求发送给各个服务端。这有助于实现负载均衡。 5.入站路由策略: 最后一个对等方:每个发送的请求的响应必须来自于最后一个接收到请求的对等方。这意味着每个请求和其对应的响应是匹配的,并且每个请求只会从一个特定的服务端得到回应。 6.静默状态下的动作: 阻塞:如果 REQ 套接字没有任何服务端可用时,发送操作会阻塞。也就是说,客户端会等待,直到至少有一个服务端可以接受请求。在没有服务端的情况下,请求不会丢失,而是会被阻塞,等待服务端的出现。 2. Publish-subscribe pattern(发布-订阅模式) 发布-订阅模式用于将数据从单个发布者以扇出方式分发到多个订阅者。 发布-订阅模式由RFC 29/PUBSUB正式定义。 ZeroMQ通过四种套接字类型支持发布/订阅: PUB 套接字类型 XPUB 套接字类型 SUB 套接字类型 XSUB 套接字类型 PUB 套接字 PUB套接字用于发布者分发数据。发送的消息以扇出方式分发给所有连接的对等方。此套接字类型不能接收任何消息。 当PUB套接字由于达到订阅者的高水位标记而进入静默状态时,将丢弃发送给该订阅者的消息,直到静默状态结束。发送功能永远不会阻塞此套接字类型。 特性总结: 兼容的对等套接字:SUB, XSUB 方向:单向 发送/接收模式:仅发送 入站路由策略:不适用 出站路由策略:扇出 静默状态下的动作:丢弃 SUB 套接字 SUB套接字用于订阅者订阅发布者分发的数据。最初SUB套接字不订阅任何消息。此套接字类型没有实现发送功能。 特性总结: 兼容的对等套接字:PUB, XPUB 方向:单向 发送/接收模式:仅接收 入站路由策略:公平排队 出站路由策略:不适用 XPUB 套接字 与PUB套接字相同,不同之处在于可以以传入消息的形式接收来自对等方的订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也会被接收,但对订阅状态没有影响。 特性总结: 兼容的对等套接字:ZMQ_SUB, ZMQ_XSUB 方向:单向 发送/接收模式:发送消息,接收订阅 入站路由策略:不适用 出站路由策略:扇出 静默状态下的动作:丢弃 XSUB 套接字 与SUB套接字相同,不同之处在于通过向套接字发送订阅消息来订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也可以发送,但对订阅状态没有影响。 特性总结: 兼容的对等套接字:ZMQ_PUB, ZMQ_XPUB 方向:单向 发送/接收模式:接收消息,发送订阅 入站路由策略:公平排队 出站路由策略:不适用 静默状态下的动作:丢弃 3. Pipeline pattern(管道模式) 管道模式用于任务分发,通常在多阶段流水线中,其中一个或少数节点将工作推送给许多工作节点,然后它们再将结果推送给一个或少数收集节点。这个模式主要是可靠的,只要节点不意外断开连接,消息就不会被丢弃。它具有可扩展性,节点可以随时加入。 管道模式由RFC 30/PIPELINE正式定义。 ZeroMQ通过两种套接字类型支持管道模式: PUSH 套接字类型 PULL 套接字类型 PUSH 套接字 PUSH套接字类型与一组匿名的PULL对等方通信,使用轮询算法发送消息。此套接字类型没有实现接收操作。 当PUSH套接字由于达到所有下游节点的高水位标记而进入静默状态,或者根本没有下游节点时,套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个下游节点可用进行发送;消息不会被丢弃。 特性总结: 兼容的对等套接字:PULL 方向:单向 发送/接收模式:仅发送 入站路由策略:不适用 出站路由策略:轮询 静默状态下的动作:阻塞 PULL 套接字 PULL套接字类型与一组匿名的PUSH对等方通信,使用公平排队算法接收消息。 此套接字类型没有实现发送操作。 特性总结: 兼容的对等套接字:PUSH 方向:单向 发送/接收模式:仅接收 入站路由策略:公平排队 出站路由策略:不适用 静默状态下的动作:阻塞 小结:在管道模式中,PUSH套接字负责将任务分发给多个PULL套接字,PULL套接字则负责接收这些任务。该模式通过轮询和公平排队算法确保任务的有效分配和处理,并且具备高可靠性和可扩展性,是实现任务分发和工作负载均衡的有效方式。 4. Exclusive pair pattern (独占 PAIR 模式) 独占对等模式 PAIR套接字不是通用套接字,而是用于特定用例,其中两个对等方在架构上是稳定的。这通常将PAIR限制在单个进程内,用于线程间通信。 独占对等模式由RFC 31/EXPAIR正式定义。 PAIR 套接字 PAIR套接字类型只能与一个对等方建立连接。消息在PAIR套接字上发送时不会进行路由或过滤。 当PAIR套接字由于达到连接对等方的高水位标记而进入静默状态,或者没有对等方连接时,套接字上的任何发送操作将阻塞,直到对等方变得可用进行发送;消息不会被丢弃。 尽管PAIR套接字可以通过除inproc之外的其他传输方式使用,但由于它们无法自动重连,并且在存在任何先前连接(包括处于关闭状态的连接)时,新入站连接将被终止,因此在大多数情况下,它们不适用于TCP。 小结:PAIR套接字专用于架构上稳定的两个对等方之间的通信,通常用于单个进程内的线程间通信。它只能与一个对等方连接,并且不支持消息路由或过滤。当达到高水位标记时,发送操作会阻塞,直到连接恢复。由于缺乏自动重连功能和处理新连接的限制,PAIR套接字不适合用于TCP连接。 二、请求-响应模式实现 请求响应模式是通信中最简单和基础的模式,ZeroMQ同样支持这个模式。 请求响应基础 请求-响应模式是计算机科学和网络通信中一种常见的通信模式。这个模式通常涉及两个主要角色:客户端和服务器。 基本概念 客户端: 向服务器发送请求的实体。它可以是浏览器、应用程序或任何其他发起通信的设备或程序。 服务器: 接收客户端请求并返回响应的实体。它通常是一个提供服务、资源或数据的程序或设备。 工作流程 客户端发起请求: 客户端构造一个请求消息,通常包含请求的类型(如 GET、POST)、请求的资源(如网页、API端点)、以及可能的附加数据(如表单数据)。 服务器处理请求: 服务器接收到请求后,解析请求内容,根据请求的类型和资源进行处理。处理可能包括访问数据库、执行计算或调用其他服务。 服务器返回响应: 处理完成后,服务器生成一个响应消息,通常包含状态码(如200表示成功、404表示资源未找到)、响应的内容(如网页内容、数据结果)以及其他信息(如响应时间、服务器信息)。 客户端接收响应: 客户端接收到响应后,解析响应内容并根据需要展示或处理这些数据。 典型应用 网页浏览: 当你在浏览器中输入网址并按下回车时,浏览器(客户端)会向服务器发出一个请求,服务器会返回网页内容作为响应。 API调用: 在应用程序中调用API时,客户端发送请求(如获取数据、提交表单),服务器处理请求并返回结果。 请求-响应模式的特点 同步通信: 通常情况下,请求-响应模式是同步的,即客户端发送请求后会等待服务器响应完成后才继续执行后续操作。 单向通信: 这种模式是一种单向通信,客户端请求数据,服务器响应数据,但服务器不会主动向客户端发送消息(除非使用长轮询或WebSocket等技术)。 简单直观: 由于其简单的结构和流程,很多网络协议和应用程序设计都是基于这种模式的。 应用实例 HTTP/HTTPS: 用于Web浏览和API交互的协议,客户端(浏览器或应用)发送HTTP请求,服务器返回HTTP响应。 REST API: 基于HTTP协议的API风格,允许客户端通过标准的HTTP请求(如GET、POST、PUT、DELETE)与服务器进行交互。 优点 易于理解和实现: 请求-响应模式简洁明了,易于理解和实现。 兼容性强: 许多网络协议和技术都基于这种模式,因此具有良好的兼容性。 缺点 延迟问题: 在网络不稳定的情况下,请求和响应的延迟可能影响用户体验。 同步阻塞: 客户端通常需要等待响应完成才能继续执行,可能导致性能瓶颈。 总的来说,请求-响应模式是现代计算和通信中的基础构建块,为各种网络应用和服务提供了一个标准化的通信方式。 ZEROMQ C语言 在 C 语言中使用 ZeroMQ 实现请求-回复模式(Request-Reply Pattern)涉及创建一个请求端和一个回复端,通过 ZeroMQ 套接字进行通信。 服务器端代码(Reply Server) 下面是使用 C 语言编写的 ZeroMQ 请求-回复模式的示例代码。我们将分别实现一个服务器端(Reply Server)和一个客户端(Request Client)。 #include #include #include #include int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new(); // 创建 REP (reply) 套接字 void *responder = zmq_socket(context, ZMQ_REP); // 将套接字绑定到端口 zmq_bind(responder, "tcp://*:5555"); while (1) { // 接收请求 char buffer[256]; zmq_recv(responder, buffer, 255, 0); printf("Received request: %s\n", buffer); // 发送回复 const char *reply = "World"; zmq_send(responder, reply, strlen(reply), 0); } // 清理资源 zmq_close(responder); zmq_ctx_destroy(context); return 0;} 客户端代码(Request Client) #include #include #include int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new(); // 创建 REQ (request) 套接字 void *requester = zmq_socket(context, ZMQ_REQ); // 连接到服务器 zmq_connect(requester, "tcp://localhost:5555"); // 发送请求 const char *request = "Hello"; zmq_send(requester, request, strlen(request), 0); // 接收回复 char buffer[256]; zmq_recv(requester, buffer, 255, 0); buffer[255] = '\0'; // 确保字符串以 null 结尾 printf("Received reply: %s\n", buffer); // 清理资源 zmq_close(requester); zmq_ctx_destroy(context); return 0;} 编译代码 编译上述代码时,需要链接 ZeroMQ 库。下面是使用 gcc 编译器的示例命令: gcc -o server server.c -lzmqgcc -o client client.c -lzmq 详细说明 ZeroMQ Context: zmq_ctx_new() 用于创建 ZeroMQ 上下文,管理所有的套接字和连接。每个应用程序应该有一个上下文对象。 Sockets: 使用 zmq_socket() 创建套接字。请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。 Bind 和 Connect: zmq_bind() 绑定服务器端套接字到指定的地址和端口。 zmq_connect() 连接客户端套接字到服务器端的地址和端口。 消息传递: 使用 zmq_send() 发送消息。 使用 zmq_recv() 接收消息。 资源清理: zmq_close() 关闭套接字。 zmq_ctx_destroy() 销毁上下文,释放相关资源。 ZEROMQ C++ 安装 ZeroMQ C++ Bindings (cppzmq) ZeroMQ 的 C++ 绑定库 cppzmq 提供了对 ZeroMQ 的 C++ 封装。你可以通过包管理工具或者从 GitHub 下载源代码来安装它。 使用 vcpkg vcpkg install cppzmq 从 GitHub 安装 git clone https://github.com/zeromq/cppzmq.git 编写代码 下面是一个简单的示例,展示了如何在 C++ 中使用 ZeroMQ 实现请求-回复模式。我们将分别编写一个服务器(Reply Server)和一个客户端(Request Client)。 服务器端代码(Reply Server) #include #include #include int main() { // Initialize ZeroMQ context zmq::context_t context(1); // Create a REP (reply) socket zmq::socket_t socket(context, ZMQ_REP); // Bind the socket to an endpoint (address:port) socket.bind("tcp://*:5555"); while (true) { // Receive a request zmq::message_t request; socket.recv(request); std::string request_str(static_cast<char*>(request.data()), request.size()); std::cout << "Received request: " << request_str << std::endl; // Send a reply std::string reply_str = "World"; zmq::message_t reply(reply_str.size()); memcpy(reply.data(), reply_str.data(), reply_str.size()); socket.send(reply); } return 0;} 客户端代码(Request Client) #include #include #include int main() { // Initialize ZeroMQ context zmq::context_t context(1); // Create a REQ (request) socket zmq::socket_t socket(context, ZMQ_REQ); // Connect to the server (address:port) socket.connect("tcp://localhost:5555"); // Send a request std::string request_str = "Hello"; zmq::message_t request(request_str.size()); memcpy(request.data(), request_str.data(), request_str.size()); socket.send(request); // Receive a reply zmq::message_t reply; socket.recv(reply); std::string reply_str(static_cast<char*>(reply.data()), reply.size()); std::cout << "Received reply: " << reply_str << std::endl; return 0;} 解释 ZeroMQ Context: 这是 ZeroMQ 的基础对象,负责管理所有的套接字和连接。每个线程应该有一个唯一的上下文对象。 Sockets: ZeroMQ 的套接字对象用于发送和接收消息。在请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。 Bind 和 Connect: 服务器使用 bind 将套接字绑定到一个特定的地址和端口,客户端使用 connect 连接到该地址和端口。 消息传递: 使用 send 和 recv 方法来传递消息。消息是通过 zmq::message_t 对象来表示的。 小结 使用 ZeroMQ 实现请求-响应模式可以带来显著的性能提升和灵活性。它不仅支持高性能的消息传递,还提供了丰富的特性,如自动重连、负载均衡、多语言支持等,使得它成为构建高性能、可靠的分布式系统和微服务架构的理想选择。 三、发布-订阅模式实现 发布订阅模式是典型的异步模式,通过ZeroMq来看看他的原理与实现。 简称 Pub-Sub,是一种消息传递模式。 允许发送者(发布者)和接收者(订阅者)之间解耦。 它广泛应用于消息队列、事件驱动系统和实时通知等场景。 基本原理 参与者: 发布者(Publisher):发送消息的实体。 订阅者(Subscriber):接收消息的实体。 消息代理(Message Broker):中介实体,负责接收发布者的消息并分发给相应的订阅者。 主题(Topic): 消息根据主题分类,订阅者订阅一个或多个主题,发布者将消息发布到特定主题。 消息传递: 发布者将消息发送到消息代理,并指定消息的主题。 消息代理根据主题将消息分发给所有订阅了该主题的订阅者。 工作流程 订阅(Subscribe): 订阅者向消息代理注册自己对某个主题的兴趣。 订阅者可以订阅多个主题。 发布(Publish): 发布者向消息代理发送消息,并指定消息的主题。 分发(Distribute): 消息代理接收到消息后,根据主题查找所有订阅了该主题的订阅者。 消息代理将消息分发给所有符合条件的订阅者。 举例说明 假设有一个天气预报系统: 发布者:天气预报服务 订阅者:用户手机应用、网页应用等 主题:不同城市的天气(如“北京天气”、“上海天气”) 用户 A 通过手机应用订阅了“北京天气”主题。 用户 B 通过网页应用订阅了“上海天气”主题。 天气预报服务发布了一条“北京天气”的消息到消息代理。 消息代理接收到消息后,将其分发给所有订阅了“北京天气”主题的用户应用(如用户 A 的手机应用)。 优点 解耦:发布者和订阅者不直接交互,彼此独立。 扩展性:可以方便地增加或减少订阅者,不影响发布者。 灵活性:可以动态改变订阅关系,适应不同需求。 缺点 复杂性:需要维护消息代理和订阅关系,增加系统复杂性。 可靠性:消息传递的可靠性需要额外保障,如消息丢失和重复的问题。 C代码实现天气预报系统 #include #include #include #include #include #include const char* cities[] = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const char* weather_conditions[] = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息void generate_weather_update(char* buffer, size_t buffer_size, const char* city) { int temp = rand() % 35; // 随机温度 const char* condition = weather_conditions[rand() % 10]; // 随机天气情况 snprintf(buffer, buffer_size, "%s天气 %s %d°C", city, condition, temp);} // 发布天气更新消息DWORD WINAPI publish_weather_update(LPVOID arg) { void* context = arg; const char* address = "tcp://*:5556"; // 创建发布者套接字 void* publisher = zmq_socket(context, ZMQ_PUB); zmq_bind(publisher, address); srand((unsigned)time(NULL)); while (1) { // 随机选择城市并生成天气更新 for (int i = 0; i < 10; ++i) { char update[256]; generate_weather_update(update, sizeof(update), cities[i]); zmq_send(publisher, update, strlen(update), 0); printf("发布消息: %s\n", update); } // 模拟发布间隔 Sleep(10000); // Windows 中使用 Sleep } // 关闭套接字 zmq_close(publisher); return 0;} // 订阅天气更新消息DWORD WINAPI subscribe_weather_updates(LPVOID arg) { void* context = zmq_ctx_new(); const char* city = (char*)arg; const char* address = "tcp://localhost:5556"; // 创建订阅者套接字 void* subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, address); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, city, strlen(city)); while (1) { // 接收消息 char update[256]; int size = zmq_recv(subscriber, update, 255, 0); if (size != -1) { update[size] = '\0'; printf("接收消息 (%s): %s\n", city, update); } } // 关闭套接字 zmq_close(subscriber); zmq_ctx_destroy(context); return 0;} int main() { void* context = zmq_ctx_new(); // 创建发布者线程 HANDLE publisher_thread = CreateThread(NULL, 0, publish_weather_update, context, 0, NULL); // 创建10个订阅者线程 HANDLE subscriber_threads[10]; for (int i = 0; i < 10; ++i) { subscriber_threads[i] = CreateThread(NULL, 0, subscribe_weather_updates, (LPVOID)cities[i], 0, NULL); } // 等待线程完成 WaitForSingleObject(publisher_thread, INFINITE); for (int i = 0; i < 10; ++i) { WaitForSingleObject(subscriber_threads[i], INFINITE); CloseHandle(subscriber_threads[i]); } // 关闭线程句柄 CloseHandle(publisher_thread); zmq_ctx_destroy(context); return 0;} 说明 发布者线程:publish_weather_update 函数在一个独立线程中运行,发布10个城市的天气预报消息,天气信息随机变化。 订阅者线程:subscribe_weather_updates 函数在10个独立线程中运行,每个线程订阅一个特定城市的天气预报消息。 主程序:在主程序中创建并启动发布者和10个订阅者线程,然后等待它们完成。 C++代码 #include #include #include #include #include #include #include #include const std::vector<std::string> cities = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const std::vector<std::string> weather_conditions = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息std::string generate_weather_update(const std::string& city) { int temp = rand() % 35; // 随机温度 const std::string& condition = weather_conditions[rand() % weather_conditions.size()]; // 随机天气情况 return city + "天气 " + condition + " " + std::to_string(temp) + "°C";} // 发布天气更新消息void publish_weather_update(zmq::context_t& context) { zmq::socket_t publisher(context, ZMQ_PUB); publisher.bind("tcp://*:5556"); srand(static_cast<unsigned>(time(nullptr))); while (true) { for (const auto& city : cities) { std::string update = generate_weather_update(city); zmq::message_t message(update.begin(), update.end()); publisher.send(message, zmq::send_flags::none); std::cout << "发布消息: " << update << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1)); }} // 订阅天气更新消息void subscribe_weather_updates(const std::string& city) { zmq::context_t context(1); zmq::socket_t subscriber(context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); subscriber.set(zmq::sockopt::subscribe, city); while (true) { zmq::message_t message; subscriber.recv(message, zmq::recv_flags::none); std::string update(static_cast<char*>(message.data()), message.size()); std::cout << "接收消息 (" << city << "): " << update << std::endl; }} int main() { zmq::context_t context(1); // 创建发布者线程 std::thread publisher_thread(publish_weather_update, std::ref(context)); // 创建10个订阅者线程 std::vector<std::thread> subscriber_threads; for (const auto& city : cities) { subscriber_threads.emplace_back(subscribe_weather_updates, city); } // 等待线程完成 publisher_thread.join(); for (auto& thread : subscriber_threads) { thread.join(); } return 0;} 其他场景举例 1. 金融市场数据分发 应用场景: 股票市场:股票价格、交易量等市场数据的实时分发。 外汇市场:汇率变动、交易量等数据的实时更新。 详细举例: Bloomberg 和 Reuters 等金融信息服务商使用发布-订阅模式来分发实时的市场数据给订阅者,这些订阅者可能是金融机构、投资者等。数据包括股票报价、外汇汇率、商品价格等。 交易平台:交易所使用发布-订阅系统向交易参与者推送交易数据和市场信息。 2. 日志和监控系统 应用场景: 服务器日志:实时收集和分析分布式系统的日志数据。 系统监控:监控系统健康状态、资源使用等。 详细举例: Elasticsearch, Logstash, Kibana (ELK) Stack:使用发布-订阅模式在分布式系统中收集和分析日志数据。Logstash 作为数据收集器,可以接收来自不同源的日志并发布到 Elasticsearch,Kibana 订阅这些日志并进行可视化展示。 Prometheus 和 Grafana:Prometheus 收集系统和应用的监控数据,Grafana 订阅这些数据并生成实时的监控仪表板。 3. 社交媒体和消息通知 应用场景: 消息推送:向用户实时推送消息、通知。 活动流:发布和订阅用户活动、帖子、评论等。 详细举例: Facebook 和 Twitter 等社交媒体平台使用发布-订阅模式来处理用户的状态更新、评论、点赞等事件。用户可以订阅朋友或关注的人的动态。 即时通讯应用:如 Slack 和 WhatsApp,用户接收消息的过程是通过发布-订阅模式实现的,确保消息的即时性和可靠性。 4. 物联网(IoT) 应用场景: 设备状态监控:实时监控和控制物联网设备。 传感器数据收集:从传感器收集数据并实时处理。 详细举例: 智能家居:智能家居系统中的传感器(如温度、湿度传感器)使用发布-订阅模式将数据发送到中央控制系统,用户可以通过应用程序订阅这些数据并进行监控和控制。 工业物联网:在工业环境中,机器和设备通过发布-订阅模式报告运行状态和故障信息,管理系统订阅这些信息进行实时监控和预警。 5. 分布式系统和微服务架构 应用场景: 服务通信:微服务之间的通信和协调。 事件驱动架构:基于事件的系统设计。 详细举例: Apache Kafka:广泛用于构建分布式系统和微服务架构中的事件流处理。不同的微服务可以发布和订阅事件,确保系统的松耦合和高扩展性。 Amazon Web Services (AWS) SNS:Simple Notification Service (SNS) 是一个托管的发布-订阅服务,用于将消息从一个应用程序发送到多个订阅者,常用于触发基于事件的处理流程。 6. 在线游戏 应用场景: 游戏状态更新:实时同步游戏状态和玩家操作。 聊天系统:游戏内聊天消息的实时分发。 详细举例: 多人在线游戏(MMO):如《魔兽世界》,使用发布-订阅模式在服务器和客户端之间同步游戏状态和玩家操作。游戏服务器发布玩家动作和状态,其他玩家的客户端订阅这些消息以保持同步。 在线棋牌类游戏:如《炉石传说》,游戏服务器发布牌局状态和玩家操作,玩家客户端订阅这些消息以实时更新游戏界面。 7. 新闻和内容分发 应用场景: 新闻推送:实时向用户推送新闻和内容更新。 订阅服务:用户订阅特定主题或作者的内容更新。 详细举例: RSS Feeds:使用发布-订阅模式向用户提供新闻和博客更新。用户订阅感兴趣的内容源,新的内容发布时会自动推送到用户的阅读器。 内容聚合平台:如 Flipboard 和 Feedly,用户订阅不同的内容源,平台通过发布-订阅模式实时获取和推送内容更新。 这些实际应用展示了发布-订阅模式在各种领域中的广泛应用,利用这一模式可以实现系统的解耦、扩展和实时性。 四、管道模式实现 ZeroMQ 的管道模式通过简单而高效的PUSH和PULL套接字实现了负载均衡和任务分发,适用于分布式任务处理和数据处理流水线等场景。其自动负载均衡和高可扩展性使得它成为构建高效、可扩展分布式系统的理想选择。 基础原理介绍 ZeroMQ 的管道模式是一种特殊的消息传递模式,通常用于将任务或消息从一个生产者传递到多个消费者。其主要特性是: 消息一对多传递:单个生产者可以将消息发送到多个消费者。 负载均衡:消息在消费者之间进行负载均衡,以确保每个消费者都能接收到均衡的消息量。 下游消费者处理:消费者处理消息,并且可以将处理后的消息传递给下游消费者,形成消息处理链。 在 ZeroMQ 中,管道模式由PUSH和PULL套接字类型实现: PUSH 套接字:生产者使用 PUSH 套接字发送消息。消息按照负载均衡的方式发送到连接的 PULL 套接字。 PULL 套接字:消费者使用 PULL 套接字接收消息。每个 PULL 套接字接收到的消息数量大致相等。 内部原理 PUSH 套接字 特性:PUSH 套接字负责将消息发送给一个或多个连接的 PULL 套接字。 消息分发:当有多个 PULL 套接字连接到一个 PUSH 套接字时,PUSH 套接字会按照循环(round-robin)的方式将消息分发给每个 PULL 套接字。这意味着每个消息都会发送给下一个连接的 PULL 套接字,直到轮回到第一个 PULL 套接字。 PULL 套接字 特性:PULL 套接字负责接收从 PUSH 套接字发送来的消息。 消息接收:每个 PULL 套接字只会接收发送给它的消息,不会与其他 PULL 套接字共享消息。 实现负载均衡的方法 1.循环分发(Round-Robin Dispatching): PUSH 套接字在有多个 PULL 套接字连接时,按照循环分发的方式将消息逐个发送给每个 PULL 套接字。 这种方式确保了所有连接的 PULL 套接字能够接收到大致相同数量的消息,从而实现了负载均衡。 2.消息队列: 每个 PULL 套接字维护一个接收队列,PUSH 套接字发送的消息会被分发到这些接收队列中。 PULL 套接字从其接收队列中取出消息进行处理,避免了消息丢失或重复处理的情况。 Round-Robin 原理 Round-robin 是一种简单而常用的调度算法,广泛应用于任务调度、资源分配和网络通信中。在 round-robin 调度中,资源(如 CPU 时间、网络带宽或消息)按顺序分配给各个请求者,循环地进行,确保每个请求者都能公平地获得资源。 Round-Robin 在 ZeroMQ 的应用 在 ZeroMQ 中,PUSH 套接字使用 round-robin 算法将消息分发给多个连接的 PULL 套接字。具体工作原理如下: 1.初始化: 当一个 PUSH 套接字被创建并绑定到一个端口时,它开始监听来自 PULL 套接字的连接。 当 PULL 套接字连接到 PUSH 套接字时,PUSH 套接字维护一个内部队列,记录所有连接的 PULL 套接字。 2.消息分发: 每当 PUSH 套接字发送一条消息时,它会按照 round-robin 的顺序选择下一个 PULL 套接字,将消息发送给它。 这种分发方式确保每个连接的 PULL 套接字接收到的消息数量大致相同。 3.循环: 当所有的 PULL 套接字都接收过一次消息后,PUSH 套接字重新开始,从第一个 PULL 套接字继续发送消息。 Round-Robin 调度算法的优点 公平性:Round-robin 算法确保每个连接的 PULL 套接字都能公平地接收到消息,没有任何一个 PULL 套接字会被饿死。 简单性:该算法非常简单,不需要复杂的计算和维护,只需要一个循环计数器即可实现。 负载均衡:由于消息均匀地分布到各个 PULL 套接字,系统能够实现负载均衡,防止某个 PULL 套接字过载。 Round-robin算法: #include #define WORKER_COUNT 5#define TASK_COUNT 20 int main() { int workers[WORKER_COUNT] = {0}; // 存储每个工作者处理的任务数量 int tasks[TASK_COUNT]; // 初始化任务 for (int i = 0; i < TASK_COUNT; i++) { tasks[i] = i + 1; // 每个任务用一个整数表示 } // Round-Robin 分配任务 for (int i = 0; i < TASK_COUNT; i++) { int worker_id = i % WORKER_COUNT; // 使用模运算实现循环分配 workers[worker_id]++; printf("任务 %d 分配给工作者 %d\n", tasks[i], worker_id); } // 输出每个工作者处理的任务数量 for (int i = 0; i < WORKER_COUNT; i++) { printf("工作者 %d 处理了 %d 个任务\n", i, workers[i]); } return 0;} 主要用途 管道模式在以下场景中非常有用: 任务分发:将任务从生产者分发到多个工作者进行处理。例如,分布式计算中的任务调度。 负载均衡:在多个工作者之间平衡负载,确保每个工作者处理的任务量大致相同。 数据处理流水线:将数据流从一个阶段传递到下一个阶段,每个阶段由不同的消费者处理。 优势 简单且高效:管道模式通过简单的 PUSH 和 PULL 套接字实现高效的消息传递和负载均衡。 自动负载均衡:消息在多个消费者之间自动均衡分配,无需手动干预。 高可扩展性:可以轻松增加或减少消费者,调整系统的处理能力。 灵活性:可以形成复杂的消息处理流水线,适用于多种分布式任务处理场景。 实际场景 1.ELK Stack 中的 Logstash: 用途:集中收集和处理分布式系统的日志。 工作原理:Logstash 从不同来源接收日志数据,处理后推送到 Elasticsearch。通过管道模式,多个 Logstash 实例可以同时处理日志数据,并将处理后的数据均衡分发到 Elasticsearch 节点。 2.视频处理流水线: 用途:在视频直播或视频编辑场景中,视频帧需要经过多个处理阶段,如解码、特效处理、编码等。 工作原理:视频帧通过管道模式从一个处理单元推送到下一个处理单元。每个处理单元可以并行处理不同的视频帧,最终生成处理后的视频流。 3.分布式交易系统: 用途:在金融市场中,实现高频交易和低延迟的数据传输。 工作原理:交易指令和市场数据通过管道模式在交易系统的各个节点间传递,确保数据的快速分发和处理。每个交易节点可以并行处理接收到的数据,优化交易性能和响应速度。 与发布订阅的区别 发布-订阅模式 基础原理 参与者:发布者(Publisher)和订阅者(Subscriber)。 主题:消息通过主题(Topic)进行分类,订阅者可以选择感兴趣的主题。 消息分发:发布者将消息发布到特定主题,消息代理将消息分发给所有订阅了该主题的订阅者。 主要特点 多对多:一个发布者可以有多个订阅者,多个发布者也可以有多个订阅者。 去耦合:发布者和订阅者彼此之间是解耦的,通过消息代理进行通信。 灵活的订阅机制:订阅者可以动态订阅或取消订阅主题。 应用场景 通知系统:如新闻推送、股票行情更新。 事件驱动架构:系统组件之间通过事件进行松耦合通信。 示例 消息队列系统:如 Apache Kafka、RabbitMQ 等。 实时通知系统:如 Slack、Twitter 等。 管道模式 基础原理 参与者:推送者(PUSH)和拉取者(PULL)。 消息流动:消息从推送者流向拉取者。 负载均衡:消息在拉取者之间进行负载均衡,确保每个拉取者接收到的消息量大致相同。 主要特点 一对多:一个推送者可以有多个拉取者,但一个消息只能被一个拉取者处理。 负载均衡:自动将消息均衡地分发给多个拉取者。 顺序处理:消息按照推送的顺序被处理,适用于流水线处理。 应用场景 任务分发系统:将任务分发给多个工作者进行处理。 流水线处理:如视频处理、数据处理流水线。 示例 分布式任务调度系统:如高性能计算任务调度。 日志处理系统:如 ELK Stack 中的 Logstash。 主要区别 1.消息分发机制: 发布-订阅模式:一个消息可以被多个订阅者接收。消息通过主题进行分类,订阅者可以根据兴趣选择订阅特定主题。 管道模式:一个消息只能被一个拉取者接收。消息从推送者传递到拉取者,并在拉取者之间进行负载均衡。 2.使用场景: 发布-订阅模式:适用于通知系统、事件驱动架构等需要广播消息的场景。 管道模式:适用于任务分发、流水线处理等需要消息均衡处理的场景。 3.参与者关系: 发布-订阅模式:发布者和订阅者之间没有直接联系,通过消息代理实现解耦。 管道模式:推送者和拉取者之间有直接联系,消息直接从推送者传递到拉取者。 为什么会觉得相似 两者都是消息传递模式,解决的是分布式系统中的通信问题。它们的相似之处在于: 都是基于消息的异步通信:两者都通过消息实现参与者之间的异步通信,解耦发送方和接收方。 都可以实现多对多的消息传递:尽管实现方式不同,发布-订阅模式通过主题,管道模式通过多个拉取者,都可以实现消息的多对多传递。 然而,它们在消息分发的具体机制和应用场景上有显著区别,选择哪种模式取决于具体的应用需求。 任务发放C代码 #include #include #include #include #include #include #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量int task_count[WORKER_COUNT] = { 0 };HANDLE mutex;volatile int keep_running = 1; // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数DWORD WINAPI producer_thread(LPVOID arg) { void* context = zmq_ctx_new(); void* producer = zmq_socket(context, ZMQ_PUSH); zmq_bind(producer, "tcp://*:5557"); srand((unsigned int)time(NULL)); for (int i = 0; i < TASK_COUNT; ++i) { char message[10]; int workload = rand() % 100; // 随机生成任务负载 snprintf(message, sizeof(message), "%d", workload); zmq_send(producer, message, strlen(message), 0); printf("发送任务: %s\n", message); Sleep(10); // 模拟任务生成间隔 } zmq_close(producer); zmq_ctx_destroy(context); keep_running = 0; // 停止任务处理者线程 return 0;} // 任务处理者(消费者)函数DWORD WINAPI consumer_thread(LPVOID arg) { int worker_id = (int)arg; void* context = zmq_ctx_new(); void* consumer = zmq_socket(context, ZMQ_PULL); zmq_connect(consumer, "tcp://localhost:5557"); while (keep_running || zmq_recv(consumer, NULL, 0, ZMQ_DONTWAIT) != -1) { char message[10]; int size = zmq_recv(consumer, message, sizeof(message) - 1, ZMQ_DONTWAIT); if (size != -1) { message[size] = '\0'; int workload = atoi(message); printf("Worker %d 接收任务: %s\n", worker_id, message); Sleep(workload); // 模拟处理任务 // 更新任务计数器 WaitForSingleObject(mutex, INFINITE); task_count[worker_id]++; ReleaseMutex(mutex); } } zmq_close(consumer); zmq_ctx_destroy(context); return 0;} int main() { mutex = CreateMutex(NULL, FALSE, NULL); // 创建任务发布者线程 HANDLE producer = CreateThread(NULL, 0, producer_thread, NULL, 0, NULL); // 创建 5 个任务处理者线程 HANDLE consumers[WORKER_COUNT]; for (int i = 0; i < WORKER_COUNT; ++i) { consumers[i] = CreateThread(NULL, 0, consumer_thread, (LPVOID)i, 0, NULL); } // 等待任务发布者线程完成 WaitForSingleObject(producer, INFINITE); CloseHandle(producer); // 等待所有任务处理者线程完成 WaitForMultipleObjects(WORKER_COUNT, consumers, TRUE, INFINITE); for (int i = 0; i < WORKER_COUNT; ++i) { CloseHandle(consumers[i]); } printf("#########################################################################"); // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { printf("Worker %d 处理了 %d 个任务\n", i, task_count[i]); } CloseHandle(mutex); return 0;} 代码说明 1.全局变量: task_count[WORKER_COUNT]:用于存储每个消费者处理的任务数量。 mutex:用于保护对 task_count 数组的访问,确保线程安全。 volatile int keep_running:用于控制任务处理者线程的运行。 2.任务发布者线程: 创建一个 ZeroMQ 上下文和一个 PUSH 套接字。 绑定 PUSH 套接字到 TCP 端口 5557。 生成 1000个随机任务,并通过 PUSH 套接字发送这些任务。 在完成任务发送后,设置 keep_running 为 0,通知任务处理者线程可以停止。 3.任务处理者线程: 创建一个 ZeroMQ 上下文和一个 PULL 套接字。 连接 PULL 套接字到任务发布者的地址(TCP 端口 5557)。 进入无限循环,接收任务消息并模拟处理这些任务。 处理完任务后更新任务计数器,并打印当前处理者处理的任务总数。 当 keep_running 为 0 并且没有待处理的消息时,退出循环。 4.主程序: 创建一个任务发布者线程。 创建 5 个任务处理者线程。 等待任务发布者线程完成后关闭其句柄。 等待所有任务处理者线程完成后关闭各自的句柄。 输出每个处理者处理的任务个数。 该程序将启动一个任务发布者线程和 100个任务处理者线程,发布者每隔 10 毫秒发送一个任务,任务处理者实时接收并处理这些任务。每个任务处理者线程独立运行,并且可以同时处理不同的任务,从而实现了任务的并行处理和负载均衡。程序结束时,将输出每个处理者处理的任务个数,从而可以清晰地展示负载均衡的效果。 任务发放C++ #include #include #include #include #include #include #include #include #include #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量std::vector<int> task_count(WORKER_COUNT, 0);std::mutex mtx;std::atomic<bool> keep_running(true); // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数void producer_thread() { zmq::context_t context(1); zmq::socket_t producer(context, ZMQ_PUSH); producer.bind("tcp://*:5557"); srand(static_cast<unsigned int>(time(NULL))); for (int i = 0; i < TASK_COUNT; ++i) { int workload = rand() % 100; // 随机生成任务负载 std::string message = std::to_string(workload); zmq::message_t zmq_message(message.data(), message.size()); producer.send(zmq_message, zmq::send_flags::none); std::cout << "发送任务: " << message << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟任务生成间隔 } keep_running = false; // 停止任务处理者线程} // 任务处理者(消费者)函数void consumer_thread(int worker_id) { zmq::context_t context(1); zmq::socket_t consumer(context, ZMQ_PULL); consumer.connect("tcp://localhost:5557"); while (keep_running) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (result) { std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务 // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 短暂休眠以避免空转 } } // 当 keep_running 为 false 后,确保处理完所有剩余消息 while (true) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (!result) { break; // 无消息可接收时退出 } std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务 // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } }} int main() { // 创建任务发布者线程 std::thread producer(producer_thread); // 创建 20 个任务处理者线程 std::vector<std::thread> consumers; for (int i = 0; i < WORKER_COUNT; ++i) { consumers.emplace_back(consumer_thread, i); } // 等待任务发布者线程完成 producer.join(); // 等待所有任务处理者线程完成 for (auto& consumer : consumers) { consumer.join(); } std::cout << "#########################################################################" << std::endl; // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { std::cout << "Worker " << i << " 处理了 " << task_count[i] << " 个任务" << std::endl; } return 0;} 五、独占模式实现 独占PAIR模式是一种设计方法,旨在确保在并发环境中,两个线程或资源对在同一时间内独占地操作共享资源,从而避免资源竞争、数据不一致、死锁等问题。这种模式在高并发环境中显得尤为重要,特别是在需要高可靠性和稳定性的系统中,如金融交易系统、物流管理系统、在线购物平台等。 首先,本节通过一个使用Mutex实现的独占PAIR模式的示例,展示了如何通过互斥锁和条件变量来确保两个线程能够协同工作,共同对共享资源进行操作。这个示例简单而直观地解释了独占PAIR模式的基本概念和工作机制。 接下来,深入探讨了ZeroMQ在进程内的独占模式实现,特别是通过PAIR套接字模式,展示了如何在同一进程的不同线程之间建立高效、安全的双向通信通道。ZeroMQ的inproc传输方式为线程间的消息传递提供了一个高效、线程安全的解决方案,避免了传统线程同步机制的复杂性。 在实际应用方面,列举了多个场景,如金融交易系统中的交易撮合、物流系统中的任务分配、在线购物平台的订单处理、社交媒体平台的消息传递,以及微服务架构中的分布式锁管理。这些场景中的每一个都强调了独占PAIR模式的重要性,特别是在保证数据一致性和系统稳定性方面。 随后,提供了一个基于ZeroMQ的金融交易撮合系统的示例,通过C语言和C++语言的实现展示了如何利用ZeroMQ的消息队列和独占模式确保交易撮合的准确性和独占性。该示例不仅展示了基本的撮合操作,还引入了并发撮合、持久化订单数据和复杂的撮合逻辑(如部分成交和优先级撮合)等高级功能,进一步提高了系统的可扩展性和可靠性。 最后,总结了ZeroMQ独占模式的价值,特别是在高并发环境中的应用。通过限制资源的并发访问,ZeroMQ独占模式有效地避免了并发编程中的常见问题,如数据竞争和死锁,保证了系统在极端情况下的稳定性和一致性。ZeroMQ提供了多种套接字模式和线程同步机制,支持独占模式的实现,使得它在复杂的分布式系统中具有广泛的应用前景。 本节有力地展示了ZeroMQ在并发编程中的强大功能,并为如何在实际项目中应用独占PAIR模式提供了详细的指导。 基础介绍 Exclusive Pair Pattern,中文翻译为“独占PAIR模式”,是一种设计模式或技术,用于在并发编程或资源管理中确保两个资源、线程或实体在同一时间段内只能由唯一的一对(PAIR)对象使用或操作。这种模式通常用于避免资源竞争、死锁或不一致的数据状态。 关键概念: 独占性(Exclusivity): 在独占PAIR模式中,一个资源或对象在某个时刻只能被唯一的一对对象(PAIR)访问或操作,防止其他对象同时访问。 PAIR: 这里的PAIR指的是成对的两个对象或线程,它们一起对某个资源或任务进行操作,必须协同工作。在给定时间内,只有这一对对象可以对资源进行操作。 并发安全: 该模式旨在确保系统在并发情况下的安全性和稳定性,防止资源争用或数据不一致。 示例:使用Mutex实现独占PAIR模式 这个例子模拟了两个线程(Thread A和Thread B)成对地访问和操作共享资源。 #include #include #include // 共享资源int shared_resource = 0; // 互斥锁,用于保护共享资源pthread_mutex_t resource_mutex; // 条件变量,用于实现PAIR的协调pthread_cond_t pair_condition;int pair_ready = 0; // 操作共享资源的函数void* thread_function(void* arg) { int thread_id = *(int*)arg; // 加锁,确保独占访问资源 pthread_mutex_lock(&resource_mutex); // 等待另一个线程准备好(模拟PAIR模式) while (pair_ready == 0) { printf("Thread %d is waiting for its pair...\n", thread_id); pthread_cond_wait(&pair_condition, &resource_mutex); } // 执行资源操作 printf("Thread %d is working with its pair...\n", thread_id); shared_resource += thread_id; // 模拟操作 printf("Thread %d updated shared resource to %d\n", thread_id, shared_resource); // 解除PAIR状态 pair_ready = 0; pthread_cond_signal(&pair_condition); // 解锁 pthread_mutex_unlock(&resource_mutex); return NULL;} int main() { pthread_t thread_a, thread_b; int thread_id_a = 1; int thread_id_b = 2; // 初始化互斥锁和条件变量 pthread_mutex_init(&resource_mutex, NULL); pthread_cond_init(&pair_condition, NULL); // 创建线程 pthread_create(&thread_a, NULL, thread_function, &thread_id_a); pthread_create(&thread_b, NULL, thread_function, &thread_id_b); // 模拟PAIR的准备状态 pthread_mutex_lock(&resource_mutex); pair_ready = 1; pthread_cond_broadcast(&pair_condition); pthread_mutex_unlock(&resource_mutex); // 等待线程完成 pthread_join(thread_a, NULL); pthread_join(thread_b, NULL); // 销毁互斥锁和条件变量 pthread_mutex_destroy(&resource_mutex); pthread_cond_destroy(&pair_condition); return 0;} 代码解析 1.共享资源(shared_resource):这是线程操作的资源,在该例子中,它只是一个简单的整数。 2.互斥锁(pthread_mutex_t):用于保护共享资源,确保只有一对线程能够同时访问和操作资源。 3.条件变量(pthread_cond_t):用于线程之间的协调,确保在某个线程准备好之前,另一个线程必须等待。这模拟了PAIR之间的协同工作。 4.线程函数(thread_function): 线程首先锁定互斥锁,确保对共享资源的独占访问。 线程会检查PAIR是否准备好,如果没有准备好,它将等待(pthread_cond_wait),直到PAIR准备完毕。 一旦PAIR准备好,线程对共享资源进行操作。 操作完成后,解除PAIR状态,允许下一个PAIR操作资源。 5.主线程: 主线程创建了两个子线程(Thread A和Thread B)。 主线程通过设置pair_ready为1并使用pthread_cond_broadcast,模拟了PAIR的准备状态。 主线程等待子线程完成操作,然后清理互斥锁和条件变量。 ZeroMq 进程内独占 #include // 包含 ZeroMQ 库头文件,用于消息队列通信#include // 包含 Windows API 头文件,用于线程管理#include // 包含标准输入输出库头文件#include // 包含字符串操作库头文件#include // 包含 locale.h 头文件,用于设置区域语言环境 // 线程 1 的函数DWORD WINAPI thread1(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_bind(socket, "inproc://pair1"); // 绑定套接字到 "inproc://pair1" 地址 char buffer[256]; // 用于接收消息的缓冲区 while (1) { memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 2 的消息 printf("Thread 1 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 const char* msg = "Message from Thread 1"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 2,+1 包括结束符 '\0' } zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} // 线程 2 的函数DWORD WINAPI thread2(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_connect(socket, "inproc://pair1"); // 连接到 "inproc://pair1" 地址,链接线程 1 的套接字 char buffer[256]; // 用于接收消息的缓冲区 while (1) { const char* msg = "Message from Thread 2"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 1,+1 包括结束符 '\0' memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 1 的消息 printf("Thread 2 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 } zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} int main() { setlocale(LC_ALL, ""); // 设置区域语言环境 void* context = zmq_ctx_new(); // 创建一个新的 ZeroMQ 上下文,用于管理套接字和它们之间的通信 HANDLE t1, t2; // 定义两个线程句柄变量 t1 = CreateThread(NULL, 0, thread1, context, 0, NULL); // 创建线程 1,并启动执行 thread1 函数 t2 = CreateThread(NULL, 0, thread2, context, 0, NULL); // 创建线程 2,并启动执行 thread2 函数 WaitForSingleObject(t1, INFINITE); // 等待线程 1 结束(在这个例子中,线程实际上不会结束) WaitForSingleObject(t2, INFINITE); // 等待线程 2 结束(在这个例子中,线程实际上不会结束) zmq_ctx_destroy(context); // 销毁 ZeroMQ 上下文,清理资源 return 0;} 代码解析 1.头文件包含: zmq.h: 包含 ZeroMQ 库的函数和数据结构声明,用于跨线程或跨进程的消息队列通信。 windows.h: 包含 Windows API 函数的声明,用于线程管理、睡眠等操作。 stdio.h: 提供标准输入输出功能,如 printf。 string.h: 提供字符串操作功能,如 strlen 和 memcpy。 unistd.h: 用于包含 POSIX 标准库函数(如 sleep),但是在 Windows 下可以替换为 Sleep 函数。 2.线程 1 和 线程 2 函数: 两个线程分别创建一个 PAIR 套接字,线程 1 绑定到 inproc://pair1 地址,线程 2 连接到 inproc://pair1。 PAIR 套接字允许线程 1 和线程 2 之间进行双向通信。 在无限循环中,两个线程轮流发送和接收消息,并将接收到的消息打印到控制台。 3.主程序: 创建一个 ZeroMQ 上下文,用于管理套接字和它们之间的通信。 使用 Windows API CreateThread 创建两个线程,每个线程分别执行 thread1 和 thread2 函数。 使用 WaitForSingleObject 函数等待两个线程的完成。在本例中,由于线程处于无限循环中,程序实际上会一直运行。 inproc://pair1 的解释 inproc://pair1 是什么: inproc 是 ZeroMQ 的一种传输方式,用于在同一进程内的线程之间通信。inproc://pair1 是一个地址标识符,pair1 是这个地址的标记(可以任意命名)。 在 zmq_bind 函数中,线程 1 的 PAIR 套接字绑定到这个地址,使它成为服务器端。zmq_connect 函数则使线程 2 的 PAIR 套接字连接到这个地址,成为客户端。 能干什么: inproc://pair1 允许两个线程在同一个进程内通过 ZeroMQ 实现快速、高效的消息通信。这种通信是线程安全的,并且比使用其他跨进程传输协议(如 TCP 或 IPC)更加高效。 使用 inproc 可以避免线程间使用共享内存或复杂的同步机制(如信号量、互斥锁等)进行通信。 在本例中,inproc://pair1用于在线程 1 和线程 2 之间建立可靠的双向通信,利用 PAIR 套接字确保每条消息都被正确接收和发送。这使得线程可以方便地互相通信,而不需要管理复杂的同步和资源共享问题。 C++ 循环发消息 #include #include #include #include void thread_function(int id, zmq::context_t& context) { zmq::socket_t socket(context, ZMQ_PAIR); // 每个线程绑定到唯一的地址 std::string address = "inproc://pair" + std::to_string(id); socket.bind(address); // 等待所有线程连接后开始通信 std::this_thread::sleep_for(std::chrono::seconds(1)); for (int i = 0; i < 10; ++i) { // 接收来自其他线程的消息 zmq::message_t request; socket.recv(request, zmq::recv_flags::none); std::string recv_msg(static_cast<char*>(request.data()), request.size()); std::cout << "Thread " << id << " received: " << recv_msg << std::endl; // 发送消息给下一个线程 std::string next_address = "inproc://pair" + std::to_string((id + 1) % 10); zmq::socket_t next_socket(context, ZMQ_PAIR); next_socket.connect(next_address); std::string msg = "Hello from thread " + std::to_string(id); zmq::message_t reply(msg.size()); memcpy(reply.data(), msg.c_str(), msg.size()); next_socket.send(reply, zmq::send_flags::none); }} int main() { zmq::context_t context(1); std::vector<std::thread> threads; // 创建并启动 10 个线程 for (int i = 0; i < 10; ++i) { threads.push_back(std::thread(thread_function, i, std::ref(context))); } // 等待所有线程结束 for (auto& t : threads) { t.join(); } return 0;} 程序解释 1.头文件包含: iostream: 用于标准输入输出流操作。 thread: 用于创建和管理多线程。 vector: 用于存储和管理线程对象的动态数组。 zmq.hpp: ZeroMQ C++ API 的头文件,用于消息传递和套接字管理。 2.thread_function函数: 每个线程都创建了一个 ZMQ_PAIR 类型的套接字,并绑定到一个唯一的 inproc://pair 地址。 线程在开始通信前等待 1 秒,以确保所有线程都已经绑定到它们的地址。 每个线程循环 10 次,在每次循环中执行以下步骤:1.接收来自其他线程的消息,并将消息内容打印到控制台。2.生成一个要发送给下一个线程的消息,并将其发送到下一个线程的地址。3.下一个线程的地址通过 id 计算得出,确保消息按顺序传递到下一个线程。 3.main函数: 创建一个 zmq::context_t 对象来管理 ZeroMQ 的套接字和通信。 使用 std::thread 创建并启动 10 个线程,每个线程调用 thread_function,并传递它们的线程 ID 和上下文对象的引用。 主线程使用 join 等待所有子线程执行完毕,确保所有线程的生命周期在程序结束前都得到正确的管理。 程序能用在哪些业务上? 线程间通信:该程序展示了如何在同一进程内的多个线程之间使用 ZeroMQ 实现消息传递。类似的模式可以应用于任何需要线程间通信的场景,例如多线程计算任务的协调、负载平衡等。 消息路由:在一些业务场景中,多个线程或模块需要按照某种逻辑顺序传递消息,例如流水线处理、任务分配等。这种环状的通信模式可以确保消息在各个节点之间按照特定顺序流转。 分布式计算:尽管此示例仅在单进程内工作,但类似的模式可以扩展到跨进程甚至跨机器的场景。例如,在分布式系统中,多个节点之间的消息传递和协作可以使用类似的机制进行管理。 事件驱动系统:在事件驱动系统中,不同的事件处理模块可能需要依次处理消息。通过这种线程间的消息传递机制,可以确保事件按照指定的顺序被处理和传递。 应用场景 独占PAIR模式在以下场景中特别有用: 线程同步: 在多线程环境中,多个线程可能需要成对地对共享资源进行操作。例如,读写锁(read-write lock)可以看作是独占PAIR模式的一种应用,只有读锁和写锁之间能够成对操作共享资源。 数据库事务: 当两个数据库连接(PAIR)需要并行工作以完成某个复杂的事务时,独占PAIR模式可以确保在事务完成之前,其他连接无法访问相关的数据或资源,从而保持数据的一致性。 双向通信协议: 在一些双向通信协议中,发送方和接收方作为PAIR进行操作,独占资源,如信号通道或数据包,在操作完成前,其他通信不能干涉。 设备驱动程序: 在硬件设备的驱动程序中,控制信号和数据传输通常成对进行操作,独占设备的访问权,以确保数据传输的完整性。 分布式系统: 在分布式系统中,不同节点之间可能需要成对操作某个共享的资源,独占PAIR模式确保在某对操作完成之前,其他节点无法进行冲突性的操作。 具体场景 1. 金融交易系统中的交易撮合 应用场景: 在金融交易系统中,买方和卖方的订单需要进行撮合。每一笔交易撮合都需要独占地访问交易引擎的某个部分资源,以确保交易的正确性和一致性。 具体应用: 当一个买方订单和卖方订单撮合成功时,这两个订单的状态需要同时更新为“已完成”。 在订单撮合过程中,独占PAIR模式确保只有这对买卖订单在这一时刻可以操作这个交易引擎的资源,防止其他订单干扰。 这种模式避免了在高频交易情况下,由于并发导致的订单状态不一致或重复撮合问题。 2. 物流系统中的运输任务分配 应用场景: 在物流系统中,运输任务的分配需要同时考虑配送任务和可用的运输资源(如车辆、司机)。为了避免同一辆车被分配到多个不同的运输任务,系统需要确保任务和资源的独占性匹配。 具体应用: 当一个运输任务(如从仓库A到地点B的货物运输)需要分配给某辆车时,系统会使用独占PAIR模式,将该任务与该车的资源配对。 该车辆和任务配对后,系统锁定这一对资源,防止其他任务占用这辆车。 任务完成后,车辆资源释放,再进行下一次任务分配。 这样避免了由于资源竞争导致的任务分配混乱或资源浪费。 3. 在线购物平台的订单处理 应用场景: 在在线购物平台中,多个用户可能会同时购买同一件商品。在库存有限的情况下,系统需要保证每次下单操作能独占处理对应的库存数据,防止出现超卖的情况。 具体应用: 每当一个用户发起购买请求时,系统会为用户订单和库存数据配对,确保这对资源(订单和库存)在处理过程中不被其他订单干扰。 在确认支付和扣减库存的操作中,独占PAIR模式确保其他订单不能同时操作相同的库存数据,防止多名用户同时购买超出库存量的商品。 一旦订单处理完成,库存数据的锁定状态解除,其他用户的订单可以继续操作。 这种方式有效避免了超卖问题,确保用户体验和库存管理的准确性。 4. 社交媒体平台的消息传递 应用场景: 在社交媒体平台上,用户之间的私密消息需要确保传输过程的私密性和准确性,特别是当消息可能同时发送给多名接收者时。 具体应用: 当用户A向用户B发送消息时,系统会确保这条消息和用户B的接收信箱形成独占的配对关系。 在这个过程中,系统确保用户B的信箱不会同时接受来自其他来源的同一消息,防止重复发送或接收错误。 一旦消息传输成功,配对关系解除,用户B可以接收其他新的消息。 独占PAIR模式确保了消息传递的完整性和准确性,避免了多线程环境下的消息混乱或丢失。 5. 分布式锁在微服务中的使用 应用场景:在微服务架构中,多个服务实例可能同时尝试操作同一资源,如更新数据库中的某一条记录。为了防止并发更新导致的数据不一致问题,分布式锁通常被用来控制资源的独占访问。 金融交易系统中的交易撮合 ZEROMQ C语言 这个示例将展示如何使用ZeroMQ消息队列进行买卖订单的撮合,同时确保交易撮合的独占性。 这个示例包含两个主要部分: 订单生成器:生成买卖订单,并通过ZeroMQ推送到交易撮合引擎。 交易撮合引擎:接收订单并进行撮合处理,确保每一对订单的独占性。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量和价格typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price;} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price && buy_order->quantity == sell_order->quantity) { // 如果匹配成功,打印匹配信息 printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, buy_order->quantity, buy_order->price); } else { // 如果不匹配,打印未匹配的信息 printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f\n", order.order_id, order.side, order.quantity, order.price); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; match_orders(&buy_order, &sell_order); has_buy_order = 0; // 重置标记,表示买单已撮合 } else { // 如果订单无法撮合,打印提示信息 printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} // 主程序入口int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建订单撮合线程 matcher_thread = CreateThread(NULL, 0, order_matcher, context, 0, NULL); // 等待线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); WaitForSingleObject(matcher_thread, INFINITE); // 关闭线程句柄 CloseHandle(generator_thread); CloseHandle(matcher_thread); // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} 代码解析 1.订单生成器(order_generator): 使用ZeroMQ PUSH套接字将生成的订单推送到交易撮合引擎。 每个订单都包含一个唯一的订单ID、买卖方向、数量和价格。 每隔一定时间生成一个新的订单,模拟真实的交易环境。 2.交易撮合引擎(order_matcher): 使用ZeroMQ PULL套接字接收订单。 通过互斥锁(pthread_mutex_t)实现独占PAIR模式,确保每次撮合操作的并发安全性。 如果当前有买单并收到卖单,尝试撮合。如果买单价格大于或等于卖单价格且数量相等,则匹配成功。 匹配后,释放互斥锁,允许下一个订单对继续撮合。 3.主程序: 创建并启动订单生成器和交易撮合引擎线程。 主线程等待子线程完成。 扩展与优化 并发撮合:可以使用多个撮合引擎线程,以处理更高并发的订单流。 持久化订单数据:在实际系统中,订单信息和撮合结果通常需要持久化到数据库。 复杂撮合逻辑:实际的交易撮合可能包括部分成交、不同优先级的订单撮合等,可以根据需求进一步扩展。 在现有的基础上,可以进行以下扩展与优化,以实现更高效、更复杂的交易撮合系统。我们将: 并发撮合:增加多个撮合引擎线程,以处理更高并发的订单流。 持久化订单数据:模拟将订单信息和撮合结果持久化到数据库。 复杂撮合逻辑:引入部分成交和不同优先级的订单撮合逻辑。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread[4]; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); } // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); } // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); } // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} 扩展与优化解释 1.并发撮合: 增加了4个并发的撮合线程,以提高系统的吞吐量和处理能力。 每个撮合线程独立处理接收到的订单,并通过互斥锁保证撮合操作的独占性。 2.持久化订单数据: 在订单撮合完成后,系统会模拟将撮合结果“持久化”到数据库。这一部分目前通过打印日志来模拟。 实际系统中,可以将这些数据写入数据库(例如使用SQLite、MySQL等)。 3.复杂撮合逻辑: 引入了部分成交:如果买单和卖单的数量不一致,系统会撮合其中较小的部分,并保留未撮合的部分。 引入了优先级撮合:每个订单都有一个优先级(1到10之间)。当有新订单到达时,,如果其优先级高于当前待撮合订单,会优先处理高优先级订单。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread[4]; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); } // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); } // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); } // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} ZEROMQ C++语言 #include #include #include #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级struct Order { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)}; // 全局互斥锁,用于确保撮合操作的独占性std::mutex match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order& buy_order, Order& sell_order) { // 检查买单和卖单是否匹配 if (buy_order.price >= sell_order.price) { int matched_quantity = std::min(buy_order.quantity, sell_order.quantity); std::cout << "Matched Order ID: " << buy_order.order_id << " (Buy) with Order ID: " << sell_order.order_id << " (Sell) - Quantity: " << matched_quantity << " at Price: " << buy_order.price << std::endl; // 模拟持久化撮合结果 std::cout << "Persisting Match: Buy Order " << buy_order.order_id << ", Sell Order " << sell_order.order_id << ", Quantity " << matched_quantity << ", Price " << buy_order.price << std::endl; // 更新剩余数量 buy_order.quantity -= matched_quantity; sell_order.quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order.quantity > 0) { std::cout << "Remaining Buy Order ID: " << buy_order.order_id << ", Quantity: " << buy_order.quantity << std::endl; } if (sell_order.quantity > 0) { std::cout << "Remaining Sell Order ID: " << sell_order.order_id << ", Quantity: " << sell_order.quantity << std::endl; } } else { std::cout << "Order ID: " << buy_order.order_id << " (Buy) and Order ID: " << sell_order.order_id << " (Sell) not matched due to price." << std::endl; }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎void order_generator(zmq::context_t* context) { try { // 创建ZeroMQ PUSH套接字用于发送订单 zmq::socket_t socket(*context, zmq::socket_type::push); socket.connect("tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand(static_cast<unsigned int>(time(NULL))); // 初始化随机数种子 while (true) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = static_cast<float>(rand() % 1000) / 10.0f; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 socket.send(zmq::buffer(&order, sizeof(Order)), zmq::send_flags::none); std::cout << "Generated Order ID: " << order.order_id << ", Side: " << order.side << ", Quantity: " << order.quantity << ", Price: " << order.price << ", Priority: " << order.priority << std::endl; Sleep(100); // 模拟订单生成频率,休眠100毫秒 } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_generator: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_generator: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_generator" << std::endl; }} // 订单撮合线程函数,接收订单并进行撮合void order_matcher(zmq::context_t* context) { try { // 创建ZeroMQ PULL套接字用于接收订单 zmq::socket_t socket(*context, zmq::socket_type::pull); socket.bind("tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 bool has_buy_order = false; // 标记是否有待撮合的买单 while (true) { Order order; // 从ZeroMQ套接字接收订单 socket.recv(zmq::buffer(&order, sizeof(Order)), zmq::recv_flags::none); // 锁定互斥锁,确保撮合操作的独占性 std::lock_guard<std::mutex> lock(match_mutex); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = true; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { std::cout << "New Sell Order ID: " << sell_order.order_id << " has higher priority than Buy Order ID: " << buy_order.order_id << std::endl; has_buy_order = false; // 放弃当前订单,等待新撮合 } else { match_orders(buy_order, sell_order); has_buy_order = (buy_order.quantity > 0); // 检查买单是否还有剩余 } } else { std::cout << "Order " << order.order_id << " queued for future matching." << std::endl; } } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_matcher: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_matcher: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_matcher" << std::endl; }} int main() { try { // 创建ZeroMQ上下文 zmq::context_t context(1); // 定义线程容器 std::vector<std::thread> matcher_threads; // 创建订单生成器线程 std::thread generator_thread(order_generator, &context); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; ++i) { matcher_threads.emplace_back(order_matcher, &context); } // 等待订单生成器线程执行完毕 generator_thread.join(); // 等待所有撮合线程执行完毕 for (auto& thread : matcher_threads) { thread.join(); } } catch (const std::exception& e) { std::cerr << "Exception in main: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in main" << std::endl; } return 0;} 主要改动与解释 1.使用C++特性: #include代替了 #include ,使用 std::cout 和 std::endl 进行输出。 使用了 std::thread 来代替 Windows 的 CreateThread。 使用了 std::vector容器来管理多个撮合线程,避免手动管理线程句柄。 2.ZeroMQ C++ API: C++代码使用了 zmq.hpp,它是ZeroMQ的C++绑定库,比纯C语言API更具可读性和便利性。 zmq::context_t 和 zmq::socket_t 代替了C语言中的 zmq_ctx_new() 和 zmq_socket()。 3.线程管理: 使用 std::thread 来创建并管理线程,thread.join() 替代了 WaitForSingleObject。 std::vector 容器被用来存储多个撮合线程,这样代码更加整洁和易于扩展。 4.使用标准库: 使用 std::min 来替代手动的 if-else 判断更简洁地获取最小值。 小结:ZeroMQ的独占模式(Exclusive Pair Pattern)是一种设计模式,旨在确保两个通信对端(PAIR)在同一时间段内独占地访问共享资源,防止其他对端干扰。它通过限制并发访问,确保资源的安全性和一致性。在高并发场景中,独占模式有助于避免数据竞争、死锁和不一致性问题。ZeroMQ通过多种套接字模式(如PAIR、REQ/REP)和线程同步机制,支持独占模式的实现,保证消息在严格的单对单通信通道中传递,从而提高系统的稳定性和可靠性。
一、数据结构 数据结构大致可以分为两种 —— 线性结构 和 非线性结构。 1. 线性结构 线性结构是数据结构中的一种基本结构,它的特点是数据元素之间存在一对一的前后关系。线性结构中的数据元素排列成一个线性序列,可以用顺序存储或链式存储来实现。 常见的线性结构有以下几种: 数组:连续存储的一组相同类型的元素,可以通过索引直接访问。 链表:由节点组成的集合,每个节点包含数据和指向下一个节点的指针。 栈:具有先进后出(LIFO)特性的线性表,只能在栈顶进行插入和删除操作。 队列:具有先进先出(FIFO)特性的线性表,只能在队尾插入,在队头删除。 哈希表:使用哈希函数将键映射到存储位置,并通过解决冲突处理碰撞问题。 2. 非线性结构 非线性结构是数据结构中的另一种类型,与线性结构不同,它的数据元素之间并不是简单的前后关系。 常见的非线性结构有以下几种: 树:由节点和边组成的层次结构,每个节点可以有多个子节点。 图:由节点和边组成的集合,节点之间可以存在多种关系,如有向图和无向图。 堆:一种特殊的树形结构,通常用于实现优先队列,具有父节点小于(或大于)子节点的特性。 散列表:基于哈希函数将键映射到存储位置,并通过解决冲突处理碰撞问题。 图表:由多个表格连接而成,用于表示复杂的关系和数据依赖。 还有例如跳表之类的其他的数据结构,也都是从基础数据结构演化出来的,用来解决指定的场景问题。 二、索引的数据结构 我们先把记忆中的 Mysql的索引是使用B+树做的,因为B+树有 xxx 的优点 抹去,没有人在开发的时候就能直接想到完美的解决方案,所以我们也来推导一下索引的数据结构。 1. 索引的作用 索引是用来做什么的? 索引在数据库和数据结构中起到了重要的作用,它能够提高数据的查询效率和访问速度。以下是索引的几个主要作用: 加快数据检索:索引可以按照特定的规则对数据进行排序和组织,从而加快数据的查找速度。通过使用合适的索引,可以避免全表扫描,减少查询所需的时间复杂度。 提高查询性能:当数据库中有大量记录时,使用索引可以显著提升查询性能。索引将数据按照特定列进行排序和分组,使得数据库系统只需搜索一部分数据而不是全部数据。 优化排序和分组操作:索引可以帮助数据库系统快速地进行排序和分组操作。如果在执行SQL语句时需要对结果进行排序或者分组,使用合适的索引可以节省大量计算时间。 约束唯一性:通过在某些字段上创建唯一索引,可以保证该字段值的唯一性,防止重复插入相同值的情况发生。 改善连接操作:当多个表之间需要进行连接操作时,在关联列上创建合适的索引能够极大地提高连接操作的效率。 尽管索引能够提高查询效率,但也会增加存储空间和更新数据的成本。 索引存储在哪里? 索引存储在数据库管理系统的内存和磁盘中。具体来说,有以下几种常见的索引存储方式: 内存中的索引:为了提高查询效率,数据库管理系统通常会将常用的索引数据加载到内存中进行操作。这样可以避免频繁的磁盘访问,加快查询速度。 磁盘上的索引:当内存无法容纳全部索引数据时,数据库管理系统会将部分或全部索引数据存储在磁盘上。通常使用B+树等数据结构来组织和管理索引数据,以支持高效地查找、插入和删除操作。 辅助文件:一些数据库系统会将较大或者不常用的索引存储在单独的文件中,而不是放在主要的数据文件中。这样可以降低主要数据文件的大小,减少IO开销,并且更灵活地管理索引。 我们都说数据持久化数据持久化,其实就是把内存里的数据转移到硬盘上,这样即便是设备断电了,数据也不会受到影响。但是有利必有弊,数据存储在硬盘上带来的后果就是读取的速度变慢。又是变慢,能变多慢呢?内存是纳秒级的处理速度,硬盘是毫秒级的处理速度,二者相差百万倍,这就是速度的差异。所以我们实际使用索引的时候,会把索引从硬盘中读到内存里,然后通过内存里的索引,从硬盘中找到数据。 但是这样优化了又如何呢,只要需要读硬盘,那就会消耗时间,硬盘IO越多,时间消耗越多。 除此之外,我们使用索引不只是为了能够迅速找到某一个数据,而是能够迅速找到某一个范围区间的数据,能够动态的执行有关数据的操作。 那么在上述的描述下,索引能够使用的数据结构就有这么几个 —— 哈希表、跳表、树。 2. 哈希表 哈希表:也叫做散列表。是根据关键字和值(Key-Value)直接进行访问的数据结构。也就是说,它通过关键字 key 和一个映射函数 Hash(key) 计算出对应的值 value,然后把键值对映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做哈希函数(散列函数),用于存放记录的数组叫做 哈希表(散列表)。哈希表的关键思想是使用哈希函数,将键 key 和值 value 映射到对应表的某个区块中。可以将算法思想分为两个部分: 向哈希表中插入一个关键字:哈希函数决定该关键字的对应值应该存放到表中的哪个区块,并将对应值存放到该区块中 在哈希表中搜索一个关键字:使用相同的哈希函数从哈希表中查找对应的区块,并在特定的区块搜索该关键字对应的值 哈希表的原理示例图如下所示: 哈希表的精确查询时间复杂度是 O(1) ,为什么呢?因为计算目标 key 的 hash 值,然后直接对应到数组的下标,这个过程大大的减少了查询所需要的时间。在产生了 hash 碰撞的时候,也会使用链表和红黑树的方式加快碰撞情况下,查询目标值的速度。 这样的话,我们索引的数据结构完全可以采用哈希表的形式来做,效率非常高。但是为什么不这么做呢? 如果使用哈希表来当作索引的数据结构,在进行范围查询的时候需要全部扫描,这是一笔不菲的代价。 3. 跳表 跳表(Skip List)是一种用于实现有序数据结构的数据结构,它在链表的基础上通过添加多级索引来加速搜索操作。 跳表由William Pugh在1989年提出,其设计灵感来自于平衡二叉树。相比于传统的链表,在查找元素时,跳表可以通过使用多级索引进行快速定位,并且具备较高的插入和删除效率。 跳表中的每个节点包含一个值和若干个指向下一层节点的指针。最底层是原始链表,每个节点按照值从小到大排列;而上方的各级索引则以不同步长稀疏地链接部分节点。这样,在搜索时可以先沿着最顶层索引开始查找,逐渐向下层细化范围,直到找到目标节点或者确定目标不存在。 跳表的时间复杂度为 O(log n),其中 n 是元素数量。它相对简单、易于实现,并且支持高效的插入、删除和搜索操作。因此,在某些场景下,跳表可以作为替代平衡二叉树等数据结构的选择。 如图所示,跳表就是在链表的基础上加了索引层,这样就能够实现区间查询的效果。比如我们要查找key = 5,那就先遍历索引层,遍历到3,然后发现下一个索引是6,那么直接从索引层的3往下进入链表,在往后走2步就到key = 5了。 如果数据量非常非常大呢?(图方便这里用 excel 绘制,美观上会差一些) 这样是不是就能发现跳表的好处了,用多个索引划分链表,从高级索引定位到更低级的索引,直到定位到链表中。效率看起来也很高。 但是这还是存在一个问题,我读取完三级索引到内存,然后我还要硬盘IO去读取二级索引,然后还要读取一级索引。还是在硬盘的IO上费了太多的操作。跳表的数据越多,索引层越高,读取索引带来的硬盘IO次数越多,性能降低,这又违背了一开始使用索引的理念。 4. 树 树结构的特性决定了遍历数据本身就支持按区间查询。再加上树是非线性结构的优势相比于线性结构的数组,不必像数组的数据是连续存放的。那么当树结构在插入新数据时就不用像数组插入数据前时,需要将数据所在往后的所有数据节点都得往后挪动的开销。所以树结构更适合插入更新等动态操作的数据结构。 需要C/C++ Linux服务器架构师学习资料加qun579733396获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享 三、树 实现索引使用的数据结构看来是要使用树结构了,常用的树都有哪些呢?二叉树、二叉查找树、平衡二叉查找树、红黑树、B树。 二叉树 二叉树是n(n>=0)个结点的有限集合。当n=0时为空树,当n不为0时,二叉树有以下特点:1.每个结点的度不超过2(可以理解为二孩政策下的结点最多只能有两个孩子); 每个结点的左子树和右子树顺序不能颠倒,所以二叉树是有序树。 特殊二叉树 满二叉树:每一层结点数都达到最大,那么它就是满二叉树。如第1层最多有2 ^0个结点,第2层最多有 2 ^1个结点,则第k层最多有2 ^(k-1)个结点,假设这棵满二叉树有k层,那么它总共有2 ^0+2 ^1+……+2 ^(k-1) = 2 ^k-1个结点。 完全二叉树:深度为k,有n个结点的二叉树当且仅当其每一个结点都与深度为k的满二叉树中编号从1至n的结点一一对应时,称为完全二叉树。(简介版:完全二叉树的前k-1层是满二叉树,最后一层从左到右依次连续) 二叉查找树 二叉查找树可以理解为融合了二分查找的二叉树。二分查找大家都熟悉吧,时间复杂度 O(logN) ,比直接遍历的线性查找快的多,但是需要数组是有序的。 所以说,二叉查找树不同于普通的二叉树,二叉查找树是将小于根节点的元素放在左子树,将大于根节点的元素放在右子树。其实就是从某种含义了实现了二分查找的先决条件 —— 数值有序。 但是二叉树是存在弊端的,如果我们每次都插入一个更小的数或者更大的数,那么二叉树就会在一个方向上无限延长,退化成了链表。那链表的时间复杂度是 O(N),而且又加大了硬盘的IO操作。所以这种结构还是不太行。 平衡二叉查找树 面说一直放更小或者更大的数,让他不断延长,变成一个极端的“很高很瘦”的数,那么用平衡二叉查找树就能解决这个问题。 平衡二叉查找树的关键是 平衡 ,指的是每个节点的左右子树高度差不能超过 1。这样左右子树都能平衡,时间复杂度为 O(logN) 。 无论是二叉树还是二叉查找树还是平衡二叉查找树还是红黑树,他最终都存在一个问题 —— 每个节点只能有 2 个子树。这意味着只要数据量足够大,它总会变成一个深度非常大的树。深度越大,硬盘IO次数越多,性能效率越低,这又双叒叕与索引的初衷背道而驰。 红黑树 与AVL树相比,红黑树并不追求严格的平衡,而是大致的平衡:只是确保从根到叶子的最长的可能路径不多于最短的可能路径的两倍长。从实现来看,红黑树最大的特点是每个节点都属于两种颜色(红色或黑色)之一,且节点颜色的划分需要满足特定的规则。 与AVL树相比,红黑树的查询效率会有所下降,这是因为树的平衡性变差,高度更高。但红黑树的删除效率大大提高了,因为红黑树同时引入了颜色,当插入或删除数据时,只需要进行O(1)次数的旋转以及变色就能保证基本的平衡,不需要像AVL树进行O(lgn)次数的旋转。总的来说,红黑树的统计性能高于AVL。 因此,在实际应用中,AVL树的使用相对较少,而红黑树的使用非常广泛。 对于数据在内存中的情况(如上述的TreeMap和HashMap),红黑树的表现是非常优异的。但是对于数据在磁盘等辅助存储设备中的情况(如MySQL等数据库),红黑树并不擅长,因为红黑树长得还是太高了。当数据在磁盘中时,磁盘IO会成为最大的性能瓶颈,设计的目标应该是尽量减少IO次数;而树的高度越高,增删改查所需要的IO次数也越多,会严重影响性能。 B树 新的数据结构的产生肯定是为了解决之前繁琐的问题。在树的深度不断变大的情况下,B树就应运而生了。 B树的出现解决了树高度的问题,从名字上也能看出来,它不叫B二叉树而是直接叫B树,因为它摆脱了二叉这个概念。它不再限制一个父节点中只能有两个子节点,而是允许拥有 M 个子节点(M > 2)。不仅如此,B树的一个节点可以存储多个元素,相比较于前面的那些二叉树数据结构又将整体的树高度降低了。那么B树实际上就是多叉树。 图中每一个节点叫做 页,是Mysql数据读取的基本单位,也就是上面的磁盘块。其中的 P 是指向子节点的指针。 当数据量足够大的时候,使用平衡二叉查找树则会不断纵向扩展子节点,让整个树变得更高。而B树可以横向扩展子节点,变得更胖,但是树的高度不高,硬盘IO的次数更少。 综上所述,B树已经非常适合用来给Mysql做索引的数据结构了。那么为什么还要去使用B+树呢?实际上B树存在一个缺点,虽然B树实现了区间查找,但是B树的去检查找是基于中序遍历来做的,中序遍历的算法题大家应该都做过,需要来回切换父子节点,切换父子节点在这里就意味着硬盘不断的IO操作,这显然也是不好的。 B+树 B+树其实就是B树的升级版。MySQL 中innoDB引擎中的索引底层数据结构采用的正是B+树。 B+树相对于树做了这些方面的改动:B+树中的非叶子节点只作为索引,不存储数据。转而由叶子节点存放整棵树的所有数据。叶子节点之间再构成一个从小到大的有序的链表并互相指向相邻的叶子节点,也就是在叶子节点之间形成了有序的双向链表。 画图画的不是很清楚,忘记展示双向链表的特点,最下面的箭头指的是相邻的两个叶子是双向的,所有的叶子节点构成双向链表。 再来看B+树的插入和删除,B+树做了大量冗余节点,从上面可以发现父节点的所有元素都会在子节点中出现,这样当删除一个节点时,可以直接从叶子节点中删除,这样效率更快。 相邻的两个叶子是双向的,所有的叶子节点构成双向链表。 再来看B+树的插入和删除,B+树做了大量冗余节点,从上面可以发现父节点的所有元素都会在子节点中出现,这样当删除一个节点时,可以直接从叶子节点中删除,这样效率更快。 B树相比于B+树,B树没有冗余节点,删除节点时会发生复杂的树变形,而B+树有冗余节点,不会涉及到复杂的树变形。而且B+树的插入也是如此,最多只涉及树的一条分支路径。B+树也不用更多复杂算法,可以类似红黑树的旋转去自动平衡。 总结: mysql选择B+树的原因在于其独特的优势: 良好的平衡性:B+树是一种自平衡的树结构,不论是在插入、删除还是查询操作中,它都能保持相对较好的平衡状态。这使得B+树能够快速定位到目标数据,提高查询效率。 顺序访问性:B+树的所有叶子节点是按照索引键的顺序排序的。这使得范围查询和顺序访问非常高效,因为相邻的数据通常在物理上也是相邻存储的,可以利用磁盘预读提高IO效率。 存储效率:B+树在内存中的节点大小通常比其他树结构更大,这样可以减少磁盘I/O操作的次数。同时,B+树的非叶子节点只存储索引列的值,而不包含实际数据,这进一步减小了索引的尺寸。 支持高并发:B+树的特性使得它能够支持高并发的读写操作。通过使用合适的锁或事务隔离级别,多个并发查询和更新操作可以同时进行而不会出现严重的阻塞或冲突。 易于扩展和维护:B+树的结构相对简单,可以较容易地进行扩展和维护。当插入或删除数据时,B+树只需要调整路径上的少数节点,而不需要整颗树的重构。这样能够有效降低维护成本,并保证索引的高性能。
0 引言 空间定向测试仪是一种应用非常广泛的电子测量仪器,尤其是伴随着微电子技术的发展,空间定向测试仪在车辆、舰船、飞行器等导航领域中的应用日趋成熟。本文所研究的空间定向测试技术主要是以MSP430单片机为基...
所谓软件是指为方便使用计算机和提高使用效率而组织的程序以及用于开发、使用和维护的有关文档。软件系统可分为系统软件和应用软件两大类。 一、系统软件系统软件System software,由一组控制计算机系统并管理其资...
1 TBOX简介 TBOX针对各个平台,封装了统一的接口,简化了通用开发过程中常用的操作,使你在开发过程中,更加关注实际应用的开发,而不是把时间浪费在琐碎的接口兼容性上面,并且充分利用了各个平台突出的一些特性进行优化。这个项目的目的,是为了高效使C开发更加简单。目前支持的平台有:Windows, Macosx, Linux, Android, iOS, *BSD等等。通过xmake支持多种编译模式: 发布:正式版编译,禁止调试信息、断言,各种检测机制,启用编译器优化 Debug:调试模式,默认实现详细调试信息、断言、内存越界检测、内存泄漏、锁竞争分析等检测机制 Small:最小化编译,取消默认所有扩展模块,实现编译器最小化优化 Micro:针对嵌入式平台,只需编译tbox微内核,仅提供最基础的跨平台接口,生成库仅64K左右(内置轻量libc接口实现) 2 特性 流庫 针对http、file、socket、data等流数据,实现统一接口进行读写,并且支持:阻塞、非阻塞、异步清晰读写模式。支持中间增加多层filter流进行流过滤,实现边读取,内部边进行解压、编码转换、加密等操作,极大地减少了内存使用。主要提供以下模块: Stream:通用非阻塞流,用于一般的单独io处理,同时支持协程以实现异步传输。 传输:流传输器,维护两路流的传输。 static_stream:针对静态数据缓冲区优化的静态流,用于轻量快速的数据解析。 协程库 快速高效的协程切换支持 提供跨平台支持,核心切换算法参考boost,并且由此进行重写和优化,目前支持架构:x86、x86_64、arm、arm64、mips32 提供渠道协程间数据通信支持,基于生产、消费者模型 提供信号量、协程锁支持 socket、stream都模块默认支持协程,并且可以在线程和协程间进行无缝切换 提供http、file等基于协程的简单服务器实例,只需几个行代码,就可以从socket开始写个高性能io服务器,代码逻辑比异步回调模式更清晰 同时提供stackfull、stackless两种协程模式支持,stackless协程更轻量(每个协程只占用几十个字节),切换更快捷(会占用部分射击性) 支持epoll、kqueue、poll、select和IOCP 在协程和poller中支持同时等待和调度socket,pipe io和process 資料 统一并简化数据库操作接口,配备各种数据源,通过统一的url来自动连接打开支持的数据库,数据枚举的采用迭代器模型。 目前支持sqlite3以及mysql两种关系型数据库,也可以自定义扩展使用其他关系型数据库。 xml库 针对xml提供DOM和SAX两种解析模式,SAX方式采用外部迭代模式,灵活和性能更高,并且可以选择路径指定,进行解析。 解析过程基于流,所以是高度流化的,可以实现边下载、边解压、完全边转码、边解析一条龙服务,利用较低的内存也可以解析大规模数据。 提供 xml writer 支持对 xml 生成 内存库 参考linux内核内存池管理机制的实现,并对其进行了各种改造和优化,所实现的TBOX突出了整套内存池管理架构。 调试模式下,可以轻松检测并定位内存丢失、内存越界溢出、内存重叠覆盖等常见内存问题,对磁盘整体内存的使用情况进行了统计和简要分析。 针对大块数据、小块数据、字符串数据进行了充分的利用,避免了大量外部碎片和内部碎片的产生。分配操作进行了各种优化,96%的情况下,效率都在O(1 )。 容器库 提供哈希、链表、队列、队列、堆栈、最小最大堆等常用容器。 支持各种常用的成员类型,在原有的容器期初上,其成员类型还可以自定义完全扩展。 所有容器都支持迭代器操作。 大多数容器都可以支持基于流的序列化和反序列化操作。 算法库 提供各种排序算法:冒泡排序、堆排序、快速排序、插入排序。 提供各种求解算法:线性遍历、二分法搜索。 提供各种遍历、删除、统计算法。 以迭代器为接口,但是实现算法和容器的分离,类似stl,c实现的,更加轻量。 网络库 实现http客户端功能 实现cookies 实现dns解析与缓存 实现ssl(支持openssl、polarssl、mbedtls) 支持ipv4、ipv6 支持通过协程实现异步模式 数学运算库 提供各种精度的定点攻击支持 提供随机数生成器 libc库 libc的一个轻量级实现,跨平台,并且针对不同的架构完全进行了优化。 支持大部分字符串、宽字符串操作。 扩展字符串、宽字符串的各种大小写不敏感操作接口 扩展memset_u16、memset_u32等接口,并对其进行高度优化,尤其适合图形渲染程序 libm库 libm部分接口的一个轻量级实现,以及对常用系统接口的封装。(目前只实现了部分,之后有时间会完全实现掉) 扩展部分常用接口,增加对sqrt、log2等常用函数的整数版本计算,进行高度优化,不涉及浮点侵犯,适合嵌入式环境使用。 对象库 轻量级类apple的CoreFoundation库,支持对象、字典、数组、字符串、数字、日期、数据等常用对象,并且可以方便扩展自定义对象的序列化。 对xml、json、binary以及apple的plist(xplist/bplist)格式序列化和反序列化支持。并且实现了自有的二进制序列化格式,针对明文进行了简单的加密,在不影响性能的前提下,序列化后的大小比bplist节省30%。 平台库 提供文件、目录、socket、线程、时间等常用系统接口 提供atomic、atomic64接口 提供高精度、低精度 提供高性能的线程池操作 提供事件、互斥量、信号量、自旋锁等事件、互斥、信号量、自旋锁操作 提供获取函数堆栈信息的接口,方便调试和错误定位 提供跨平台动态库加载接口(如果系统支持的话) 提供io轮询器,针对epoll、poll、select、kqueue进行跨平台封装 提供跨平台上下文切换接口,主要用于协程实现,切换效率非常高 压缩库 支持zlib/zlibraw/gzip的压缩与解压(需要第三方zlib库支持)。 字符编码库 支持utf8、utf16、gbk、gb2312、uc2、uc4之间的相互转码,并且支持大小端格式。 实用工具库 实现base64/32解码 实现crc32、adler32、md5、sha1等常用哈希算法 实现日志输出、断言等辅助调试工具 实现url编 实现位操作相关接口,支持各种数据格式的解析,可以对8bits、16bits、32bits、64bits、float、double以及任意bits的字段进行解析操作,并且同时支持大端、小端和本地端模式,并针对部分操作进行了优化,像static_stream、stream都有相关接口对其进行了封装,方便在流上进行快速数据解析。 实现了swap16、swap32、swap64等位交换操作,并针对各个平台进行了优化。 实现一些高级的位处理接口,例如:位0的快速统计、前导0和前导1的快速位计数、后导01的位计数 实现单例模块,可以对静态对象、实例对象进行快速的单例封装,实现全局线程安全 实现选项模块,快速对命令行参数进行解析,提供方便的命令行选项创建和解析操作,对于编写终端程序还是很有帮助的 正當理責库 支持匹配和替换操作 支持全局、多行、大小写不敏感等模式 使用pcre, pcre2和posix正则库 3一些使用tbox项目: 格盒 vm86 制作 伊特拉斯 更多项目 4 编译 请先安装: xmake #默认直接编译当前主机平台$ cd./tbox$ xmake #编译mingw平台$ cd./tbox$ xmake f -p mingw --sdk=/home/mingwsdk$ xmake #编译iphoneos平台$ cd./tbox$ xmake f -p iphoneos$ xmake #编译android平台$ cd./tbox$ xmake f -p android --ndk=xxxxx$ xmake #交叉编译$ cd./tbox$ xmake f -p linux --sdk=/home/sdk #--bin=/home/sdk/bin$ xmake 5 个例子 #包括 “tbox/tbox.h” int main (int argc,char ** argv){ // 初始化 tbox 如果(!tb_init(tb_null,tb_null))返回 0; // 痕迹tb_trace_i( "你好 tbox" ); // 初始化向量 tb_vector_ref_t 向量= tb_vector_init( 0 , tb_element_str(tb_true)); 如果(向量){ // 插入项目tb_vector_insert_tail(向量, “你好” );tb_vector_insert_tail(向量,"tbox" ); // 转储所有项目tb_for_all ( tb_char_t const *, cstr,向量){ // 痕迹tb_trace_i( “%s”,cstr);} // 退出向量tb_vector_exit(向量);} // 初始化流 tb_stream_ref_t流 = tb_stream_init_from_url( "http://www.xxx.com/file.txt" ); 如果(流){ // 打开流 如果(tb_stream_open(流)){ // 读取行 tb_long_t大小 = 0 ; tb_char_t行[TB_STREAM_BLOCK_MAXN]; 当((size = tb_stream_bread_line(stream,line,sizeof(line)))> = 0){ // 痕迹tb_trace_i( "行:%s",行);}} // 退出流tb_stream_exit(流);} // 等待tb_getchar(); // 退出 tbox退出(); 返回 0;} 点击阅读原文,了解更多。https://gitee.com/tboox/tbox