[netty5: HttpObjectEncoder HttpObjectDecoder]-源码解析
在阅读该篇文章之前,推荐先阅读以下内容:
- [netty5: HttpObject]-源码解析
- [netty5: MessageToMessageCodec & MessageToMessageEncoder & MessageToMessageDecoder]-源码分析
- [netty5: ByteToMessageCodec & MessageToByteEncoder & ByteToMessageDecoder]-源码分析
HttpObjectEncoder
HttpObjectEncoder
类用于编码 HTTP 消息对象(如请求和响应),并根据不同的消息类型(如普通内容、分块内容或无内容)来处理编码过程。它支持对 HTTP 头、初始行、内容和尾部进行编码,并根据消息类型和内容长度动态调整内存分配,以提高编码效率。此外,还提供了一个方法来处理分块传输编码的内容,并在消息为空时输出合适的空缓冲区。
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {static final short CRLF_SHORT = (CR << 8) | LF;private static final int ZERO_CRLF_MEDIUM = ('0' << 16) | CRLF_SHORT;// \r\nprivate static final byte[] CRLF = {CR, LF};// 0\r\nprivate static final byte[] ZERO_CRLF_CRLF = { '0', CR, LF, CR, LF };// 新的头部数据权重 0.2private static final float HEADERS_WEIGHT_NEW = 1 / 5f;// 历史头部数据权重 0.8private static final float HEADERS_WEIGHT_HISTORICAL = 1 - HEADERS_WEIGHT_NEW;// 新的尾部数据权重 0.2private static final float TRAILERS_WEIGHT_NEW = HEADERS_WEIGHT_NEW;// 历史尾部数据权重 0.8private static final float TRAILERS_WEIGHT_HISTORICAL = HEADERS_WEIGHT_HISTORICAL;// 初始假设的头部大小private float headersEncodedSizeAccumulator = 256;// 初始假设的尾部大小private float trailersEncodedSizeAccumulator = 256;private static final int ST_INIT = 0;private static final int ST_CONTENT_NON_CHUNK = 1;private static final int ST_CONTENT_CHUNK = 2;private static final int ST_CONTENT_ALWAYS_EMPTY = 3;private Supplier<Buffer> crlfBufferSupplier;private Supplier<Buffer> zeroCrlfCrlfBufferSupplier;@SuppressWarnings("RedundantFieldInitialization")private int state = ST_INIT;@Overrideprotected void encodeAndClose(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {Buffer buf = null;if (msg instanceof HttpMessage) {if (state != ST_INIT) {throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg) + ", state: " + state);}@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })H m = (H) msg;buf = ctx.bufferAllocator().allocate((int) headersEncodedSizeAccumulator);encodeInitialLine(buf, m);state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);encodeHeaders(m.headers(), buf);buf.writeShort(CRLF_SHORT);headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;}// 跳过空的消息,优化数据流的处理if (msg instanceof Buffer && ((Buffer) msg).readableBytes() == 0) {out.add(msg);return;}if (msg instanceof HttpContent || msg instanceof Buffer || msg instanceof FileRegion) {switch (state) {case ST_INIT:// 如果当前状态是 ST_INIT,但消息类型是 HttpContent 或 Buffer 或 FileRegion,则抛出 IllegalStateException,说明这个消息类型在 ST_INIT 状态下是不允许出现的。// 在抛出异常之前,释放消息 msg 占用的资源Resource.dispose(msg);throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg) + ", state: " + state);case ST_CONTENT_NON_CHUNK:// 处理非分块内容:对于 ST_CONTENT_NON_CHUNK 状态,首先获取内容的长度 contentLength(msg)final long contentLength = contentLength(msg);if (contentLength > 0) {// 如果内容长度大于零且 buf 缓冲区有足够的可写空间,则将内容合并到 buf 中(提高性能,避免频繁分配新的缓冲区)if (buf != null && buf.writableBytes() >= contentLength && msg instanceof HttpContent) {buf.writeBytes(((HttpContent<?>) msg).payload());Resource.dispose(msg);out.add(buf);} else {if (buf != null) {out.add(buf);}out.add(encode(msg));}// 如果 msg 是 LastHttpContent(即最后一部分 HTTP 内容),则将 state 重置为 ST_INIT,表示处理完成if (msg instanceof LastHttpContent) {state = ST_INIT;}break;} else {// do not break, let's fall-through}// fall-through!case ST_CONTENT_ALWAYS_EMPTY:Resource.dispose(msg);if (buf != null) {out.add(buf);} else {out.add(ctx.bufferAllocator().allocate(0));}break;case ST_CONTENT_CHUNK:// 先将现有的缓冲区 buf 添加到 out 中if (buf != null) {// We allocated a buffer so add it now.out.add(buf);}// 处理分块传输编码(chunked transfer encoding)内容encodeChunkedContent(ctx, msg, contentLength(msg), out);break;default:throw new Error();}if (msg instanceof LastHttpContent) {state = ST_INIT;}} else if (buf != null) {out.add(buf);}}// 将传入的 HttpHeaders 对象中的所有头部字段逐个编码,并将编码后的字节数据写入 buf 中protected void encodeHeaders(HttpHeaders headers, Buffer buf) {for (Entry<CharSequence, CharSequence> header : headers) {HttpHeadersEncoder.encoderHeader(header.getKey(), header.getValue(), buf);}}// 负责将数据分块并添加到 out 列表中。具体处理流程包括:// 1. 编码分块大小并将其添加到输出缓冲区。// 2. 处理 LastHttpContent,如果有尾部头部(trailers),则处理并编码它们。// 3. 处理内容长度为 0 的情况,不进行分块传输,直接将消息添加到输出列表。private void encodeChunkedContent(ChannelHandlerContext ctx, Object msg, long contentLength, List<Object> out) {if (contentLength > 0) {String lengthHex = Long.toHexString(contentLength);Buffer buf = ctx.bufferAllocator().allocate(lengthHex.length() + 2);buf.writeCharSequence(lengthHex, StandardCharsets.US_ASCII);buf.writeShort(CRLF_SHORT);out.add(buf);out.add(encode(msg));out.add(crlfBuffer(ctx.bufferAllocator()));}if (msg instanceof LastHttpContent) {HttpHeaders headers = ((LastHttpContent<?>) msg).trailingHeaders();if (headers.isEmpty()) {out.add(zeroCrlfCrlfBuffer(ctx.bufferAllocator()));} else {Buffer buf = ctx.bufferAllocator().allocate((int) trailersEncodedSizeAccumulator);buf.writeMedium(ZERO_CRLF_MEDIUM);encodeHeaders(headers, buf);buf.writeShort(CRLF_SHORT);trailersEncodedSizeAccumulator = TRAILERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +TRAILERS_WEIGHT_HISTORICAL * trailersEncodedSizeAccumulator;out.add(buf);}if (contentLength == 0) {// EmptyLastHttpContent or LastHttpContent with empty payload((LastHttpContent<?>) msg).close();}} else if (contentLength == 0) {out.add(encode(msg));}}// 在编码消息之前清理其头部,默认实现为空操作(noop),但可以根据需要进行扩展。protected void sanitizeHeadersBeforeEncode(@SuppressWarnings("unused") H msg, boolean isAlwaysEmpty) {// noop}// 判断某些特殊消息(如 `HEAD` 或 `CONNECT` 请求)是否始终没有消息体,以便跳过内容处理。protected boolean isContentAlwaysEmpty(@SuppressWarnings("unused") H msg) {return false;}// 判断是否接受传出的消息。// 检查消息是否为 HttpObject、Buffer 或 FileRegion 类型,只有这些类型的消息才会被接受并继续处理@Overridepublic boolean acceptOutboundMessage(Object msg) throws Exception {return msg instanceof HttpObject || msg instanceof Buffer || msg instanceof FileRegion;}private static Object encode(Object msg) {if (msg instanceof Buffer) {return msg;}if (msg instanceof HttpContent) {return ((HttpContent<?>) msg).payload();}if (msg instanceof FileRegion) {return msg;}Resource.dispose(msg);throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));}private static long contentLength(Object msg) {if (msg instanceof HttpContent) {return ((HttpContent<?>) msg).payload().readableBytes();}if (msg instanceof Buffer) {return ((Buffer) msg).readableBytes();}if (msg instanceof FileRegion) {return ((FileRegion) msg).count();}Resource.dispose(msg);throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg));}// 为缓冲区增加一些额外的空间。其目的是避免内存过度分配和防止在需要时发生缓冲区扩展或复制private static int padSizeForAccumulation(int readableBytes) {return (readableBytes << 2) / 3;}// GET /index.html HTTP/1.1// HTTP/1.1 200 OKprotected abstract void encodeInitialLine(Buffer buf, H message) throws Exception;// \r\nprotected Buffer crlfBuffer(BufferAllocator allocator) {if (crlfBufferSupplier == null) {crlfBufferSupplier = allocator.constBufferSupplier(CRLF);}return crlfBufferSupplier.get();}// 0\r\n\r\nprotected Buffer zeroCrlfCrlfBuffer(BufferAllocator allocator) {if (zeroCrlfCrlfBufferSupplier == null) {zeroCrlfCrlfBufferSupplier = allocator.constBufferSupplier(ZERO_CRLF_CRLF);}return zeroCrlfCrlfBufferSupplier.get();}
}
HttpRequestEncoder
HttpRequestEncoder 类用于将 HTTP 请求消息(HttpRequest)编码为符合 HTTP 协议规范的字节数据。它的主要任务是将请求的初始行(包括请求方法、URI 和协议版本)正确地编码到 Buffer 中,并处理 URI 中可能存在的一些细节(如缺失的斜杠或查询参数)。
/*** Encodes an {@link HttpRequest} or an {@link HttpContent} into a {@link Buffer}.*/
public class HttpRequestEncoder extends HttpObjectEncoder<HttpRequest> {private static final char SLASH = '/';private static final char QUESTION_MARK = '?';private static final short SLASH_AND_SPACE_SHORT = SLASH << 8 | SP;private static final int SPACE_SLASH_AND_SPACE_MEDIUM = SP << 16 | SLASH_AND_SPACE_SHORT;@Overridepublic boolean acceptOutboundMessage(Object msg) throws Exception {return super.acceptOutboundMessage(msg) && !(msg instanceof HttpResponse);}// GET /index.html HTTP/1.1@Overrideprotected void encodeInitialLine(Buffer buf, HttpRequest request) throws Exception {buf.writeCharSequence(request.method().asciiName(), StandardCharsets.US_ASCII);String uri = request.uri();if (uri.isEmpty()) {// Add " / " as absolute path if uri is not present.// See https://tools.ietf.org/html/rfc2616#section-5.1.2buf.writeMedium(SPACE_SLASH_AND_SPACE_MEDIUM);} else {CharSequence uriCharSequence = uri;boolean needSlash = false;int start = uri.indexOf("://");if (start != -1 && uri.charAt(0) != SLASH) {start += 3;// Correctly handle query params.// See https://github.com/netty/netty/issues/2732int index = uri.indexOf(QUESTION_MARK, start);if (index == -1) {if (uri.lastIndexOf(SLASH) < start) {needSlash = true;}} else {if (uri.lastIndexOf(SLASH, index) < start) {uriCharSequence = new StringBuilder(uri).insert(index, SLASH);}}}buf.writeByte(SP).writeCharSequence(uriCharSequence, StandardCharsets.UTF_8);if (needSlash) {// write "/ " after uribuf.writeShort(SLASH_AND_SPACE_SHORT);} else {buf.writeByte(SP);}}request.protocolVersion().encode(buf);buf.writeShort(CRLF_SHORT);}
}
HttpResponseEncoder
HttpResponseEncoder
类用于将 HTTP 响应消息(HttpResponse
)编码为符合 HTTP 协议规范的字节数据。它的主要任务是将响应的状态行(包括状态码、状态描述和协议版本)编码到 Buffer
中,并根据响应的状态(如 204 No Content
或 304 Not Modified
)决定是否移除 Content-Length
和 Transfer-Encoding
等头部信息,以确保响应符合 HTTP 规范。
/*** Encodes an {@link HttpResponse} or an {@link HttpContent} into a {@link Buffer}.*/
public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> {@Overridepublic boolean acceptOutboundMessage(Object msg) throws Exception {return super.acceptOutboundMessage(msg) && !(msg instanceof HttpRequest);}// HTTP/1.1 200 OK@Overrideprotected void encodeInitialLine(Buffer buf, HttpResponse response) throws Exception {response.protocolVersion().encode(buf);buf.writeByte(SP);response.status().encode(buf);buf.writeShort(CRLF_SHORT);}// 在响应头编码前,对响应头进行清理,去除不必要的 Content-Length 和 Transfer-Encoding 头,// 特别是对于无内容的响应(如 204 No Content、304 Not Modified)或 205 Reset Content 响应。@Overrideprotected void sanitizeHeadersBeforeEncode(HttpResponse msg, boolean isAlwaysEmpty) {if (isAlwaysEmpty) {HttpResponseStatus status = msg.status();if (status.codeClass() == HttpStatusClass.INFORMATIONAL ||status.code() == HttpResponseStatus.NO_CONTENT.code()) {msg.headers().remove(HttpHeaderNames.CONTENT_LENGTH);msg.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);} else if (status.code() == HttpResponseStatus.RESET_CONTENT.code()) {msg.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);msg.headers().set(HttpHeaderNames.CONTENT_LENGTH, HttpHeaderValues.ZERO);}}}// 根据响应的状态码判断该响应是否包含内容。// 如果是信息性状态、204 No Content、304 Not Modified 或 205 Reset Content 等,返回 true 表示响应没有内容;// 其他情况则返回 false,表示响应包含内容@Overrideprotected boolean isContentAlwaysEmpty(HttpResponse msg) {HttpResponseStatus status = msg.status();if (status.codeClass() == HttpStatusClass.INFORMATIONAL) {if (status.code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code()) {return msg.headers().contains(HttpHeaderNames.SEC_WEBSOCKET_VERSION);}return true;}return status.code() == HttpResponseStatus.NO_CONTENT.code() ||status.code() == HttpResponseStatus.NOT_MODIFIED.code() ||status.code() == HttpResponseStatus.RESET_CONTENT.code();}
}
HttpObjectDecoder
public abstract class HttpObjectDecoder extends ByteToMessageDecoder {public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;public static final int DEFAULT_MAX_HEADER_SIZE = 8192;public static final boolean DEFAULT_CHUNKED_SUPPORTED = true;public static final boolean DEFAULT_VALIDATE_HEADERS = true;public static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;public static final boolean DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS = false;private final boolean chunkedSupported;protected final HttpHeadersFactory headersFactory;protected final HttpHeadersFactory trailersFactory;private final boolean allowDuplicateContentLengths;private final Buffer parserScratchBuffer;private final HeaderParser headerParser;private final LineParser lineParser;private HttpMessage message;private long chunkSize;private long contentLength = Long.MIN_VALUE;private boolean chunked;private boolean isSwitchingToNonHttp1Protocol;private final AtomicBoolean resetRequested = new AtomicBoolean();// These will be updated by splitHeader(...)private AsciiString name;private String value;private LastHttpContent<?> trailer;private State currentState = State.SKIP_CONTROL_CHARS;protected HttpObjectDecoder() {this(new HttpDecoderConfig());}protected HttpObjectDecoder(HttpDecoderConfig config) {headersFactory = config.getHeadersFactory();trailersFactory = config.getTrailersFactory();parserScratchBuffer = MemoryManager.unpooledHeap(config.getInitialBufferSize());lineParser = new LineParser(parserScratchBuffer, config.getMaxInitialLineLength());headerParser = new HeaderParser(parserScratchBuffer, config.getMaxHeaderSize());chunkedSupported = config.isChunkedSupported();allowDuplicateContentLengths = config.isAllowDuplicateContentLengths();}@Overrideprotected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {try (parserScratchBuffer) {super.handlerRemoved0(ctx);}}@Overrideprotected void decode(ChannelHandlerContext ctx, Buffer buffer) throws Exception {if (resetRequested.get()) {resetNow();}switch (currentState) {case SKIP_CONTROL_CHARS:// Fall-throughcase READ_INITIAL: try {// 跳过前面错误的ASCII码小于32的内容,解析第一行内容Buffer line = lineParser.parse(buffer);if (line == null) {return;}// 用于将初始行拆分成多个部分(如 HTTP 方法、URL 和协议版本)final String[] initialLine = splitInitialLine(line);assert initialLine.length == 3 : "initialLine::length must be 3";// 根据解析出的初始行创建消息对象message = createMessage(initialLine);currentState = State.READ_HEADER;// fall-through} catch (Exception e) {ctx.fireChannelRead(invalidMessage(ctx, message, buffer, e));return;}case READ_HEADER: try {// 解析 HTTP 头部信息,根据头部中的字段(如 Content-Length, Transfer-Encoding)决定解码后续的内容// 如果没有消息体(Content-Length == 0 或 -1 且是请求消息),则直接跳到下一步。// 如果是分块传输编码,跳转到 READ_CHUNK_SIZE。// 否则,继续读取固定长度或可变长度的消息体。State nextState = readHeaders(buffer);if (nextState == null) {return;}currentState = nextState;switch (nextState) {case SKIP_CONTROL_CHARS:addCurrentMessage(ctx);ctx.fireChannelRead(new EmptyLastHttpContent(ctx.bufferAllocator()));resetNow();return;case READ_CHUNK_SIZE:if (!chunkedSupported) {throw new IllegalArgumentException("Chunked messages not supported");}// Chunked encoding - generate HttpMessage first. HttpChunks will follow.addCurrentMessage(ctx);return;default:if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {ctx.fireChannelRead(message);ctx.fireChannelRead(new EmptyLastHttpContent(ctx.bufferAllocator()));resetNow();return;}assert nextState == State.READ_FIXED_LENGTH_CONTENT ||nextState == State.READ_VARIABLE_LENGTH_CONTENT;addCurrentMessage(ctx);if (nextState == State.READ_FIXED_LENGTH_CONTENT) {chunkSize = contentLength;}return;}} catch (Exception e) {ctx.fireChannelRead(invalidMessage(ctx, message, buffer, e));return;}case READ_VARIABLE_LENGTH_CONTENT: {// 继续读取消息体,直到连接关闭。一般用于请求/响应没有明确的 Content-Length 或使用了 Transfer-Encoding: chunked。int toRead = buffer.readableBytes();if (toRead > 0) {Buffer content = buffer.split();ctx.fireChannelRead(new DefaultHttpContent(content));}return;}case READ_FIXED_LENGTH_CONTENT: {// 读取固定长度的消息体。如果读取到的字节数与剩余的 chunkSize 相等,则解码完成,跳转到下一个阶段。int toRead = buffer.readableBytes();if (toRead == 0) {return;}if (toRead > chunkSize) {toRead = (int) chunkSize;}Buffer content = buffer.readSplit(toRead);chunkSize -= toRead;if (chunkSize == 0) {// Read all content.ctx.fireChannelRead(new DefaultLastHttpContent(content, trailersFactory));resetNow();} else {ctx.fireChannelRead(new DefaultHttpContent(content));}return;}// 解析分块传输编码的块大小。每个块有自己的大小,解析出大小后进入 READ_CHUNKED_CONTENT 状态case READ_CHUNK_SIZE: try {Buffer line = lineParser.parse(buffer);if (line == null) {return;}assert line.countComponents() == 1: "line should have exactly one component";try (var componentIterator = line.forEachComponent()) {var component = componentIterator.first();int chunkSize = getChunkSize(component.readableArray(),component.readableArrayOffset() + line.readerOffset(),line.readableBytes());this.chunkSize = chunkSize;if (chunkSize == 0) {currentState = State.READ_CHUNK_FOOTER;return;}currentState = State.READ_CHUNKED_CONTENT;}// fall-through} catch (Exception e) {ctx.fireChannelRead(invalidChunk(ctx.bufferAllocator(), buffer, e));return;}// 读取分块内容,根据解析出的块大小读取数据。如果读取完成,跳到 READ_CHUNK_DELIMITER 状态。case READ_CHUNKED_CONTENT: {assert chunkSize <= Integer.MAX_VALUE;int toRead = (int) chunkSize;toRead = Math.min(toRead, buffer.readableBytes());if (toRead == 0) {return;}HttpContent<?> chunk = new DefaultHttpContent(buffer.readSplit(toRead));chunkSize -= toRead;ctx.fireChannelRead(chunk);if (chunkSize != 0) {return;}currentState = State.READ_CHUNK_DELIMITER;// fall-through}// 处理分块传输编码中的块分隔符(即 CRLF)。根据是否有剩余数据,决定是否继续读取下一个数据块。case READ_CHUNK_DELIMITER: {// include LF in the bytes to skipint bytesToSkip = buffer.bytesBefore(HttpConstants.LF) + 1;if (bytesToSkip > 0) {currentState = State.READ_CHUNK_SIZE;buffer.skipReadableBytes(bytesToSkip);} else {buffer.skipReadableBytes(buffer.readableBytes());}return;}// 解析分块传输编码的尾部,通常是 HTTP 头部中的 Trailer 信息。解析完成后重置状态。case READ_CHUNK_FOOTER: try {LastHttpContent<?> trailer = readTrailingHeaders(ctx.bufferAllocator(), buffer);if (trailer == null) {return;}ctx.fireChannelRead(trailer);resetNow();return;} catch (Exception e) {ctx.fireChannelRead(invalidChunk(ctx.bufferAllocator(), buffer, e));return;}// 处理错误的消息。如果解码过程中发生异常,会进入该状态并丢弃数据。case BAD_MESSAGE: {// Keep discarding until disconnection.buffer.skipReadableBytes(buffer.readableBytes());break;}// 处理协议升级。在这种情况下,解码器会将剩余的数据交给新的协议解码器继续处理。case UPGRADED: {int readableBytes = buffer.readableBytes();if (readableBytes > 0) {ctx.fireChannelRead(buffer.split());}break;}default:break;}}// ...
}
State
State
枚举定义了 HTTP 消息解码过程中各个阶段的状态。每个状态代表了解码器在解析 HTTP 消息时的不同步骤。
State | Description |
---|---|
SKIP_CONTROL_CHARS | 跳过控制字符(如回车、换行符等)。 |
READ_INITIAL | 读取初始行(如请求行或响应行)。 |
READ_HEADER | 读取头部信息。 |
READ_VARIABLE_LENGTH_CONTENT | 读取可变长度的消息体。 |
READ_FIXED_LENGTH_CONTENT | 读取固定长度的消息体。 |
READ_CHUNK_SIZE | 读取分块传输编码中的块大小。 |
READ_CHUNKED_CONTENT | 读取分块传输编码中的数据内容。 |
READ_CHUNK_DELIMITER | 读取分块传输编码中的块分隔符(CRLF)。 |
READ_CHUNK_FOOTER | 读取分块传输编码的尾部(如结尾的 CRLF)。 |
BAD_MESSAGE | 解析过程中发生错误,无法继续解码。 |
UPGRADED | 协议已升级,用于处理协议升级。 |
这些状态帮助控制解码器在不同解析阶段的行为,确保按正确的顺序和格式解析 HTTP 消息。
private enum State {SKIP_CONTROL_CHARS,READ_INITIAL,READ_HEADER,READ_VARIABLE_LENGTH_CONTENT,READ_FIXED_LENGTH_CONTENT,READ_CHUNK_SIZE,READ_CHUNKED_CONTENT,READ_CHUNK_DELIMITER,READ_CHUNK_FOOTER,BAD_MESSAGE,UPGRADED
}
HeaderParser
HeaderParser
类用于解析 HTTP 请求或响应头部的内容。它接收一个缓冲区 Buffer
,逐步解析其中的数据,并将解析结果存储在 seq
中。它会根据最大长度 maxLength
限制头部的解析大小,如果超过限制则抛出异常。该类还处理 CRLF(回车换行符)分隔符,确保解析到有效的头部数据,并且能够在头部数据解析完毕时更新缓冲区的读取位置。
private static class HeaderParser {protected final Buffer seq;protected final int maxLength;int size;HeaderParser(Buffer seq, int maxLength) {this.seq = seq;this.maxLength = maxLength;}public Buffer parse(Buffer buffer) {final int readableBytes = buffer.readableBytes();final int readerIndex = buffer.readerOffset();final int maxBodySize = maxLength - size;assert maxBodySize >= 0;// adding 2 to account for both CR (if present) and LF// don't remove 2L: it's key to cover maxLength = Integer.MAX_VALUEfinal long maxBodySizeWithCRLF = maxBodySize + 2L;final int toProcess = (int) Math.min(maxBodySizeWithCRLF, readableBytes);final int toIndexExclusive = readerIndex + toProcess;assert toIndexExclusive >= readerIndex;int toLf = buffer.bytesBefore(HttpConstants.LF);final int indexOfLf = readerIndex + toLf;if (toLf == -1) {if (readableBytes > maxBodySize) {// TODO: Respond with Bad Request and discard the traffic// or close the connection.// No need to notify the upstream handlers - just log.// If decoding a response, just throw an exception.throw newException(maxLength);}return null;}final int endOfSeqIncluded;if (indexOfLf > readerIndex && buffer.getByte(indexOfLf - 1) == HttpConstants.CR) {// Drop CR if we had a CRLF pairendOfSeqIncluded = indexOfLf - 1;} else {endOfSeqIncluded = indexOfLf;}final int newSize = endOfSeqIncluded - readerIndex;if (newSize == 0) {seq.resetOffsets();buffer.readerOffset(indexOfLf + 1);return seq;}int size = this.size + newSize;if (size > maxLength) {throw newException(maxLength);}this.size = size;seq.resetOffsets();seq.ensureWritable(newSize, newSize, false);buffer.copyInto(readerIndex, seq, 0, newSize);seq.writerOffset(newSize);buffer.readerOffset(indexOfLf + 1);return seq;}public void reset() {size = 0;}protected TooLongFrameException newException(int maxLength) {return new TooLongHttpHeaderException("HTTP header is larger than " + maxLength + " bytes.");}
}
LineParser
LineParser
类继承自 HeaderParser
,用于解析 HTTP 请求或响应的单行(如请求行、状态行或头部行)。它首先跳过控制字符(如回车、换行等),然后调用父类 HeaderParser
解析有效的 HTTP 行。类中的 skipControlChars
方法负责跳过这些不需要的字符,并在必要时抛出异常。如果超出了最大长度 maxLength
,会抛出 TooLongHttpLineException
异常。
private final class LineParser extends HeaderParser {LineParser(Buffer seq, int maxLength) {super(seq, maxLength);}@Overridepublic Buffer parse(Buffer buffer) {// Suppress a warning because HeaderParser.reset() is supposed to be calledreset();final int readableBytes = buffer.readableBytes();if (readableBytes == 0) {return null;}final int readerIndex = buffer.readerOffset();if (currentState == State.SKIP_CONTROL_CHARS && skipControlChars(buffer, readableBytes, readerIndex)) {return null;}return super.parse(buffer);}private boolean skipControlChars(Buffer buffer, int readableBytes, int readerIndex) {assert currentState == State.SKIP_CONTROL_CHARS;final int maxToSkip = Math.min(maxLength, readableBytes);final int firstNonControlIndex = buffer.openCursor(readerIndex, maxToSkip).process(SKIP_CONTROL_CHARS_BYTES);if (firstNonControlIndex == -1) {buffer.skipReadableBytes(maxToSkip);if (readableBytes > maxLength) {throw newException(maxLength);}return true;}// from now on we don't care about control charsbuffer.readerOffset(readerIndex + firstNonControlIndex);currentState = State.READ_INITIAL;return false;}@Overrideprotected TooLongFrameException newException(int maxLength) {return new TooLongHttpLineException("An HTTP line is larger than " + maxLength + " bytes.");}
}
HttpRequestDecoder
HttpRequestDecoder 类用于解码 HTTP 请求消息(HttpRequest)并将其转换为 HttpMessage 对象。它继承自 HttpObjectDecoder,并通过解析请求行(包括请求方法、URI 和协议版本)以及请求头(如 Host、Content-Type、Content-Length)来完成解码过程。该类支持不同的 HTTP 方法(如 GET 和 POST)和协议版本(如 HTTP/1.0 和 HTTP/1.1)。它还能够根据特定的头部标识符对请求头进行拆分,以便正确识别请求中的各个字段。通过 HttpDecoderConfig,它允许自定义解码器的配置,如最大初始行长度和最大头部大小。
public class HttpRequestDecoder extends HttpObjectDecoder {private static final AsciiString Accept = AsciiString.cached("Accept");private static final AsciiString Host = AsciiString.cached("Host");private static final AsciiString Connection = AsciiString.cached("Connection");private static final AsciiString ContentType = AsciiString.cached("Content-Type");private static final AsciiString ContentLength = AsciiString.cached("Content-Length");private static final int GET_AS_INT = (int) charsToLong("GET");private static final int POST_AS_INT = (int) charsToLong("POST");private static final long HTTP_1_1_AS_LONG = charsToLong("HTTP/1.1");private static final long HTTP_1_0_AS_LONG = charsToLong("HTTP/1.0");;private static final int HOST_AS_INT = (int) charsToLong("Host");;private static final long CONNECTION_AS_LONG_0 = charsToLong("Connecti");private static final short CONNECTION_AS_SHORT_1 = (short) charsToLong("on");private static final long CONTENT_AS_LONG = charsToLong("Content-");;private static final int TYPE_AS_INT = (int) charsToLong("Type");private static final long LENGTH_AS_LONG = charsToLong("Length");private static final long ACCEPT_AS_LONG = charsToLong("Accept");private static long charsToLong(String cs) {long result = cs.charAt(0);int shift = 0;for (int i = 1; i < cs.length(); i++) {result |= (long) cs.charAt(i) << (shift += 8);}return result;}public HttpRequestDecoder() {}public HttpRequestDecoder(int maxInitialLineLength, int maxHeaderSize) {super(new HttpDecoderConfig().setMaxInitialLineLength(maxInitialLineLength).setMaxHeaderSize(maxHeaderSize));}public HttpRequestDecoder(HttpDecoderConfig config) {super(config);}// [GET, /index.html, HTTP/1.1]@Overrideprotected HttpMessage createMessage(String[] initialLine) throws Exception {return new DefaultHttpRequest(// Do strict version checkingHttpVersion.valueOf(initialLine[2], true),HttpMethod.valueOf(initialLine[0]), initialLine[1], headersFactory);}// 解析 HTTP 请求中的头部字段名(Header Name)@Overrideprotected AsciiString splitHeaderName(final byte[] sb, final int start, final int length) {// 获取头部字段的第一个字符final byte firstChar = sb[start];// 如果第一个字符是 'H',检查是否是 "Host" 头部字段if (firstChar == 'H') {if (length == 4 && isHost(sb, start)) {return Host;}} // 如果第一个字符是 'A',检查是否是 "Accept" 头部字段else if (firstChar == 'A') {if (length == 6 && isAccept(sb, start)) {return Accept;}} // 如果第一个字符是 'C',检查是否是 "Connection"、"Content-Type" 或 "Content-Length" 头部字段else if (firstChar == 'C') {if (length == 10) {if (isConnection(sb, start)) {return Connection;}} else if (length == 12) {if (isContentType(sb, start)) {return ContentType;}} else if (length == 14) {if (isContentLength(sb, start)) {return ContentLength;}}}// 如果没有匹配的情况,调用父类方法来处理return super.splitHeaderName(sb, start, length);}// 解析 HTTP 请求的初始行,并识别出 HTTP 方法@Overrideprotected String splitFirstWordInitialLine(final byte[] sb, final int start, final int length) {if (length == 3) {if (isGetMethod(sb, start)) {return HttpMethod.GET.name();}} else if (length == 4) {if (isPostMethod(sb, start)) {return HttpMethod.POST.name();}}return super.splitFirstWordInitialLine(sb, start, length);}// 解析 HTTP 请求的初始行,并识别出 HTTP 版本@Overrideprotected String splitThirdWordInitialLine(final byte[] sb, final int start, final int length) {if (length == 8) {final long maybeHttp1_x = sb[start] |sb[start + 1] << 8 |sb[start + 2] << 16 |sb[start + 3] << 24 |(long) sb[start + 4] << 32 |(long) sb[start + 5] << 40 |(long) sb[start + 6] << 48 |(long) sb[start + 7] << 56;if (maybeHttp1_x == HTTP_1_1_AS_LONG) {return HttpVersion.HTTP_1_1_STRING;} else if (maybeHttp1_x == HTTP_1_0_AS_LONG) {return HttpVersion.HTTP_1_0_STRING;}}return super.splitThirdWordInitialLine(sb, start, length);}@Overrideprotected HttpMessage createInvalidMessage(ChannelHandlerContext ctx) {return new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "/bad-request",ctx.bufferAllocator().allocate(0), headersFactory, trailersFactory);}// 指示当前正在解析的是一个 HTTP 请求而不是响应@Overrideprotected boolean isDecodingRequest() {return true;}// 判断消息类型来优化内容解析,对于 `DefaultHttpRequest` 返回 `false` 表示请求可能包含内容,而其他类型的消息则交由父类处理。@Overrideprotected boolean isContentAlwaysEmpty(final HttpMessage msg) {if (msg.getClass() == DefaultHttpRequest.class) {return false;}return super.isContentAlwaysEmpty(msg);}
}
HttpResponseDecoder
HttpResponseDecoder
类用于解析 HTTP 响应消息的初始行(版本、状态码和状态描述)和头部,生成对应的 HttpResponse
对象,并在解析失败时返回一个无效的响应。
public class HttpResponseDecoder extends HttpObjectDecoder {private static final HttpResponseStatus UNKNOWN_STATUS = new HttpResponseStatus(999, "Unknown");public HttpResponseDecoder() {}public HttpResponseDecoder(int maxInitialLineLength, int maxHeaderSize) {super(new HttpDecoderConfig().setMaxInitialLineLength(maxInitialLineLength).setMaxHeaderSize(maxHeaderSize));}public HttpResponseDecoder(HttpDecoderConfig config) {super(config);}@Overrideprotected HttpMessage createMessage(String[] initialLine) {return new DefaultHttpResponse(// Do strict version checkingHttpVersion.valueOf(initialLine[0], true),HttpResponseStatus.valueOf(Integer.parseInt(initialLine[1]), initialLine[2]), headersFactory);}@Overrideprotected HttpMessage createInvalidMessage(ChannelHandlerContext ctx) {return new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, UNKNOWN_STATUS, ctx.bufferAllocator().allocate(0),headersFactory, trailersFactory);}@Overrideprotected boolean isDecodingRequest() {return false;}
}