JEP 462 - Structured Concurrency
Simplifier la programmation concurrente en traitant plusieurs tâches comme une unité de travail
462
Preview (2ème)
Java 21 (preview), Java 22 (2e preview)
En résumé
La concurrence structurée traite plusieurs tâches concurrentes comme une seule unité de travail, améliorant la fiabilité et l'observabilité. Combine parfaitement avec les virtual threads pour simplifier la gestion d'erreurs, les annulations et le cycle de vie des tâches parallèles.
Problème avec la concurrence classique
Quand vous lancez plusieurs tâches en parallèle, plusieurs problèmes se posent :
- Comment attendre que toutes les tâches soient terminées ?
- Que faire si une tâche échoue ? Faut-il annuler les autres ?
- Comment éviter les fuites de threads ?
- Comment débugger facilement ?
// Approche classique : complexe et fragile
ExecutorService executor = Executors.newCachedThreadPool();
try {
Future user = executor.submit(() -> fetchUser());
Future orders = executor.submit(() -> fetchOrders());
// Attendre les résultats
String userData = user.get(); // Bloquant
String orderData = orders.get(); // Bloquant
// Que faire si user.get() échoue ?
// orders continue d'exécuter inutilement !
} finally {
executor.shutdown(); // Peut laisser des threads zombies
}
Solution : StructuredTaskScope
La classe StructuredTaskScope groupe plusieurs tâches concurrentes :
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Lancer les tâches
Subtask user = scope.fork(() -> fetchUser());
Subtask orders = scope.fork(() -> fetchOrders());
// Attendre la complétion de TOUTES les tâches
scope.join();
// Vérifier si erreur (lève une exception si échec)
scope.throwIfFailed();
// Récupérer les résultats
String userData = user.get();
String orderData = orders.get();
return new Response(userData, orderData);
} // Auto-fermeture : annule automatiquement les tâches restantes
Avantages de la concurrence structurée
1. Gestion automatique du cycle de vie
Les tâches enfants ne peuvent pas survivre à leur parent (scope). Pas de fuite de threads possible.
2. Propagation d'erreurs claire
Si une tâche échoue, les autres sont automatiquement annulées.
3. Annulation coopérative
Les threads virtuels peuvent être interrompus proprement.
4. Observabilité améliorée
Les outils de monitoring voient la hiérarchie des tâches.
Stratégies d'exécution
ShutdownOnFailure - Stop à la première erreur
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask task1 = scope.fork(() -> operationRisquee1());
Subtask task2 = scope.fork(() -> operationRisquee2());
Subtask task3 = scope.fork(() -> operationRisquee3());
scope.join(); // Attend toutes les tâches
scope.throwIfFailed(); // Lance exception si une a échoué
// Si on arrive ici, toutes les tâches ont réussi
String result = task1.get() + task2.get() + task3.get();
}
ShutdownOnSuccess - Stop dès qu'une réussit
// Utile pour interroger plusieurs services (race condition)
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
// Interroger 3 services en parallèle
scope.fork(() -> callService1());
scope.fork(() -> callService2());
scope.fork(() -> callService3());
scope.join(); // Attend qu'UNE tâche réussisse
// Récupérer le premier résultat réussi
// Les autres tâches sont annulées automatiquement
String result = scope.result();
return result;
}
Exemples pratiques
Exemple 1 : Agrégation de données
record UserProfile(String user, List orders, String preferences) {}
UserProfile fetchUserProfile(int userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Lancer 3 requêtes en parallèle
Subtask userData =
scope.fork(() -> userService.getUser(userId));
Subtask> ordersData =
scope.fork(() -> orderService.getOrders(userId));
Subtask prefsData =
scope.fork(() -> preferenceService.getPreferences(userId));
// Attendre toutes les réponses
scope.join().throwIfFailed();
// Agréger les résultats
return new UserProfile(
userData.get(),
ordersData.get(),
prefsData.get()
);
}
}
Exemple 2 : Service avec timeout
String fetchWithTimeout(int timeoutSeconds) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
// Lancer la tâche principale
scope.fork(() -> longRunningOperation());
// Joindre avec timeout
scope.join(Duration.ofSeconds(timeoutSeconds));
// Vérifier si timeout
if (scope.result() == null) {
throw new TimeoutException("Opération trop longue");
}
return scope.result();
}
}
Exemple 3 : Recherche dans plusieurs sources
List search(String query) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Chercher en parallèle dans plusieurs bases
Subtask> db1 =
scope.fork(() -> database1.search(query));
Subtask> db2 =
scope.fork(() -> database2.search(query));
Subtask> cache =
scope.fork(() -> cacheService.search(query));
// Attendre tous les résultats
scope.join().throwIfFailed();
// Fusionner et retourner
List results = new ArrayList<>();
results.addAll(db1.get());
results.addAll(db2.get());
results.addAll(cache.get());
return results;
}
}
Combinaison avec Virtual Threads
// Traiter 10 000 requêtes en parallèle avec structured concurrency
void traiterRequetes(List requests) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork un virtual thread par requête
List> tasks = requests.stream()
.map(req -> scope.fork(() -> traiterRequete(req)))
.toList();
// Attendre toutes les tâches
scope.join().throwIfFailed();
// Collecter les résultats
List responses = tasks.stream()
.map(Subtask::get)
.toList();
sauvegarderResponses(responses);
}
}
Scope personnalisé
Vous pouvez créer vos propres stratégies en étendant StructuredTaskScope :
class FirstNSuccessScope extends StructuredTaskScope {
private final int n;
private final List results = new CopyOnWriteArrayList<>();
FirstNSuccessScope(int n) {
this.n = n;
}
@Override
protected void handleComplete(Subtask extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
if (results.size() >= n) {
shutdown(); // Annuler les autres
}
}
}
List results() {
return List.copyOf(results);
}
}
// Utilisation : obtenir les 3 premières réponses
try (var scope = new FirstNSuccessScope(3)) {
for (int i = 0; i < 10; i++) {
scope.fork(() -> callService(i));
}
scope.join();
List first3 = scope.results();
}
Bonnes pratiques
✅ À faire
- Toujours utiliser try-with-resources avec StructuredTaskScope
- Appeler
join()avant d'accéder aux résultats - Vérifier les erreurs avec
throwIfFailed() - Combiner avec virtual threads pour de meilleures performances
❌ À éviter
- Ne pas oublier le
join()avantget() - Éviter de partager des états mutables entre tâches
- Ne pas utiliser avec des tâches CPU-intensive (préférer ForkJoinPool)