博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
etcd使用经历
阅读量:4319 次
发布时间:2019-06-06

本文共 21878 字,大约阅读时间需要 72 分钟。

  etcd是一个开源的、分布式的键值对数据存储系统,提供共享配置、服务的注册和发现。etcd与zookeeper相比算是轻量级系统,两者的一致性协议也一样,etcd的raft比zookeeper的paxos简单。关于windows版本的etcd服务端和nodejs浏览器下载和安装见。

  我们用etcd,就需要etcd客户端,这里用的是java客户端etcd4j。etcd客户端通过http发送get、put、post、delete等操作到服务端执行对目录信息的增删查改。etcd应用于微服务架构中的角色是服务注册中心,通过对接口调用信息的添加和查询,提供服务的注册和发现能力。

  下面结合etcd4j的主要功能类EtcdClient,我们看看etcd的一些操作:

* Copyright (c) 2015, Jurriaan Mous and contributors as indicated by the @author tags.package mousio.etcd4j;import io.netty.handler.ssl.SslContext;import mousio.client.retry.RetryPolicy;import mousio.client.retry.RetryWithExponentialBackOff;import mousio.etcd4j.requests.*;import mousio.etcd4j.responses.EtcdAuthenticationException;import mousio.etcd4j.responses.EtcdException;import mousio.etcd4j.responses.EtcdHealthResponse;import mousio.etcd4j.responses.EtcdMembersResponse;import mousio.etcd4j.responses.EtcdSelfStatsResponse;import mousio.etcd4j.responses.EtcdStoreStatsResponse;import mousio.etcd4j.responses.EtcdVersionResponse;import mousio.etcd4j.responses.EtcdLeaderStatsResponse;import mousio.etcd4j.transport.EtcdClientImpl;import mousio.etcd4j.transport.EtcdNettyClient;import java.io.Closeable;import java.io.IOException;import java.net.URI;import java.util.concurrent.TimeoutException;/** * Etcd client. */public class EtcdClient implements Closeable {  private final EtcdClientImpl client;  private RetryPolicy retryHandler;  /**   * Constructor   *   * @param baseUri URI to create connection on   */  public EtcdClient(URI... baseUri) {    this(EtcdSecurityContext.NONE, baseUri);  }  /**   * Constructor   *   * @param username  username   * @param password  password   * @param baseUri   URI to create connection on   */  public EtcdClient(String username, String password, URI... baseUri) {    this(EtcdSecurityContext.withCredential(username, password), baseUri);  }  /**   * Constructor   *   * @param sslContext  context for Ssl connections   * @param username    username   * @param password    password   * @param baseUri     URI to create connection on   */  public EtcdClient(SslContext sslContext, String username, String password, URI... baseUri) {    this(new EtcdSecurityContext(sslContext, username, password), baseUri);  }  /**   * Constructor   *   * @param sslContext  context for Ssl connections   * @param baseUri     URI to create connection on   */  public EtcdClient(SslContext sslContext, URI... baseUri) {    this(EtcdSecurityContext.withSslContext(sslContext), baseUri);  }  /**   * Constructor   *   * @param securityContext context for security   * @param baseUri URI to create connection on   */  public EtcdClient(EtcdSecurityContext securityContext, URI... baseUri) {    this(new EtcdNettyClient(      securityContext,      (baseUri.length == 0)        ? new URI[] { URI.create("https://127.0.0.1:4001") }        : baseUri    ));  }  /**   * Create a client with a custom implementation   *   * @param etcdClientImpl to create client with.   */  public EtcdClient(EtcdClientImpl etcdClientImpl) {    this.client = etcdClientImpl;    this.retryHandler = RetryWithExponentialBackOff.DEFAULT;  }  /**   * Get the version of the Etcd server   *   * @return version as String   * @deprecated use version() when using etcd 2.1+.   */  @Deprecated  public String getVersion() {    try {      return new EtcdOldVersionRequest(this.client, retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Get the version of the Etcd server   *   * @return version   */  public EtcdVersionResponse version() {    try {      return new EtcdVersionRequest(this.client, retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Get the Self Statistics of Etcd   *   * @return EtcdSelfStatsResponse   */  public EtcdSelfStatsResponse getSelfStats() {    try {      return new EtcdSelfStatsRequest(this.client, retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Get the Leader Statistics of Etcd   *   * @return EtcdLeaderStatsResponse   */  public EtcdLeaderStatsResponse getLeaderStats() {    try {      return new EtcdLeaderStatsRequest(this.client, retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Get the Store Statistics of Etcd   *   * @return vEtcdStoreStatsResponse   */  public EtcdStoreStatsResponse getStoreStats() {    try {      return new EtcdStoreStatsRequest(this.client, retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Get the Members of Etcd   *   * @return vEtcdMembersResponse   */  public EtcdMembersResponse getMembers() {    try {      return new EtcdMembersRequest(this.client,retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Get the Members of Etcd   *   * @return vEtcdMembersResponse   */  public EtcdHealthResponse getHealth() {    try {      return new EtcdHealthRequest(this.client,retryHandler).send().get();    } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) {      return null;    }  }  /**   * Put a key with a value   *   * @param key to put   * @param value to put on key   * @return EtcdKeysRequest   */  public EtcdKeyPutRequest put(String key, String value) {    return new EtcdKeyPutRequest(client, key, retryHandler).value(value);  }  /**  * Refresh a key with new ttl  * (without notifying watchers when using etcd 2.3+)  *  * @param key to refresh  * @param ttl to update key with  * @return EtcdKeysRequest  */  public EtcdKeyPutRequest refresh(String key, Integer ttl) {    return new EtcdKeyPutRequest(client, key, retryHandler).refresh(ttl);  }  /**   * Create a dir   *   * @param dir to create   * @return EtcdKeysRequest   */  public EtcdKeyPutRequest putDir(String dir) {    return new EtcdKeyPutRequest(client, dir, retryHandler).isDir();  }  /**   * Post a value to a key for in-order keys.   *   * @param key to post to   * @param value to post   * @return EtcdKeysRequest   */  public EtcdKeyPostRequest post(String key, String value) {    return new EtcdKeyPostRequest(client, key, retryHandler).value(value);  }  /**   * Deletes a key   *   * @param key to delete   * @return EtcdKeysRequest   */  public EtcdKeyDeleteRequest delete(String key) {    return new EtcdKeyDeleteRequest(client, key, retryHandler);  }  /**   * Deletes a directory   *   * @param dir to delete   * @return EtcdKeysRequest   */  public EtcdKeyDeleteRequest deleteDir(String dir) {    return new EtcdKeyDeleteRequest(client, dir, retryHandler).dir();  }  /**   * Get by key   *   * @param key to get   * @return EtcdKeysRequest   */  public EtcdKeyGetRequest get(String key) {    return new EtcdKeyGetRequest(client, key, retryHandler);  }  /**   * Get directory   *   * @param dir to get   * @return EtcdKeysGetRequest   */  public EtcdKeyGetRequest getDir(String dir) {    return new EtcdKeyGetRequest(client, dir, retryHandler).dir();  }  /**   * Get all keys   *   * @return EtcdKeysRequest   */  public EtcdKeyGetRequest getAll() {    return new EtcdKeyGetRequest(client, retryHandler);  }  @Override  public void close() throws IOException {    if (client != null) {      client.close();    }  }  /**   * Set the retry handler. Default is an exponential back-off with start of 20ms.   *   * @param retryHandler to set   * @return this instance   */  public EtcdClient setRetryHandler(RetryPolicy retryHandler) {    this.retryHandler = retryHandler;    return this;  }}

  这个类提供能etcd连接的方法,也就是构造器方法,提供了对etcd的操作方法(get、put、post、putDir、delete、deleteDir等),还提供了重试策略(参见RetryPolicy)。我们跟一下EtcdClient(URI... baseUri),进入EtcdClient(EtcdSecurityContext securityContext, URI... baseUri),实例化EtcdNettyClient:

* Copyright (c) 2015, Jurriaan Mous and contributors as indicated by the @author tags.package mousio.etcd4j.transport;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.PooledByteBufAllocator;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioDatagramChannel;import io.netty.handler.codec.base64.Base64;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslHandler;import io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.ReadTimeoutHandler;import io.netty.resolver.dns.DnsAddressResolverGroup;import io.netty.resolver.dns.DnsServerAddresses;import io.netty.util.CharsetUtil;import io.netty.util.concurrent.Future;import io.netty.util.concurrent.GenericFutureListener;import io.netty.util.concurrent.Promise;import mousio.client.ConnectionState;import mousio.client.retry.RetryHandler;import mousio.etcd4j.EtcdSecurityContext;import mousio.etcd4j.promises.EtcdResponsePromise;import mousio.etcd4j.requests.EtcdRequest;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.net.URI;import java.nio.channels.ClosedChannelException;import java.util.Map;import java.util.concurrent.CancellationException;/** * @author Jurriaan Mous * @author Luca Burgazzoli * * Netty client for the requests and responses */public class EtcdNettyClient implements EtcdClientImpl {  private static final Logger logger = LoggerFactory.getLogger(EtcdNettyClient.class);  // default etcd port  private static final int DEFAULT_PORT = 2379;  private static final String ENV_ETCD4J_ENDPOINT = "ETCD4J_ENDPOINT";  private final EventLoopGroup eventLoopGroup;  private final URI[] uris;  private final Bootstrap bootstrap;  //private final String hostName;  private final EtcdNettyConfig config;  private final EtcdSecurityContext securityContext;  protected volatile int lastWorkingUriIndex;  /**   * Constructor   *   * @param sslContext SSL context if connecting with SSL. Null if not connecting with SSL.   * @param uri        to connect to   */  public EtcdNettyClient(final SslContext sslContext, final URI... uri) {    this(new EtcdNettyConfig(), sslContext, uri);  }  /**   * Constructor   *   * @param securityContext security context.   * @param uri             to connect to   */  public EtcdNettyClient(final EtcdSecurityContext securityContext, final URI... uri) {    this(new EtcdNettyConfig(), securityContext, uri);  }  /**   * Constructor with custom eventloop group and timeout   *   * @param config     for netty   * @param sslContext SSL context if connecting with SSL. Null if not connecting with SSL.   * @param uris       to connect to   */  public EtcdNettyClient(final EtcdNettyConfig config,                         final SslContext sslContext, final URI... uris) {    this(config, new EtcdSecurityContext(sslContext), uris);  }  /**   * Constructor with custom eventloop group and timeout   *   * @param config     for netty   * @param uris       to connect to   */  public EtcdNettyClient(final EtcdNettyConfig config, final URI... uris) {    this(config, EtcdSecurityContext.NONE, uris);  }  /**   * Constructor with custom eventloop group and timeout   *   * @param config          for netty   * @param securityContext security context (ssl, authentication)   * @param uris            to connect to   */  public EtcdNettyClient(final EtcdNettyConfig config,                         final EtcdSecurityContext securityContext, final URI... uris) {    logger.info("Setting up Etcd4j Netty client");    this.lastWorkingUriIndex = 0;    this.config = config.clone();    this.securityContext = securityContext.clone();    this.uris = uris;    this.eventLoopGroup = config.getEventLoopGroup();    this.bootstrap = new Bootstrap()      .group(eventLoopGroup)      .channel(config.getSocketChannelClass())      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)      .option(ChannelOption.TCP_NODELAY, true)      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout())      .resolver(new DnsAddressResolverGroup(        NioDatagramChannel.class,        DnsServerAddresses.defaultAddresses()))      .handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (securityContext.hasNettySsl()) { p.addLast(securityContext.nettySslContext().newHandler(ch.alloc())); } else if (securityContext.hasSsl()) { p.addLast(new SslHandler(securityContext.sslContext().createSSLEngine())); } p.addLast("codec", new HttpClientCodec()); p.addLast("auth", new HttpBasicAuthHandler()); p.addLast("chunkedWriter", new ChunkedWriteHandler()); p.addLast("aggregate", new HttpObjectAggregator(config.getMaxFrameSize())); } }); } /** * For tests * * @return the current bootstrap */ protected Bootstrap getBootstrap() { return bootstrap; } /** * Send a request and get a future. * * @param etcdRequest Etcd Request to send * @return Promise for the request. */ public
EtcdResponsePromise
send(final EtcdRequest
etcdRequest) throws IOException { ConnectionState connectionState = new ConnectionState(uris, lastWorkingUriIndex); if (etcdRequest.getPromise() == null) { etcdRequest.setPromise(new EtcdResponsePromise
( etcdRequest.getRetryPolicy(), connectionState, new RetryHandler() { @Override public void doRetry(ConnectionState connectionState) throws IOException { connect(etcdRequest, connectionState); } })); } connect(etcdRequest, connectionState); return etcdRequest.getPromise(); } /** * Connect to server * * @param etcdRequest to request with * @param
Type of response * @throws IOException if request could not be sent. */ @SuppressWarnings("unchecked") protected
void connect(final EtcdRequest
etcdRequest) throws IOException { this.connect(etcdRequest, etcdRequest.getPromise().getConnectionState()); } /** * Connect to server * * @param etcdRequest to request with * @param connectionState for retries * @param
Type of response * @throws IOException if request could not be sent. */ @SuppressWarnings("unchecked") protected
void connect(final EtcdRequest
etcdRequest, final ConnectionState connectionState) throws IOException { if(eventLoopGroup.isShuttingDown() || eventLoopGroup.isShutdown() || eventLoopGroup.isTerminated()){ etcdRequest.getPromise().getNettyPromise().cancel(true); logger.debug("Retry canceled because of closed etcd client"); return; } final URI uri; // when we are called from a redirect, the url in the request may also // contain host and port! URI requestUri = URI.create(etcdRequest.getUrl()); if (requestUri.getHost() != null && requestUri.getPort() > -1) { uri = requestUri; } else if (connectionState.uris.length == 0 && System.getenv(ENV_ETCD4J_ENDPOINT) != null) { // read uri from environment variable String endpoint_uri = System.getenv(ENV_ETCD4J_ENDPOINT); if(logger.isDebugEnabled()) { logger.debug("Will use environment variable {} as uri with value {}", ENV_ETCD4J_ENDPOINT, endpoint_uri); } uri = URI.create(endpoint_uri); } else { uri = connectionState.uris[connectionState.uriIndex]; } // Start the connection attempt. final ChannelFuture connectFuture = bootstrap.connect(connectAddress(uri)); etcdRequest.getPromise().getConnectionState().loop = connectFuture.channel().eventLoop(); etcdRequest.getPromise().attachNettyPromise(connectFuture.channel().eventLoop().
newPromise()); connectFuture.addListener(new GenericFutureListener
() { @Override public void operationComplete(final ChannelFuture f) throws Exception { if (!f.isSuccess()) { final Throwable cause = f.cause(); if (logger.isDebugEnabled()) { logger.debug("Connection failed to {}, cause {}", connectionState.uris[connectionState.uriIndex], cause); } if (cause instanceof ClosedChannelException || cause instanceof IllegalStateException) { etcdRequest.getPromise().cancel(new CancellationException("Channel closed")); } else { etcdRequest.getPromise().handleRetry(f.cause()); } return; } // Handle already cancelled promises if (etcdRequest.getPromise().getNettyPromise().isCancelled()) { f.channel().close(); etcdRequest.getPromise().getNettyPromise().setFailure(new CancellationException()); return; } final Promise listenedToPromise = etcdRequest.getPromise().getNettyPromise(); // Close channel when promise is satisfied or cancelled later listenedToPromise.addListener(new GenericFutureListener
>() { @Override public void operationComplete(Future
future) throws Exception { // Only close if it was not redirected to new promise if (etcdRequest.getPromise().getNettyPromise() == listenedToPromise) { f.channel().close(); } } }); if (logger.isDebugEnabled()) { logger.debug("Connected to {} ({})", f.channel().remoteAddress().toString(), connectionState.uriIndex); } lastWorkingUriIndex = connectionState.uriIndex; modifyPipeLine(etcdRequest, f.channel().pipeline()); createAndSendHttpRequest(uri, etcdRequest.getUrl(), etcdRequest, f.channel()) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { etcdRequest.getPromise().setException(future.cause()); if (!f.channel().eventLoop().inEventLoop()) { f.channel().eventLoop().shutdownGracefully(); } f.channel().close(); } } }); f.channel().closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Connection closed for request {} on uri {} ", etcdRequest.getMethod().name(), etcdRequest.getUri()); } } }); } }); } /** * Modify the pipeline for the request * * @param req to process * @param pipeline to modify * @param
Type of Response */ private
void modifyPipeLine(final EtcdRequest
req, final ChannelPipeline pipeline) { final EtcdResponseHandler
handler = new EtcdResponseHandler<>(this, req); if (req.hasTimeout()) { pipeline.addFirst(new ReadTimeoutHandler(req.getTimeout(), req.getTimeoutUnit())); } pipeline.addLast(handler); pipeline.addLast(new ChannelHandlerAdapter() { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { handler.retried(true); req.getPromise().handleRetry(cause); } }); } /** * Get HttpRequest belonging to etcdRequest * * @param server server for http request * @param uri to send request to * @param etcdRequest to send * @param channel to send request on * @param
Response type * @return HttpRequest * @throws Exception when creating or sending HTTP request fails */ private
ChannelFuture createAndSendHttpRequest(URI server, String uri, EtcdRequest
etcdRequest, Channel channel) throws Exception { HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, etcdRequest.getMethod(), uri); httpRequest.headers().add(HttpHeaderNames.CONNECTION, "keep-alive"); if(!this.config.hasHostName()) { httpRequest.headers().add(HttpHeaderNames.HOST, server.getHost() + ":" + server.getPort()); } else { httpRequest.headers().add(HttpHeaderNames.HOST, this.config.getHostName()); } HttpPostRequestEncoder bodyRequestEncoder = null; Map
keyValuePairs = etcdRequest.getRequestParams(); if (keyValuePairs != null && !keyValuePairs.isEmpty()) { HttpMethod etcdRequestMethod = etcdRequest.getMethod(); if (etcdRequestMethod == HttpMethod.POST || etcdRequestMethod == HttpMethod.PUT) { bodyRequestEncoder = new HttpPostRequestEncoder(httpRequest, false); for (Map.Entry
entry : keyValuePairs.entrySet()) { bodyRequestEncoder.addBodyAttribute(entry.getKey(), entry.getValue()); } httpRequest = bodyRequestEncoder.finalizeRequest(); } else { QueryStringEncoder encoder = new QueryStringEncoder(uri); for (Map.Entry
entry : keyValuePairs.entrySet()) { encoder.addParam(entry.getKey() , entry.getValue()); } httpRequest.setUri(encoder.toString()); } } etcdRequest.setHttpRequest(httpRequest); ChannelFuture future = channel.write(httpRequest); if (bodyRequestEncoder != null && bodyRequestEncoder.isChunked()) { future = channel.write(bodyRequestEncoder); } channel.flush(); return future; } /** * Close netty */ @Override public void close() { logger.info("Shutting down Etcd4j Netty client"); if (config.isManagedEventLoopGroup()) { logger.debug("Shutting down Netty Loop"); eventLoopGroup.shutdownGracefully(); } } private InetSocketAddress connectAddress(URI uri) { return InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort() == -1 ? DEFAULT_PORT : uri.getPort()); } private class HttpBasicAuthHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (securityContext.hasCredentials() && msg instanceof HttpRequest) { addBasicAuthHeader((HttpRequest)msg); } ctx.write(msg, promise); } private void addBasicAuthHeader(HttpRequest request) { final String auth = Base64.encode( Unpooled.copiedBuffer( securityContext.username() + ":" + securityContext.password(), CharsetUtil.UTF_8) ).toString(CharsetUtil.UTF_8); request.headers().add(HttpHeaderNames.AUTHORIZATION, "Basic " + auth); } }}

  进入EtcdNettyClient(final EtcdSecurityContext securityContext, final URI... uri),再进入public EtcdNettyClient(final EtcdNettyConfig config,

                         final EtcdSecurityContext securityContext, final URI... uris),我们发现etcd集成了netty框架。

 

转载于:https://www.cnblogs.com/wuxun1997/p/8137753.html

你可能感兴趣的文章
XDM、GDM和KDM
查看>>
Java并发编程
查看>>
导致“mysql has gone away”的两种情况
查看>>
DTRACE -MYSQL
查看>>
autoScroll不显示滚动条
查看>>
LabVIEW动态添加控件
查看>>
Azure SQL 数据库与新的数据库吞吐量单位
查看>>
VC++2012编程演练数据结构《13》单链表
查看>>
Yii2简单的 yii2-phpexcel导出
查看>>
ubuntu/deepin 下下载wxpython
查看>>
A011 Activiti工作流程开发的一些统一规则和实现原理(完整版)
查看>>
Apache JServ protocol服务 怎么关闭?
查看>>
使用 SQL SERVER PROFILER 监测死锁
查看>>
男人的眼泪是血 很容易就会流完
查看>>
#define 中#和##的作用
查看>>
心情日记
查看>>
20145320\20145319 《信息安全系统设计基础》实验三
查看>>
Python 类
查看>>
oledbException 未指定的错误解决过程
查看>>
Scarlet的字符串不可能这么可爱
查看>>