Nginx是什么 Nginx(engine X)是一个开源的轻量级的HTTP服务器,能够提供高性能的HTTP和反向代理服务。 与传统的Apache服务器相比,在性能上Nginx占用系统资源更小、支持高并发,访问效率更高;在功能上,Nginx不仅作为Web服务软件,还适用于反向代理、负载均衡等场景;在安装配置上,Nginx更为简单、灵活。 Nginx因为并发性能和资源占用上的优势,已经广泛用于大中型互联网企业。 Nginx特点 支持高并发:Nginx是专门为性能优化而开发的,采用内核Poll模型,单机能够支持几万以上的并发连接;nginx支持高并发连接,处理2-3万并发连接数,官方监测能支持5万并发。对HTTP并发连接的处理能力高,单台物理服务器可支持30000~50000个并发请求。(实际操作,很多公司为了服务器的稳定,都会设置在20000个左右) 低资源消耗:Nginx采取了分阶段资源分配技术,使得CPU与内存的占用率非常低。一般1万个非活跃的HTTP Keep-Alive连接在Nginx中仅消耗几MB内存;可以跨平台,配置简单,内存消耗少,10个nginx才占用150M内存。 低成本,高稳定:成本低,且开源,稳定性高,宕机概率非常小; 高拓展性:设计极具扩展性,由多个不同功能、不同层次、不同类型且耦合度极低的模块组成 高可用性:Nginx支持热部署,其中的master管理进程与worker工作进程的分离设计;启动速度特别迅速,因此可以在不间断服务的情况下,对软件版本或者配置进行升级,即使运行数月也无需重新启动,几乎可以做到7x24小时不间断地运行 丰富的使用场景:可以作为Web服务端、HTTP反向代理、负载均衡和前端缓存服务等场景使用 开源协议:使用BSD许可协议,免费使用,且可修改源码。Nginx 是开源、高性能、高可靠的 Web 和反向代理服务器,而且支持热部署,几乎可以做到 7 * 24 小时不间断运行,即使运行几个月也不需要重新启动,还能在不间断服务的情况下对软件版本进行热更新。 异步非阻塞网络模型:使用的是epoll模型,这种模型是I/O多路复用技术(I/O多路复用是一种技术,它允许一个进程或线程监控多个网络连接,当其中某个或某几个连接有数据时,当前程序可以拿到网卡收到的数据进行下一步的处理;),异步非阻塞的模型(异步非阻塞模型可以提高程序的效率,在等待I/O操作完成的同时,可以继续执行其他代码。) 健康监测:内置的健康检查功能,可以允许在服务器宕机的时候,做健康检查,再发送的请求就不会发给宕机的服务器,会重新提交到其他节点上。 Nginx源码的目录结构: ├── auto 自动检测系统环境以及编译相关的脚本│ ├── cc 关于编译器相关的编译选项的检测脚本│ ├── lib nginx编译所需要的一些库的检测脚本│ ├── os 与平台相关的一些系统参数与系统调用相关的检测│ └── types 与数据类型相关的一些辅助脚本├── conf 存放默认配置文件,在make install后,会拷贝到安装目录中去├── contrib 存放一些实用工具,如geo配置生成工具(geo2nginx.pl)├── html 存放默认的网页文件,在make install后,会拷贝到安装目录中去├── man nginx的man手册└── src 存放nginx的源代码 ├── core nginx的核心源代码,包括常用数据结构的定义,以及nginx初始化运行的核心代码如main函数 ├── event 对系统事件处理机制的封装,以及定时器的实现相关代码 │ └── modules 不同事件处理方式的模块化,如select、poll、epoll、kqueue等 ├── http nginx作为http服务器相关的代码 │ └── modules 包含http的各种功能模块 ├── mail nginx作为邮件代理服务器相关的代码 ├── misc 一些辅助代码,测试c++头的兼容性,以及对google_perftools的支持 └── os 主要是对各种不同体系统结构所提供的系统函数的封装,对外提供统一的系统调用接口 1. Nginx架构简介 换张中文图,Nginx架构更容易理解 Master 进程 Master 进程: Nginx 的运行始于一个 master 进程,它负责管理所有的工作进程。 master 进程负责读取和解析配置文件,并启动工作进程。 当 Nginx 启动时,它会生成两种类型的进程:主进程(master)和工作进程(worker)。 主进程并不处理网络请求,而是负责调度工作进程,包括加载配置、启动工作进程以及进行非停升级。 因此,当 Nginx 启动后,查看操作系统的进程列表,至少会有两个 Nginx 进程。 工作进程 工作进程: 一旦 master 进程启动,它会生成一组工作进程。 每个工作进程都是独立运行的,负责处理来自客户端的连接和请求。 工作进程之间相互独立,可以并行处理请求,提高了 Nginx 的性能和吞吐量。 服务器实际 处理网络请求 及 响应 的是 工作进程(worker),在类 unix 系统上,Nginx 可以配置 多个 worker,而每个 worker 进程 都可以同时处理 数以千计 的 网络请求。 每个工作进程在启动时都会复制主进程的配置信息和相关资源,但它们彼此之间是相互独立的,这意味着它们可以并行地处理请求,互不影响。 此外,每个工作进程还会维护一个事件驱动的事件循环,通过事件驱动机制处理来自客户端的连接请求、数据读取和响应发送,这种异步非阻塞的 I/O 模型确保了 Nginx 的高性能和低资源消耗。 模块化设计 Nginx 核心模块: Nginx 的核心模块包括 HTTP 模块、事件模块、解析器模块等。 HTTP 模块处理 HTTP 请求和响应,包括 HTTP 头部解析、HTTP 请求方法解析、URI 解析等。 事件模块负责处理底层的事件通知机制,如 Epoll、Kqueue 等。 解析器模块负责解析 Nginx 配置文件。 Nginx 的 worker 进程分为核心模块和功能性模块。 核心模块主要负责维持一个运行循环(run-loop),在其中执行网络请求处理的不同阶段的模块功能,如网络读写、存储读写、内容传输、外出过滤,以及将请求发往上游服务器等。 Nginx 的代码采用了模块化设计,这使得我们可以根据需要选择和修改功能模块,然后编译成具有特定功能的服务器。 事件驱动模型 事件驱动模型: Nginx 采用了事件驱动的模型,主要利用了操作系统提供的异步 I/O 机制。 当有新的连接建立或者数据可读写时,Nginx 不会阻塞等待,而是通过事件通知机制处理这些事件,从而提高了处理效率。 Nginx 实现了高并发、高性能的关键在于其基于异步及非阻塞的事件驱动模型。 这种模型使得 Nginx 能够高效地处理大量并发请求,而不会因为阻塞等待而降低性能。 此外,Nginx 还充分利用了 Linux、Solaris 以及类 BSD 等操作系统内核中提供的事件通知和 I/O 性能增强功能,如 kqueue、epoll 以及 event ports,进一步提升了其性能表现。 代理设计 Nginx 作为高性能的代理服务器,其代理原理是其设计的核心之一。 无论是针对 HTTP 还是其他协议(如 FastCGI、Memcache、Redis等)的网络请求或响应,Nginx 都采用了代理机制来实现数据的转发和处理。 Nginx 的代理原理主要基于以下几个关键点: 接收请求:当 Nginx 接收到客户端的请求时,根据配置文件中的代理设置,确定是否需要进行代理转发。如果需要代理转发,则根据配置选择合适的代理方式。 建立连接:Nginx 会与目标服务器建立连接,可以是与远程服务器建立 TCP 连接,也可以是与本地应用程序之间建立的 Unix Socket 连接,取决于代理目标的具体情况。 数据传输:一旦连接建立成功,Nginx 会将客户端的请求数据转发给目标服务器,并且在接收到目标服务器的响应后,再将响应数据返回给客户端。这个过程可以是全双工的,意味着 Nginx 可以同时接收客户端请求和目标服务器响应,然后进行相应的转发和处理。 代理缓存:为了进一步提高性能,Nginx 还支持代理缓存功能。它可以将经常请求的数据缓存在本地,避免每次请求都要向后端服务器发起请求,从而减少响应时间和网络负载。 负载均衡:对于需要代理转发的请求,Nginx 还支持负载均衡功能,可以根据一定的策略将请求分发到多个后端服务器上,以实现负载均衡和高可用性。 2. Nginx架构详解 工作流程: 当有新的 HTTP 请求到达时,master 进程会将其分发给一个工作进程。 工作进程处理请求,根据配置文件进行请求的处理,包括反向代理、负载均衡、静态文件服务等。 处理完成后,工作进程将响应返回给客户端。 2.1 Nginx进程模型 Nginx默认采用多进程工作方式,在Nginx启动后,会运行一个master进程和多个worker进程。 master主要用来管理worker进程,充当整个进程组与用户的交互接口,同时对进程进行监护,实现worker进程的重启服务、平滑升级、更换日志文件、配置文件实时生效等功能; worker进程用来处理基本的网络事件,worker之间是平等的,他们共同竞争来处理来自客户端的请求。一个请求只能在一个worker进程中处理,一个worker进程不可能处理其它worker进程中的请求。 另外在Nginx架构中还有Cache Loader和Cache Manager进程,Cache Loader进程加载缓存索引文件信息;Cache Manager进程管理磁盘的缓存大小,超过预定值大小后最小使用的数据将被删除。 2.1.1 Master管理进程 Master进程主要用来管理worker进程,具体包括如下4个主要功能: 接收来自外界的信号; 向各worker进程发送信号; 监控woker进程的运行状态; 当woker进程退出后(异常情况下),会自动重新启动新的woker进程。 Master进程接受到命令重启Nginx进程(./nginx -s reload),会按照以下流程: 1) 首先master进程在收到重启命令后,会先重新加载配置文件,然后再启动新的worker进程,并向所有老的worker进程发送信号,告诉他们可以光荣退休了。 2) 新的worker进程在启动后,就开始接收新的请求,而老的worker在收到来自master的信号后,就不再接收新的请求,并且处理完当前进程中的所有未处理完的请求后,再退出。 2.1.2 Worker工作进程 Worker工作进程之间是对等的,每个进程处理请求的机会也是一样的。Nginx采用异步非阻塞的方式来处理网络事件,具体流程如下: 1) 接收请求:首先,每个worker进程都是从master进程fork过来,在master进程建立好需要listen的socket(listenfd)之后,然后再fork出多个worker进程。 a) 所有worker进程的listenfd会在新连接到来时变得可读,每个work进程都可以去accept这个socket(listenfd)。 b) 当一个client连接到来时,所有accept的work进程都会受到通知,但只有一个进程可以accept成功,其它的则会accept失败。 c) 为保证只有一个进程处理该连接,Nginx提供了一把共享锁accept_mutex来保证同一时刻只有一个work进程在accept连接。 d) 所有worker进程在注册listenfd读事件前抢accept_mutex,抢到互斥锁的那个进程注册listenfd读事件,在读事件里调用accept接受该连接。 2) 处理请求:当一个worker进程在accept这个连接之后,就开始读取请求,解析请求,处理请求,产生数据后,再返回给客户端,最后才断开连接。 由上可以看出,一个请求完全由worker进程处理,并且只在一个worker进程中处理。 2.2 Nginx请求处理流程 Nginx工作进程会监听套接字上的事件(accept_mutex和kernel socketsharding),来决定什么时候开始工作。 事件是由新的连接初始化的,这些连接会被分配给状态机,Nginx中有三大类状态机:处理应用层的HTTP状态机、处理TCP/UDP的4层的传输层状态机和处理邮件的MAIL状态机,其中HTTP状态机最为常见。 在多种流量进入Nginx后,Nginx的三种状态机在Nginx解析出请求后,会动用线程池处理调用,将静态资源、反向代理、错误日志等信息分别导向不同的出口,比如fastcgi会导向PHP处理、html会导向nginx处理,并将处理请求日志记录到本地或远程服务器中。 处理流程图: 在nginx中我们指的是http请求,具体到nginx中的数据结构是ngx_http_request_t。ngx_http_request_t是对一个http请求的封装。 我们知道,一个http请求,包含请求行、请求头、请求体、响应行、响应头、响应体。 http请求是典型的请求-响应类型的的网络协议,而http是文件协议,所以我们在分析请求行与请求头,以及输出响应行与响应头,往往是一行一行的进行处理。 如果我们自己来写一个http服务器,通常在一个连接建立好后,客户端会发送请求过来。 然后我们读取一行数据,分析出请求行中包含的method、uri、http_version信息。 然后再一行一行处理请求头,并根据请求method与请求头的信息来决定是否有请求体以及请求体的长度,然后再去读取请求体。 得到请求后,我们处理请求产生需要输出的数据,然后再生成响应行,响应头以及响应体。 在将响应发送给客户端之后,一个完整的请求就处理完了。 当然这是最简单的webserver的处理方式,其实nginx也是这样做的,只是有一些小小的区别,比如,当请求头读取完成后,就开始进行请求的处理了。 nginx通过ngx_http_request_t来保存解析请求与输出响应相关的数据。 2.3 Nginx多进程IO模型 2.3.1 Nginx多进程模型 Nginx默认使用多进程的工作方式,相比较多线程的方式,有以下好处: 1) 首先,对于每个worker进程来说,独立的进程不需要加锁,所以省掉了锁带来的开销,同时在编程以及问题查找时,也会方便很多; 2) 其次,采用独立的进程,可以让进程之间相互不会影响,一个进程退出后,其它进程还在工作,服务也不会中断,master进程则很快启动新的worker进程; 3) 再次,为Nginx热部署提供了支持。在修改配置文件nginx.conf后,重新生成新的worker进程,新的worker进程会以新的配置处理请求,而老的worker进程,等把以前的请求处理完成以后,kill掉就可以。 2.3.2 Nginx异步非阻塞事件模型 异步非阻塞事件是怎么回事? 先看一个请求的完整过程,首先请求过来建立连接,然后再接收数据再发送数据,具体到系统层就是IO读写事件。当读写事件没有准备好,如果不采用非阻塞的方式,就得阻塞调用,阻塞调用会进入内核等待,导致CPU资源被其它进程占用。当并发请求越大时,等待的事件越多,CPU利用不上去,并发也上不去。因此Nginx使用非阻塞的事件模型,系统中事件模型有很多中,比如select/poll/kqueue/epoll等,Nginx采用epoll模型。 Epoll模型基于事件驱动机制,可以监控多个事件是否准备完毕,如果可以,就放入epoll队列,这个过程是异步的,worker进程只需要从epoll队列循环处理即可。 Epoll调用过程如下图所示: 事件驱动&异步非阻塞: 本质来说,事件驱动是一种思想(事实上它不仅仅局限于编程) ,事件驱动思想是实现 异步非阻塞特性 的一个重要手段。 对于web服务器来说,造成性能拉胯不支持高并发的常见原因就是由于使用了传统的I/O模型造成在内核没有可读/可写事件(或者说没有数据可供用户进程读写)时,用户线程 一直在等待(其他事情啥也干不了就是干等等待内核上的数据可读/可写),这样的话其实是一个线程(ps:线程在Linux系统也是进程)对应一个请求,请求是无限的,而线程是有限的从而也就形成了并发瓶颈。 而大佬们为了解决此类问题,运用了事件驱动思想来对传统I/O模型做个改造,即在客户端发起请求后,用户线程不再阻塞等待内核数据就绪,而是立即返回(可以去执行其他业务逻辑或者继续处理其他请求)。 当内核的I/O操作完成后,内核系统会向用户线程发送一个事件通知,用户线程才来处理这个读/写操作,之后拿到数据再做些其他业务后响应给客户端,从而完成一次客户端请求的处理。 事件驱动的I/O模型中,程序不必阻塞等待I/O操作的完成,也无需为每个请求创建一个线程,从而提高了系统的并发处理能力和响应速度。 事件驱动型的I/O模型通常也被被称为I/O多路复用,即这种模型可以在一个线程中,处理多个连接(复用就是指多个连接复用一个线程,多路也即所谓的 多个连接),通过这种方式避免了线程间切换的开销,同时也使得用户线程不再被阻塞,提高了系统的性能和可靠性。 nginx支持事件驱动是因为他利用了操作系统提供的I/O多路复用接口,如Linux系统中,常用的I/O多路复用接口有select/poll,epoll。 这些接口可以监视多个文件描述符的状态变化,当文件描述符可读或可写时,就会向用户线程发送一个事件通知。 用户线程通过事件处理机制(读取/写入数据)来处理这个事件,之后进行对应的业务逻辑完了进行响应。 简单一句话概括: 事件驱动机制就是指当有读/写/连接事件就绪时 再去做读/写/接受连接这些事情,而不是一直在那里傻傻的等,也正应了他的名词: 【事件驱动!】,基于事件驱动思想设计的多路复用I/O(如select/poll,epoll),相对于传统I/O模型,达到了异步非阻塞的效果! 既然提到了select/poll,epoll 那么我们就简单说一下(注意我这里是简单描述,后续有时间会对相关知识点从源码层面做个系统的整理和图解): select: 将已连接的 Socket 都放到一个文件描述符集合,然后用户态调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。 poll: poll函数的话其实和select大差不差,唯一区别可能就是socket列表的结构有所不同,不再受FD_SETSIZE的限制。这里就不多说了。 epoll: epoll在前边两者的基础上做了很大的优化,select/poll都需要遍历整个socket列表,当检测到传入的socket可读/可写时,则copy socket列表给用户空间,用户态仍然需要遍历(因为内核copy给用户态的是整个socket列表) ,而epoll则是通过红黑树结构将需要监控的socket插入到进去,然后当有socket可读时会通过回调机制来将其添加到可读列表中,然后内核将可读列表copy给用户态即可(据说此处使用了mmap这里我们不去验证探究,后续写相关文章时在深究吧),整个过程少了无效的遍历以及不用copy整个socket集合。 2.3.3 Nginx请求响应过程种的 I/O过程 1、用户态:应用程序,我们可以控制 2、内核态:操作系统层面,我们不容易去控制的 操作系统 ① 当用户发起 http 请求需要请求一个index.html 网页文件 ② 客户端请求与服务器端建立连接,建立连接后,会发送请求报文 ③ 服务端的网卡收到请求报文,会将该报文复制到内核空间,内核空间分析报文后交给对应的程序。nginx 分析该报文,将报文和自己的配置文件,一一比对,按照配置文件完成请求,分析后发现客户需要 index.html 文件 ④ 由于程序的权限问题,没有资格直接调用磁盘上的文件,程序会再将这个请求再次转发给内核,内核得到后请求去磁盘上找文件,找到文件后复制给程序 nginx ⑤ 程序会构建响应报文,构建好后再交给内核空间 ⑥ 内核空间得到响应报文后,再交给网卡发给客户 2.3.4 零拷贝技术 在传统的数据传输过程中,数据通常需要经过多次复制。 比如,当数据从磁盘读取到内存时,首先将数据读入内核缓冲区,然后再从内核缓冲区复制到用户空间的应用程序缓冲区。 零拷贝技术通过避免或减少数据在内存和设备之间的多次复制来提高效率。 具体做法包括直接内存访问(DMA)、文件映射(mmap)和发送文件(sendfile)等。 MMAP ( Memory Mapping ) mmap()系统调用使得进程之间通过映射同一个普通文件实现共享内存。普通文件被映射到进程地址空间后,进程可以向访问普通内存一样对文件进行访问。 mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。 直接内存访问(DMA) 文件映射(mmap) 发送文件(sendfile) Nginx 的零拷贝技术主要是指在网络 I/O 操作中,使用 sendfile 方法来提高性能,这样可以避免用户空间和内核空间之间的数据拷贝,从而提高系统的吞吐量。 零拷贝技术的核心函数是 sendfile(),它在 Linux 2.0 版本以上的操作系统中得到支持。当 Nginx 配置文件中启用了 sendfile 指令时,Nginx 会使用这个系统调用来输出静态文件。 以下是一个简单的 Nginx 配置示例,其中启用了 sendfile: server { listen 80; server_name localhost; location / { root /usr/share/nginx/html; index index.html index.htm; sendfile on; # 启用 sendfile 方法 }} 在这个配置中,当客户端请求静态文件时,Nginx 会直接将文件内容通过 sendfile() 系统调用发送给客户端,而不是将文件内容读取到用户空间的缓冲区中,再从用户空间的缓冲区拷贝到内核空间去发送。这样就减少了数据拷贝的次数和 CPU 的参与,从而提高了性能。 2.3.5 Nginx为什么不使用多线程? Apache: 创建多个进程或线程,而每个进程或线程都会为其分配 cpu 和内存(线程要比进程小的多,所以worker支持比perfork高的并发),并发过大会耗光服务器资源 Nginx: 采用单线程来异步非阻塞处理请求(管理员可以配置Nginx主进程的工作进程的数量)(epoll),不会为每个请求分配cpu和内存资源,节省了大量资源,同时也减少了大量的CPU的上下文切换。所以才使得Nginx支持更高的并发。 2.3.6 Nginx 是如何实现高并发的? 1.异步,非阻塞,使用了epoll 和大量的底层代码优化 如果一个server采用一个进程负责一个request的方式,那么进程数就是并发数。正常情况下,会有很多进程一直在等待中 而nginx采用一个master进程,多个woker进程的模式 master进程主要负责收集、分发请求。每当一个请求过来时,master就拉起一个worker进程负责处理这个请求。同时master进程也负责监控woker的状态,保证高可靠性 woker进程一般设置为跟cpu核心数一致。nginx的woker进程在同一时间可以处理的请求数只受内存限制,可以处理多个请求 Nginx 的异步非阻塞工作方式正把当中的等待时间利用起来了。在需要等待的时候,这些进程就空闲出来待命了,因此表现为少数几个进程就解决了大量的并发问题 2.同步和异步 同步:一个服务的完成需要依赖其他服务时,只有等待被依赖的服务完成后,才算完成,这是一种可靠的服务序列。要么成功都成功,失败都失败,服务的状态可以保持一致 异步:一个服务的完成需要依赖其他服务时,只通知其他依赖服务开始执行,而不需要等待被依赖的服务完成,此时该服务就算完成了。被依赖的服务是否最终完成无法确定,因此它是一个不可靠的服务序列 3.阻塞与非阻塞 阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务,函数只有在得到结果之后才会返回。 非阻塞:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回 3.Nginx功能模块和应用场景 3.1 Nginx功能模块说明 Nginx由内核和模块组成,其中内核在设计上非常简洁,完成的工作非常简单,仅仅通过查找配置文件将客户端请求映射到一个location block,而在这个location中所配置的每个指令将会启动不同的模块去完成相应的工作。 nginx 六大模块 核心模块:是 Nginx 服务器正常运行必不可少的模块,提供错误日志记录 、配置文件解析 、事件驱动机制 、进程管理等核心功能 标准HTTP模块:提供 HTTP 协议解析相关的功能,比如: 端口配置 、 网页编码设置 、 HTTP响应头设置 等等 可选HTTP模块:主要用于扩展标准的 HTTP 功能,让 Nginx 能处理一些特殊的服务,比如:Flash 多媒体传输 、解析 GeoIP 请求、 网络传输压缩 、 安全协议 SSL 支持等 邮件服务模块:主要用于支持 Nginx 的 邮件服务 ,包括对 POP3 协议、 IMAP 协议和 SMTP协议的支持 Stream服务模块: 实现反向代理功能,包括TCP协议代理 反向 第三方模块:是为了扩展 Nginx 服务器应用,完成开发者自定义功能,比如: Json 支持、 Lua 支持等 nginx高度模块化,但其模块早期不支持DSO机制;1.9.11 版本支持动态装载和卸载 3.1.1 Nginx模块分类 Nginx的模块从结构上分为核心模块、基础模块和第三方模块,其中用户根据自己的需要开发的模块都属于第三方模块: 核心模块:HTTP模块、EVENT模块和MAIL模块 基础模块:HTTP Access模块、HTTP FastCGI模块、HTTP Proxy模块和HTTP Rewrite模块; 第三方模块:HTTP Upstream Request Hash模块、Notice模块和HTTP Access Key模块。 3.1.2 Nginx模块功能 Nginx模块常规的HTTP请求和响应的过程如上图所示,Nginx模块从功能上分为以下三类: Handlers处理器模块:此类模块直接处理请求,并进行输出内容和修改headers信息等操作。Handlers处理器模块一般只能有一个。 Filters过滤器模块:此类模块主要对其他处理器模块输出的内容进行修改操作,最后由Nginx输出。 Proxies代理类模块:此类模块是Nginx的HTTP Upstream之类的模块,这些模块主要与后端一些服务比如FastCGI等进行交互,实现服务代理和负载均衡等功能。 Nginx本身处理的工作很少,当它接到一个HTTP请求时,通过查找配置文件将此次请求映射到一个location block,而此location中所配置的各个指令则会启动不同的模块去完成工作。 3.2 Nginx使用场景 首先,我们一般会将请求打到Nginx, 再把请求转发到我们的应用服务。 比如我们常用的php-fpm/golang程序或者tomcat,再由应用服务访问缓存,数据库等存储以提供基本的数据服务能力。 由于我们开发过程中要求应用程序开发效率较高,但其运行效率是较低的。 单个应用程序的qps,tps都是受限的,不足以支撑用户的请求量,那么为了提高整个服务的吞吐能力,就需要将多个应用程序组成一个集群来整体向外提供高可用服务。这样就会延伸出来2个需求,1.负载均衡,2.当有个别应用程序出问题的时候,需要做容灾。那么我们的反向代理就需要具备负载均衡的能力。 其次,Nginx一般处于边缘节点,离用户最近,我们可以将一些热点数据缓存在Nginx中,直接向用户提供访问,从而达到减少用户时延的效果。这也就衍生出了缓存功能。 第三,当应用程序的性能不及缓存,数据库的性能时,有一些接口我们可以由Nginx直接访问数据库,redis,第三方应用服务。如:使用Openresty,lua等。 第四,我们还可以将css, js, 小图片等静态资源直接放到Nginx服务中,没有必要再次请求应用服务。 3.2.1 静态文件服务 静态文件服务:Nginx在提供静态资源服务方面效率很高,可以快速的响应大量的静态请求,减轻其他动态服务器的负担,如CSS、JavaScript、Image、Audio和Video文件等。 以下是一个简单的配置示例,用于设置Nginx以服务静态文件: server { listen 80; server_name example.com; location / { root /usr/share/nginx/html; index index.html index.htm; try_files $uri $uri/ =404; }} 解释: listen 80; 表示Nginx监听80端口。 server_name Example Domain; 表示对应的域名。 location / { ... } 定义了一个处理所有请求的区块。 root /usr/share/nginx/html; 指定了静态文件存放的根目录。 index index.html index.htm; 指定了默认页面。 try_files $uri $uri/ =404; 尝试按顺序提供请求的文件,如果找不到文件则返回404错误。 确保/usr/share/nginx/html目录包含您的静态文件,并根据需要调整server_name和root指令。 3.2.2 缓存服务器 缓存服务器:Nginx可以缓存一些响应结果,降低后端服务器的负载,提高数据的访问速度,平衡访问压力等。 Nginx 可以用作缓存服务器,通过配置 ngx_http_proxy_module 和 ngx_http_cache_module 实现。以下是一个简单的配置示例,它将设置 Nginx 作为反向代理的同时开启缓存功能: http { proxy_cache_path /data/nginx/cache levels=1:2 keys_zone=my_cache:10m max_size=10g inactive=60m use_temp_path=off; server { listen 80; location / { proxy_pass http://upstream_server; proxy_cache my_cache; proxy_cache_valid 200 302 10m; proxy_cache_valid 404 1m; proxy_cache_use_stale error timeout updating http_500 http_502 http_503 http_504; } }} 解释: proxy_cache_path 定义了缓存的存储路径、缓存目录的层次结构、缓存键的区域、最大缓存大小以及缓存内容在多长时间没有被访问后应被视为不活动并可能被删除。 proxy_cache 指定使用名为 my_cache 的缓存。 proxy_cache_valid 设置了不同HTTP响应状态码的缓存有效期。 proxy_cache_use_stale 定义了在指定的错误情况下或缓存内容超时时是否可以使用过期的缓存数据。 确保替换 http://upstream_server 为你的后端服务器地址。这个配置假设 Nginx 已经安装了 ngx_http_proxy_module 和 ngx_http_cache_module。 3.2.3 SSL加速 SSL加速:Nginx 可以通过 HTTPS 访问加速,提高 HTTPS 访问的性能,减少SSL负载压力,保证数据的安全性。 Nginx SSL加速通常指的是通过优化SSL/TLS配置来提高性能,这可以包括禁用不必要的SSL/TLS协议版本、使用更快的加密算法、启用会话复用等。以下是一个基本的Nginx配置示例,展示了一些可以用来加速SSL的配置: server { listen 443 ssl; server_name yourdomain.com; ssl_certificate /path/to/your/certificate.pem; ssl_certificate_key /path/to/your/private.key; # 仅启用最安全的TLS协议版本 ssl_protocols TLSv1.2 TLSv1.3; # 仅使用安全的加密算法 ssl_ciphers 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384'; ssl_prefer_server_ciphers on; # 启用会话复用 ssl_session_cache shared:SSL:10m; ssl_session_timeout 10m; # 其他SSL配置和安全头配置 ... # 服务器的其他配置 location / { ... }} 这个配置示例中,我们启用了TLSv1.2和TLSv1.3协议,并且指定了支持的加密算法。我们也启用了SSL会话复用,这可以减少SSL握手时间,从而加速连接建立的过程。 请根据你的实际需求和服务器的安全策略来调整这些配置。如果你需要进一步优化SSL性能,可以考虑使用HTTP/2,或者进行SSL/TLS性能分析来找到瓶颈。 3.2.4 WebSocket服务 WebSocket:Nginx也支持WebSocket协议,可用于实时通信应用程序。 在Nginx中使用WebSocket,你需要确保Nginx版本支持WebSocket,一般来说,较新版本的Nginx已经支持WebSocket。 以下是一个配置示例,它将Nginx配置为代理WebSocket请求: http { map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 80; server_name example.com; location / { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } } upstream websocket_backend { server websocket_server_address:port; }} 在这个配置中,我们定义了一个map指令,它根据$http_upgrade变量的值设置$connection_upgrade变量。这样做是为了确保当Nginx收到一个包含Upgrade头部的HTTP请求时,它会将连接升级到Upgrade类型。 然后,在server块中,我们定义了监听端口和服务器名称。在location /块中,我们配置了Nginx来代理WebSocket请求。我们设置了代理的目标(upstream块中定义的服务器),并设置了必要的头部,以确保WebSocket连接可以正确建立。 确保替换example.com、websocket_server_address和port为你的实际域名和WebSocket服务器的IP地址和端口。 3.2.5 访问控制和安全 访问控制和安全:Nginx可以使用访问控制、基于IP地址的访问限制等来提高服务器的安全性,有效保护Web应用程序和服务器。 在Nginx中实现访问控制和安全性,可以通过配置文件来设置。 以下是一些常见的安全性配置示例: 禁止特定IP访问: location / { deny 192.168.1.1; allow all; } 使用Auth Basic进行访问认证: location / { auth_basic "Restricted Content"; auth_basic_user_file /etc/nginx/.htpasswd; } 设置SSL要求,强制使用HTTPS: server { listen 80; return 301 https://$host$request_uri; } server { listen 443 ssl; ssl_certificate /etc/nginx/ssl/nginx.crt; ssl_certificate_key /etc/nginx/ssl/nginx.key; # 其他配置... } 限制请求速率: http { limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s; server { location / { limit_req zone=mylimit burst=5; } } } 设置X-Frame-Options防止点击劫持: add_header X-Frame-Options "SAMEORIGIN"; 设置Content Security Policy(CSP): add_header Content-Security-Policy "default-src 'self' https:; script-src 'self''unsafe-inline' https: 'unsafe-eval';"; 禁止浏览器缓存: add_header Cache-Control "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0"; 这些配置可以根据具体需求进行组合和调整,以增强Nginx服务器的安全性和访问控制。 3.2.6 Api网关服务(综合) Nginx可以作为API网关,通过配置来处理API路由、负载均衡、请求限流、缓存、SSL/TLS终结等。以下是一个简单的Nginx配置示例,用于将API请求代理到后端服务: http { upstream userapi { server user1.example.com; server user2.example.com; } upstream orderapi { server order1.example.com; server order2.example.com; } server { listen 80; location /api/user/ { proxy_pass http://userapi; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location /api/order/ { proxy_pass http://orderapi; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location / { return 404; } }} 在这个配置中: upstream 块定义了一个后端服务器群组,可以通过server指令添加更多后端服务器。 server 块中的 listen 指令设置Nginx监听80端口。 location /api/ 块捕获所有到达/api/路径的请求,并使用proxy_pass指令将请求转发到backend群组。 proxy_set_header 指令用于设置转发给后端服务器的HTTP头部,以确保后端服务可以获取到正确的原始请求信息。 第二个 location / 块捕获所有其他请求,并返回404状态码。 这个配置可以根据具体需求进行扩展,比如添加缓存、请求限流、服务发现、TLS/SSL配置等功能。 3.2.7 代理:正向代理和反向代理 代理服务器一般指代局域网内部的机器通过代理服务发送请求到互联网上的服务器,代理服务器一般作用于客户端。代理服务器是介于客户端和Web服务器之间的服务器,客户端首先与代理服务器创建连接,然后根据代理服务器所使用的代理协议,请求对目标服务器创建连接、或则获得目标服务器的指定资源。 正向代理:为了从原始服务器取的内容,客户端向代理发送一个请求并指定目标(Web服务器),然后代理向Web服务器转交请求并将获得的内容返回给客户端,客户端必须要进行一些特别的设置才能使用正向代理。 像VPN就是正向代理,一般在浏览器中配置代理服务器的相关信息。 正向代理中代理的对象是客户端,代理服务器和客户端属于同一个LAN,对服务器端来说是透明的。 反向代理:客户端发送请求到代理服务器,由代理服务器转发给相应的Web服务器进行处理,最终返回结果给客户端。 像Nginx就是反向代理服务器软件,对客户端暴露的其实是一个VIP,不是真实的Web服务器的IP。 反向代理的是对象是Web服务器端,代理服务器和Web服务端属于同一个LAN,对客户端来说是透明的。 使用反向代理的好处是客户端不需要任何配置就可以访问,对外暴露的是代理服务器的地址隐藏了真实服务器的地址,客户端只需要把请求发送给代理服务器,由代理服务器去选择后端的Web服务器,获取到数据后再返回给客户端。 Nginx配置正向代理的示例(典型的用香港机器访问脸书和谷歌): server { listen 3128; location / { proxy_pass http://$http_host$request_uri; proxy_set_header Host $http_host; proxy_buffers 256 4k; client_max_body_size 10m; client_body_buffer_size 128k; proxy_connect_timeout 300; proxy_send_timeout 300; proxy_read_timeout 300; send_timeout 300; }} Nginx配置反向代理的示例: http { upstream backend { server backend1.example.com; server backend2.example.com; } server { listen 80; location / { proxy_pass http://backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } }} 3.2.8 负载均衡 负载均衡建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。负载均衡(Load Balance)其意思就是分摊到多个操作单元上进行执行,例如Web服务器、FTP服务器、企业关键应用服务器和其它关键任务服务器等,从而共同完成工作任务。 简而言之,单个Web应用服务器不能承受日益增长的并发量请求,因此需要不断扩展web服务器来支撑高并发请求,根据不同的负载均衡策略将请求分配到各个服务器上。 Nginx支持6种不同的负载均衡策略: round_robin轮询:轮询(默认策略),每个请求按时间顺序依次分配至不同的后端服务器。每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉能够被自动剔除。轮询算法适合服务器配置相当,无状态且短平快的服务使用。 weight权重(基于轮询):指定轮询的几率,weight和后端的访问比例成比例,weight权重越高比例越大。通常用于后端服务器配置不均的情况。 ip_hash:上面两种算法存在一个问题是就是无法做到会话保持,当用户登录到服务器上后,第二次请求的时候会被定位到服务器集群中的某一个,那么已经登录到某个服务器上的用户会重新定位到另一台,之前的登录信息会丢失。ip_hash算法可以解决这个问题,当用户再次访问请求时,会通过hash算法自动定位到已经登录的服务器上,这样每个客户端可以固定在某个web服务器上,解决客户端session的问题。 hash $request_uri:uri哈希,根据请求的URI进行哈希计算,相同的URI将会被分配到相同的服务器。 hash $remote_addr:远程地址哈希,根据客户端IP地址进行哈希计算,相同的IP地址将会被分配到相同的服务器。 random:随机,每个请求随机分配至不同的后端服务器。 3.2.9 动静分离 动静分离技术是让动态网站里的动态网页根据一定规则把不变的资源和经常变的资源区分开来,将静态文件放在一个单独的web服务器上,加快解析速度,降低原来单个服务器的压力。在Nginx的配置中,在server{}段中加入带正则匹配的location来指定匹配项针对PHP的动静分离:静态页面交给Nginx处理,动态页面交给PHP-FPM模块或Apache处理。 Nginx 动静分离是一种常见的web服务器优化方法,通过将网站的静态内容(例如图片、CSS、JS等)与动态内容(例如PHP、Python脚本处理的内容)分开存储,可以有效提升网站的性能。 以下是一个简单的Nginx配置示例,实现动静分离: server { listen 80; server_name example.com; # 静态文件目录 location /static/ { alias /path/to/your/static/files/; expires 30d; add_header Cache-Control "public"; try_files $uri $uri/ =404; } # 动态内容处理 location / { include uwsgi_params; uwsgi_pass unix:/path/to/your/uwsgi/socket.sock; uwsgi_param UWSGI_SCRIPT your_app.wsgi; uwsgi_param UWSGI_CHDIR /path/to/your/project; client_max_body_size 32M; }} 在这个配置中: 静态文件位于 /path/to/your/static/files/ 目录下,并且通过 location /static/ 指定。 对于静态文件,设置了缓存时间为30天,并添加了Cache-Control头,以指示客户端和代理服务器缓存文件。 所有其他请求都被转发到uWSGI服务器(通过UNIX socket),以处理Python/Django等应用的动态内容。 确保替换 /path/to/your/、example.com、your_app.wsgi 和项目路径为你的实际路径和应用信息。
内核开发比用户空间开发更难的一个因素就是内核调试艰难。内核错误往往会导致系统宕机,很难保留出错时的现场。调试内核的关键在于你的对内核的深刻理解。 一 调试前的准备 在调试一个bug之前,我们所要做的准备工作有: 有一个被确认的bug。 包含这个bug的内核版本号,需要分析出这个bug在哪一个版本被引入,这个对于解决问题有极大的帮助。可以采用二分查找法来逐步锁定bug引入版本号。 对内核代码理解越深刻越好,同时还需要一点点运气。 该bug可以复现。如果能够找到复现规律,那么离找到问题的原因就不远了。 最小化系统。把可能产生bug的因素逐一排除掉。 二 内核中的bug 内核中的bug也是多种多样的。它们的产生有无数的原因,同时表象也变化多端。从隐藏在源代码中的错误到展现在目击者面前的bug,其发作往往是一系列连锁反应的事件才可能出发的。虽然内核调试有一定的困难,但是通过你的努力和理解,说不定你会喜欢上这样的挑战。 三 内核调试配置选项 学习编写驱动程序要构建安装自己的内核(标准主线内核)。最重要的原因之一是:内核开发者已经建立了多项用于调试的功能。但是由于这些功能会造成额外的输出,并导致能下降,因此发行版厂商通常会禁止发行版内核中的调试功能。 1 内核配置 为了实现内核调试,在内核配置上增加了几项: Kernel hacking ---> [*] Magic SysRq key [*] Kernel debugging [*] Debug slab memory allocations [*] Spinlock and rw-lock debugging: basic checks [*] Spinlock debugging: sleep-inside-spinlock checking [*] Compile the kernel with debug info Device Drivers ---> Generic Driver Options ---> [*] Driver Core verbose debug messages General setup ---> [*] Configure standard kernel features (for small systems) ---> [*] Load all symbols for debugging/ksymoops 启用选项例如: slab layer debugging(slab层调试选项) high-memory debugging(高端内存调试选项) I/O mapping debugging(I/O映射调试选项) spin-lock debugging(自旋锁调试选项) stack-overflow checking(栈溢出检查选项) sleep-inside-spinlock checking(自旋锁内睡眠选项) 2 调试原子操作 从内核2.5开发,为了检查各类由原子操作引发的问题,内核提供了极佳的工具。 内核提供了一个原子操作计数器,它可以配置成,一旦在原子操作过程中,进城进入睡眠或者做了一些可能引起睡眠的操作,就打印警告信息并提供追踪线索。 所以,包括在使用锁的时候调用schedule(),正使用锁的时候以阻塞方式请求分配内存等,各种潜在的bug都能够被探测到。 下面这些选项可以最大限度地利用该特性: CONFIG_PREEMPT = y CONFIG_DEBUG_KERNEL = y CONFIG_KLLSYMS = y CONFIG_SPINLOCK_SLEEP = y 四 引发bug并打印信息 1 BUG()和BUG_ON() 一些内核调用可以用来方便标记bug,提供断言并输出信息。最常用的两个是BUG()和BUG_ON()。 定义在中: #ifndef HAVE_ARCH_BUG #define BUG() do { printk("BUG: failure at %s:%d/%s()! ", __FILE__, __LINE__, __FUNCTION__); panic("BUG!"); /* 引发更严重的错误,不但打印错误消息,而且整个系统业会挂起 */ } while (0) #endif #ifndef HAVE_ARCH_BUG_ON #define BUG_ON(condition) do { if (unlikely(condition)) BUG(); } while(0) #endif 当调用这两个宏的时候,它们会引发OOPS,导致栈的回溯和错误消息的打印。 ※ 可以把这两个调用当作断言使用,如:BUG_ON(bad_thing); 2.WARN(x) 和 WARN_ON(x) 而WARN_ON则是调用dump_stack,打印堆栈信息,不会OOPS 定义在中: #ifndef __WARN_TAINT#ifndef __ASSEMBLY__extern void warn_slowpath_fmt(const char *file, const int line, const char *fmt, ...) __attribute__((format(printf, 3, 4)));extern void warn_slowpath_fmt_taint(const char *file, const int line, unsigned taint, const char *fmt, ...) __attribute__((format(printf, 4, 5)));extern void warn_slowpath_null(const char *file, const int line);#define WANT_WARN_ON_SLOWPATH#endif#define __WARN() warn_slowpath_null(__FILE__, __LINE__)#define __WARN_printf(arg...) warn_slowpath_fmt(__FILE__, __LINE__, arg)#define __WARN_printf_taint(taint, arg...) \ warn_slowpath_fmt_taint(__FILE__, __LINE__, taint, arg)#else#define __WARN() __WARN_TAINT(TAINT_WARN)#define __WARN_printf(arg...) do { printk(arg); __WARN(); } while (0)#define __WARN_printf_taint(taint, arg...) \ do { printk(arg); __WARN_TAINT(taint); } while (0)#endif #ifndef WARN_ON#define WARN_ON(condition) ({ \ int __ret_warn_on = !!(condition); \ if (unlikely(__ret_warn_on)) \ __WARN(); \ unlikely(__ret_warn_on); \})#endif #ifndef WARN#define WARN(condition, format...) ({ \ int __ret_warn_on = !!(condition); \ if (unlikely(__ret_warn_on)) \ __WARN_printf(format); \ unlikely(__ret_warn_on); \})#endif 3 dump_stack() 有些时候,只需要在终端上打印一下栈的回溯信息来帮助你调试。这时可以使用dump_stack()。这个函数只在终端上打印寄存器上下文和函数的跟踪线索。 if (!debug_check) { printk(KERN_DEBUG “provide some information…/n”); dump_stack(); } 五 printk() 内核提供的格式化打印函数。 1 printk函数的健壮性 健壮性是printk最容易被接受的一个特质,几乎在任何地方,任何时候内核都可以调用它(中断上下文、进程上下文、持有锁时、多处理器处理时等)。 2 printk函数脆弱之处 在系统启动过程中,终端初始化之前,在某些地方是不能调用的。如果真的需要调试系统启动过程最开始的地方,有以下方法可以使用: 使用串口调试,将调试信息输出到其他终端设备。 使用early_printk(),该函数在系统启动初期就有打印能力。但它只支持部分硬件体系。 3 LOG等级 printk和printf一个主要的区别就是前者可以指定一个LOG等级。内核根据这个等级来判断是否在终端上打印消息。内核把比指定等级高的所有消息显示在终端。 可以使用下面的方式指定一个LOG级别: printk(KERN_CRIT “Hello, world!\n”); 注意,第一个参数并不一个真正的参数,因为其中没有用于分隔级别(KERN_CRIT)和格式字符的逗号(,)。KERN_CRIT本身只是一个普通的字符串(事实上,它表示的是字符串 "";表 1 列出了完整的日志级别清单)。作为预处理程序的一部分,C 会自动地使用一个名为 字符串串联 的功能将这两个字符串组合在一起。组合的结果是将日志级别和用户指定的格式字符串包含在一个字符串中。 内核使用这个指定LOG级别与当前终端LOG等级console_loglevel来决定是不是向终端打印。 下面是可使用的LOG等级: #define KERN_EMERG "" /* system is unusable */#define KERN_ALERT "" /* action must be taken immediately */ #define KERN_CRIT "" /* critical conditions */#define KERN_ERR "" /* error conditions */#define KERN_WARNING "" /* warning conditions */#define KERN_NOTICE "" /* normal but significant condition */#define KERN_INFO "" /* informational */#define KERN_DEBUG "" /* debug-level messages */#define KERN_DEFAULT "" /* Use the default kernel loglevel */ 注意,如果调用者未将日志级别提供给 printk,那么系统就会使用默认值KERN_WARNING ""(表示只有KERN_WARNING 级别以上的日志消息会被记录)。由于默认值存在变化,所以在使用时最好指定LOG级别。有LOG级别的一个好处就是我们可以选择性的输出LOG。比如平时我们只需要打印KERN_WARNING级别以上的关键性LOG,但是调试的时候,我们可以选择打印KERN_DEBUG等以上的详细LOG。而这些都不需要我们修改代码,只需要通过命令修改默认日志输出级别: mtj@ubuntu :~$ cat /proc/sys/kernel/printk4 4 1 7mtj@ubuntu :~$ cat /proc/sys/kernel/printk_delay0mtj@ubuntu :~$ cat /proc/sys/kernel/printk_ratelimit5mtj@ubuntu :~$ cat /proc/sys/kernel/printk_ratelimit_burst10 第一项定义了 printk API 当前使用的日志级别。这些日志级别表示了控制台的日志级别、默认消息日志级别、最小控制台日志级别和默认控制台日志级别。printk_delay 值表示的是 printk 消息之间的延迟毫秒数(用于提高某些场景的可读性)。注意,这里它的值为 0,而它是不可以通过 /proc 设置的。printk_ratelimit 定义了消息之间允许的最小时间间隔(当前定义为每 5 秒内的某个内核消息数)。消息数量是由 printk_ratelimit_burst 定义的(当前定义为 10)。如果您拥有一个非正式内核而又使用有带宽限制的控制台设备(如通过串口), 那么这非常有用。注意,在内核中,速度限制是由调用者控制的,而不是在printk 中实现的。如果一个 printk 用户要求进行速度限制,那么该用户就需要调用printk_ratelimit 函数。 4 记录缓冲区 内核消息都被保存在一个LOG_BUF_LEN大小的环形队列中。 关于LOG_BUF_LEN定义: #define __LOG_BUF_LEN (1 << CONFIG_LOG_BUF_SHIFT) ※ 变量CONFIG_LOG_BUF_SHIFT在内核编译时由配置文件定义,对于i386平台,其值定义如下(在linux26/arch/i386/defconfig中): CONFIG_LOG_BUF_SHIFT=18 记录缓冲区操作: ① 消息被读出到用户空间时,此消息就会从环形队列中删除。 ② 当消息缓冲区满时,如果再有printk()调用时,新消息将覆盖队列中的老消息。 ③ 在读写环形队列时,同步问题很容易得到解决。 ※ 这个纪录缓冲区之所以称为环形,是因为它的读写都是按照环形队列的方式进行操作的。 5 syslogd/klogd 在标准的Linux系统上,用户空间的守护进程klogd从纪录缓冲区中获取内核消息,再通过syslogd守护进程把这些消息保存在系统日志文件中。klogd进程既可以从/proc/kmsg文件中,也可以通过syslog()系统调用读取这些消息。默认情况下,它选择读取/proc方式实现。klogd守护进程在消息缓冲区有新的消息之前,一直处于阻塞状态。一旦有新的内核消息,klogd被唤醒,读出内核消息并进行处理。默认情况下,处理例程就是把内核消息传给syslogd守护进程。syslogd守护进程一般把接收到的消息写入/var/log/messages文件中。不过,还是可以通过/etc/syslog.conf文件来进行配置,可以选择其他的输出文件。 6 dmesg dmesg 命令也可用于打印和控制内核环缓冲区。这个命令使用 klogctl 系统调用来读取内核环缓冲区,并将它转发到标准输出(stdout)。这个命令也可以用来清除内核环缓冲区(使用 -c 选项),设置控制台日志级别(-n 选项),以及定义用于读取内核日志消息的缓冲区大小(-s 选项)。注意,如果没有指定缓冲区大小,那么 dmesg 会使用 klogctl 的SYSLOG_ACTION_SIZE_BUFFER 操作确定缓冲区大小。 7 注意 a) 虽然printk很健壮,但是看了源码你就知道,这个函数的效率很低:做字符拷贝时一次只拷贝一个字节,且去调用console输出可能还产生中断。所以如果你的驱动在功能调试完成以后做性能测试或者发布的时候千万记得尽量减少printk输出,做到仅在出错时输出少量信息。否则往console输出无用信息影响性能。 b) printk的临时缓存printk_buf只有1K,所有一次printk函数只能记录<1K的信息到log buffer,并且printk使用的“ringbuffer”. 8 内核printk和日志系统的总体结构 9 动态调试 动态调试是通过动态的开启和禁止某些内核代码来获取额外的内核信息。 首先内核选项CONFIG_DYNAMIC_DEBUG应该被设置。所有通过pr_debug()/dev_debug()打印的信息都可以动态的显示或不显示。 可以通过简单的查询语句来筛选需要显示的信息。 -源文件名 -函数名 -行号(包括指定范围的行号) -模块名 -格式化字符串 将要打印信息的格式写入/dynamic_debug/control中。 nullarbor:~ # echo 'file svcsock.c line 1603 +p' >/dynamic_debug/control 六 内存调试工具 1 MEMWATCH MEMWATCH 由 Johan Lindh 编写,是一个开放源代码 C 语言内存错误检测工具,您可以自己下载它。只要在代码中添加一个头文件并在 gcc 语句中定义了 MEMWATCH 之后,您就可以跟踪程序中的内存泄漏和错误了。MEMWATCH 支持ANSIC,它提供结果日志纪录,能检测双重释放(double-free)、错误释放(erroneous free)、没有释放的内存(unfreedmemory)、溢出和下溢等等。 内存样本(test1.c) #include #include #include "memwatch.h"int main(void){ char *ptr1; char *ptr2; ptr1 = malloc(512); ptr2 = malloc(512); ptr2 = ptr1; free(ptr2); free(ptr1);} 内存样本(test1.c)中的代码将分配两个 512 字节的内存块,然后指向第一个内存块的指针被设定为指向第二个内存块。结果,第二个内存块的地址丢失,从而产生了内存泄漏。 现在我们编译内存样本(test1.c) 的 memwatch.c。下面是一个 makefile 示例: test1 gcc -DMEMWATCH -DMW_STDIO test1.c memwatchc -o test1 当您运行 test1 程序后,它会生成一个关于泄漏的内存的报告。下面展示了示例 memwatch.log 输出文件。 test1 memwatch.log 文件 MEMWATCH 2.67 Copyright (C) 1992-1999 Johan Lindh...double-free: <4> test1.c(15), 0x80517b4 was freed from test1.c(14)...unfreed: <2> test1.c(11), 512 bytes at 0x80519e4{FE FE FE FE FE FE FE FE FE FE FE FE ..............}Memory usage statistics (global): N)umber of allocations made: 2 L)argest memory usage : 1024 T)otal of all alloc() calls: 1024 U)nfreed bytes totals : 512 MEMWATCH 为您显示真正导致问题的行。如果您释放一个已经释放过的指针,它会告诉您。对于没有释放的内存也一样。日志结尾部分显示统计信息,包括泄漏了多少内存,使用了多少内存,以及总共分配了多少内存。 2 YAMD YAMD 软件包由 Nate Eldredge 编写,可以查找 C 和 C++ 中动态的、与内存分配有关的问题。在撰写本文时,YAMD 的最新版本为 0.32。请下载 yamd-0.32.tar.gz。执行 make 命令来构建程序;然后执行 make install 命令安装程序并设置工具。 一旦您下载了 YAMD 之后,请在 test1.c 上使用它。请删除 #include memwatch.h 并对 makefile 进行如下小小的修改: 使用 YAMD 的 test1 gcc -g test1.c -o test1 展示了来自 test1 上的 YAMD 的输出,使用 YAMD 的 test1 输出 YAMD version 0.32Executable: /usr/src/test/yamd-0.32/test1...INFO: Normal allocation of this blockAddress 0x40025e00, size 512...INFO: Normal allocation of this blockAddress 0x40028e00, size 512...INFO: Normal deallocation of this blockAddress 0x40025e00, size 512...ERROR: Multiple freeing Atfree of pointer already freedAddress 0x40025e00, size 512...WARNING: Memory leakAddress 0x40028e00, size 512WARNING: Total memory leaks:1 unfreed allocations totaling 512 bytes*** Finished at Tue ... 10:07:15 2002Allocated a grand total of 1024 bytes 2 allocationsAverage of 512 bytes per allocationMax bytes allocated at one time: 102424 K alloced internally / 12 K mapped now / 8 K maxVirtual program size is 1416 KEnd. YAMD 显示我们已经释放了内存,而且存在内存泄漏。让我们在另一个样本程序上试试 YAMD。 内存代码(test2.c) #include #include int main(void){ char *ptr1; char *ptr2; char *chptr; int i = 1; ptr1 = malloc(512); ptr2 = malloc(512); chptr = (char *)malloc(512); for (i; i <= 512; i++) { chptr[i] = 'S'; } ptr2 = ptr1; free(ptr2); free(ptr1); free(chptr);} 您可以使用下面的命令来启动 YAMD: ./run-yamd /usr/src/test/test2/test2 显示了在样本程序 test2 上使用 YAMD 得到的输出。YAMD 告诉我们在 for 循环中有“越界(out-of-bounds)”的情况,使用 YAMD 的 test2 输出 Running /usr/src/test/test2/test2Temp output to /tmp/yamd-out.1243*********./run-yamd: line 101: 1248 Segmentation fault (core dumped)YAMD version 0.32Starting run: /usr/src/test/test2/test2Executable: /usr/src/test/test2/test2Virtual program size is 1380 K...INFO: Normal allocation of this blockAddress 0x40025e00, size 512...INFO: Normal allocation of this blockAddress 0x40028e00, size 512...INFO: Normal allocation of this blockAddress 0x4002be00, size 512ERROR: Crash...Tried to write address 0x4002c000Seems to be part of this block:Address 0x4002be00, size 512...Address in question is at offset 512 (out of bounds)Will dump core after checking heap.Done. MEMWATCH 和 YAMD 都是很有用的调试工具,它们的使用方法有所不同。对于 MEMWATCH,您需要添加包含文件memwatch.h 并打开两个编译时间标记。对于链接(link)语句,YAMD 只需要 -g 选项。 3 Electric Fence 多数 Linux 分发版包含一个 Electric Fence 包,不过您也可以选择下载它。Electric Fence 是一个由 Bruce Perens 编写的malloc()调试库。它就在您分配内存后分配受保护的内存。如果存在 fencepost 错误(超过数组末尾运行),程序就会产生保护错误,并立即结束。通过结合 Electric Fence 和 gdb,您可以精确地跟踪到哪一行试图访问受保护内存。ElectricFence 的另一个功能就是能够检测内存泄漏。 七 strace strace 命令是一种强大的工具,它能够显示所有由用户空间程序发出的系统调用。strace 显示这些调用的参数并返回符号形式的值。strace 从内核接收信息,而且不需要以任何特殊的方式来构建内核。将跟踪信息发送到应用程序及内核开发者都很有用。在下面代码中,分区的一种格式有错误,显示了 strace 的开头部分,内容是关于调出创建文件系统操作(mkfs )的。strace 确定哪个调用导致问题出现。 mkfs 上 strace 的开头部分 execve("/sbin/mkfs.jfs", ["mkfs.jfs", "-f", "/dev/test1"], &...open("/dev/test1", O_RDWR|O_LARGEFILE) = 4stat64("/dev/test1", {st_mode=&, st_rdev=makedev(63, 255), ...}) = 0ioctl(4, 0x40041271, 0xbfffe128) = -1 EINVAL (Invalid argument)write(2, "mkfs.jfs: warning - cannot setb" ..., 98mkfs.jfs: warning -cannot set blocksize on block device /dev/test1: Invalid argument ) = 98stat64("/dev/test1", {st_mode=&, st_rdev=makedev(63, 255), ...}) = 0open("/dev/test1", O_RDONLY|O_LARGEFILE) = 5ioctl(5, 0x80041272, 0xbfffe124) = -1 EINVAL (Invalid argument)write(2, "mkfs.jfs: can\'t determine device"..., ..._exit(1) = ? 显示 ioctl 调用导致用来格式化分区的 mkfs 程序失败。ioctl BLKGETSIZE64 失败。( BLKGET-SIZE64 在调用 ioctl的源代码中定义。) BLKGETSIZE64 ioctl 将被添加到 Linux 中所有的设备,而在这里,逻辑卷管理器还不支持它。因此,如果BLKGETSIZE64 ioctl 调用失败,mkfs 代码将改为调用较早的 ioctl 调用;这使得 mkfs 适用于逻辑卷管理器。 八 OOPS OOPS(也称 Panic)消息包含系统错误的细节,如 CPU 寄存器的内容等。是内核告知用户有不幸发生的最常用的方式。 内核只能发布OOPS,这个过程包括向终端上输出错误消息,输出寄存器保存的信息,并输出可供跟踪的回溯线索。通常,发送完OOPS之后,内核会处于一种不稳定的状态。 OOPS的产生有很多可能原因,其中包括内存访问越界或非法的指令等。 ※ 作为内核的开发者,必定将会经常处理OOPS。 ※ OOPS中包含的重要信息,对所有体系结构的机器都是完全相同的:寄存器上下文和回溯线索(回溯线索显示了导致错误发生的函数调用链)。 1 ksymoops 在 Linux 中,调试系统崩溃的传统方法是分析在发生崩溃时发送到系统控制台的 Oops 消息。一旦您掌握了细节,就可以将消息发送到 ksymoops 实用程序,它将试图将代码转换为指令并将堆栈值映射到内核符号。 ※ 如:回溯线索中的地址,会通过ksymoops转化成名称可见的函数名。 ksymoops需要几项内容:Oops 消息输出、来自正在运行的内核的 System.map 文件,还有 /proc/ksyms、vmlinux和/proc/modules。 关于如何使用 ksymoops,内核源代码 /usr/src/linux/Documentation/oops-tracing.txt 中或 ksymoops 手册页上有完整的说明可以参考。Ksymoops 反汇编代码部分,指出发生错误的指令,并显示一个跟踪部分表明代码如何被调用。 首先,将 Oops 消息保存在一个文件中以便通过 ksymoops 实用程序运行它。下面显示了由安装 JFS 文件系统的 mount命令创建的 Oops 消息。 ksymoops 处理后的 Oops 消息 ksymoops 2.4.0 on i686 2.4.17. Options used... 15:59:37 sfb1 kernel: Unable to handle kernel NULL pointer dereference atvirtual address 0000000... 15:59:37 sfb1 kernel: c01588fc... 15:59:37 sfb1 kernel: *pde = 0000000... 15:59:37 sfb1 kernel: Oops: 0000... 15:59:37 sfb1 kernel: CPU: 0... 15:59:37 sfb1 kernel: EIP: 0010:[jfs_mount+60/704]... 15:59:37 sfb1 kernel: Call Trace: [jfs_read_super+287/688] [get_sb_bdev+563/736] [do_kern_mount+189/336] [do_add_mount+35/208][do_page_fault+0/1264]... 15:59:37 sfb1 kernel: Call Trace: []...... 15:59:37 sfb1 kernel: [
求平均值的方法:首先新建一个python文件;然后初始化sum总和的值;接着循环输入要计算平均数的数,并计算总和sum的值;最后利用“总和/数量”的公式计算出平均数即可。65a220812a55465d245f518b81fb68d2.png本文操作...
本文介绍ROS机器人操作系统(Robot Operating System)的实现原理,从最底层分析ROS代码是如何实现的。 1、序列化 把通信的内容(也就是消息message)序列化是通信的基础,所以我们先研究序列化。 尽管笔者从事机器人学习和研发很长时间了,但是在研究ROS的过程中,“序列化”这个词还是这辈子第一次听到。 所以可想而知很多人在看到“把一个消息序列化”这样的描述时是如何一脸懵逼。 但其实序列化是一个比较常见的概念,你虽然不知道它但一定接触过它。 下面我们先介绍“序列化”的一些常识,然后解释ROS里的序列化是怎么做的? 1.1 什么是序列化? “序列化”(Serialization )的意思是将一个对象转化为字节流。 这里说的对象可以理解为“面向对象”里的那个对象,具体的就是存储在内存中的对象数据。 与之相反的过程是“反序列化”(Deserialization )。 虽然挂着机器人的羊头,但是后面的介绍全部是计算机知识,跟机器人一丁点关系都没有,序列化就是一个纯粹的计算机概念。 序列化的英文Serialize就有把一个东西变成一串连续的东西之意。 形象的描述,数据对象是一团面,序列化就是将面团拉成一根面条,反序列化就将面条捏回面团。 另一个形象的类比是我们在对话或者打电话时,一个人的思想转换成一维的语音,然后在另一个人的头脑里重新变成结构化的思想,这也是一种序列化。 面对序列化,很多人心中可能会有很多疑问。 首先,为什么要序列化?或者更具体的说,既然对象的信息本来就是以字节的形式储存在内存中,那为什么要多此一举把一些字节数据转换成另一种形式的、一维的、连续的字节数据呢? 如果我们的程序在内存中存储了一个数字,比如25。那要怎么传递25这个数字给别的程序节点或者把这个数字永久存储起来呢? 很简单,直接传递25这个数字(的字节表示,即0X19,当然最终会变成二进制表示11001以高低电平传输存储)或者直接把这个数字(的字节表示)写进硬盘里即可。 所以,对于本来就是连续的、一维的、一连串的数据(例如字符串),序列化并不需要做太多东西,其本质是就是由内存向其它地方拷贝数据而已。 所以,如果你在一个序列化库里看到memcpy函数不用觉得奇怪,因为你知道序列化最底层不过就是在操作内存数据而已(还有些库使用了流的ostream.rdbuf()->sputn函数)。 可是实际程序操作的对象很少是这么简单的形式,大多数时候我们面对的是包含不同数据类型(int、double、string)的复杂数据结构(比如vector、list),它们很可能在内存中是不连续存储的而是分散在各处。比如ROS的很多消息都包含向量。 数据中还有各种指针和引用。而且,如果数据要在运行于不同架构的计算机之上的、由不同编程语言所编写的节点程序之间传递,那问题就更复杂了,它们的字节顺序endianness规定有可能不一样,基本数据类型(比如int)的长度也不一样(有的int是4个字节、有的是8个字节)。 这些都不是通过简单地、原封不动地复制粘贴原始数据就能解决的。这时候就需要序列化和反序列化了。 所以在程序之间需要通信时(ROS恰好就是这种情况),或者希望保存程序的中间运算结果时,序列化就登场了。 另外,在某种程度上,序列化还起到统一标准的作用。 我们把被序列化的东西叫object(对象),它可以是任意的数据结构或者对象:结构体、数组、类的实例等等。 把序列化后得到的东西叫archive,它既可以是人类可读的文本形式,也可以是二进制形式。 前者比如JSON和XML,这两个是网络应用里最常用的序列化格式,通过记事本就能打开阅读; 后者就是原始的二进制文件,比如后缀名是bin的文件,人类是没办法直接阅读一堆的0101或者0XC9D23E72的。 序列化算是一个比较常用的功能,所以大多数编程语言(比如C++、Python、Java等)都会附带用于序列化的库,不需要你再去造轮子。 以C++为例,虽然标准STL库没有提供序列化功能,但是第三方库Boost提供了[ 2 ]谷歌的protobuf也是一个序列化库,还有Fast-CDR,以及不太知名的Cereal,Java自带序列化函数,python可以使用第三方的pickle模块实现。 总之,序列化没有什么神秘的,用户可以看看这些开源的序列化库代码,或者自己写个小程序试试简单数据的序列化,例如这个例子,或者这个,有助于更好地理解ROS中的实现。 1.2 ROS中的序列化实现 理解了序列化,再回到ROS。我们发现,ROS没有采用第三方的序列化工具,而是选择自己实现,代码在roscpp_core项目下的roscpp_serialization中,见下图。这个功能涉及的代码量不是很多。 为什么ROS不使用现成的序列化工具或者库呢?可能ROS诞生的时候(2007年),有些序列化库可能还不存在(protobuf诞生于2008年),更有可能是ROS的创造者认为当时没有合适的工具。 1.2.1 serialization.h 核心的函数都在serialization.h里,简而言之,里面使用了C语言标准库的memcpy函数把消息拷贝到流中。 下面来看一下具体的实现。 序列化功能的特点是要处理很多种数据类型,针对每种具体的类型都要实现相应的序列化函数。 为了尽量减少代码量,ROS使用了模板的概念,所以代码里有一堆的template。 从后往前梳理,先看Stream这个结构体吧。在C++里结构体和类基本没什么区别,结构体里也可以定义函数。 Stream翻译为流,流是一个计算机中的抽象概念,前面我们提到过字节流,它是什么意思呢? 在需要传输数据的时候,我们可以把数据想象成传送带上连续排列的一个个被传送的物体,它们就是一个流。 更形象的,可以想象磁带或者图灵机里连续的纸带。在文件读写、使用串口、网络Socket通信等领域,流经常被使用。例如我们常用的输入输出流: cout<<"helllo"; 由于使用很多,流的概念也在演变。想了解更多可以看这里。 struct Stream{ // Returns a pointer to the current position of the stream inline uint8_t* getData() { return data_; } // Advances the stream, checking bounds, and returns a pointer to the position before it was advanced. // \throws StreamOverrunException if len would take this stream past the end of its buffer ROS_FORCE_INLINE uint8_t* advance(uint32_t len){ uint8_t* old_data = data_; data_ += len; if (data_ > end_) { // Throwing directly here causes a significant speed hit due to the extra code generated for the throw statement throwStreamOverrun(); } return old_data; } // Returns the amount of space left in the stream inline uint32_t getLength() { return static_cast(end_ - data_); } protected: Stream(uint8_t* _data, uint32_t _count) : data_(_data), end_(_data + _count) {} private: uint8_t* data_; uint8_t* end_;}; 注释表明Stream是个基类,输入输出流IStream和OStream都继承自它。 Stream的成员变量data_是个指针,指向序列化的字节流开始的位置,它的类型是uint8_t。 在Ubuntu系统中,uint8_t的定义是typedef unsigned char uint8_t; 所以uint8_t就是一个字节,可以用size_of()函数检验。data_指向的空间就是保存字节流的。 输出流类OStream用来序列化一个对象,它引用了serialize函数,如下。 struct OStream : public Stream{ static const StreamType stream_type = stream_types::Output; OStream(uint8_t* data, uint32_t count) : Stream(data, count) {} /* Serialize an item to this output stream*/ template ROS_FORCE_INLINE void next(const T& t){ serialize(*this, t); } template ROS_FORCE_INLINE OStream& operator<<(const T& t) { serialize(*this, t); return *this; }}; 输入流类IStream用来反序列化一个字节流,它引用了deserialize函数,如下。 struct ROSCPP_SERIALIZATION_DECL IStream : public Stream{ static const StreamType stream_type = stream_types::Input; IStream(uint8_t* data, uint32_t count) : Stream(data, count) {} /* Deserialize an item from this input stream */ template ROS_FORCE_INLINE void next(T& t){ deserialize(*this, t); } template ROS_FORCE_INLINE IStream& operator>>(T& t) { deserialize(*this, t); return *this; }}; 自然,serialize函数和deserialize函数就是改变数据形式的地方,它们的定义在比较靠前的地方。它们都接收两个模板,都是内联函数,然后里面没什么东西,只是又调用了Serializer类的成员函数write和read。所以,serialize和deserialize函数就是个二道贩子。 // Serialize an object. Stream here should normally be a ros::serialization::OStreamtemplateinline void serialize(Stream& stream, const T& t){ Serializer::write(stream, t);}// Deserialize an object. Stream here should normally be a ros::serialization::IStreamtemplateinline void deserialize(Stream& stream, T& t){ Serializer::read(stream, t);} 所以,我们来分析Serializer类,如下。我们发现,write和read函数又调用了类型里的serialize函数和deserialize函数。 头别晕,这里的serialize和deserialize函数跟上面的同名函数不是一回事。 注释中说:“Specializing the Serializer class is the only thing you need to do to get the ROS serialization system to work with a type”(要想让ROS的序列化功能适用于其它的某个类型,你唯一需要做的就是特化这个Serializer类)。 这就涉及到的另一个知识点——模板特化(template specialization)。 templatestruct Serializer{ // Write an object to the stream. Normally the stream passed in here will be a ros::serialization::OStream template inline static void write(Stream& stream, typename boost::call_traits::param_type t){ t.serialize(stream.getData(), 0); } // Read an object from the stream. Normally the stream passed in here will be a ros::serialization::IStream template inline static void read(Stream& stream, typename boost::call_traits::reference t){ t.deserialize(stream.getData()); } // Determine the serialized length of an object. inline static uint32_t serializedLength(typename boost::call_traits::param_type t){ return t.serializationLength(); }}; 接着又定义了一个带参数的宏函数ROS_CREATE_SIMPLE_SERIALIZER(Type),然后把这个宏作用到了ROS中的10种基本数据类型,分别是:uint8_t, int8_t, uint16_t, int16_t, uint32_t, int32_t, uint64_t, int64_t, float, double。 说明这10种数据类型的处理方式都是类似的。看到这里大家应该明白了,write和read函数都使用了memcpy函数进行数据的移动。 注意宏定义中的template<>语句,这正是模板特化的标志,关键词template后面跟一对尖括号。 关于模板特化可以看这里。 #define ROS_CREATE_SIMPLE_SERIALIZER(Type) \ template<> struct Serializer\ { \ templateinline static void write(Stream& stream, const Type v) \{ \ memcpy(stream.advance(sizeof(v)), &v, sizeof(v) ); \ } \ templateinline static void read(Stream& stream, Type& v) \{ \ memcpy(&v, stream.advance(sizeof(v)), sizeof(v) ); \ } \ inline static uint32_t serializedLength(const Type&) \{ \ return sizeof(Type); \ } \};ROS_CREATE_SIMPLE_SERIALIZER(uint8_t)ROS_CREATE_SIMPLE_SERIALIZER(int8_t)ROS_CREATE_SIMPLE_SERIALIZER(uint16_t)ROS_CREATE_SIMPLE_SERIALIZER(int16_t)ROS_CREATE_SIMPLE_SERIALIZER(uint32_t)ROS_CREATE_SIMPLE_SERIALIZER(int32_t)ROS_CREATE_SIMPLE_SERIALIZER(uint64_t)ROS_CREATE_SIMPLE_SERIALIZER(int64_t)ROS_CREATE_SIMPLE_SERIALIZER(float)ROS_CREATE_SIMPLE_SERIALIZER(double) 对于其它类型的数据,例如bool、std::string、std::vector、ros::Time、ros::Duration、boost::array等等,它们各自的处理方式有细微的不同,所以不再用上面的宏函数,而是用模板特化的方式每种单独定义,这也是为什么serialization.h这个文件这么冗长。 对于int、double这种单个元素的数据,直接用上面特化的Serializer类中的memcpy函数实现序列化。 对于vector、array这种多个元素的数据类型怎么办呢?方法是分成几种情况,对于固定长度简单类型的(fixed-size simple types),还是用各自特化的Serializer类中的memcpy函数实现,没啥太大区别。 对于固定但是类型不简单的(fixed-size non-simple types)或者既不固定也不简单的(non-fixed-size, non-simple types)或者固定但是不简单的(fixed-size, non-simple types),用for循环遍历,一个元素一个元素的单独处理。 那怎么判断一个数据是不是固定是不是简单呢?这是在roscpp_traits文件夹中的message_traits.h完成的。 其中采用了萃取Type Traits,这是相对高级一点的编程技巧了,笔者也不太懂。 对序列化的介绍暂时就到这里了,有一些细节还没讲,等笔者看懂了再补。 2、消息订阅发布 2.1 ROS的本质 如果问ROS的本质是什么,或者用一句话概括ROS的核心功能。那么,笔者认为ROS就是个通信库,让不同的程序节点能够相互对话。 很多文章和书籍在介绍ROS是什么的时候,经常使用“ROS是一个通信框架”这种描述。 但是笔者认为这种描述并不是太合适。“框架”是个对初学者非常不友好的抽象词汇,用一个更抽象难懂的概念去解释一个本来就不清楚的概念,对初学者起不到任何帮助。 而且笔者严重怀疑绝大多数作者能对机器人的本质或者软件框架能有什么太深的理解,他们的见解不会比你我深刻多少。 既然提到本质,那我们就深入到最基本的问题。 在接触无穷的细节之前,我们不妨先做一个哲学层面的思考。 那就是,为什么ROS要解决通信问题? 机器人涉及的东西千千万万,机械、电子、软件、人工智能无所不包,为什么底层的设计是一套用来通信的程序而不是别的东西。 到目前为止,我还没有看到有人讨论过这个问题。这要回到机器人或者智能的本质。 当我们在谈论机器人的时候,最首要的问题不是硬件设计,而是对信息的处理。一个机器人需要哪些信息,信息从何而来,如何传递,又被谁使用,这些才是最重要的问题。 人类飞不鸟,游不过鱼,跑不过马,力不如牛,为什么却自称万物之灵呢。 因为人有大脑,而且人类大脑处理的信息更多更复杂。 抛开物质,从信息的角度看,人与动物、与机器人存在很多相似的地方。 机器人由许多功能模块组成,它们之间需要协作才能形成一个有用的整体,机器人与机器人之间也需要协作才能形成一个有用的系统,要协作就离不开通信。 需要什么样的信息以及信息从何而来不是ROS首先关心的,因为这取决于机器人的应用场景。 因此,ROS首先要解决的是通信的问题,即如何建立通信、用什么方式通信、通信的格式是什么等等一系列具体问题。 带着这些问题,我们看看ROS是如何设计的。 2.2 客户端库 实现通信的代码在ros_comm包中,如下。 其中clients文件夹一共有127个文件,看来是最大的包了。 现在我们来到了ROS最核心的地带。 客户端这个名词出现的有些突然,一个机器人操作系统里为什么需要客户端。 原因是,节点与主节点master之间的关系是client/server,这时每个节点都是一个客户端(client),而master自然就是服务器端(server)。 那客户端库(client libraries)是干什么的?就是为实现节点之间通信的。 虽然整个文件夹中包含的文件众多,但是我们如果按照一定的脉络来分析就不会眼花缭乱。 节点之间最主要的通信方式就是基于消息的。为了实现这个目的,需要三个步骤,如下。 弄明白这三个步骤就明白ROS的工作方式了。这三个步骤看起来是比较合乎逻辑的,并不奇怪。 消息的发布者和订阅者(即消息的接收方)建立连接; 发布者向话题发布消息,订阅者在话题上接收消息,将消息保存在回调函数队列中; 调用回调函数队列中的回调函数处理消息。 2.2.1 一个节点的诞生 在建立连接之前,首先要有节点。 节点就是一个独立的程序,它运行起来后就是一个普通的进程,与计算机中其它的进程并没有太大区别。 一个问题是:ROS中为什么把一个独立的程序称为“节点” 这是因为ROS沿用了计算机网络中“节点”的概念。 在一个网络中,例如互联网,每一个上网的计算机就是一个节点。前面我们看到的客户端、服务器这样的称呼,也是从计算机网络中借用的。 下面来看一下节点是如何诞生的。我们在第一次使用ROS时,一般都会照着官方教程编写一个talker和一个listener节点,以熟悉ROS的使用方法。 我们以talker为例,它的部分代码如下。 #include "ros/ros.h"int main(int argc, char **argv){ /* You must call one of the versions of ros::init() before using any other part of the ROS system. */ ros::init(argc, argv, "talker"); ros::NodeHandle n; main函数里首先调用了init()函数初始化一个节点,该函数的定义在init.cpp文件中。 当我们的程序运行到init()函数时,一个节点就呱呱坠地了。 而且在出生的同时我们还顺道给他起好了名字,也就是"talker"。 名字是随便起的,但是起名是必须的。 我们进入init()函数里看看它做了什么,代码如下,看上去还是挺复杂的。它初始化了一个叫g_global_queue的数据,它的类型是CallbackQueuePtr。 这是个相当重要的类,叫“回调队列”,后面还会见到它。init()函数还调用了network、master、this_node、file_log、param这几个命名空间里的init初始化函数各自实现一些变量的初始化,这些变量都以g开头,例如g_host、g_uri,用来表明它们是全局变量。 其中,network::init完成节点主机名、IP地址等的初始化,master::init获取master的URI、主机号和端口号。 this_node::init定义节点的命名空间和节点的名字,没错,把我们给节点起的名字就存储在这里。file_log::init初始化日志文件的路径。 void init(const M_string& remappings, const std::string& name, uint32_t options){ if (!g_atexit_registered) { g_atexit_registered = true; atexit(atexitCallback); } if (!g_global_queue) { g_global_queue.reset(new CallbackQueue); } if (!g_initialized) { g_init_options = options; g_ok = true; ROSCONSOLE_AUTOINIT; // Disable SIGPIPE#ifndef WIN32 signal(SIGPIPE, SIG_IGN);#else WSADATA wsaData; WSAStartup(MAKEWORD(2, 0), &wsaData);#endif check_ipv6_environment(); network::init(remappings); master::init(remappings); // names:: namespace is initialized by this_node this_node::init(name, remappings, options); file_log::init(remappings); param::init(remappings); g_initialized = true; }} 完成初始化以后,就进入下一步ros::NodeHandle n定义句柄。 我们再进入node_handle.cpp文件,发现构造函数NodeHandle::NodeHandle调用了自己的construct函数。然后,顺藤摸瓜找到construct函数,它里面又调用了ros::start()函数。 没错,我们又绕回到了init.cpp文件。 ros::start()函数主要实例化了几个重要的类,如下。 完成实例化后马上又调用了各自的start()函数,启动相应的动作。 这些都做完了以后就可以发布或订阅消息了。 一个节点的故事暂时就到这了。 TopicManager::instance()->start();ServiceManager::instance()->start();ConnectionManager::instance()->start();PollManager::instance()->start();XMLRPCManager::instance()->start(); 2.2.1 XMLRPC是什么? 关于ROS节点建立连接的技术细节,官方文档说的非常简单,在这里ROS Technical Overview。没有基础的同学看这个介绍必然还是不懂。 在ROS中,节点与节点之间的通信依靠节点管理器(master)牵线搭桥。 master像一个中介,它介绍节点们互相认识。一旦节点们认识了以后,master就完成自己的任务了,它就不再掺和了。 这也是为什么你启动节点后再杀死master,节点之间的通信依然保持正常的原因。 使用过电驴和迅雷而且研究过BitTorrent的同学对master的工作方式应该很熟悉,master就相当于Tracker服务器,它存储着其它节点的信息。 我们每次下载之前都会查询Tracker服务器,找到有电影资源的节点,然后就可以与它们建立连接并开始下载电影了。 那么master是怎么给节点牵线搭桥的呢?ROS使用了一种叫XMLRPC的方式实现这个功能。 XMLRPC中的RPC的意思是远程过程调用(Remote Procedure Call)。 简单来说,远程过程调用的意思就是一个计算机中的程序(在我们这就是节点啦)可以调用另一个计算机中的函数,只要这两个计算机在一个网络中。 这是一种听上去很高大上的功能,它能让节点去访问网络中另一台计算机上的程序资源。 XMLRPC中的XML我们在1.1节讲消息序列化时提到了,它就是一种数据表示方式而已。 所以合起来,XMLRPC的意思就是把由XML表示的数据发送给其它计算机上的程序运行。 运行后返回的结果仍然以XML格式返回回来,然后我们通过解析它(还原回纯粹的数据)就能干别的事了。 想了解更多XMLRPC的细节可以看这个XML-RPC:概述。 举个例子,一个XMLRPC请求是下面这个样子的。因为XMLRPC是基于HTTP协议的,所以下面的就是个标准的HTTP报文。 POST / HTTP/1.1User-Agent: XMLRPC++ 0.7Host: localhost:11311Content-Type: text/xmlContent-length: 78 circleArea 2.41 如果你没学过HTTP协议,看上面的语句可能会感到陌生。《图解HTTP》这本小书可以让你快速入门。 HTTP报文比较简单,它分两部分,前半部分是头部,后半部分是主体。 头部和主体之间用空行分开,这都是HTTP协议规定的标准。 上面主体部分的格式就是XML,见的多了你就熟悉了。 所以,XMLRPC传递的消息其实就是主体部分是XML格式的HTTP报文而已,没什么神秘的。 对应客户端一个XMLRPC请求,服务器端会执行它并返回一个响应,它也是一个HTTP报文,如下。 它的结构和请求一样,不再解释了。所以,XMLRPC跟我们上网浏览网页的过程其实差不多。 HTTP/1.1 200 OKDate: Sat, 06 Oct 2001 23:20:04 GMTServer: Apache.1.3.12 (Unix)Connection: closeContent-Type: text/xmlContent-Length: 124 18.24668429131 2.2.2 ROS中XMLRPC的实现 上面的例子解释了XMLRPC是什么?下面我们看看ROS是如何实现XMLRPC的。 ROS使用的XMLRPC介绍在这里: http://wiki.ros.org/xmlrpcpp。这次ROS的创作者没有从零开始造轮子,而是在一个已有的XMLRPC库的基础上改造的。 XMLRPC的C++代码在下载后的ros_comm-noetic-devel\utilities\xmlrpcpp路径下。 还好,整个工程不算太大。XMLRPC分成客户端和服务器端两大部分。 咱们先看客户端,主要代码在XmlRpcClient.cpp文件里。 擒贼先擒王,XmlRpcClient.cpp文件中最核心的函数就是execute,用于执行远程调用,代码如下。 // Execute the named procedure on the remote server.// Params should be an array of the arguments for the method.// Returns true if the request was sent and a result received (although the result might be a fault).bool XmlRpcClient::execute(const char* method, XmlRpcValue const& params, XmlRpcValue& result){ XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s (_connectionState %s).", method, connectionStateStr(_connectionState)); // This is not a thread-safe operation, if you want to do multithreading, use separate // clients for each thread. If you want to protect yourself from multiple threads // accessing the same client, replace this code with a real mutex. if (_executing) return false; _executing = true; ClearFlagOnExit cf(_executing); _sendAttempts = 0; _isFault = false; if ( ! setupConnection()) return false; if ( ! generateRequest(method, params)) return false; result.clear(); double msTime = -1.0; // Process until exit is called _disp.work(msTime); if (_connectionState != IDLE || ! parseResponse(result)) { _header = ""; return false; } // close() if server does not supports HTTP1.1 // otherwise, reusing the socket to write leads to a SIGPIPE because // the remote server could shut down the corresponding socket. if (_header.find("HTTP/1.1 200 OK", 0, 15) != 0) { close(); } XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s completed.", method); _header = ""; _response = ""; return true;} 它首先调用setupConnection()函数与服务器端建立连接。 连接成功后,调用generateRequest()函数生成发送请求报文。 XMLRPC请求报文的头部又交给generateHeader()函数做了,代码如下。 // Prepend http headersstd::string XmlRpcClient::generateHeader(size_t length) const{ std::string header = "POST " + _uri + " HTTP/1.1\r\n" "User-Agent: "; header += XMLRPC_VERSION; header += "\r\nHost: "; header += _host; char buff[40]; std::snprintf(buff,40,":%d\r\n", _port); header += buff; header += "Content-Type: text/xml\r\nContent-length: "; std::snprintf(buff,40,"%zu\r\n\r\n", length); return header + buff;} 主体部分则先将远程调用的方法和参数变成XML格式,generateRequest()函数再将头部和主体组合成完整的报文,如下: std::string header = generateHeader(body.length());_request = header + body; 把报文发给服务器后,就开始静静地等待。 一旦接收到服务器返回的报文后,就调用parseResponse函数解析报文数据,也就是把XML格式变成纯净的数据格式。 我们发现,XMLRPC使用了socket功能实现客户端和服务器通信。 我们搜索socket这个单词,发现它原始的意思是插座,如下。这非常形象,建立连接实现通信就像把插头插入插座。 虽说XMLRPC也是ROS的一部分,但它毕竟只是一个基础功能,我们会用即可,暂时不去探究其实现细节, 所以对它的分析到此为止。下面我们来看节点是如何调用XMLRPC的。 2.2.3 节点间通过XMLRPC建立连接 在一个节点刚启动的时候,它并不知道其它节点的存在,更不知道它们在交谈什么,当然也就谈不上通信。 所以,它要先与master对话查询其它节点的状态,然后再与其它节点通信。 而节点与master对话使用的就是XMLRPC。 从这一点来看,master叫节点管理器确实名副其实,它是一个大管家,给刚出生的节点提供服务。 下面我们以两个节点:talker和listener为例,介绍其通过XMLRPC建立通信连接的过程,如下图所示。 talker注册 假设我们先启动talker。启动后,它通过1234端口使用XMLRPC向master注册自己的信息,包含所发布消息的话题名。master会将talker的注册信息加入注册列表中; 2.listener注册 listener启动后,同样通过XMLRPC向master注册自己的信息,包含需要订阅的话题名; 3.master进行匹配 master根据listener的订阅信息从注册列表中查找,如果没有找到匹配的发布者,则等待发布者的加入,如果找到匹配的发布者信息,则通过XMLRPC向listener发送talker的地址信息。 4.listener发送连接请求 listener接收到master发回的talker地址信息,尝试通过XMLRPC向talker发送连接请求,传输订阅的话题名、消息类型以及通信协议(TCP或者UDP); 5.talker确认连接请求 talker接收到listener发送的连接请求后,继续通过XMLRPC向listener确认连接信息,其中包含自身的TCP地址信息; 6.listener尝试与talker建立连接 listener接收到确认信息后,使用TCP尝试与talker建立网络连接。 7.talker向listener发布消息 成功建立连接后,talker开始向listener发送话题消息数据,master不再参与。 从上面的分析中可以发现,前五个步骤使用的通信协议都是XMLRPC,最后发布数据的过程才使用到TCP。 master只在节点建立连接的过程中起作用,但是并不参与节点之间最终的数据传输。 节点在请求建立连接时会通过master.cpp文件中的execute()函数调用XMLRPC库中的函数。 我们举个例子,加入talker节点要发布消息,它会调用topic_manager.cpp中的TopicManager::advertise()函数,在函数中会调用execute()函数,该部分代码如下。 XmlRpcValue args, result, payload; args[0] = this_node::getName(); args[1] = ops.topic; args[2] = ops.datatype; args[3] = xmlrpc_manager_->getServerURI(); master::execute("registerPublisher", args, result, payload, true); 其中,registerPublisher就是一个远程过程调用的方法(或者叫函数)。节点通过这个远程过程调用向master注册,表示自己要发布发消息了。 你可能会问,registerPublisher方法在哪里被执行了呢?我们来到ros_comm-noetic-devel\tools\rosmaster\src\rosmaster路径下,打开master_api.py文件,然后搜索registerPublisher这个方法,就会找到对应的代码,如下。 匆匆扫一眼就知道,它在通知所有订阅这个消息的节点,让它们做好接收消息的准备。 你可能注意到了,这个被调用的XMLRPC是用python语言实现的。 也就是说,XMLRPC通信时只要报文的格式是一致的,不管C++还是python语言,都可以实现远程调用的功能。 def registerPublisher(self, caller_id, topic, topic_type, caller_api): try: self.ps_lock.acquire() self.reg_manager.register_publisher(topic, caller_id, caller_api) # don't let '*' type squash valid typing if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types: self.topics_types[topic] = topic_type pub_uris = self.publishers.get_apis(topic) sub_uris = self.subscribers.get_apis(topic) self._notify_topic_subscribers(topic, pub_uris, sub_uris) mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api) sub_uris = self.subscribers.get_apis(topic) finally: self.ps_lock.release() return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris 2.3 master是什么? 当我们在命令行中输入roscore想启动ROS的节点管理器时,背后到底发生了什么?我们先用Ubuntu的which命令找找roscore这个命令在什么地方,发现它位于/opt/ros/melodic/bin/roscore路径下,如下图。再用file命令查看它的属性,发现它是一个Python脚本。 2.3.1 roscore脚本 我们回到自己下载的源码:ros_comm文件夹中,找到roscore文件,它在\ros_comm-melodic-devel\tools\roslaunch\scripts路径下。 虽然它是个Python脚本,但是却没有.py后缀名。 用记事本打开它,迎面第一句话是#!/usr/bin/env python,说明这还是一个python 2版本的脚本。 我们发现这个roscore只是个空壳,真正重要的只有最后一行指令,如下 import roslaunchroslaunch.main(['roscore', '--core'] + sys.argv[1:]) 2.3.2 roslaunch模块 2.3.2.1 XMLRPC服务器如何启动? roscore调用了roslaunch.main,我们继续追踪,进到ros_comm-noetic-devel\tools\roslaunch\src\roslaunch文件夹中,发现有个__init__.py文件,说明这个文件夹是一个python包,打开__init__.py文件找到def main(argv=sys.argv),这就是roscore调用的函数roslaunch.main的实现,如下(这里只保留主要的代码,不太重要的删掉了)。 def main(argv=sys.argv): options = None logger = None try: from . import rlutil parser = _get_optparse() (options, args) = parser.parse_args(argv[1:]) args = rlutil.resolve_launch_arguments(args) write_pid_file(options.pid_fn, options.core, options.port) uuid = rlutil.get_or_generate_uuid(options.run_id, options.wait_for_master) configure_logging(uuid) # #3088: don't check disk usage on remote machines if not options.child_name and not options.skip_log_check: rlutil.check_log_disk_usage() logger = logging.getLogger('roslaunch') logger.info("roslaunch starting with args %s"%str(argv)) logger.info("roslaunch env is %s"%os.environ) if options.child_name: # 这里没执行到,就不列出来了 else: logger.info('starting in server mode') # #1491 change terminal name if not options.disable_title: rlutil.change_terminal_name(args, options.core) # Read roslaunch string from stdin when - is passed as launch filename. roslaunch_strs = [] # This is a roslaunch parent, spin up parent server and launch processes. # args are the roslaunch files to load from . import parent as roslaunch_parent # force a port binding spec if we are running a core if options.core: options.port = options.port or DEFAULT_MASTER_PORT p = roslaunch_parent.ROSLaunchParent(uuid, args, roslaunch_strs=roslaunch_strs, is_core=options.core, port=options.port, local_only=options.local_only, verbose=options.verbose, force_screen=options.force_screen, force_log=options.force_log, num_workers=options.num_workers, timeout=options.timeout, master_logger_level=options.master_logger_level, show_summary=not options.no_summary, force_required=options.force_required, sigint_timeout=options.sigint_timeout, sigterm_timeout=options.sigterm_timeout) p.start() p.spin() roslaunch.main开启了日志,日志记录的信息可以帮我们了解main函数执行的顺序。 我们去Ubuntu的.ros/log/路径下,打开roslaunch-ubuntu-52246.log日志文件,内容如下。 通过阅读日志我们发现,main函数首先检查日志文件夹磁盘占用情况,如果有剩余空间就继续往下运行。 然后把运行roscore的终端的标题给改了。 再调用ROSLaunchParent类中的函数,这大概就是main函数中最重要的地方了。 ROSLaunchParent类的定义是在同一路径下的parent.py文件中。为什么叫LaunchParent笔者也不清楚。 先不管它,我们再看日志,发现运行到了下面这个函数,它打算启动XMLRPC服务器端。 所以调用的顺序是:roslaunch\__init__.py文件中的main()函数调用parent.py\start()函数,start()函数调用自己类中的_start_infrastructure()函数,_start_infrastructure()函数调用自己类中的_start_server()函数,_start_server()函数再调用server.py中的start函数。 def _start_server(self): self.logger.info("starting parent XML-RPC server") self.server = roslaunch.server.ROSLaunchParentNode(self.config, self.pm) self.server.start() 我们再进到server.py文件中,找到ROSLaunchNode类,里面的start函数又调用了父类XmlRpcNode中的start函数。 class ROSLaunchNode(xmlrpc.XmlRpcNode): """ Base XML-RPC server for roslaunch parent/child processes """ def start(self): logger.info("starting roslaunch XML-RPC server") super(ROSLaunchNode, self).start() 我们来到ros_comm-noetic-devel\tools\rosgraph\src\rosgraph路径,找到xmlrpc.py文件。找到class XmlRpcNode(object)类,再进入start(self)函数,发现它调用了自己类的run函数,run函数又调用了自己类中的_run函数,_run函数又调用了自己类中的_run_init()函数,在这里才调用了真正起作用的ThreadingXMLRPCServer类。 因为master节点是用python实现的,所以,需要有python版的XMLRPC库。 幸运的是,python有现成的XMLRPC库,叫SimpleXMLRPCServer。SimpleXMLRPCServer已经内置到python中了,无需安装。 所以,ThreadingXMLRPCServer类直接继承了SimpleXMLRPCServer,如下。 class ThreadingXMLRPCServer(socketserver.ThreadingMixIn, SimpleXMLRPCServer) 2.3.2.2 master如何启动? 我们再来看看节点管理器master是如何被启动的,再回到parent.py\start()函数,如下。 我们发现它启动了XMLRPC服务器后,接下来就调用了_init_runner()函数。 def start(self, auto_terminate=True): self.logger.info("starting roslaunch parent run") # load config, start XMLRPC servers and process monitor try: self._start_infrastructure() except: self._stop_infrastructure() # Initialize the actual runner. self._init_runner() # Start the launch self.runner.launch() init_runner()函数实例化了ROSLaunchRunner类,这个类的定义在launch.py里。 def _init_runner(self): self.runner = roslaunch.launch.ROSLaunchRunner(self.run_id, self.config, server_uri=self.server.uri, ...) 实例化完成后,parent.py\start()函数就调用了它的launch()函数。 我们打开launch.py文件,找到launch()函数,发现它又调用了自己类中的_setup()函数,而_setup()函数又调用了_launch_master()函数。 看名字就能猜出来,_launch_master()函数肯定是启动节点管理器master的,它调用了create_master_process函数,这个函数在nodeprocess.py里。 所以我们打开nodeprocess.py,create_master_process函数使用了LocalProcess类。这个类继承自Process类。nodeprocess.py文件引用了python中用于创建新的进程的subprocess模块。 create_master_process函数实例化LocalProcess类用的是’rosmaster’,即ros_comm-noetic-devel\tools\rosmaster中的包。 main.py文件中的rosmaster_main函数使用了master.py中的Master类。 Master类中又用到了master_api.py中的ROSMasterHandler类,这个类包含所有的XMLRPC服务器接收的远程调用,一共24个,如下。 shutdown(self, caller_id, msg='')getUri(self, caller_id)getPid(self, caller_id)deleteParam(self, caller_id, key)setParam(self, caller_id, key, value)getParam(self, caller_id, key)searchParam(self, caller_id, key)subscribeParam(self, caller_id, caller_api, key)unsubscribeParam(self, caller_id, caller_api, key)hasParam(self, caller_id, key)getParamNames(self, caller_id)param_update_task(self, caller_id, caller_api, param_key, param_value)_notify_topic_subscribers(self, topic, pub_uris, sub_uris)registerService(self, caller_id, service, service_api, caller_api)lookupService(self, caller_id, service)unregisterService(self, caller_id, service, service_api)registerSubscriber(self, caller_id, topic, topic_type, caller_api)unregisterSubscriber(self, caller_id, topic, caller_api)registerPublisher(self, caller_id, topic, topic_type, caller_api)unregisterPublisher(self, caller_id, topic, caller_api)lookupNode(self, caller_id, node_name)getPublishedTopics(self, caller_id, subgraph)getTopicTypes(self, caller_id) getSystemState(self, caller_id) 2.3.2.1 检查log文件夹占用空间 roslaunch这个python包还负责检查保存log的文件夹有多大。在ros_comm-noetic-devel\tools\roslaunch\src\roslaunch\__init__.py文件中的main函数里,有以下语句。 看名字就知道是干啥的了。 rlutil.check_log_disk_usage() 再打开同一路径下的rlutil.py,发现它又调用了rosclean包中的get_disk_usage函数。 我们发现,这个函数里直接写死了比较的上限:disk_usage > 1073741824,当然这样不太好,应该改为可配置的。 数字1073741824的单位是字节,刚好就是1GB(102 4 3 1024^31024 3byte)。 我们要是想修改log文件夹报警的上限,直接改这个值即可。 def check_log_disk_usage(): """ Check size of log directory. If high, print warning to user """ try: d = rospkg.get_log_dir() roslaunch.core.printlog("Checking log directory for disk usage. This may take a while.\nPress Ctrl-C to interrupt") disk_usage = rosclean.get_disk_usage(d) # warn if over a gig if disk_usage > 1073741824: roslaunch.core.printerrlog("WARNING: disk usage in log directory [%s] is over 1GB.\nIt's recommended that you use the 'rosclean' command."%d) else: roslaunch.core.printlog("Done checking log file disk usage. Usage is <1GB.") except: pass 我们刨根问底,追查rosclean.get_disk_usage(d)是如何实现的。 这个rosclean包不在ros_comm里面,需要单独下载。 打开后发现这个包还是跨平台的,给出了Windows和Linux下的实现。 如果是Windows系统,用os.path.getsize函数获取文件的大小,通过os.walk函数遍历所有文件,加起来就是文件夹的大小。 如果是Linux系统,用Linux中的du -sb命令获取文件夹的大小。哎,搞个机器人不仅要学习python,还得熟悉Linux,容易吗? 主节点会获取用户设置的ROS_MASTER_URI变量中列出的URI地址和端口号(默认为当前的本地IP和11311端口号)。 3、时间 不只是机器人,在任何一个系统里,时间都是一个绕不开的重要话题。下面我们就从万物的起点——时间开始吧。 ROS中定义时间的程序都在roscpp_core项目下的rostime中,见下图。 如果细分一下,时间其实有两种,一种叫“时刻”,也就是某个时间点; 一种叫“时段”或者时间间隔,也就是两个时刻之间的部分。这两者的代码是分开实现的,时刻是time,时间间隔是duration。 在Ubuntu中把rostime文件夹中的文件打印出来,发现确实有time和duration两类文件,但是还多了个rate,如下图所示。 我们还注意到,里面有 CMakeLists.txt 和 package.xml 两个文件,那说明rostime本身也是个ROS的package,可以单独编译。 3.1 时刻time 先看下文件间的依赖关系。跟时刻有关的文件是两个time.h文件和一个time.cpp文件。 time.h给出了类的声明,而impl\time.h给出了类运算符重载的实现,time.cpp给出了其它函数的实现。 3.1.1 TimeBase基类 首先看time.h文件,它定义了一个叫TimeBase的类。注释中说,TimeBase是个基类,定义了两个成员变量uint32_t sec, nsec,还重载了+ ++、− -−、< <<、> >>、= ==等运算符。 成员变量uint32_t sec, nsec其实就是时间的秒和纳秒两部分,它们合起来构成一个完整的时刻。 至于为啥要分成两部分而不是用一个来定义,可能是考虑到整数表示精度的问题。 因为32位整数最大表示的数字是2147483647。如果我们要用纳秒这个范围估计是不够的。 你可能会问,机器人系统怎么会使用到纳秒这么高精度的时间分辨率,毕竟控制器的定时器最高精度可能也只能到微秒? 如果你做过自动驾驶项目,有使用过GPS和激光雷达传感器的经验,就会发现GPS的时钟精度就是纳秒级的,它可以同步激光雷达每个激光点的时间戳。 还有,为什么定义TimeBase这个基类,原因大家很容易就能猜到。 因为在程序里,时间本质上就是一个数字而已,数字系统的序关系(能比较大小)和运算(加减乘除)也同样适用于时间这个东西。 当然,这里只有加减没有乘除,因为时间的乘除没有意义。 3.1.2 Time类 紧接着TimeBase类的是Time类,它是TimeBase的子类。我们做机器人应用程序开发时用不到TimeBase基类,但是Time类会经常使用。 3.1.3 now()函数 Time类比TimeBase类多了now()函数,它是我们的老熟人了。在开发应用的时候,我们直接用下面的代码就能得到当前的时间戳: ros::Time begin = ros::Time::now(); //获取当前时间 now()函数的定义在rostime\src\time.cpp里,因为它很常用很重要,笔者就把代码粘贴在这里,如下。 函数很简单,可以看到,如果定义了使用仿真时间(g_use_sim_time为true),那就使用仿真时间,否则就使用墙上时间。 Time Time::now(){ if (!g_initialized) throw TimeNotInitializedException(); if (g_use_sim_time) { boost::mutex::scoped_lock lock(g_sim_time_mutex); Time t = g_sim_time; return t; } Time t; ros_walltime(t.sec, t.nsec); return t; } 在ROS里,时间分成两类,一种叫仿真时间,一种叫墙上时间。 顾名思义,墙上时间就是实际的客观世界的时间,它一秒一秒地流逝,谁都不能改变它,让它快一点慢一点都不可能,除非你有超能力。 仿真时间则是可以由你控制的,让它快它就快。之所以多了一个仿真时间,是因为有时我们在仿真机器人希望可以自己控制时间,例如为了提高验证算法的效率,让它按我们的期望速度推进。 在使用墙上时间的情况下,now()函数调用了ros_walltime函数,这个函数也在rostime\src\time.cpp里。 剥开层层洋葱皮,最后发现,这个ros_walltime函数才是真正调用操作系统时间函数的地方,而且它还是个跨平台的实现(Windows和Linux)。 如果操作系统是Linux,那它会使用clock_gettime函数,在笔者使用的Ubuntu 18.04系统中这个函数在usr\include路径下。 但是万一缺少这个函数,那么ROS会使用gettimeofday函数,但是gettimeofday没有clock_gettime精确,clock_gettime能提供纳秒的精确度。 如果操作系统是Windows,那它会使用标准库STL的chrono库获取当前的时刻,要用这个库只需要引用它的头文件,所以在time.cpp中引用了#include。 具体使用的函数就是 std::chrono::system_clock::now().time_since_epoch()。 当然,时间得是秒和纳秒的形式,所以用了count方法: uint64_t now_ns = std::chrono::duration_cast (std::chrono::system_clock::now().time_since_epoch()).count(); 3.1.4 WallTime类 后面又接着声明了WallTime类和SteadyTime类。 3.2 时间间隔duration 时间间隔duration的定义与实现跟time时刻差不多,但是Duration类里的sleep()延时函数是在time.cpp里定义的,其中使用了Linux操作系统的nanosleep系统调用。 这个系统调用虽然叫纳秒,但实际能实现的精度也就是几十毫秒,即便这样也比C语言提供的sleep函数的精度好多了。如果是Windows系统则调用STL chrono函数: std::this_thread::sleep_for(std::chrono::nanoseconds(static_cast(sec * 1e9 + nsec))); 3.3 频率rate 关于Rate类,声明注释中是这么解释的“Class to help run loops at a desired frequency”,也就是帮助(程序)按照期望的频率循环执行。 我们看看一个ROS节点是怎么使用Rate类的,如下图所示的例子。 首先用Rate的构造函数实例化一个对象loop_rate。调用的构造函数如下。 可见,构造函数使用输入完成了对三个参数的初始化。 然后在While循环里调用sleep()函数实现一段时间的延迟。 既然用到延迟,所以就使用了前面的time类。 Rate::Rate(double frequency): start_(Time::now()), expected_cycle_time_(1.0 / frequency), actual_cycle_time_(0.0){ } 3.4 总结 至此rostime就解释清楚了。可以看到,这部分实现主要是依靠STL标准库函数(比如chrono)、BOOST库(date_time)、Linux操作系统的系统调用以及标准的C函数。这就是ROS的底层了。 说句题外话,在百度Apollo自动驾驶项目中也受到了rostime的影响,其cyber\time中的Time、Duration、Rate类的定义方式与rostime中的类似,但是没有定义基类,实现更加简单了。
看到这个标题,相信大多数人的第一反应是:真的有人用 40 亿条 if 语句,只为判断一个数字是奇数还是偶数? 的确有,这个开发者名为 Andreas Karlsson,他还把整个过程都整理成文了。 或许由于这“40 亿条 if 语句”听起来实在震撼,Andreas Karlsson 分享的这篇文章在 Hacker News 上很快引起了极大的关注和讨论,而他在文中也直白表示:其实这个想法,最初源于一个充满恶评的短视频。 以下为译文: 大于 11 的数字,没有输出结果 我最近在火车上刷手机时,偶然发现了上面这个截图:“写了一个程序,来判断一个数字是偶数还是奇数。”点开评论区,果然是一连串的恶意评论,多数都在嘲笑这位新手程序员的稚嫩和无知,竟企图以这种方式解决计算机科学中的经典问题“取模运算”。 可看过截图中的代码和网友评论后,我莫名生出了一些不同的想法:现在,AI 正在分分钟取代程序员、抢走他们的饭碗,并彻底改变了我们对代码的思考方式,或许我们应该更加开放地接受这个行业新生代的思想? 其实仔细想来,上述代码是时间和空间的一种完美权衡:你在付出自己时间的同时,也换来了计算机的内存和时间——这难道不是一个神奇的算法吗? 于是,我开始探索这种只使用比较来判断一个数字是奇数还是偶数的想法,看看它在实际情况中的效果到底如何。由于我是一位高性能代码的忠实拥护者,因此我决定用 C 语言来实现这个想法。 然后,我就开始编码了: /* Copyright 2023. All unauthorized distribution of this source code will be persecuted to the fullest extent of the law*/ #include #include #include int main(int argc, char* argv[]) { uint8_t number = atoi(argv[1]); // No problems here if (number == 0) printf("even\n"); if (number == 1) printf("odd\n"); if (number == 2) printf("even\n"); if (number == 3) printf("odd\n"); if (number == 4) printf("even\n"); if (number == 5) printf("odd\n"); if (number == 6) printf("even\n"); if (number == 7) printf("odd\n"); if (number == 8) printf("even\n"); if (number == 9) printf("odd\n"); if (number == 10) printf("even\n"); } 接下来,我们要编译这段代码,使用 /Od 禁用优化,确保烦人的编译器不会干扰我们的算法。编译完成后,我们就可以对程序进行快速测试,看看结果如何: PS > cl.exe /Od program.c PS > .\program.exe 0 even PS > .\program.exe 4 even PS > .\program.exe 3 odd PS > .\program.exe 7 odd 结果显示:0、4 是偶数,3、7 是奇数。这么看来,程序似乎运行得挺好,但在进一步测试后,我发现了一些问题: PS > .\program.exe 50 PS > .\program.exe 11 PS > .\program.exe 99 大于 11 的数字没有输出,看来这个程序只对 11 以下的数字有效!回到原始代码中,可以发现问题出在最后一个 if 语句之后:我们需要更多的 if 语句! 向 32 位(32-bit)数扩展 这件事进行到这里,就需要我在时间和内存之间做出权衡了。考虑到我的寿命有限,我决定用另一种编程语言对 if 语句进行元编程。为了弥补这种“作弊”行为,我决定用“地球上速度最慢”的语言 Python。 print("/* Copyright 2023. All unauthorized distribution of this source code") print(" will be persecuted to the fullest extent of the law*/") print("#include ") print("#include ") print("#include ") print("int main(int argc, char* argv[])") print("{") print(" uint8_t number = atoi(argv[1]); // No problems here") for i in range(2**8): print(" if (number == "+str(i)+")") if i % 2 == 0: print(" printf(\"even\\n\");") else: print(" printf(\"odd\\n\");") print("}") 好了!现在我们可以生成一个程序,解决所有 8 位(8-bit)整数的奇偶问题! PS > python programmer.py > program.c PS > cl.exe /Od program.c PS > .\program.exe 99 odd PS > .\program.exe 50 even PS > .\program.exe 240 even PS > .\program.exe 241 odd 看看,这个效果简直完美!现在,让我们把它放大到 16 位(16-bit)! print(" uint16_t number = atoi(argv[1]); // No problems here") … for i in range(2**16): 这样就得到了一个约 13 万行、超长且漂亮的 c 文件。回顾了一下我多年工作所做的一些代码库,这其实不算什么。话不多说,开始编译! PS > python programmer.py > program.c PS > cl.exe /Od program.c PS > .\program.exe 21000 even PS > .\program.exe 3475 odd PS > .\program.exe 3 odd PS > .\program.exe 65001 odd PS > .\program.exe 65532 even 太棒了,我们的算法似乎能够处理大量数据!可执行文件大约只有 2 MB,但这与我拥有高达 31.8 GB 内存的强大游戏设备相比,简直不值一提。 但众所周知,32 位(32-bit)才是计算机领域的终极目标,也是我们解决所有实际工程和科学问题所需的最终位宽。毕竟,在 IPv4 因所谓的 "地址耗尽 "而被认为过时 60 年后,它如今仍然很强大。所以,让我们来看看最终的规模:32 位的数字是 16 位的 65536 倍,这会有什么问题吗? print(" uint32_t number = atoi(argv[1]); // No problems here") … for i in range(2**32): 于是,我让强大的 Python 开始它的工作。48 小时后,我喝了一杯咖啡,然后回来检查程序,就得到了一个美丽的 c 文件,大小接近 330 GB!我几乎可以肯定,这是历史上最大的 c 文件之一。当我输入下一条命令时,我的手指都在颤抖,我猜 MSVC 肯定从未遇到如此强大的源代码。 在我那台可怜而强大的电脑页面文件中遭受半小时的折磨后,输出如下: PS > cl /Od program.c Microsoft (R) C/C++ Optimizing Compiler Version 19.32.31329 for x64 Copyright (C) Microsoft Corporation. All rights reserved. program.c program.c(134397076): warning C4049: compiler limit: terminating line number emission program.c(134397076): note: Compiler limit for line number is 16777215 program.c(41133672): fatal error C1060: compiler is out of heap space 太令人失望了!不仅编译器让我失望,在研究 Windows 可移植可执行文件格式(.exe)的限制时,我发现它无法处理超过 4GB 的文件!由于需要将 40 多亿次比较语句编码到可执行文件中,这对于实现我们的算法是一个主要障碍。即使每次比较时使用的字节数少于一个,对我来说工作量也太大了。 不过,糟糕的编译器和文件格式不应该阻止我们实现梦想。毕竟,编译器所做的只是将一些花哨的机器代码写入文件,而文件格式只是一些结构,告诉操作系统如何将二进制代码放入内存——其实,我们自己就能做到。 解决最后一个问题,程序性能很不错 让我们先用 x86-64 汇编语言编写一个 IsEven 函数,因为这是我 Intel 处理器驱动的本地语言,它看起来是这样的: ; Argument is stored in ECX, return value in EAX XOR EAX, EAX ; Set eax to zero (return value for odd number) CMP ECX, 0h ; Compare arg to 0 JNE 3h ; Skip next two instructions if it wasn't equal INC EAX ; It was even, set even return value (1) RET ; Return CMP ECX, 1h ; Compare arg to 1 JNE 2 ; Skip next instruction if not equal RET ; Odd return value already in EAX, just RET ; add the next 2...2^32-1 comparisons here RET ; Fallback return 这并不是真正正确的汇编代码,但这不重要,因为我们要手动将其编译成机器代码。 你问我是怎么做到的?我上网查阅了x86(-64) 体系结构手册,还利用我早年编写仿真器和黑客经验,找出了每条指令的正确操作码和格式……开个玩笑,这是不可能的。实际上,我是直接问 ChatGPT 每条指令的正确操作码是什么,幸运的是,它也没有产生 x86-64 的任何新扩展。 所以现在我们只需编写一个“编译器”来输出这段代码。请注意,我们将直接使用从 AI 获取的指令操作码,下面是用 Python 编写的代码: import struct with open('isEven.bin', 'wb') as file: file.write(b"\x31\xC0") # XOR EAX, EAX for i in range(2**32): ib = struct.pack("
大家好,又见面了,我是你们的朋友全栈君。 文章目录 【前言】 有好一段时间都没敲py了, 今天将urllib库算是较全的学习了一下老实说还是敲py比较舒服,当然还有requests,Beautiful库,正则表达式这些对于进行对...