JEP 462 - Structured Concurrency

Simplifier la programmation concurrente en traitant plusieurs tâches comme une unité de travail

Numéro JEP

462

Statut

Preview (2ème)

Introduit dans

Java 21 (preview), Java 22 (2e preview)

En résumé

La concurrence structurée traite plusieurs tâches concurrentes comme une seule unité de travail, améliorant la fiabilité et l'observabilité. Combine parfaitement avec les virtual threads pour simplifier la gestion d'erreurs, les annulations et le cycle de vie des tâches parallèles.

Problème avec la concurrence classique

Quand vous lancez plusieurs tâches en parallèle, plusieurs problèmes se posent :

  • Comment attendre que toutes les tâches soient terminées ?
  • Que faire si une tâche échoue ? Faut-il annuler les autres ?
  • Comment éviter les fuites de threads ?
  • Comment débugger facilement ?
// Approche classique : complexe et fragile
ExecutorService executor = Executors.newCachedThreadPool();
try {
    Future user = executor.submit(() -> fetchUser());
    Future orders = executor.submit(() -> fetchOrders());

    // Attendre les résultats
    String userData = user.get();      // Bloquant
    String orderData = orders.get();   // Bloquant

    // Que faire si user.get() échoue ?
    // orders continue d'exécuter inutilement !

} finally {
    executor.shutdown();  // Peut laisser des threads zombies
}

Solution : StructuredTaskScope

La classe StructuredTaskScope groupe plusieurs tâches concurrentes :

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

    // Lancer les tâches
    Subtask user = scope.fork(() -> fetchUser());
    Subtask orders = scope.fork(() -> fetchOrders());

    // Attendre la complétion de TOUTES les tâches
    scope.join();

    // Vérifier si erreur (lève une exception si échec)
    scope.throwIfFailed();

    // Récupérer les résultats
    String userData = user.get();
    String orderData = orders.get();

    return new Response(userData, orderData);

} // Auto-fermeture : annule automatiquement les tâches restantes

Avantages de la concurrence structurée

1. Gestion automatique du cycle de vie

Les tâches enfants ne peuvent pas survivre à leur parent (scope). Pas de fuite de threads possible.

2. Propagation d'erreurs claire

Si une tâche échoue, les autres sont automatiquement annulées.

3. Annulation coopérative

Les threads virtuels peuvent être interrompus proprement.

4. Observabilité améliorée

Les outils de monitoring voient la hiérarchie des tâches.

Stratégies d'exécution

ShutdownOnFailure - Stop à la première erreur

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask task1 = scope.fork(() -> operationRisquee1());
    Subtask task2 = scope.fork(() -> operationRisquee2());
    Subtask task3 = scope.fork(() -> operationRisquee3());

    scope.join();            // Attend toutes les tâches
    scope.throwIfFailed();   // Lance exception si une a échoué

    // Si on arrive ici, toutes les tâches ont réussi
    String result = task1.get() + task2.get() + task3.get();
}

ShutdownOnSuccess - Stop dès qu'une réussit

// Utile pour interroger plusieurs services (race condition)
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {

    // Interroger 3 services en parallèle
    scope.fork(() -> callService1());
    scope.fork(() -> callService2());
    scope.fork(() -> callService3());

    scope.join();  // Attend qu'UNE tâche réussisse

    // Récupérer le premier résultat réussi
    // Les autres tâches sont annulées automatiquement
    String result = scope.result();
    return result;
}

Exemples pratiques

Exemple 1 : Agrégation de données

record UserProfile(String user, List orders, String preferences) {}

UserProfile fetchUserProfile(int userId) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        // Lancer 3 requêtes en parallèle
        Subtask userData =
            scope.fork(() -> userService.getUser(userId));

        Subtask> ordersData =
            scope.fork(() -> orderService.getOrders(userId));

        Subtask prefsData =
            scope.fork(() -> preferenceService.getPreferences(userId));

        // Attendre toutes les réponses
        scope.join().throwIfFailed();

        // Agréger les résultats
        return new UserProfile(
            userData.get(),
            ordersData.get(),
            prefsData.get()
        );
    }
}

Exemple 2 : Service avec timeout

String fetchWithTimeout(int timeoutSeconds) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {

        // Lancer la tâche principale
        scope.fork(() -> longRunningOperation());

        // Joindre avec timeout
        scope.join(Duration.ofSeconds(timeoutSeconds));

        // Vérifier si timeout
        if (scope.result() == null) {
            throw new TimeoutException("Opération trop longue");
        }

        return scope.result();
    }
}

Exemple 3 : Recherche dans plusieurs sources

List search(String query) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        // Chercher en parallèle dans plusieurs bases
        Subtask> db1 =
            scope.fork(() -> database1.search(query));

        Subtask> db2 =
            scope.fork(() -> database2.search(query));

        Subtask> cache =
            scope.fork(() -> cacheService.search(query));

        // Attendre tous les résultats
        scope.join().throwIfFailed();

        // Fusionner et retourner
        List results = new ArrayList<>();
        results.addAll(db1.get());
        results.addAll(db2.get());
        results.addAll(cache.get());

        return results;
    }
}

Combinaison avec Virtual Threads

// Traiter 10 000 requêtes en parallèle avec structured concurrency
void traiterRequetes(List requests) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        // Fork un virtual thread par requête
        List> tasks = requests.stream()
            .map(req -> scope.fork(() -> traiterRequete(req)))
            .toList();

        // Attendre toutes les tâches
        scope.join().throwIfFailed();

        // Collecter les résultats
        List responses = tasks.stream()
            .map(Subtask::get)
            .toList();

        sauvegarderResponses(responses);
    }
}

Scope personnalisé

Vous pouvez créer vos propres stratégies en étendant StructuredTaskScope :

class FirstNSuccessScope extends StructuredTaskScope {
    private final int n;
    private final List results = new CopyOnWriteArrayList<>();

    FirstNSuccessScope(int n) {
        this.n = n;
    }

    @Override
    protected void handleComplete(Subtask subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
            if (results.size() >= n) {
                shutdown();  // Annuler les autres
            }
        }
    }

    List results() {
        return List.copyOf(results);
    }
}

// Utilisation : obtenir les 3 premières réponses
try (var scope = new FirstNSuccessScope(3)) {
    for (int i = 0; i < 10; i++) {
        scope.fork(() -> callService(i));
    }
    scope.join();
    List first3 = scope.results();
}

Bonnes pratiques

✅ À faire

  • Toujours utiliser try-with-resources avec StructuredTaskScope
  • Appeler join() avant d'accéder aux résultats
  • Vérifier les erreurs avec throwIfFailed()
  • Combiner avec virtual threads pour de meilleures performances

❌ À éviter

  • Ne pas oublier le join() avant get()
  • Éviter de partager des états mutables entre tâches
  • Ne pas utiliser avec des tâches CPU-intensive (préférer ForkJoinPool)

Ressources