如何在不更改 main 的情况下在 Spring 服务中同时进行多个 API 调用?

我需要在现有的胖代码中创建一个服务以从 4 个 API 获取结果,并且我需要合并它们并重新格式化每个响应,但由于 4 个调用我不知道如何同时执行它,它需要非常慢。 我也无法更改 main 以在 main 中添加 Runnable 或此类执行程序,因为它可能对另一个代码产生滚雪球效应。

所以目前,我制作了一个处理请求的控制器,一个从用户那里获取请求并调用 5 个不同的服务中间件 (SM) 函数的服务。每个 SM 函数都用于调用外部 API,在每个 SM 中,我也重新格式化了 API 的每个返回映射。我使用 java.net.HttpURLConnection 进行 API 调用。因此,我的 API “工作”了,但不能超过 4 秒。这些 API 需要额外的 OAuth,因此总共大约需要 10 个 API 调用。

由于 API 调用的当前返回是 Object 类型,因此我可以将其视为 Map ,并通过对其中的数据进行循环来重新格式化输出。所以 SM 函数的代码可能与下面类似:

token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
List<Map> data = (List) ((Map) response.get("output")).get("data");
List<Map> result = new HashMap();
for(Map m : data) {
  Map temp = new HashMap();
  temp.put("name", m.get("Name"));
  temp.put("health_status", m.get("HealthStatus"));
  result.add(temp);
}

// This format is mandatory
Map finalResult = new HashMap();
finalResult.put("output", result);
finalResult.put("status", "OK");
return finalResult;

sendHttpRequest 是发送请求、将参数序列化为 JSON 并将 API 输出反序列化为 Object 的方法。这是 sendHttpRequest 的样子:

CloseableHttpClient httpClient = HttpClients.custom()
                        .setSSLSocketFactory(csf)
                        .build();

HttpComponentsClientHttpRequestFactory requestFactory =
                        new HttpComponentsClientHttpRequestFactory();
requestFactory.setConnectTimeout(this.connectTimeOut);
requestFactory.setReadTimeout(this.readTimeOut);
requestFactory.setHttpClient(httpClient);

RestTemplate rt = new RestTemplate(requestFactory);
HttpEntity<Map> request = null;
if(method.equals("POST"))
    request = new HttpEntity<Map>(objBody, headers);
else if(method.equals("GET"))
    request = new HttpEntity<Map>(headers);

try {
    ResponseEntity<Map> response = null;
    if(method.equals("POST"))
        restTemplate.postForEntity(url, request , Map.class);
    if(method.equals("GET"))
        restTemplate.postForEntity(url, request , Map.class);
    if(this.outputStream){
        logger.debug("Output : " + response.getBody());
    }
    return response.getBody();
} catch(HttpClientErrorException e) {
    logger.debug(e.getMessage());
}

sendHttpRequest 方法也是我不允许更改的现有方法,除非我只是创建一个新方法来执行我的请求。

简单地说,这是我需要做的事情:

  1. 对于每个 API 调用:

    • 从外部 API 获取授权令牌。

    • 向另一个外部 API 发出请求 (POST/GET) 以获取数据。

    • 将数据重新格式化为响应的预期格式(每个格式都有自己的格式)<大多数循环响应对象的数组以根据需要重新映射字段名称>。
      2.所有API调用完毕后,我需要做:

    • 将 API 1 和 3 的输出合并到地图/对象

    • 将 API 2 和 4 的输出合并到一个数组并全部排序

    • 将来自 API 5 的响应放入已定义属性/字段的内部对象中。

我尝试过的事情

我曾尝试使用 ExecutorCompletionService 来调用 5 个 SM。我还为此创建了一个实现 Callable 的内部类。

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);

List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
    // i here is used to define which api calls to be done
    results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}

for (int i=0; i < results.size(); i++) {
    try {
        Map result = (Map) completionService.take().get();
        int code = (int) result.get("code");

        // Collect the results for each SM (SM function has described above)

    } catch (Exception e) {
        logger.debug(e.getMessage());
    }
}

// Merge the outputs.

合并输出中,我需要构建地图,所以它会是这样的:

{
  "details": {"api1": {...}, "api3": {...}},
  "list_items": [{...}, {...}, ...], // Results of sorted merged lists from api2 & api4
  "api5": [{...}, {...}, {...}, ...]
}

同时,从 api 响应中,基本上我只是在存在时检索它们的所有 output_schema


有什么技巧可以优化和加速这个 API 调用,所以通过相同的调用次数,这可以更快地执行???任何帮助是极大的赞赏。

编辑


我已经阅读了@Ananthapadmanabhan 的答案,但似乎我需要更改我不能做的主类文件。或者实际上是否可以在主类中不使用 @EnableAsync 的情况下应用 CompletableFuture 的使用?我还想知道如何使用 CompletableFuture 和 EnableAsync 以更快的速度完成此过程链。

stack overflow How to do multiple API calls concurrently in Spring service without changing main?
原文答案

答案:

作者头像

如果所有 4 个 api 调用彼此独立并且您使用的是 java 8 ,则可以根据需要将它们提取到单独的服务层中的单独函数,并在方法上使用 spring @Async 注释以及 CompletableFuture 作为返回类型以进行并行调用。

@Service
public class TestClient {
    RestTemplate restTemplate = new RestTemplate();

    @Async
    public CompletableFuture<List<TestPojo>> getTestPojoByLanguage(String language) {
        String url = "https://test.eu/rest/v2/lang/" + language + "?fields=name";
        Country[] response = restTemplate.getForObject(url, Country[].class);

        return CompletableFuture.completedFuture(Arrays.asList(response));
    }

    @Async
    public CompletableFuture<List<TestPojo>> getCountriesByRegion(String region) {
        String url = "https://testurl.eu/rest/v2/region/" + region + "?fields=name";
        Country[] response = restTemplate.getForObject(url, Country[].class);

        return CompletableFuture.completedFuture(Arrays.asList(response));
    }
}

可完成的未来 guide

作者头像

您尝试的解决方案对我来说看起来很不错:

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);

List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
    // i here is used to define which api calls to be done
    results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}

for (int i=0; i < results.size(); i++) {
    try {
        Map result = (Map) completionService.take().get();
        int code = (int) result.get("code");

        // Collect the results for each SM (SM function has described above)

    } catch (Exception e) {
        logger.debug(e.getMessage());
    }
}

// Merge the outputs.

我不太确定除了可能更流利的API外,使用 CompletableFuture 是否会给您与程序性能相关的任何好处 - 主题在此处进行了广泛讨论,请参见例如{JXM = } 1 2 - 但这是一个可能的解决方案。

实际上,下一个代码基于 3 ,反过来又与Tomasz Nurkiewicz博客的 in one of my previous answers 密切相关。

您提供的代码的 this article 对应物将看起来像:

CompletableFuture

请验证上述代码,可能需要定义 ```
ExecutorService executor = Executors.newFixedThreadPool(5);

// List of the different parameters to perform every external API invocations
final List smParameters = Arrays.asList(
...
);

// Submit invoke external task to the thread pool
final List<CompletableFuture> futures = smParameters.stream().
map(paramMap -> CompletableFuture.supplyAsync(() -> invokeExternalAPI(paramMap), executor)).
collect(Collectors.<CompletableFuture>toList())
;

// The next code is based on the sequence method proposed in the blog I cited
// The idea is to turn the List<CompletableFuture<Map>> we have into a
// CompletableFuture<List> with the results of every single async task
final CompletableFuture allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List> allDone = allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.toList())
);

// Merge the outputs.
final Map result = allDone.thenAccept(results ->
// Merge the outputs. The results variable contains the different Mapz
// obtained from the every different API invocation
);


提到的 `Map` 可以接受 `invokeExternalAPI` ,并具有执行单个API调用所需的不同参数,例如:

 `Map` 

我认为您不需要修改 ```
private Map invokeExternalAPI(Map configuration) {
  // Pass and extract from the configuration the authUrl, etcetera, everything you need to

  // Your code...

  token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
  Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
  List<Map> data = (List) ((Map) response.get("output")).get("data");
  List<Map> result = new HashMap();
  for(Map m : data) {
    Map temp = new HashMap();
    temp.put("name", m.get("Name"));
    temp.put("health_status", m.get("HealthStatus"));
    result.add(temp);
  }

  // This format is mandatory
  Map finalResult = new HashMap();
  finalResult.put("output", result);
  finalResult.put("status", "OK");
  return finalResult;
}

``` 或任何配置,因为解决方案是基于Java的。

请记住,可以定制这种通用方法以满足不同的要求。

例如,根据您的评论,似乎您需要从服务中调用不同服务中间件中实现的功能。

为了定义您想同时执行的任务列表,您可以尝试以下内容,而不是我的初始建议:

 `main class` 

要处理错误,您有几个选择。

一个明显的是处理服务中间件本身中的错误,以使其永远不会引起任何例外,但是在其结果 ```
List<CompletableFuture<Map>> futures = new ArrayList<>(5);

// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = CompletableFuture.supplyAsync(() -> sm1.doYourStuff(), executor);
futures.add(sm1Cf);

// Now obtain a reference to the second middleware, and submit it again
final ServiceMiddleware2 sm2 = new ServiceMiddleware2();
final CompletableFuture<Map> sm2Cf = CompletableFuture.supplyAsync(() -> sm2.doYourStuff(), executor);
futures.add(sm2Cf);

// the rest of service middleware. I think here a common interface
// or some kind of inheritance could be of help in the invocation

// At the end, you will get the list of futures you wanna execute in parallel

// The rest of the code is the same
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
final Map result = allDone.thenAccept(results ->
  // Merge the outputs. The results variable contains the different Mapz
  // obtained from the every different API invocation
);

``` 中返回某种信息,例如结果代码,状态等。

完整的未来本身为您提供了不同的选择来处理错误。由于您可能需要在结果 `Map` 中执行一些更改,因此可以在必要时使用 `Map` )方法。基本上,它将结果和假设异常在执行与 [`handle`](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletionStage.html#handle(java.util.function.BiFunction) 关联的任务中获得,并根据该结果和可能的错误返回具有适当自定义的新的 `CompletableFuture` 。例如,在您的第四和第五服务中间Wares中,似乎会引起错误,您可以使用类似的内容:

 `CompletableFuture` 

final ServiceMiddleware4 sm4 = new ServiceMiddleware4();
final CompletableFuture sm4Cf = CompletableFuture.supplyAsync(() -> sm4.doYourStuff(), executor)
.handle((result, exception) -> {
if (exception == null) {
return result;
}

  Map actualResult = new HashMap();
  actualResult.put("errorCode", "xxx")
  actualResult.put("errorMessage", exception.getMessage());
  return actualResult; 
});

)
;
futures.add(sm4Cf);


所有这些方法都假定您的代码不会抛出检查的异常。如果您需要处理它们,则根据您的评论,可以使用Holger在 [This great article](https://mincong.io/2020/05/30/exception-handling-in-completable-future/) 中发布的代码的修改版本。这个想法是创建一种可以处理检查异常的方法,并在必要时使用适当的错误完成:

 [this SO answer](https://stackoverflow.com/questions/40795420/try-catch-when-calling-supplyasync) 

然后,使用此方法提交每个服务中间件任务:

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) {
CompletableFuture f=new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { f.complete(supplier.get()); } catch(Throwable t) { f.completeExceptionally(t); }
}, executor);
return f;
}



我修改了一个示例,以提供合并结果的合并方法。

请考虑在服务中间软件代码中包含记录信息,如果您需要跟踪并改善整体解决方案提供的调试功能。

如果您以前使用过它们作为替代方案,则可以在 ```
List<CompletableFuture<Map>> futures = new ArrayList<>(5);

// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = supplyAsync(() -> sm1.doYourStuff(), executor)
  // this method will only be executed if any exception is thrown
  .exceptionally(exception -> { 
    Map errorResult = new HashMap();
    errorResult.put("errorCode", "xxx")
    errorResult.put("errorMessage", exception.getMessage());
    return errorResult; 
  });
futures.add(sm1Cf);

// Apply a similar logic to the rest of services middlewares...

// The rest of the code is the same as above
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
// Please, be aware that in the lambda expression results
// is a List of the different Maps obtained as the individual
// results of every single service middleware call
// I would create an object that agglutinates these results in
// the right format, as you indicated in your question. Let's call
// this container class ServiceMiddlewareResult. Then, the merge
// results code will looks like similar to this
final ServiceMiddlewareResult result = allDone.thenAccept(results -> {
  ServiceMiddlewareResult serviceMiddlewareResult = new ServiceMiddlewareResult();
  // Variable used for storing temporarily the Api 2 and 4 results
  // Parameterize it as necessary
  List tempResultsFromApi2AndApi4 = new ArrayList();
  // Honestly I don't remember if the order of the results is the
  // same as the order of the futures that generated them, my guess
  // is that not, as it depends on the actual future completion,
  // but in any way I always try thinking that the results can be 
  // in any order, so it is important that every Map contains the 
  // minimal information to identify the corresponding service 
  // middleware. With that assumption in mind, your code will look
  // similar to this:
  results.forEach(result -> {
    // The suggested idea, identify the service middleware that
    // produced the results
    String serviceMiddleware = result.get("serviceMiddleware");
    switch(serviceMiddleware) {
      // handle every case appropriately
      case 'sm1': {
        // it should be similar to sm3
        serviceMiddlewareResult.getDetails().setApi1(...);
        break;
      }

      case 'sm2':
      case 'sm4': {
        // Extract results from the Map, and add to the temporary list
        tempResultsFromApi2AndApi4.add(...)
        break;
      }

      case 'sm5': {
        // extract results and populate corresponding object
        serviceMiddlewareResult.setApi5(...);
        break;
      }
    }
  });

  List sortedResultsFromApi2AndApi4 = Collections.sort(
    sortedResultsFromApi2AndApi4, ... the appropriate comparator...
  );
  result.setListItems(sortedResultsFromApi2AndApi4);

  return result;  
});

``` 或 `RxJava` 之类的库中尝试解决方案。

相关问题