前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springcloud gateway 自定义协议转化实现

springcloud gateway 自定义协议转化实现

作者头像
张泽旭
发布2019-12-10 18:14:22
3.3K0
发布2019-12-10 18:14:22
举报
文章被收录于专栏:张泽旭的专栏

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/qq_17655941/article/details/103362115

这几天公司要求实现 springcloud gateway 自定义协议 需要对外统一提供http/https 的接口,但是后端有很多服务提供了不同的方式,包括dubbo 协议,和dubbo 上提供的各种访问协议等(dubbo服务上协议的支持),需要从网关直接代理去访问。仔细分析后,发现gateway 只有自带的几种协议 ,http/https 和 ws 这个几种 ps:这就很难受,网上也百度了许多没有找到具体的实现方案,或许没有人这样干吧,在需求的推动下,决定自己研究一把.

1、源码分析

在没有头绪的情况下,还是得从阅读源码开始,源码下载地址,然后配置官方提供的私服地址,因为源码引用的有些包是未发布版本,在项目下找到对应 .flattened-pom.xml 文件下,上面的地址就是私服地址。(我这个说几个需要用到的类,其他的自己看百度查看 推荐学习地址)

org.springframework.cloud.gateway.config.GatewayAutoConfiguration

这个类是对gateway GatewayFilter 的实例类注入,在源代码调试过程中要在这里添加自己自定义的GatewayFilter,不然用注解注入不能起作用。

org.springframework.cloud.gateway.filter.GlobalFilter

全局网关过滤器,是一个接口,自定义需要实现此接口(下面说明的类都实现此接口,并在同一个包中)

NettyRoutingFilter

代码语言:javascript
复制
@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		/**
		 * 获取访问的url地址
		 */
		String scheme = requestUrl.getScheme();
		/**
		 * 判断这次请求是否被处理过,请求协议是否是http/https
		 * 如果满足则不处理
		 */
		if (isAlreadyRouted(exchange)
				|| (!"http".equals(scheme) && !"https".equals(scheme))) {
			return chain.filter(exchange);
		}
		/**
		 * 开始处理,首先将状态标志为已处理
		 */
		setAlreadyRouted(exchange);


		/**
		 * 获取ServerHttpRequest,相当于HttpServletRequest请求,在gateway
		 * 中使用的是webflux 做的web服务,所有处理有点变化
		 */
		ServerHttpRequest request = exchange.getRequest();

		/**
		 * 获取 请求方式
		 */
		final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
		final String url = requestUrl.toASCIIString();
		System.out.println(url);
		/**
		 * 获取 HttpHeaders
		 */
		HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

		final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
		filtered.forEach(httpHeaders::set);

		boolean preserveHost = exchange
				.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);


		/**
		 * httpClient 这次采用httpclient 去代理访问要去代理的地址,相当于网关去
		 * 帮你请求了一次
		 */
		Flux<HttpClientResponse> responseFlux = this.httpClient.headers(headers -> {
			headers.add(httpHeaders);
			/**
			 * 判断请求主机是否一致,一起是访问地址和路由地址是否是一个
			 * ip 不是则需要替换
			 */
			if (preserveHost) {
				String host = request.getHeaders().getFirst(HttpHeaders.HOST);
				headers.add(HttpHeaders.HOST, host);
			}
			else {
				// let Netty set it based on hostname
				headers.remove(HttpHeaders.HOST);
			}
		}).request(method).uri(url).send((req, nettyOutbound) -> {
			if (log.isTraceEnabled()) {
				nettyOutbound.withConnection(connection -> log.trace(
						"outbound route: " + connection.channel().id().asShortText()
								+ ", inbound: " + exchange.getLogPrefix()));
			}
			NettyOutbound outbound = nettyOutbound.send(request.getBody()
					.map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer()));

			return outbound;
		}).responseConnection((res, connection) -> {

			// Defer committing the response until all route filters have run
			// Put client response as ServerWebExchange attribute and write
			// response later NettyWriteResponseFilter
			exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
			exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
			/**
			 * 异步获取 httpClient 访问的响应结果,下面就是对响应结果的封装
			 */

			ServerHttpResponse response = exchange.getResponse();
			// put headers and status so filters can modify the response
			HttpHeaders headers = new HttpHeaders();

			res.responseHeaders()
					.forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

			String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
			if (StringUtils.hasLength(contentTypeValue)) {
				exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
						contentTypeValue);
			}

			HttpStatus status = HttpStatus.resolve(res.status().code());
			if (status != null) {
				response.setStatusCode(status);
			}
			else if (response instanceof AbstractServerHttpResponse) {
				// https://jira.spring.io/browse/SPR-16748
				((AbstractServerHttpResponse) response)
						.setStatusCodeValue(res.status().code());
			}
			else {
				// TODO: log warning here, not throw error?
				throw new IllegalStateException("Unable to set status code on response: "
						+ res.status().code() + ", " + response.getClass());
			}

			// make sure headers filters run after setting status so it is
			// available in response
			HttpHeaders filteredResponseHeaders = HttpHeadersFilter
					.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);

			if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
					&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
				// It is not valid to have both the transfer-encoding header and
				// the content-length header.
				// Remove the transfer-encoding header in the response if the
				// content-length header is present.
				response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
			}

			exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
					filteredResponseHeaders.keySet());

			response.getHeaders().putAll(filteredResponseHeaders);

			System.out.println(res+"///////");
			return Mono.just(res);
		});

		if (properties.getResponseTimeout() != null) {
			responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
					Mono.error(new TimeoutException("Response took longer than timeout: "
							+ properties.getResponseTimeout())))
					.onErrorMap(TimeoutException.class,
							th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
									th.getMessage(), th));
		}


		return responseFlux.then(chain.filter(exchange));
	}

NettyWriteResponseFilter(此类接收和处理上面的代理响应结果)

代码语言:javascript
复制
@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
		// until the NettyRoutingFilter is run
		// @formatter:off

		return chain.filter(exchange)
				.doOnError(throwable -> cleanup(exchange))
				.then(Mono.defer(() -> {
					Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

					if (connection == null) {
						return Mono.empty();
					}
					if (log.isTraceEnabled()) {
						log.trace("NettyWriteResponseFilter start inbound: "
								+ connection.channel().id().asShortText() + ", outbound: "
								+ exchange.getLogPrefix());
					}
					ServerHttpResponse response = exchange.getResponse();

					// TODO: what if it's not netty
					NettyDataBufferFactory factory = (NettyDataBufferFactory) response
							.bufferFactory();


					// TODO: needed? 对响应结果进一个解析,获取到的数据是一个NettyDataBuffer,可以百度下
					// 是netty的数据
					final Flux<NettyDataBuffer> body = connection
							.inbound()
							.receive()
							.retain()
							.map(factory::wrap);

					MediaType contentType = null;
					try {
						contentType = response.getHeaders().getContentType();
					}
					catch (Exception e) {
						if (log.isTraceEnabled()) {
							log.trace("invalid media type", e);
						}
					}

					return (isStreamingMediaType(contentType)
							? response.writeAndFlushWith(body.map(Flux::just))
							: response.writeWith(body));
				})).doOnCancel(() -> cleanup(exchange));
		// @formatter:on
	}

上面的代码变量和编写方式都和框架使用了webflux 有关 大家可以学习一下就很容易理解了阿里JAVA架构师详解Spring5新特性之WebFlux 我也是看了这个视频学会的 2个小时,感觉讲的挺好的

2、自定义代码实现

代码语言:javascript
复制
package com.neo.config;

import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;

/**
 * @ClassName DubboResponseGlobalFilter
 * @Desription 协议转换的过滤器类
 * @Author zhangzexu
 * @Date 2019/11/28 17:14
 * @Version V1.0
 */
@Configuration
public class DubboResponseGlobalFilter implements GlobalFilter, Ordered {

    @Value("${plugin.calssName}")
    private String className;

    private static Logger LOGGER = LoggerFactory.getLogger(DubboResponseGlobalFilter.class);
    private volatile List<HttpHeadersFilter> headersFilters;

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    public DubboResponseGlobalFilter() {

    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);


        final String scheme = requestUrl.getScheme();
        if (isAlreadyRouted(exchange) || "http".equals(scheme) || "https".equals(scheme) || "lb".equals(scheme) || "ws".equals(scheme)) {
            return chain.filter(exchange);
        }
        LOGGER.info("请求的url为{},协议为{}",requestUrl,scheme);
        setAlreadyRouted(exchange);
        /**
         * 获取请求的url 对路径进行重新编码
         */
        final String url = requestUrl.toASCIIString();

        Flux<DataBuffer> flux = exchange.getRequest().getBody();
        AtomicReference<byte[]> atomicReference = new AtomicReference<>();
        /**
         * 获取客户端请求的数据,body体
         */
        flux.subscribe(buffer -> {
            byte[] bytes = new byte[buffer.readableByteCount()];
            buffer.read(bytes);
            DataBufferUtils.release(buffer);
            atomicReference.set(bytes);
        });
        return chain.filter(exchange)
                .then(Mono.defer(() -> {
                    ServerHttpResponse response = exchange.getResponse();
                    return response.writeWith(Flux.create(sink -> {
                        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false));
                        JSONObject json = new JSONObject();
                        Class c = null;
                        DataBuffer dataBuffer = null;
                        String charset = "UTF-8";
                        try {
                            /**
                             * 初始化反射数据,将要调用的类反射获取,反射的类的名称结构,
                             * 用 dubbo 协议举例 
                             * 则插件的类名组合为 DubboGatewayImpl
                             */
                            StringBuilder sb = new StringBuilder(className);
                            sb.append(".");
                            char[] name = scheme.toCharArray();
                            name[0] -= 32;
                            sb.append(String.valueOf(name));
                            sb.append("GatewayPluginImpl");
                            c = Class.forName(sb.toString());
                            c.getMethods();
                            Method method = c.getMethod("send", String.class, byte[].class);
                            Object obj = c.getConstructor().newInstance();
                            Object result = method.invoke(obj, url, atomicReference.get());
                            HttpStatus status = HttpStatus.resolve(500);
                            /**
                             * 判断结果是否返回,如果没有数据则直接返回
                             */
                            if (result == null) {

                            } else {
                                json = (JSONObject) result;
                                status = HttpStatus.resolve(json.getInteger("code"));
                                json.remove("code");
                                /**
                                 * 获取字符集编码格式 默认 utf-8
                                 */
                                if (!StringUtils.isEmpty(json.getString("charset"))) {
                                    charset = json.getString("charset");
                                }
                            }
                            response.setStatusCode(status);
                            try {
                                dataBuffer = nettyDataBufferFactory.wrap(json.toJSONString().getBytes(charset));
                            } catch (UnsupportedEncodingException e) {
                                dataBuffer = nettyDataBufferFactory.wrap(e.toString().getBytes(charset));
                                LOGGER.error("返回调用请求数据错误{}",e);
                                e.printStackTrace();
                            }
                        } catch (Exception e) {
                            try {
                                dataBuffer = nettyDataBufferFactory.wrap(e.toString().getBytes(charset));
                                LOGGER.error("获取远程数据错误{}",e);
                            } catch (UnsupportedEncodingException ex) {
                                ex.printStackTrace();
                                LOGGER.error("返回调用请求数据错误{}",ex);
                            }
                            e.printStackTrace();
                        }
                        /**
                         * 将数据进行发射到下一个过滤器
                         */
                        sink.next(dataBuffer);
                        sink.complete();
                    }));

                }));
    }
}

通过反射机制类完成除过gateway 自定义协议外的所有解析进行处理。

这就完了,简单把 具体协议插件实现可以下载源代码

完整项目下载 github

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/12/03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、源码分析
    • org.springframework.cloud.gateway.config.GatewayAutoConfiguration
      • org.springframework.cloud.gateway.filter.GlobalFilter
        • NettyRoutingFilter
          • NettyWriteResponseFilter(此类接收和处理上面的代理响应结果)
          • 2、自定义代码实现
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档