Threads, pipes, attente et annulation

Threads, pipes, attente et annulation - Java - Programmation

Marsh Posté le 28-07-2004 à 10:50:57    

Bonjour,
 
Je suis en train de faire une petite moulinette qui permet de faire passer un fichier XML par plusieurs transformations XSL successives, avec des PipedIn/OutputStream
 
Bon, globalement, ça marche plutôt bien : ça transforme, etc..
 
Mais j'ai un souci : je voudrais que ma méthode qui lance le process ne retourne qu'à la fin de l'opération : pour le moment, je lance une thread par transformation, qui lit dans un InputStream, et écrit dans un OutputStream, et la méthode start() retourne 'immmédiatement"
 
Donc, quand j'appelle transform(source, cible) , j'ai un retour immédiat, mais le fichier n'est pas disponible avant que toutes les threads n'aient fini leur boulot.
 
J'avais au départ fait un système de 'waitFor' avec un lock.wait() et un lock.notifyAll(), mais ça ne march epas au delà de 2 transformations, sans que j'aie pu piger pourquoi.
 
Voilà une partie du code. Toute suggestion sera la bienvenue!
 
 
- la thread qui fait une transformation :  

Code :
  1. public class TransformerThread extends Thread {
  2. private OutputStream out = null;
  3. private InputStream in = null;
  4. private Object lock;
  5. private boolean finished = false;
  6. private Transformer transformer;
  7. private XSLTransformer xslt;
  8. public TransformerThread(XSLTransformer xslt, Transformer transformer, OutputStream out, InputStream in) {
  9.  this.xslt = xslt;
  10.  this.out = out;
  11.  this.in = in;
  12.  this.transformer = transformer;
  13.  setName(transformer.getParameter("fr.actia.progress.xsl.XSLTransformer.Name" ).toString());
  14.  lock = new Object();
  15. }
  16. public void run() {
  17.  xslt.start(this);
  18.  System.out.println("Starting thread " + getName());
  19.  if (out != null && in != null && transformer != null) {
  20.   StreamSource source = new StreamSource(in);
  21.   StreamResult result = new StreamResult(out);
  22.   try {
  23.    transformer.transform(source, result);
  24.    out.flush();
  25.    out.close();
  26.   } catch (TransformerException e) {
  27.    // The listener relayed a TransformerException. We must stop!
  28.    try {
  29.     out.flush();
  30.     out.close();
  31.    } catch (IOException ioe) {
  32.    }
  33.   } catch (IOException e) {
  34.   }
  35.   xslt.finished(this);
  36.   System.out.println("Finished thread " + getName());
  37.  }
  38. }
  39. public void waitFor() {
  40.  start();
  41.  synchronized(lock) {
  42.   try {
  43.    lock.wait();
  44.   } catch (Exception e) {
  45.    e.printStackTrace();
  46.   }
  47.  }
  48. }
  49. public void cancelTransformation() {
  50.  synchronized (lock) {
  51.   try {
  52.    interrupt();
  53.    lock.notifyAll();
  54.   } catch (Exception e) {
  55.    e.printStackTrace();
  56.   }
  57.  }
  58. }
  59. }


 
Et le code qui utilise ce machin :  
 

Code :
  1. /**
  2.  *  
  3.  * @param source
  4.  * @param transformer
  5.  * @return
  6.  * @throws IOException
  7.  */
  8. private InputStream transform(InputStream source, Transformer transformer) throws IOException, InterruptedException {
  9.  PipedOutputStream pipeOut = new PipedOutputStream();
  10.  PipedInputStream pipeIn = new PipedInputStream(pipeOut);
  11.  transform(source, transformer, pipeOut);
  12.  return pipeIn;
  13. }
  14. private void transform(InputStream source, Transformer transformer, OutputStream out) throws IOException, InterruptedException {
  15.  BufferedInputStream in = new BufferedInputStream(source);
  16.  TransformerThread tt = new TransformerThread(this, transformer, out, in);
  17.  startTransformation(transformer.getParameter("fr.actia.progress.xsl.XSLTransformer.Name" ).toString());
  18.  tt.start();
  19. }
  20. public boolean transform(File sourceFile, File resultFile) throws FileNotFoundException, TransformerException, InterruptedException {
  21.  success = true;
  22.  lock = new Object();
  23.  if (!sourceFile.exists()) {
  24.   throw new FileNotFoundException(sourceFile.getAbsolutePath());
  25.  } else {
  26.   try {
  27.    StreamSource source = getSource(sourceFile);
  28.    InputStream sourceIn = new FileInputStream(sourceFile);
  29.    int last = transformers.size() - 1;
  30.    for (int i = 0; i < last; i++) {
  31.     Transformer t = (Transformer) transformers.get(i);
  32.     sourceIn = transform(sourceIn, t);
  33.    }
  34.    transform(sourceIn, (Transformer) transformers.get(last), new FileOutputStream(resultFile));
  35.   } catch (IOException ioe) {
  36.    ioe.printStackTrace();
  37.   }
  38.   return success;
  39.  }
  40. }


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 10:50:57   

Reply

Marsh Posté le 28-07-2004 à 10:58:45    

passer en monoThread ?
tu fais une liste de filtres et une fonction de filtrage et en voiture simone ?  
 
en plus "extends Thread", ça me fait très peur là.


Message édité par nraynaud le 28-07-2004 à 11:01:16

---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:06:38    

bah, passer en mono thread, avec des Piped machins, c'est pas DU TOUT conseillé :
 

Citation :


A piped input stream should be connected to a piped output stream; the piped input stream then provides whatever data bytes are written to the piped output stream. Typically, data is read from a PipedInputStream  object by one thread and data is written to the corresponding PipedOutputStream  by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread. The piped input stream contains a buffer, decoupling read operations from write operations, within limits.


 
Et pkoi 'extends Thread' te fait peur?
 


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:14:53    

en fait, ce truc sert à faire de la communication asynchrone entre threads, c'est une boite à lettres, pas un filtre de transformation.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:19:07    


Huhu, j'ai trouvé! :)  
 
En fait, j'ai viré le lock de TransfomerThread, puisque je lm'en servait pas, et j'en ai mis un dans la classe qui l'appelle :  
 
 
 

Code :
  1. private InputStream transform(InputStream source, Transformer transformer) throws IOException, InterruptedException {
  2.  PipedOutputStream pipeOut = new PipedOutputStream();
  3.  PipedInputStream pipeIn = new PipedInputStream(pipeOut);
  4.  transform(source, transformer, pipeOut);
  5.  return pipeIn;
  6. }
  7. private void transform(InputStream source, Transformer transformer, OutputStream out) throws IOException, InterruptedException {
  8.  BufferedInputStream in = new BufferedInputStream(source);
  9.  TransformerThread tt = new TransformerThread(this, transformer, out, in);
  10.  startTransformation(transformer.getParameter("fr.actia.progress.xsl.XSLTransformer.Name" ).toString());
  11.  tt.start();
  12. }
  13. public boolean transform(File sourceFile, File resultFile) throws FileNotFoundException, TransformerException, InterruptedException {
  14.  success = true;
  15.  lock = new Object();
  16.  if (!sourceFile.exists()) {
  17.   throw new FileNotFoundException(sourceFile.getAbsolutePath());
  18.  } else {
  19.   try {
  20.    StreamSource source = getSource(sourceFile);
  21.    InputStream sourceIn = new FileInputStream(sourceFile);
  22.    int last = transformers.size() - 1;
  23.    for (int i = 0; i < last; i++) {
  24.     Transformer t = (Transformer) transformers.get(i);
  25.     sourceIn = transform(sourceIn, t);
  26.    }
  27.    transform(sourceIn, (Transformer) transformers.get(last), new FileOutputStream(resultFile));
  28.                                 synchronized(lock) {
  29.     try {
  30.      lock.wait();
  31.     } catch (InterruptedException e) {
  32.      e.printStackTrace();
  33.     }
  34.    }
  35.   } catch (IOException ioe) {
  36.    ioe.printStackTrace();
  37.   }
  38.   return success;
  39.  }
  40. }


 
Et la méthode finished(TransformerThread tt) de la classe appellante, qui est appellée à la fin de TransformerThread.run() :  
 

Code :
  1. protected void finished(TransformerThread tt) {
  2.  transformerThreads.remove(tt);
  3.  if (transformerThreads.isEmpty()) {
  4.          synchronized(lock) {
  5.    lock.notifyAll();
  6.   }
  7.  }
  8. }


 
Sachant que transformerThreads est une liste de mes threads.
 
Maintenant, il me reste à gérer l'annulation des transformations, donc l'interruption des threads, quand l'utilisateur appuie sur ''cancel'....


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:19:56    

nraynaud a écrit :

en fait, ce truc sert à faire de la communication asynchrone entre threads, c'est une boite à lettres, pas un filtre de transformation.


 
bah, tu peux t'en servir comme tu veux, je vois pas où est le souci : dans le tutorial, il s'en servent pour 'retourner' le contenu d'un fichier texte...


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:22:31    

biensûr, d'ailleur la mémoire et les ressources de calcul sont infies dans le modèle abstrait de java, alors on peut faire n'importe quoi avec n'importe quelle infrastructure.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:27:32    

baaah, spas la peine de faire le caustique! :)  
 
Mais ça me parraissait une solution plutôt élégante, quoi.
 
Après, c'est sûr qu'il ne faut pas chaîner 30 transformations de suite non plus, je pense, mais bon.
 
Et puis j'ai trouvé que c'était plutôt intréressant à faire, comme truc.


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:31:25    

Ben tu as eu tort.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:35:14    

Et t'as rien de plus constructif, comme idée?  
 
Imaginons que tu veuilles faire 3 transformations, et pas écrire de fichier temporaire. Tu ferais comment?
 
[Edit] Sans déconner, ça m'intéresse, en fait.
 
J'ai envisagé de passer par des ByteArrayXxputStreams, et d'écrire dans un tableau de bytes, mais si j'ai un gros fichier, ça me pose le problème de la mise en mémoire de mon bouzin.
 
Là, ce qui m'a paru positif de ce point de vue, c'est que c'est lu/ecrit au fir et à mesure (la preuve : si je plante le process au milieu, je me retrouve avec un bout de fichier de sortie coupé n'importe où, même avec plus d'une transformation)


Message édité par gfive le 28-07-2004 à 11:40:59

---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:35:14   

Reply

Marsh Posté le 28-07-2004 à 11:43:52    

des filtres qui aggrègent un inputStream et un outPutStream, créés de manière fainéante ; au minimum.
 
peut-être même aller voir ce qu'on peut faire de org.xml.sax.InputSource pour voir s'il n'y a pas moyen d'accélérer les choses, par exemple parce qu'un filtre sait à quel encoding il sort.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:46:22    

[:drapo]  
ce topic m'interesse
dans mon cas c pour crypter un fichier que j'encode :
je lis un fichier que j'envoi dans un GZIPOutputStream(new FileOutputStream(outFilename))  
 
ensuite j'ai besoin d'un input stream a faire passer dans  out = new CipherOutputStream(out, encryptcipher);

Reply

Marsh Posté le 28-07-2004 à 11:47:49    

peut-être même voir si l'ont ne peut pas faire passer des arbres Dom entre les filtres.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:47:52    

nraynaud a écrit :

des filtres qui aggrègent un inputStream et un outPutStream, créés de manière fainéante ; au minimum.
 
peut-être même aller voir ce qu'on peut faire de org.xml.sax.InputSource pour voir s'il n'y a pas moyen d'accélérer les choses, par exemple parce qu'un filtre sait à quel encoding il sort.


 
Ah mais ça, j'ai!
 
Ma classe TransformerThread, par exemple, c'est un filtre, en fait :  
 
Elle prend un InputStream et un OutputStream en entrée, elle s'en sert pour créer (effectivement) des StreamSource et StreamResult, et elle transforme.
 
Le seul truc, c'est qu'elle fait ça dans sa méthode run(), quoi, ce qui ne change pas le rôle fondamental du machin.
 
Et niveau encoding, du coup, ça marche très bien : ça fait les transcodages tout seuls, si jamais la feuille de style sort en isomachin, et que l'entrée est en UTF-8
 


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:51:38    

nraynaud a écrit :

peut-être même voir si l'ont ne peut pas faire passer des arbres Dom entre les filtres.


 
Bah ça, j'ai essayé, aussi, mais c'est une galère : par défaut, l'implémentation du DOM du jdk (je sais plus laquelle c'est) me sort des choses bizarres, du genre "stylesheet requires the version attribute" alors que la version y est....On a réussi en utilisant Saxon....
 
Mon souci, c'est que je fais des couches assez basses, et je ne peux pas (faut demander à mon chef pkoi) utiliser d'autres API que celles du JDK....(soupir, mais bon)  
 
Du coup, je suis un peu coincé...Bon, j'ai laissé des possibilités, hein, par exemple, on peut choisir l'implémentation de TransformerFactory à utiliser..(d'ailleurs, il faut que je voie si je peux mettre en place  le même mécanisme pour choisir les implémentations de Source et Result..)


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 11:55:04    

mais alors pourquoi créer un thread par filtre, ça n'a aucun intérêt.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 11:58:46    

nraynaud a écrit :

mais alors pourquoi créer un thread par filtre, ça n'a aucun intérêt.


 
ben j'ai voulu suivre l'exemple du tutorial sur les Pipes, tout simplement!
 
Et puis mes threads, elles sont toutes lancées en même temps, hein, donc, ça va plus vite : la 2° transformation commence dès qu'elle a qqchose à lire, ce qui ne cesse pas de m'étonner, d'ailleurs : si la 1° donne un fichier xml non valide, ça me donne une sortie quand même (bon, pas du tout ce que j'attends, mais ça tourne)


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 12:02:03    

si tu fais un empilage de stream aussi hein. tu connectes ton FilIputStream au premier filtre, tu connecte la chaîne de filtres et tu tires à l'autre bout et *pouf*. Comme maintenant, mais en divisant les ressources système utilisées par 2.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 12:10:56    

nraynaud a écrit :

si tu fais un empilage de stream aussi hein. tu connectes ton FilIputStream au premier filtre, tu connecte la chaîne de filtres et tu tires à l'autre bout et *pouf*. Comme maintenant, mais en divisant les ressources système utilisées par 2.


 
ouais...Faut que j'essaie.
 
En gros, ça correspond à ce que tu ferais??  

Code :
  1. InputStream in = new FileInputStream(masource);
  2. OutputStream out = new ByteArrayOutputStream();
  3. while (iterator.hasNext()) {
  4.   Transformer t = (Transformer) it.next();
  5.   StreamSource source = new StreamSource(in);
  6.   StreamResult result = new StreamResult(out);
  7.   t.transform(source, result);
  8.   in = new ByteArrayInputStream(out.toByteArray());
  9.   if (it.hasNext()) {
  10.      out = new ByteArrayOutputStream();
  11.   } else {
  12.      out = new FileOutputStream(monresultat);
  13.   }
  14. }


 
Je t'avoue que les IO, c'est pas trop ma tasse de thé, au départ, donc, il se peut que je me m'embourbe complètement, des fois!


Message édité par gfive le 28-07-2004 à 12:11:48

---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 12:35:11    

non, le contrat serait du type :
 

Code :
  1. Transformer t1, t2, t3;
  2. //affectations  
  3. t1.setInput(newFileInputStream(...));
  4. t2.setInput(t1.getOutPut());
  5. t3.setInput(t2.getOutPut());

ensuite, tu pompes les bytes de t2.getOutPut() et tu les pousses dans un fichier.
 
on voit que  
1) j'ai dit une connerie, pas besoin d'OutputStream dans les filtres. Tout est en tirant, on peut faire la même infrastructure en poussant, mais il faut éviter de mélanger les 2.
2) le filtre by-pass est simplement quand getOutPut() renvoie ce qu'on lui a passé en setInput(), ça veut dire que l'infrastructure est légère *en soi*.
 
D'autre part, j'ai pas mis le code de pompage/poussage, car j'ai encore rien de trouvé de statisfaisant.
peut-être par read() et write() avec un tableau de 1024 bytes :

Code :
  1. byte[] buff = new byte[1024];
  2. int i = out.read(buff, 0, 1024);
  3. while(i > 0) {
  4.   file.write(buff, 0, i);
  5.   i = out.read(buff, 0, 1024);
  6. }


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 12:43:22    

ok, je vois ce que tu veux faire...
 
Mais je me demande si il n'y a pas un risque de lock, vu que t(i) lit dans un flux où t(i-1) est en train d'écrire, et ce depuis la même thread.


---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 28-07-2004 à 12:48:19    

non. t(i-1) ne donne des bytes à t(i) que s'il lui en demande.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le 28-07-2004 à 12:56:04    

ok...
 
Bowdel, j'aime pas les IO!  
 
[Edit] ouais, enfin, ton machin, ça me laisse tout dubitatif
 
1- Je vois pas bien pkoi tu veux te faire chier à lire/ecrire dans un buffer, alors que normalement, les piped streams sont faits exactement pour ça, et sans doute mieux écrits que ce que je pourrais faire moi-même,
2- sans avoir une thread par transformation, je cours au deadlock, dela même manière que si j'essayais de lire les sorties d'un process sans créer une thread.
 
Mais bon. [:itm]


Message édité par gfive le 28-07-2004 à 13:18:46

---------------
Tous les sud africains sont ségrégationistes, à part Ted. (P. Desproges)
Reply

Marsh Posté le 02-08-2004 à 01:36:10    

gfive a écrit :

Bah ça, j'ai essayé, aussi, mais c'est une galère : par défaut, l'implémentation du DOM du jdk (je sais plus laquelle c'est) me sort des choses bizarres, du genre "stylesheet requires the version attribute" alors que la version y est....On a réussi en utilisant Saxon....


 
J'ai eu le même problème ; je m'en suis sorti en enlevant carément l'attribut version en faisant xmloutputter.setOmitEncodingVersion(true); je crois.  :)

Reply

Marsh Posté le 02-08-2004 à 11:04:35    

gfive a écrit :


[Edit] ouais, enfin, ton machin, ça me laisse tout dubitatif
 
1- Je vois pas bien pkoi tu veux te faire chier à lire/ecrire dans un buffer, alors que normalement, les piped streams sont faits exactement pour ça, et sans doute mieux écrits que ce que je pourrais faire moi-même,
2- sans avoir une thread par transformation, je cours au deadlock, dela même manière que si j'essayais de lire les sorties d'un process sans créer une thread.
 
Mais bon. [:itm]

1) oké d'accord. Fais ce que tu veux, mais ne prétends pas savoir lire une doc ou faire de la conception.
 
2) oké d'accord, tu es le premier mec au monde à avoir un deadlock avec un seul thread, je t'en félicite et te souhaite bon courage pour ton prochain record : un deadlock avec zero thread.
 
Quand je pense que je suis au chomedû et que des gens comme toi on plus d'espoir d'être embauché que moi parce qu'il ont leur bout de papier insignifiant. Même des trucs aussi scolaires que les deadlocks sont pas acquis. Monde de merde.


---------------
trainoo.com, c'est fini
Reply

Marsh Posté le    

Reply

Sujets relatifs:

Leave a Replay

Make sure you enter the(*)required information where indicate.HTML code is not allowed