Exemple de processus scalable

De Wiki1000

Dans cet exemple nous allons montrer comment écrire un processus scalable capable :

  • D'être exécuté manuellement par l'utilisateur
  • D'être exécuté simultanément sur plusieurs instances de l'Application
  • D'être planifié dans un automate
  • D'être exécuté au fil de l'eau.

Le processus transforme les objets d'une classe métier en changeant l'état d'un attribut énuméré.

  • WFClasseA est la classe des objets métiers à traiter
  • MyQueue est une file d'attente des objets prêt a être traité
  • MyProcess est le processus de traitement

Principe d'exécution

L'exécution du processus est séparée en deux étapes :

  • Dans une première étape le processus sélectionne les objets à traiter (doInQueue) et les place dans une file d'attente (MyQueue).
  • Dans une seconde étape le processus consomme les objets de la file d'attente (doDeQueue) et les traite (doProcess).

Le sujet de la file d'attente (queueTopic) reflète les variables d'état des objets traités.

Code du processus

Le code du processus est le suivant :

<source lang="delphi"> unit TestSYFREWF; interface

Type

 MyProcessus = Class(TitObject)
 public
   Procedure doDeQueue;
   Procedure doInQueue;
   Procedure doProcess(obj:TQueueObject);
   Procedure doTask;
   Procedure Execute;
 end;

Implementation

{MyProcessus}

Procedure MyProcessus.doDeQueue; //Procedure doDeQueue; var obj:MyQueue; indx:Integer; aTopic:string; begin

 // In this step we dequeue and process the ready objects.
 // Request objects in initial state in case of the queue contains multiple states
 aTopic := 'wfclassea/0/%';
 // This is just an estimation of the count, could be false if several processes run in parallel
 ProgressMax(MyQueue.CountWhere('queueTopic like %1',true,[aTopic]));
 // Get an enumerator from the queue
 foreach obj in MyQueue.Topic(aTopic) index indx do
  try
    // Progress indicator
    ProgressCount(indx);
    // Process this queue object
    // Separate the code to support straight through processing
    doProcess(obj);
    // delete the queue object
    obj.DeleteThisObject;
  except
    // on error requeue the object on an error topic
    // do not requeue on the same topic if the processus must be used in a straight through mode
    obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/'));
  end;
 //
 ProgressFull();

end;;

Procedure MyProcessus.doInQueue; //Procedure doInQueue; var sel:TSelector; begin

 // In this step we InQueue all objects which match the process criteria.
 // In this example the process criteria is objects in initial state
 // Use a selector for objects in initial state
 sel := WFClasseA.CreateSelector('unEtat=%1',,true,[WFCAState_Initial]);
 // Insert the objects in the queue using the selector, this is a transactional operation.
 // Topic is //classname/state/code/oid
 sel.InQueue('MyQueue','wfclassea/@unEtat/@unCode/@oid');

end;;

Procedure MyProcessus.doProcess(obj:TQueueObject); //Procedure doProcess(obj:TQueueObject); var inst:WFClasseA; begin

 // One transaction by object
 withP transaction do
  begin
    // Get the actual object
    inst := obj.queueRef as WFClasseA;
    if Assigned(inst) then
     begin
       // this is the process core where business transformations are apply.
       // here we change the state of the object
       inst.unEtat.value := WFCAState_Etat2;
       ProgressMessage(obj.queueTopic);
     end;
  end;

end;;

Procedure MyProcessus.doTask; //Procedure doTask; begin

 // Step One : inqueue object to be processed.
 doInQueue;
 // Step two : Process objects from queue.
 doDeQueue;

end;;

Procedure MyProcessus.Execute; //Procedure Execute; var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer; begin

 if Assigned(UserContext.TaskContext) then
  begin
    UserContext.TaskContext.AddMessage('MyProcessus.Execute');
    if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then
      begin
        aMsg := UserContext.TaskContext.EventContext.receivedMsg;
        UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]);
        // a TdbmQueueMessage is also a TQueueObject
        // Original properties of the MyQueue object has been copied into aMsg
        doProcess(aMsg);
      end
      else
      begin
        UserContext.TaskContext.AddMessage('No message, process as a regular task');
        doTask;
      end;
  end
  else doTask; // started by the UI

end; </source>

Exécution par l'interface utilisateur

L'interface du processus est standard, l'exécution appelle la méthode Execute du processus.

{{#images:image1.png|multitenant/example1}}

Lorsque le processus est exécuté manuellement, ou bien dans une tâche d'automate planifié, il enchaine ces deux étapes (doTask).

Exécution au fil de l'eau

Lorsque le processus est exécuté par une tâche d'automate déclenché par un évènement file d'attente il extrait du contexte de l'automate le message reçu de la file d'attente (Execute) et exécute directement le code de traitement (doProcess).

L'automate est configuré ainsi :

{{#images:image2.png|multitenant/example1}}

Le paramétrage de la file d'attente utilisée par l'évènement file d'attente :

{{#images:image3.png|multitenant/example1}}

La file d'attente est définie sur la classe file d'attente utilisée par le processus (MyQueue) et filtre sur le sujet correspondant aux objets à traiter (wfclassea/0/%)

Le code de la méthode Execute teste le contexte d'exécution de l'automate pour extraire le message à traiter :

<source lang="delphi"> Procedure MyProcessus.Execute; //Procedure Execute; var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer; begin

 if Assigned(UserContext.TaskContext) then
  begin
    UserContext.TaskContext.AddMessage('MyProcessus.Execute');
    if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then
      begin
        aMsg := UserContext.TaskContext.EventContext.receivedMsg;
        UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]);
        // a TdbmQueueMessage is also a TQueueObject
        // Original properties of the MyQueue object has been copied into aMsg
        doProcess(aMsg);
      end
      else
      begin
        UserContext.TaskContext.AddMessage('No message, process as a regular task');
        doTask;
      end;
  end
  else doTask; // started by the UI

end; </source>

Notez que le message reçu dans le contexte de la tâche d'automate n'est pas directement du type MyQueue mais du type TdbmSoredQueueMessage. C'est pour cette raison que le type de l'objet message passé à dbProcess n'est pas MyQueue mais TQueueObject.

Gestion des erreurs

Erreur en traitement par énumérateur

Lorsque une erreur de traitement se produit, le code de deQueue réactive l'objet de la file tout en modifiant son sujet.

<source lang="delphi"> //Procedure doDeQueue; begin

 ...
 foreach obj in MyQueue.Topic(aTopic) index indx do
  try
    ....
  except
    // on error requeue the object on an error topic
    // do not requeue on the same topic if the processus must be used in a straight through mode
    obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/'));
  end;

end; </source>

Tip : Dans la gestion des erreurs il faut être attentif à ne pas créer des situations de boucle sans fin. Dans le cadre d'une consommation par un énumérateur ce n'est pas possible car l'énumération est ordonnée sur queueID.

Réactiver l'objet de la file d'attente est un choix d'implémentation du processus, il serait aussi possible de supprimer cet objet.

En réactivant l'objet de la file d'attente sur un sujet différent il est possible de traiter ce objet dans un processus dédié aux erreurs.

En traitement au fil de l'eau

Dans ce cas c'est l'évènement file d'attente de la tâche d'automate qui va gérer l'erreur suivant le paramétrage défini dans la file d'attente.

{{#images:image3.png|multitenant/example1}}

  • Si RequeueTopicXXXPattern sont définis l'objet se réactivé dans la file d'attente (ici MyQueue) avec le nouveau sujet défini par
  StringReplace(queueTopic,RequeueTopicOldPattern,RequeueTopicNewPattern)
  • Si RequeueTopicXXXPattern ne sont pas définis l'objet de la file d'attente sera supprimé