Domanda Una fornitura condivisa può eseguire più tap contemporaneamente?


Considera questo codice in cui un tocco richiede un po 'di tempo per essere completato. Tutti i blocchi sono in esecuzione simultaneamente (immediatamente in uscita), quindi dormono. La maggior parte non finisce perché il programma finisce prima di loro:

my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5;  };
sleep 5;

L'output (elided) ha 25 righe (una per ogni tick di 0.2 in 5 secondi):

1. 0
1. 1
...
1. 24

Quindi cambio l'offerta in .share:

my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;

Vedo solo una riga di input ma mi aspettavo lo stesso risultato:

1. 1

Il .share rende possibile che più tocchi ottengano gli stessi valori.

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Tuttavia, l'output è stato emesso solo per il primo tocco e ha ancora una sola riga. Mi aspettavo 25 linee per ciascuna:

1. 1

13
2018-03-28 16:18


origine


risposte:


Le regole di base per Supply siamo:

  1. Nessuna introduzione di concorrenza senza che sia esplicitamente richiesta
  2. Back-pressure attraverso un modello mittente-paga
  3. Un messaggio viene elaborato per intero prima di quello successivo (quindi .map({ ...something with state... }) ci si può fidare di non causare conflitti sullo stato)

La regola 3 non si applica in realtà share dal momento che ci sono catene operative a valle separate dopo quel punto, ma le regole 1 e 2 fanno. Lo scopo di share è quello di consentire la pubblicazione / sottoscrizione e anche di prevedere il riutilizzo di un blocco di elaborazione da parte di più processori di messaggi a valle. L'introduzione dell'elaborazione parallela dei messaggi è una preoccupazione separata da questo.

Sono varie opzioni. Uno è quello di avere i messaggi per l'elaborazione parallela bloccati in a Channel. Questo introduce esplicitamente un posto dove i messaggi devono essere memorizzati nel buffer (beh, finché non si esaurisce la memoria ... che è esattamente il motivo Supply viene fornito con un modello di contropressione che paga il mittente). Coercizione a Channel di nuovo in a Supply ottiene i valori tirati da Channel ed emesso su quello Supply su un filo della piscina. In questo modo sembra:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Da notare che da allora whenever costringe automaticamente la cosa a cui è stato chiesto di reagire a a Supply, quindi sarebbe whenever $supply.Channel { }, che lo rende una soluzione piuttosto breve - ma allo stesso tempo ben esplicito in quanto indica Come il normale meccanismo di contropressione viene messo di lato. L'altra proprietà di questa soluzione è che conserva l'ordine dei messaggi e continua a elaborare una volta per volta la parte Channel.

L'alternativa è reagire a ciascun messaggio, invece di iniziare un lavoro asincrono per gestirlo. Il start operazione su a Supply pianifica il blocco che viene passato per l'esecuzione nel pool di thread per ciascun messaggio ricevuto, quindi non blocca l'arrivo del messaggio successivo. Il risultato è a Supply di Supply. Questo costringe a toccare ogni interno Supply in realtà fare accadere qualsiasi cosa, che all'inizio sembra leggermente controintuitivo, ma in realtà è per il bene del programmatore: rende chiaro che c'è un po 'di lavoro asincrono da tenere sotto controllo. Consiglio vivamente di usarlo in combinazione con il react/whenever sintassi, che fa automaticamente la gestione degli abbonamenti e la propagazione degli errori. La trasformazione più diretta del codice nella domanda è:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Sebbene sia anche possibile scriverlo come:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Che indica la possibilità di scrivere a parallelize  Supply Combinator:

my $supply = Supply.interval(0.2).share;
my $tap  = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

sub parallelize(Supply $messages, &operation) {
    supply {
        whenever $messages -> $value {
            whenever start operation($value) {
                emit $_;
            }
        }
     }
}

L'output di questo approccio è piuttosto diverso dal Channel uno, poiché le operazioni vengono tutte avviate non appena il messaggio arriva. Inoltre non mantiene l'ordine dei messaggi. C'è ancora una coda implicita (a differenza di quella esplicita con il Channel approccio), è solo che ora è la coda di lavoro del programma di pianificazione del pool di thread e lo scheduler del sistema operativo che deve tenere traccia del lavoro in corso. E ancora, non c'è alcuna contropressione, ma si noti che sarebbe del tutto possibile implementarlo tenendo traccia di ciò che è eccezionale Promises e bloccando ulteriori messaggi in arrivo con un await Promise.anyof(@outstanding).

Infine, prenderò nota che c'è qualche considerazione su hyper whenever e race whenever costruisce per fornire un meccanismo a livello di linguaggio per affrontare l'elaborazione parallela di Supply messaggi. Tuttavia la semantica di tali e come essi giocano nel supply-sviluppare obiettivi di progettazione e proprietà di sicurezza, rappresentano sfide progettuali significative.


13
2018-03-28 21:08



I rubinetti di a Supply vengono eseguiti sequenzialmente all'interno di un singolo thread. Quindi il codice del secondo tap verrà eseguito solo dopo il primo tap (che dorme per 5 secondi). Questo mostra nel seguente codice:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4

Quindi la risposta è al momento: no


6
2018-03-28 20:25