任务背景
在研究如何使用 sidecar 模式实现 Buildfarm Worker 在自定义镜像中运行构建任务的过程中,注意到如下类:
可以看到,ProcessBuilder
的实例记录了一次构建任务的全部上下文,包括具体执行的指令,输出目录,环境变量。
Buildfarm Worker 在生成了这样一个对象后,运行start
方法,生成Process
类实例,在新的线程中执行任务,并通过Process
对象获取任务的状态(标准输出、标准错误、是否结束、退出码等)。
因此,如果能在 sidecar 容器(这个容器从自定义镜像创建)中运行一个代理,将对ProcessBuilder
和Process
的操作发送到代理中,就可以实现我们的需求。
RMI 概述
RMI远程调用 - Java教程 - 廖雪峰的官方网站 (liaoxuefeng.com)
Java RMI(Remote Method Invocation)是一个用于构建分布式应用程序的API。它允许开发者在不同的Java虚拟机(JVM)间调用远程对象的方法,像调用本地对象一样简单。以下是RMI的几个关键特性:
远程对象 :在RMI中,远程对象是运行在服务器上的对象,客户端通过代理(stub)与其进行交互。
代理和骨架 :RMI框架生成代理和骨架代码。代理在客户端,负责请求的发送;骨架在服务器,处理这些请求并调用相应的方法。
序列化 :RMI使用Java的序列化机制将方法参数和返回值转换为字节流,以便通过网络传输。
网络通信 :RMI通常使用TCP/IP协议在客户端与服务器之间建立可靠的通讯。
简化开发 :开发者可以像调用本地方法那样调用远程方法,从而简化了分布式编程的复杂性。
动态发现 :RMI注册中心(RMI Registry)允许客户端动态查找和绑定远程对象。
RMI适用于构建需要跨网络调用的分布式系统,尤其是在纯Java环境中。通过RMI,开发者可以创建灵活且易于维护的网络应用程序。
注意点
可序列化的对象
通过 Java RMI 传输的对象,必须是可序列化的(实现java.io.Serializable接口),而ProcessBuilder
,Process
对象都是不可序列化的。因此,不能直接将这两个对象进行传输。
生命周期
远程对象同样遵循JVM的垃圾回收机制。客户端对远程对象的引用是通过一个代理(stub)实现的。当客户端本地对应的对象由于某种原因失去引用时,会导致远程对象的引用状态变化。
举例两个会导致远程对象失去引用的情形:
局部变量超出作用域: 如果客户端在某个方法中定义了一个局部变量来持有对远程对象的引用,由于这个变量生命周期有限,当方法执行完后,变量会被垃圾回收。
将引用显式设置为 null: 如果客户端代码将持有远程对象的引用显式设置为 null,则该引用将失去对远程对象的引用。
RMI 实现
对象代理
实现ProcessBuilder
和Process
的代理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package com.alicloud.basic_tech;import java.io.File;import java.rmi.RemoteException;import java.rmi.server.UnicastRemoteObject;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class RemoteProcessBuilderImpl extends UnicastRemoteObject implements RemoteProcessBuilder { private String command; private File directory; private Map<String, String> environment; protected RemoteProcessBuilderImpl () throws RemoteException { super (); environment = new HashMap <>(); } @Override public RemoteProcessBuilder directory (File directory) throws RemoteException { this .directory = directory; System.out.println("Directory set to: " + (directory != null ? directory.getAbsolutePath() : "null" )); return this ; } @Override public RemoteProcessBuilder setEnvironmentVariable (String key, String value) throws RemoteException { this .environment.put(key, value); System.out.println("Environment variable set: " + key + " = " + value); return this ; } @Override public Map<String, String> environment () throws RemoteException { return environment; } @Override public RemoteProcessBuilder command (List<String> command) throws RemoteException { this .command = String.join(" " , new ArrayList <>(command)); System.out.println("Command set to: " + this .command); return this ; } @Override public RemoteProcessBuilder command (String... command) throws RemoteException { this .command = String.join(" " , command); System.out.println("Command set to: " + this .command); return this ; } @Override public RemoteProcess start () throws RemoteException { try { System.out.println("Starting process with command: " + command); ProcessBuilder builder = new ProcessBuilder ("bash" , "-c" , command); if (directory != null ) { builder.directory(directory); System.out.println("Process will run in directory: " + directory.getAbsolutePath()); } else { System.out.println("Process will run in the default directory." ); } Map<String, String> currentEnv = builder.environment(); currentEnv.clear(); if (environment != null ) { currentEnv.putAll(environment); } System.out.println("Environment variables set: " + currentEnv); Process process = builder.start(); System.out.println("Process started successfully. PID: " + process.hashCode()); return new RemoteProcessImpl (process); } catch (Exception e) { System.err.println("Error starting process: " + e.getMessage()); throw new RemoteException ("Error starting process" , e); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 package com.alicloud.basic_tech;import java.rmi.RemoteException;import java.rmi.server.UnicastRemoteObject;import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.concurrent.TimeUnit;public class RemoteProcessImpl extends UnicastRemoteObject implements RemoteProcess { private String stdOutput; private String stdError; private Process process; private Thread outputThread; private Thread errorThread; public RemoteProcessImpl (Process process) throws RemoteException { this .process = process; System.out.println("Process started. Capturing output..." ); String command = String.join(" " , process.info().command().orElse("Unknown Command" )); System.out.println("Command executed: " + command); captureOutput(); } private void captureOutput () { outputThread = new Thread (() -> { StringBuilder stdOutputBuilder = new StringBuilder (); try (BufferedReader reader = new BufferedReader (new InputStreamReader (process.getInputStream()))) { String line; int lineCount = 0 ; while ((line = reader.readLine()) != null ) { stdOutputBuilder.append(line).append("\n" ); System.out.println("Standard Output (Line " + (++lineCount) + "): " + line); } System.out.println("Total lines read from Standard Output: " + lineCount); } catch (Exception e) { System.err.println("Error reading standard output: " + e.getMessage()); } stdOutput = stdOutputBuilder.toString(); }); errorThread = new Thread (() -> { StringBuilder stdErrorBuilder = new StringBuilder (); try (BufferedReader reader = new BufferedReader (new InputStreamReader (process.getErrorStream()))) { String line; int lineCount = 0 ; while ((line = reader.readLine()) != null ) { stdErrorBuilder.append(line).append("\n" ); System.err.println("Standard Error (Line " + (++lineCount) + "): " + line); } System.err.println("Total lines read from Standard Error: " + lineCount); } catch (Exception e) { System.err.println("Error reading standard error: " + e.getMessage()); } stdError = stdErrorBuilder.toString(); }); outputThread.start(); errorThread.start(); } @Override public String getStdInput () throws RemoteException { return stdOutput; } @Override public String getStdError () throws RemoteException { return stdError; } @Override public int waitFor () throws RemoteException { try { process.waitFor(); outputThread.join(); errorThread.join(); System.out.println("Process finished successfully." ); return process.exitValue(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RemoteException ("Process was interrupted" , e); } } @Override public boolean waitFor (long timeout, TimeUnit unit) throws RemoteException { boolean finished; try { finished = process.waitFor(timeout, unit); outputThread.join(); errorThread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RemoteException ("Process was interrupted while waiting" , e); } if (finished) { System.out.println("Process finished within timeout." ); } else { System.out.println("Process did not finish within timeout." ); } return finished; } @Override public void destroy () throws RemoteException { if (process != null ) { process.destroy(); System.out.println("Process destroyed" ); } } @Override public Process destroyForcibly () throws RemoteException { if (process != null ) { process.destroyForcibly(); System.out.println("Process forcibly destroyed" ); return process; } return null ; } @Override public int exitValue () throws RemoteException { try { return process.exitValue(); } catch (IllegalThreadStateException e) { throw new RemoteException ("Process has not finished yet." , e); } } }
值的注意的是,Porcess.getInputStream
获取的对象同样是不可序列化的,因此需要在代理对象中先将其输出收集存储,然后再以String
类型传输。
RMI 服务器
运行一个服务器,监听本地1099端口。将我们的代理对象注册到其中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.alicloud.basic_tech;import java.rmi.registry.LocateRegistry;import java.rmi.registry.Registry;public class RMIServer { public static void main (String[] args) { try { RemoteProcessBuilder processBuilder = new RemoteProcessBuilderImpl (); Registry registry = LocateRegistry.createRegistry(1099 ); registry.rebind("RemoteProcessBuilder" , processBuilder); System.out.println("RMI Server is running and ready to manage processes." ); }catch (Exception e) { e.printStackTrace(); } } }
客户端
在需要调用远程对象的地方,只需要用 RMI 客户端获取远程对象。这里写一个测试程序进行模拟。
sidecar 模式中,两个容器运行在同一个 Pod 中,共享网络空间,因此直接访问本地的 1099 端口即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.alicloud.basic_tech;import java.io.File;import java.rmi.registry.LocateRegistry;import java.rmi.registry.Registry;public class RMIServerClient { public static void main (String[] args) { try { Registry registry = LocateRegistry.getRegistry("localhost" , 1099 ); RemoteProcessBuilder remoteProcessBuilder = (RemoteProcessBuilder) registry.lookup("RemoteProcessBuilder" ); String logMessage = "Hello, World!" ; String outputFilePath = "log" ; String command = String.format("echo \"%s\" | tee %s" , logMessage, outputFilePath); RemoteProcess remoteProcess = remoteProcessBuilder .command(command) .directory(new File ("/mesh-tmp" )) .setEnvironmentVariable("PATH" , "/usr/bin" ) .start(); int exitCode = remoteProcess.waitFor(); String output = remoteProcess.getStdInput(); String error = remoteProcess.getStdError(); System.out.println("Standard Output: \n" + output); System.out.println("Standard Error: \n" + error); System.out.println("Process exited with code: " + exitCode); } catch (Exception e) { e.printStackTrace(); } } }
测试
我们用 maven 将服务器和客户端编译为 jar 包(这里采用 fat jar 格式,引入guava库,使其可以在任何地方运行)。
在同一个 Pod 中,一个容器运行服务端:
在另一个容器中,运行客户端,结果如下