Go to D

From D Wiki
Revision as of 17:37, 29 March 2016 by Quickfur (talk | contribs) (Coroutines: grammar)
Jump to: navigation, search

Original article in Russian

Multitasking - Go's multitasking capabilities are good, though not perfect. It has a nice syntax with a sweet aftertaste, simple and powerful abstraction, elegant compared to other imperative languages. It exhibits good taste, so one does not wish to compromise with mediocrity. Therefore, to translate it to another language, it must be even more expressive and with no less sensible multitasking.

If you have played enough with Go, and are tired of copy-paste, hand-juggling mutexes and seriously considering the acquisition of a prosthetic hand, allow me to offer you a translation of the Tour of the Go with equivalent code in D accompanied by brief explanations.

Coroutines

Go

   package main
   
   import (
       "fmt"
       "time"
   )
   
   func say(s string) {
       for i := 0; i < 5; i++ {
           time.Sleep(100 * time.Millisecond)
           fmt.Println(s)
       }
   }
   
   func main() {
       go say("world")
       say("hello")
   }

Go allows you to easily run any function in parallel and continue working without waiting for its completion. All Go streams (fibers, fibers, coroutines, coroutines, greenlets) are run cooperatively on a limited number of native threads (yarns, threads), thereby utilizing the maximum CPU core (cores). D standard library supports fibers, but only within a single thread, and is not able to balance several fiber strands. Such a scheduler is implemented in the project vibe.d, but the syntax for running parallel streams still is not as succinct in Go. Therefore, we use the library go.d which provides the "go!" parallel start template function. In addition, following the best practices, code examples, we will arrange a test.

D

   unittest
   {
       import core.time;
       import std.range;
       import jin.go;
   
       __gshared static string[] log;
   
       static void saying( string message )
       {
           foreach( _ ; 3.iota ) {
               sleep( 100.msecs );
               log ~= message;
           }
       }
   
       go!saying( "hello" );
       sleep( 50.msecs );
       saying( "world" );
   
       log.assertEq([ "hello" , "world" , "hello" , "world" , "hello" , "world" ]);
   } 

In D it is not necessary to write for-loops manually, so we have implemented the loop via iteration over a sequence of positive integers. The function "saying" must be declared "static", because it does not have access to a local variable, which would not be safe for parallel execution in different threads. If you make this function a closure by removing "static", this code will not compile - thanks to magic algorithms the compiler will not allow us to shoot ourselves in the foot. In Go this concurrency issue dependsw on the programmer's conscience, which, in most cases, is absent.

Buffered Channels

Go

   package main
   
   import "fmt"
   
   func main() {
       ch := make(chan int, 2)
       ch <- 1
       ch <- 2
       fmt.Println(<-ch)
       fmt.Println(<-ch)
   }

Run parallel streams would not be so useful if it was not possible to synchronize them. Go uses a rather elegant abstraction for this - channels. Channels represent a typed message queues. If a thread tries to read something from an empty channel, it is blocked waiting for another thread that writes to the data. Conversely, if an attempt to write in a crowded channel, blocked until another thread does not subtract from the channel at least one message. Channels quickly and easily replaced, such as a lazy abstraction generators, events and promises, bringing with them a lot more usage scenarios. The standard library D for communication between threads used by the reception / transmission of abstract messages . That is, knowing the flow id, you can send him a custom message, and it should unpack it and somehow process. Pretty is not convenient mechanism. Vibe.d introduces abstraction byte stream behavior similar gokanalam. But often it requires not only the bytes transmitted, and some structures. Furthermore, in Go, that D, inter-thread communication is realized through the nip mutex that is notorious problem . Therefore, we again use the library go.d, providing us typed wait-free channels.

D

   unittest
   {
       import jin.go;
   
       auto numbers = new Channel!int(2);
       numbers.next = 1;
       numbers.next = 2;
       numbers.next.assertEq( 1 );
       numbers.next.assertEq( 2 );
   }

Virtual property "next", certainly not as clearly as the arrow in the Go, but the compiler is closely monitoring the situation of our gun, and does not allow to pass through the channel types that are not safe for the parallel use of different yarns. However, there is one thing - these channels require them to have no more than one reader, and no more than one writer. Unfortunately, for this we have to monitor manually, but in the future for sure and then the compiler will go to our allies. Also worth noting is that the size of the channel in Go defaults to one element, and in go.d about 512 bytes.


Channels

Go

   package main
   
   import "fmt"
   
   func sum(s []int, c chan int) {
       sum := 0
       for _, v := range s {
           sum += v
       }
       c <- sum // send sum to c
   }
   
   func main() {
       s := []int{7, 2, 8, -9, 4, 0}
   
       c := make(chan int)
       go sum(s[:len(s)/2], c)
       go sum(s[len(s)/2:], c)
       x, y := <-c, <-c // receive from c
   
       fmt.Println(x, y, x+y) // -5 17 12
   } 

Go work in the channel is protected by a mutex, so you can use it to communicate directly with multiple threads when you do not care in what order they will provide the data. Channels from go.d library, on the other hand, non-blocking, so in this scenario, they can not be used - for each stream, you must create a communication channel. To simplify the work with the channel list library provides a structure-balancers Inputs and Outputs. In this case we need Inputs, which in turn reads from each non-empty channel registered therein.

D

   unittest
   {
       import std.algorithm;
       import std.range;
       import jin.go;
   
       static auto summing( Channel!int sums , const int[] numbers ) {
           sums.next = numbers.sum;
       }
   
       immutable int[] numbers = [ 7 , 2 , 8 , -9 , 4 , 0 ];
   
       Inputs!int sums;
       go!summing( sums.make(1) , numbers[ 0 .. $/2 ] );
       go!summing( sums.make(1) , numbers[ $/2 .. $ ] );
       auto res = sums.take(2).array;
   
       ( res ~ res.sum ).assertEq([ 17 , -5 , 12 ]);
   }

As usual, we do not write the summation range of hands, and making use of the standard "sum" generic algorithm. To these algorithms to work with your data type is sufficient to implement one of the ranges interfaces , which, of course, implemented in the Channel, and in the Inputs, and Outputs. Algorithm "take" produces a range of slothful, returns a specified number of the first elements of the original range. A algorithm "array" rake from a range of all the elements of an array and returns the native with them. Note that each stream we pass a separate channel of length and cut immutable array (hello, parallelism!).


Range and Close

Go

   package main
   
   import (
       "fmt"
   )
   
   func fibonacci(n int, c chan int) {
       x, y := 0, 1
       for i := 0; i < n; i++ {
           c <- x
           x, y = y, x+y
       }
       close(c)
   }
   
   func main() {
       c := make(chan int, 10)
       go fibonacci(cap(c), c)
       for i := range c {
           fmt.Println(i)
       }
   }

As can be seen, in Go, we can also be iterated over a channel, consistently getting out of it the next elements. To hang in an endless loop, such channels shall be closed by the transmission side, so that the host could understand that the more data there will be no time to finish the cycle. In D we would write almost the same thing, except that the Fibonacci series would be announced in the form of a mathematical formula recursively.

D

   unittest
   {
       import std.range;
       import jin.go;
   
       static auto fibonacci( Channel!int numbers , int count )
       {
           auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( count );
           foreach( x ; range ) numbers.next = x;
           numbers.close();
       }
   
       auto numbers = new Channel!int(10);
       go!fibonacci( numbers , numbers.size );
   
       numbers.array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
   }

But we can simplify the code even more, knowing that the pattern "go!" he is able to shift the value of the range in the channel.

D

   unittest
   {
       import std.range;
       import jin.go;
   
       static auto fibonacci( int limit )
       {
           return recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( limit );
       }
   
       fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
       go!fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
   }

Thus, the function does not need to know anything about the channels to be able to run it in parallel flow, and then wait for the result of it.


Select

Go

   package main
   
   import "fmt"
   
   func fibonacci(c, quit chan int) {
       x, y := 0, 1
       for {
           select {
           case c <- x:
               x, y = y, x+y
           case <-quit:
               fmt.Println("quit")
               return
           }
       }
   }
   
   func main() {
       c := make(chan int)
       quit := make(chan int)
       go func() {
           for i := 0; i < 10; i++ {
               fmt.Println(<-c)
           }
           quit <- 0
       }()
       fibonacci(c, quit)
   }

Go has a special terse syntax for simultaneous operation of multiple channels. D is nothing, of course, does not matter. However, the equivalent functionality is implemented is not particularly complicated manual implementation cycle monitoring.

D

   unittest
   {
       import std.range;
       import jin.go;
   
       __gshared int[] log;
   
       static auto fibonacci( Channel!int numbers , Channel!bool control )
       {
           auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 );
   
           while( !control.closed )
           {
               if( numbers.needed ) numbers.next = range.next;
               yield;
           }
   
           log ~= -1;
           numbers.close();
       }
   
       static void print( Channel!bool control , Channel!int numbers )
       {
           foreach( i ; 10.iota ) log ~= numbers.next;
           control.close();
       }
   
       auto numbers = new Channel!int(1);
       auto control = new Channel!bool(1);
   
       go!print( control , numbers );
       go!fibonacci( numbers , control );
   
       while( !control.empty || !numbers.empty ) yield;
   
       log.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , -1 ]);
   }

As you can see, we had to get rid of the circuit, but in cycles add "yield", to the competing fibers, too, could do something, while the present hanging in suspense.


Default Selection

Go

   package main
   
   import (
       "fmt"
       "time"
   )
   
   func main() {
       tick := time.Tick(100 * time.Millisecond)
       boom := time.After(500 * time.Millisecond)
       for {
           select {
           case <-tick:
               fmt.Println("tick.")
           case <-boom:
               fmt.Println("BOOM!")
               return
           default:
               fmt.Println("    .")
               time.Sleep(50 * time.Millisecond)
           }
       }
   }

A special syntax allows Go do something, or if one of the channels were not active. In D you, however, more control over the execution flow.

D

   unittest
   {
       import core.time;
       import jin.go;
   
       static auto after( Channel!bool channel , Duration dur )
       {
           sleep( dur );
           if( !channel.closed ) channel.next = true;
       }
   
       static auto tick( Channel!bool channel , Duration dur )
       {
           while( !channel.closed ) after( channel , dur );
       }
   
       auto ticks = go!tick( 101.msecs );
       auto booms = go!after( 501.msecs );
   
       string log;
   
       while( booms.clear )
       {
           while( !ticks.clear ) {
               log ~= "tick";
               ticks.popFront;
           }
           log ~= ".";
           sleep( 51.msecs );
       }
       log ~= "BOOM!";
   
       log.assertEq( "..tick..tick..tick..tick..BOOM!" );
   }

A notable feature is that we are not required to manually create a channel. If the function takes the first argument to the channel and we did not pass, it will be created automatically and returned as the result of the template "go!", Which is very convenient. Functions of "after" and "tick" is too specific to make them into a shared library, but the implementation of them is very simple.


Mutex

In some cases, no shared mutable state still can not do, and here we come to the aid of the lock.

Go

   package main
   
   import (
       "fmt"
       "sync"
       "time"
   )
   
   // SafeCounter is safe to use concurrently.
   type SafeCounter struct {
       v   map[string]int
       mux sync.Mutex
   }
   
   // Inc increments the counter for the given key.
   func (c *SafeCounter) Inc(key string) {
       c.mux.Lock()
       // Lock so only one goroutine at a time can access the map c.v.
       c.v[key]++
       c.mux.Unlock()
   }
   
   // Value returns the current value of the counter for the given key.
   func (c *SafeCounter) Value(key string) int {
       c.mux.Lock()
       // Lock so only one goroutine at a time can access the map c.v.
       defer c.mux.Unlock()
       return c.v[key]
   }
   
   func main() {
       c := SafeCounter{v: make(map[string]int)}
       for i := 0; i < 1000; i++ {
           go c.Inc("somekey")
       }
   
       time.Sleep(time.Second)
       fmt.Println(c.Value("somekey"))
   }

Yes, that's right, the realization of the shared mutable state in the Go - it's a pain and suffering. One wrong move when you work with mutex and you suddenly find yourself phantom limb . Not to mention the fact that the compiler does not even nameknёt you about where mutexes needed. But D compiler you narugaet much for trying to work with unprotected mutable state of different threads. And the easiest way to protect state when multithreading - to implement a synchronized class.

D

   unittest
   {
       import core.atomic;
       import core.time;
       import std.range;
       import std.typecons;
       import jin.go;
   
       synchronized class SafeCounter
       {
           private int[string] store;
   
           void inc( string key )
           {
               ++ store[key];
           }
   
           auto opIndex( string key )
           {
               return store[ key ];
           }
           void opIndexUnary( string op = "++" )( string key )
           {
               this.inc( key );
           }
       }
   
       static counter = new shared SafeCounter;
   
       static void working( int i )
       {
           ++ counter["somekey"];
       }
   
       foreach( i ; 1000.iota ) {
           go!working( i );
       }
   
       sleep( 1.seconds );
   
       counter["somekey"].assertEq( 1000 );
   }

Feature synchronized class that automatically creates a mutex and call any public method of the mutex is captured for him, freed only when the output from the method. At the same time all the internal state is required to be kept private. But there is one unpleasant feature (and actually very dangerous and annoying bug compiler): template methods, such as, for example, "! OpIndexUnary", not wrapped in the capture of the mutex. Therefore, we have created a separate public method "inc", which causes of a generic method. The internal implementation has turned out not so beautiful, but it turned out the external interface as a native. The resulting "shared SafeCounter" we can safely pass through the channel and used directly from the different streams.