Gradle Connector 连接建立
Client和Server通常运行在不同的进程,不同进程的通信最常用的就是socket。
确实Gradle也是使用的socket建立C/S的连接
对于Gradle来说一个连接既是
1 2 3 4 public interface Connection <T> extends Dispatch <T>, Receive<T>, Stoppable {} public class DaemonClientConnection implements Connection <Message> {}
所以如果想要创建连接即是实例化一个此类的对象。
DaemonClient.java
如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public BuildActionResult execute (BuildAction action, BuildActionParameters parameters, BuildRequestContext requestContext) { UUID buildId = idGenerator.generateId(); List<DaemonInitialConnectException> accumulatedExceptions = Lists.newArrayList(); LOGGER.debug("Executing build {} in daemon client {pid={}}" , buildId, processEnvironment.maybeGetPid()); int saneNumberOfAttempts = 100 ; for (int i = 1 ; i < saneNumberOfAttempts; i++) { final DaemonClientConnection connection = connector.connect(compatibilitySpec); if (connection == null ) { break ; } try { Build build = new Build (buildId, connection.getDaemon().getToken(), action, requestContext.getClient(), requestContext.getStartTime(), requestContext.isInteractive(), parameters); return executeBuild(build, connection, requestContext.getCancellationToken(), requestContext.getEventConsumer()); } catch (DaemonInitialConnectException e) { LOGGER.debug("{}, Trying a different daemon..." , e.getMessage()); accumulatedExceptions.add(e); } finally { connection.stop(); } } final DaemonClientConnection connection = connector.startDaemon(compatibilitySpec); try { Build build = new Build (buildId, connection.getDaemon().getToken(), action, requestContext.getClient(), requestContext.getStartTime(), requestContext.isInteractive(), parameters); return executeBuild(build, connection, requestContext.getCancellationToken(), requestContext.getEventConsumer()); } catch (DaemonInitialConnectException e) { throw new NoUsableDaemonFoundException ("A new daemon was started but could not be connected to: " + "pid=" + connection.getDaemon() + ", address= " + connection.getDaemon().getAddress() + ". " + Documentation.userManual("troubleshooting" , "network_connection" ).consultDocumentationMessage(), accumulatedExceptions); } finally { connection.stop(); } }
Connector虽说是一个接口,但是只有一个实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public DaemonClientConnection connect (ExplainingSpec<DaemonContext> constraint) { final Pair<Collection<DaemonInfo>, Collection<DaemonInfo>> idleBusy = partitionByState(daemonRegistry.getAll(), Idle); final Collection<DaemonInfo> idleDaemons = idleBusy.getLeft(); final Collection<DaemonInfo> busyDaemons = idleBusy.getRight(); DaemonClientConnection connection = connectToIdleDaemon(idleDaemons, constraint); if (connection != null ) { return connection; } connection = connectToCanceledDaemon(busyDaemons, constraint); if (connection != null ) { return connection; } handleStopEvents(idleDaemons, busyDaemons); return null ; }
复用idle进程
连接符合条件的idle daemon进程
1 2 3 4 private DaemonClientConnection connectToIdleDaemon(Collection<DaemonInfo> idleDaemons, ExplainingSpec<DaemonContext> constraint) { final List<DaemonInfo> compatibleIdleDaemons = getCompatibleDaemons(idleDaemons, constraint); return findConnection(compatibleIdleDaemons); }
依此尝试连接
1 2 3 4 5 6 7 8 9 10 private DaemonClientConnection findConnection (List<DaemonInfo> compatibleDaemons) { for (DaemonInfo daemon : compatibleDaemons) { try { return connectToDaemon(daemon, new CleanupOnStaleAddress (daemon, true )); } catch (ConnectException e) { LOGGER.debug("Cannot connect to daemon {} due to {}. Trying a different daemon..." , daemon, e); } } return null ; }
连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private DaemonClientConnection connectToDaemon (DaemonConnectDetails daemon, DaemonClientConnection.StaleAddressDetector staleAddressDetector) throws ConnectException { ProgressLogger progressLogger = progressLoggerFactory.newOperation(DefaultDaemonConnector.class) .start("Connecting to Gradle Daemon" , "Connecting to Daemon" ); RemoteConnection<Message> connection; try { connection = connector.connect(daemon.getAddress()).create(Serializers.stateful(serializer)); } catch (ConnectException e) { staleAddressDetector.maybeStaleAddress(e); throw e; } finally { progressLogger.completed(); } return new DaemonClientConnection (connection, daemon, staleAddressDetector); }
TcpOutgoingConnector.java
连接socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public ConnectCompletion connect (Address destinationAddress) throws org.gradle.internal.remote.internal.ConnectException { InetEndpoint address = (InetEndpoint) destinationAddress; List<InetAddress> candidateAddresses = address.getCandidates(); try { Exception lastFailure = null ; for (InetAddress candidate : candidateAddresses) { LOGGER.debug("Trying to connect to address {}." , candidate); SocketChannel socketChannel; try { socketChannel = tryConnect(address, candidate); } catch (SocketException e) { LOGGER.debug("Cannot connect to address {}, skipping." , candidate); lastFailure = e; continue ; } catch (SocketTimeoutException e) { LOGGER.debug("Timeout connecting to address {}, skipping." , candidate); lastFailure = e; continue ; } LOGGER.debug("Connected to address {}." , socketChannel.socket().getRemoteSocketAddress()); return new SocketConnectCompletion (socketChannel); } throw new org .gradle.internal.remote.internal.ConnectException(String.format("Could not connect to server %s. Tried addresses: %s." , destinationAddress, candidateAddresses), lastFailure); } catch (org.gradle.internal.remote.internal.ConnectException e) { throw e; } catch (Exception e) { throw new RuntimeException (String.format("Could not connect to server %s. Tried addresses: %s." , destinationAddress, candidateAddresses), e); } } private SocketChannel tryConnect (InetEndpoint address, InetAddress candidate) throws IOException { SocketChannel socketChannel = SocketChannel.open(); try { socketChannel.socket().connect(new InetSocketAddress (candidate, address.getPort()), CONNECT_TIMEOUT); if (!detectSelfConnect(socketChannel)) { return socketChannel; } socketChannel.close(); } catch (IOException e) { socketChannel.close(); throw e; } catch (Throwable e) { socketChannel.close(); throw UncheckedException.throwAsUncheckedException(e); } throw new java .net.ConnectException(String.format("Socket connected to itself on %s port %s." , candidate, address.getPort())); }
创建connection实例
1 2 3 4 5 @Override public <T> RemoteConnection<T> create (StatefulSerializer<T> serializer) { return new SocketConnection <T>(socket, new KryoBackedMessageSerializer (), serializer); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public SocketConnection (SocketChannel socket, MessageSerializer streamSerializer, StatefulSerializer<T> messageSerializer) { this .socket = socket; try { socket.configureBlocking(false ); outstr = new SocketOutputStream (socket); instr = new SocketInputStream (socket); } catch (IOException e) { throw UncheckedException.throwAsUncheckedException(e); } InetSocketAddress localSocketAddress = (InetSocketAddress) socket.socket().getLocalSocketAddress(); localAddress = new SocketInetAddress (localSocketAddress.getAddress(), localSocketAddress.getPort()); InetSocketAddress remoteSocketAddress = (InetSocketAddress) socket.socket().getRemoteSocketAddress(); remoteAddress = new SocketInetAddress (remoteSocketAddress.getAddress(), remoteSocketAddress.getPort()); objectReader = messageSerializer.newReader(streamSerializer.newDecoder(instr)); encoder = streamSerializer.newEncoder(outstr); objectWriter = messageSerializer.newWriter(encoder); }
复用busy进程
连接已经取消任务的busy 进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private DaemonClientConnection connectToCanceledDaemon (Collection<DaemonInfo> busyDaemons, ExplainingSpec<DaemonContext> constraint) { DaemonClientConnection connection = null ; final Pair<Collection<DaemonInfo>, Collection<DaemonInfo>> canceledBusy = partitionByState(busyDaemons, Canceled); final Collection<DaemonInfo> compatibleCanceledDaemons = getCompatibleDaemons(canceledBusy.getLeft(), constraint); if (!compatibleCanceledDaemons.isEmpty()) { LOGGER.info(DaemonMessages.WAITING_ON_CANCELED); CountdownTimer timer = Time.startCountdownTimer(CANCELED_WAIT_TIMEOUT); while (connection == null && !timer.hasExpired()) { try { sleep(200 ); connection = connectToIdleDaemon(daemonRegistry.getIdle(), constraint); } catch (InterruptedException e) { throw UncheckedException.throwAsUncheckedException(e); } } } return connection; }
连接
1 2 3 4 private DaemonClientConnection connectToIdleDaemon (Collection<DaemonInfo> idleDaemons, ExplainingSpec<DaemonContext> constraint) { final List<DaemonInfo> compatibleIdleDaemons = getCompatibleDaemons(idleDaemons, constraint); return findConnection(compatibleIdleDaemons); }
开启新的进程 DefaultDaemonConnector.java
1 2 3 4 @Override public DaemonClientConnection startDaemon (ExplainingSpec<DaemonContext> constraint) { return doStartDaemon(constraint, false ); }
开启daemon进程并连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private DaemonClientConnection doStartDaemon (ExplainingSpec<DaemonContext> constraint, boolean singleRun) { ProgressLogger progressLogger = progressLoggerFactory.newOperation(DefaultDaemonConnector.class) .start("Starting Gradle Daemon" , "Starting Daemon" ); final DaemonStartupInfo startupInfo = daemonStarter.startDaemon(singleRun); LOGGER.debug("Started Gradle daemon {}" , startupInfo); CountdownTimer timer = Time.startCountdownTimer(connectTimeout); try { do { DaemonClientConnection daemonConnection = connectToDaemonWithId(startupInfo, constraint); if (daemonConnection != null ) { startListener.daemonStarted(daemonConnection.getDaemon()); return daemonConnection; } try { sleep(200L ); } catch (InterruptedException e) { throw UncheckedException.throwAsUncheckedException(e); } } while (!timer.hasExpired()); } finally { progressLogger.completed(); } throw new DaemonConnectionException ("Timeout waiting to connect to the Gradle daemon.\n" + startupInfo.describe()); }
消息发送
连接既是一个发送者,也是一个接收者
1 2 public interface Connection <T> extends Dispatch <T>, Receive<T>, Stoppable {}
发送者
1 2 3 4 public interface Dispatch <T> { void dispatch (T message) ; }
DaemonClientConnection.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void dispatch (Message message) throws DaemonConnectionException { LOG.debug("thread {}: dispatching {}" , Thread.currentThread().getId(), message.getClass()); try { dispatchLock.lock(); try { connection.dispatch(message); connection.flush(); } finally { dispatchLock.unlock(); } } catch (MessageIOException e) { LOG.debug("Problem dispatching message to the daemon. Performing 'on failure' operation..." ); if (!hasReceived && staleAddressDetector.maybeStaleAddress(e)) { throw new StaleDaemonAddressException ("Could not dispatch a message to the daemon." , e); } throw new DaemonConnectionException ("Could not dispatch a message to the daemon." , e); } }
SocketConnection.java
发送数据
1 2 3 4 5 6 7 8 9 10 11 12 13 public void dispatch (T message) throws MessageIOException { try { objectWriter.write(message); } catch (ObjectStreamException e) { throw new RecoverableMessageIOException (String.format("Could not write message %s to '%s'." , message, remoteAddress), e); } catch (ClassNotFoundException e) { throw new RecoverableMessageIOException (String.format("Could not write message %s to '%s'." , message, remoteAddress), e); } catch (IOException e) { throw new RecoverableMessageIOException (String.format("Could not write message %s to '%s'." , message, remoteAddress), e); } catch (Throwable e) { throw new MessageIOException (String.format("Could not write message %s to '%s'." , message, remoteAddress), e); } }
1 2 3 4 5 6 7 8 9 10 @Override public ObjectWriter<T> newWriter (final Encoder encoder) { return new ObjectWriter <T>() { @Override public void write (T value) throws Exception { serializer.write(encoder, value); } }; }
DefaultSerializerRegistry.java
1 2 3 4 5 6 7 8 public void write (Encoder encoder, T value) throws Exception { TypeInfo typeInfo = map(value.getClass()); encoder.writeSmallInt(typeInfo.tag); Cast.<Serializer<T>>uncheckedNonnullCast(typeInfo.serializer).write(encoder, value); }
由于发送的是Build
所以使用了如下序列化器
BuildSerializer.java
1 2 3 4 5 6 7 8 9 10 11 public void write (Encoder encoder, Build build) throws Exception { encoder.writeLong(build.getIdentifier().getMostSignificantBits()); encoder.writeLong(build.getIdentifier().getLeastSignificantBits()); encoder.writeBinary(build.getToken()); encoder.writeLong(build.getStartTime()); encoder.writeBoolean(build.isInteractive()); buildActionSerializer.write(encoder, build.getAction()); GradleLauncherMetaData metaData = (GradleLauncherMetaData) build.getBuildClientMetaData(); encoder.writeString(metaData.getAppName()); buildActionParametersSerializer.write(encoder, build.getParameters()); }
关于序列化器
支持read读取和write写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface Serializer <T> { T read (Decoder decoder) throws EOFException, Exception; void write (Encoder encoder, T value) throws Exception; }
每一个可序列化的事件都会有一个序列化器
消息接受 1 2 3 4 5 6 7 8 9 public interface Receive <T> { @Nullable T receive () ; }
DaemonClientConnection.java
1 2 3 4 5 6 7 8 9 10 11 12 13 public Message receive () throws DaemonConnectionException { try { return connection.receive(); } catch (MessageIOException e) { LOG.debug("Problem receiving message to the daemon. Performing 'on failure' operation..." ); if (!hasReceived && staleAddressDetector.maybeStaleAddress(e)) { throw new StaleDaemonAddressException ("Could not receive a message from the daemon." , e); } throw new DaemonConnectionException ("Could not receive a message from the daemon." , e); } finally { hasReceived = true ; } }
SocketConnection.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public T receive () throws MessageIOException { try { return objectReader.read(); } catch (EOFException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Discarding EOFException: {}" , e.toString()); } return null ; } catch (ObjectStreamException e) { throw new RecoverableMessageIOException (String.format("Could not read message from '%s'." , remoteAddress), e); } catch (ClassNotFoundException e) { throw new RecoverableMessageIOException (String.format("Could not read message from '%s'." , remoteAddress), e); } catch (IOException e) { throw new RecoverableMessageIOException (String.format("Could not read message from '%s'." , remoteAddress), e); } catch (Throwable e) { throw new MessageIOException (String.format("Could not read message from '%s'." , remoteAddress), e); } }
使用序列化器进行读取
1 2 3 4 5 6 7 8 public ObjectReader<T> newReader (final Decoder decoder) { return new ObjectReader <T>() { @Override public T read () throws Exception { return serializer.read(decoder); } }; }
总结
Gradle C/S连接的建立依托于Socket
Server的相关信息会写入到文件中,Client需要建立连接的时候会优先读取文件内容,从而确认服务端的端口号,如果没有满足的服务端进程才会考虑开启新的进程
C/S的通过依靠SocketConnection<T>
对象完成,C/S预先定义了交互的事件。消息对象发送会先经过Kryo(一个序列化框架)进行序列化,然后再通过Socket发送。