if (SHARD.equals(configs.getBackplane().getType())) { backplane = newRedisShardBackplane(identifier, this::stripOperation, this::stripQueuedOperation); backplane.start(configs.getWorker().getPublicName()); } else { thrownewIllegalArgumentException("Shard Backplane not set in config"); }
@Override publicvoidstart(String clientPublicName)throws IOException { // Construct a single redis client to be used throughout the entire backplane. // We wish to avoid various synchronous and error handling issues that could occur when using // multiple clients. client = newRedisClient(jedisClusterFactory.get()); // Create containers that make up the backplane state = DistributedStateCreator.create(client);
if (configs.getBackplane().isSubscribeToBackplane()) { startSubscriptionThread(); } if (configs.getBackplane().isRunFailsafeOperation()) { startFailsafeOperationThread(); }
// Record client start time client.call( jedis -> jedis.set("startTime/" + clientPublicName, Long.toString(newDate().getTime()))); }
voidonWorkerChange(WorkerChange workerChange) { switch (workerChange.getTypeCase()) { case TYPE_NOT_SET: log.log( Level.SEVERE, format( "WorkerChange oneof type is not set from %s at %s", workerChange.getName(), workerChange.getEffectiveAt())); break; case ADD: addWorker(workerChange.getName()); break; case REMOVE: removeWorker(workerChange.getName()); break; } }
voidonOperationChange(String channel, OperationChange operationChange) { switch (operationChange.getTypeCase()) { case TYPE_NOT_SET: log.log( Level.SEVERE, format( "OperationChange oneof type is not set from %s at %s", operationChange.getSource(), operationChange.getEffectiveAt())); break; case RESET: resetOperation(channel, operationChange.getReset()); break; case EXPIRE: terminateExpiredWatchers( channel, toInstant(operationChange.getEffectiveAt()), operationChange.getExpire().getForce()); break; } }
// We will build a worker's server based on it's capabilities. // A worker that is capable of execution will construct an execution pipeline. // It will use various execution phases for it's profile service. // On the other hand, a worker that is only capable of CAS storage does not need a pipeline. if (configs.getWorker().getCapabilities().isExecution()) { PipelineStagecompleteStage= newPutOperationStage((operation) -> context.deactivate(operation.getName())); PipelineStageerrorStage= completeStage; /* new ErrorStage(); */ PipelineStagereportResultStage=newReportResultStage(context, completeStage, errorStage); PipelineStageexecuteActionStage= newExecuteActionStage(context, reportResultStage, errorStage); PipelineStageinputFetchStage= newInputFetchStage(context, executeActionStage, newPutOperationStage(context::requeue)); PipelineStagematchStage=newMatchStage(context, inputFetchStage, errorStage);
@Override protectedvoiditerate()throws InterruptedException { // stop matching and picking up any works if the worker is in graceful shutdown. if (inGracefulShutdown) { return; } Stopwatchstopwatch= Stopwatch.createStarted(); OperationContextoperationContext= OperationContext.newBuilder().build(); if (!output.claim(operationContext)) { return; } MatchOperationListenerlistener=newMatchOperationListener(operationContext, stoxpwatch); try { logStart(); workerContext.match(listener); } finally { if (!listener.wasMatched()) { output.release(); } } }
/* if the operation is already in the dispatch list, fail the dispatch */ success = state.dispatchedOperations.insertIfMissing(jedis, operationName, dispatchedOperationJson); } catch (InvalidProtocolBufferException e) { log.log(Level.SEVERE, "error printing dispatched operation", e); // very unlikely, printer would have to fail }
if (success) { if (!state.operationQueue.removeFromDequeue(jedis, queueEntryJson)) { log.log( Level.WARNING, format( "operation %s was missing in %s, may be orphaned", operationName, state.operationQueue.getDequeueName())); } state.dispatchingOperations.remove(jedis, operationName);
// Return an entry so that if it needs re-queued, it will have the correct "requeue attempts". return queueEntryBuilder.setRequeueAttempts(queueEntry.getRequeueAttempts() + 1).build(); } returnnull; }
publicclassDockerExecutor { /** * @brief Run the action using the docker client and populate the results. * @details This will fetch any images as needed, spawn a container for execution, and clean up * docker resources if requested. * @param dockerClient Client used to interact with docker. * @param settings Settings used to perform action execition. * @param resultBuilder The action results to populate. * @return Grpc code as to whether buildfarm was able to run the action. * @note Suggested return identifier: code. */ publicstatic Code runActionWithDocker( DockerClient dockerClient, DockerExecutorSettings settings, ActionResult.Builder resultBuilder) throws InterruptedException, IOException { StringcontainerId= prepareRequestedContainer(dockerClient, settings); StringexecId= runActionInsideContainer(dockerClient, settings, containerId, resultBuilder); extractInformationFromContainer(dockerClient, settings, containerId, execId, resultBuilder); cleanUpContainer(dockerClient, containerId); return Code.OK; }
/** * @brief Setup the container for the action. * @details This ensures the image is fetched, the container is started, and that the container * has proper visibility to the action's execution root. After this call it should be safe to * spawn an action inside the container. * @param dockerClient Client used to interact with docker. * @param settings Settings used to perform action execition. * @return The ID of the started container. * @note Suggested return identifier: containerId. */ privatestatic String prepareRequestedContainer( DockerClient dockerClient, DockerExecutorSettings settings)throws InterruptedException { // this requires network access. Once complete, "docker image ls" will show the downloaded // image fetchImageIfMissing( dockerClient, settings.limits.containerSettings.containerImage, settings.fetchTimeout);
// build and start the container. Once complete, "docker container ls" will show the started // container StringcontainerId= createContainer(dockerClient, settings); dockerClient.startContainerCmd(containerId).exec();
// copy files into it populateContainer(dockerClient, containerId, settings.execDir);
// container is ready for running actions return containerId; }
/** * @brief Assuming the container is already created and properly populated/mounted with data, this * can be used to spawn an action inside of it. * @details The stdout / stderr of the action execution are populated to the results. * @param dockerClient Client used to interact with docker. * @param settings Settings used to perform action execition. * @param containerId The ID of the container. * @param resultBuilder The results to populate. * @return The ID of the container execution. * @note Suggested return identifier: execId. */ privatestatic String runActionInsideContainer( DockerClient dockerClient, DockerExecutorSettings settings, String containerId, ActionResult.Builder resultBuilder) throws InterruptedException { // decide command to run ExecCreateCmdexecCmd= dockerClient.execCreateCmd(containerId); execCmd.withWorkingDir(settings.execDir.toAbsolutePath().toString()); execCmd.withAttachStderr(true); execCmd.withAttachStdout(true); execCmd.withCmd(settings.arguments.toArray(newString[0])); StringexecId= execCmd.exec().getId(); // execute command (capture stdout / stderr) ExecStartCmdexecStartCmd= dockerClient.execStartCmd(execId); ByteArrayOutputStreamout=newByteArrayOutputStream(); ByteArrayOutputStreamerr=newByteArrayOutputStream(); execStartCmd.exec(newExecStartResultCallback(out, err)).awaitCompletion(); // store results resultBuilder.setStdoutRaw(ByteString.copyFromUtf8(out.toString())); resultBuilder.setStderrRaw(ByteString.copyFromUtf8(err.toString()));
/** * @brief Extract information from the container after the action ran. * @details This can include exit code, output artifacts, and various docker information. * @param dockerClient Client used to interact with docker. * @param settings Settings used to perform action execition. * @param containerId The ID of the container. * @param execId The ID of the execution. * @param resultBuilder The results to populate. */ privatestaticvoidextractInformationFromContainer( DockerClient dockerClient, DockerExecutorSettings settings, String containerId, String execId, ActionResult.Builder resultBuilder) throws IOException { extractExitCode(dockerClient, execId, resultBuilder); copyOutputsOutOfContainer(dockerClient, settings, containerId); }
publicvoidprepareWorkerForGracefulShutdown() { if (configs.getWorker().getGracefulShutdownSeconds() == 0) { log.info( "Graceful Shutdown is not enabled. Worker is shutting down without finishing executions" + " in progress."); } else { inGracefulShutdown = true; log.info( "Graceful Shutdown - The current worker will not be registered again and should be" + " shutdown gracefully!"); pipeline.stopMatchingOperations(); intscanRate=30; // check every 30 seconds inttimeWaited=0; inttimeOut= configs.getWorker().getGracefulShutdownSeconds(); try { if (pipeline.isEmpty()) { log.info("Graceful Shutdown - no work in the pipeline."); } else { log.info("Graceful Shutdown - waiting for executions to finish."); } while (!pipeline.isEmpty() && timeWaited < timeOut) { SECONDS.sleep(scanRate); timeWaited += scanRate; log.info( String.format( "Graceful Shutdown - Pipeline is still not empty after %d seconds.", timeWaited)); } } catch (InterruptedException e) { log.info( "Graceful Shutdown - The worker gracefully shutdown is interrupted: " + e.getMessage()); } finally { log.info( String.format( "Graceful Shutdown - It took the worker %d seconds to %s", timeWaited, pipeline.isEmpty() ? "finish all actions" : "gracefully shutdown but still cannot finish all actions")); } } }