Go to D
Multitasking - Go's multitasking capabilities are good, though not perfect. It has a nice syntax with a sweet aftertaste, simple and powerful abstraction, elegant compared to other imperative languages. It exhibits good taste, so one does not wish to compromise with mediocrity. Therefore, to translate it to another language, it must be even more expressive and with no less sensible multitasking.
If you have played enough with Go, and are tired of copy-paste, hand-juggling mutexes and seriously considering the acquisition of a prosthetic hand, allow me to offer you a translation of the Tour of the Go with equivalent code in D accompanied by brief explanations.
Contents
Coroutines
Go
package main import ( "fmt" "time" ) func say(s string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go say("world") say("hello") }
Go allows you to easily run any function in parallel and continue working without waiting for its completion. All Go streams (fibers, fibers, coroutines, coroutines, greenlets) are run cooperatively on a limited number of native threads (yarns, threads), thereby utilizing the maximum CPU core (cores). D standard library supports fibers, but only within a single thread, and is not able to balance several fiber strands. Such a scheduler is implemented in the project vibe.d, but the syntax for running parallel streams still is not as succinct in Go. Therefore, we use the library go.d which provides the "go!" parallel start template function. In addition, following the best practices, code examples, we will arrange a test.
D
unittest { import core.time; import std.range; import jin.go; __gshared static string[] log; static void saying( string message ) { foreach( _ ; 3.iota ) { sleep( 100.msecs ); log ~= message; } } go!saying( "hello" ); sleep( 50.msecs ); saying( "world" ); log.assertEq([ "hello" , "world" , "hello" , "world" , "hello" , "world" ]); }
In D it is not necessary to write for-loops manually, so we have implemented the loop via iteration over a sequence of positive integers. The function "saying" must be declared "static", because it does not have access to a local variable, which would not be safe for parallel execution in different threads. If you make this function a closure by removing "static", this code will not compile - thanks to magic algorithms the compiler will not allow us to shoot ourselves in the foot. In Go this concurrency issue depends on the programmer's conscience, which, in most cases, is absent.
Buffered Channels
Go
package main import "fmt" func main() { ch := make(chan int, 2) ch <- 1 ch <- 2 fmt.Println(<-ch) fmt.Println(<-ch) }
Running parallel streams would not be so useful if it was not possible to synchronize them. Go uses a rather elegant abstraction for this - channels. Channels represent a typed message queue. If a thread tries to read something from an empty channel, it will block, waiting for another thread that writes data to it. Conversely, if a thread attempts to write to a full channel, it will be blocked until another thread removes at least one message from the channel. Channels can quickly and easily replace constructs such as a lazy abstraction generators, events and promises, bringing with them a lot more usage scenarios.
D's standard library for communication between threads uses the receiving and sending of abstract messages. That is, knowing the thread id, you can send it a custom message, and it should unpack and somehow process it. A pretty inconvenient mechanism. Vibe.d introduces the byte stream abstraction that behaves similarly to Go channels. But often it requires not only the bytes to be transmitted, but also some additional structures. Furthermore, just as in Go, in D inter-thread communication is realized through the acquiring of mutexes, which is a notorious problem. Therefore, we again use the library go.d, providing us typed wait-free channels.
D
unittest { import jin.go; auto numbers = new Channel!int(2); numbers.next = 1; numbers.next = 2; numbers.next.assertEq( 1 ); numbers.next.assertEq( 2 ); }
The virtual property "next" is certainly not as clear as the arrow in the Go, but the compiler is closely monitoring the situation of our proverbial gun, and does not us allow to pass through the channel types that are not safe for the parallel use of different threads. However, there is one thing - these channels are required to have no more than one reader, and no more than one writer. Unfortunately, we currently have to monitor this manually; but in the future the compiler will surely become our ally.
Also worth noting is that the size of the channel in Go defaults to one element, and in go.d about 512 bytes.
Channels
Go
package main import "fmt" func sum(s []int, c chan int) { sum := 0 for _, v := range s { sum += v } c <- sum // send sum to c } func main() { s := []int{7, 2, 8, -9, 4, 0} c := make(chan int) go sum(s[:len(s)/2], c) go sum(s[len(s)/2:], c) x, y := <-c, <-c // receive from c fmt.Println(x, y, x+y) // -5 17 12 }
In Go, working with a channel is protected by a mutex, so you can use it to communicate directly with multiple threads when you do not care in what order they will provide the data. Channels from go.d library, on the other hand, are non-blocking, so in this scenario, they cannot be used - for each stream, you must create a communication channel. To simplify the work with channel lists the library provides a structure: the balancers Inputs and Outputs. In this case we need Inputs, which reads from each non-empty channel registered therein in order.
D
unittest { import std.algorithm; import std.range; import jin.go; static auto summing( Channel!int sums , const int[] numbers ) { sums.next = numbers.sum; } immutable int[] numbers = [ 7 , 2 , 8 , -9 , 4 , 0 ]; Inputs!int sums; go!summing( sums.make(1) , numbers[ 0 .. $/2 ] ); go!summing( sums.make(1) , numbers[ $/2 .. $ ] ); auto res = sums.take(2).array; ( res ~ res.sum ).assertEq([ 17 , -5 , 12 ]); }
As usual, we do not write the summation range of hands, and making use of the standard "sum" generic algorithm. To these algorithms to work with your data type is sufficient to implement one of the ranges interfaces , which, of course, implemented in the Channel, and in the Inputs, and Outputs. Algorithm "take" produces a range of slothful, returns a specified number of the first elements of the original range. A algorithm "array" rake from a range of all the elements of an array and returns the native with them. Note that each stream we pass a separate channel of length and cut immutable array (hello, parallelism!).
Range and Close
Go
package main import ( "fmt" ) func fibonacci(n int, c chan int) { x, y := 0, 1 for i := 0; i < n; i++ { c <- x x, y = y, x+y } close(c) } func main() { c := make(chan int, 10) go fibonacci(cap(c), c) for i := range c { fmt.Println(i) } }
As can be seen, in Go, we can also be iterated over a channel, consistently getting out of it the next elements. To hang in an endless loop, such channels shall be closed by the transmission side, so that the host could understand that the more data there will be no time to finish the cycle. In D we would write almost the same thing, except that the Fibonacci series would be announced in the form of a mathematical formula recursively.
D
unittest { import std.range; import jin.go; static auto fibonacci( Channel!int numbers , int count ) { auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( count ); foreach( x ; range ) numbers.next = x; numbers.close(); } auto numbers = new Channel!int(10); go!fibonacci( numbers , numbers.size ); numbers.array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]); }
But we can simplify the code even more, knowing that the pattern "go!" he is able to shift the value of the range in the channel.
D
unittest { import std.range; import jin.go; static auto fibonacci( int limit ) { return recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( limit ); } fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]); go!fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]); }
Thus, the function does not need to know anything about the channels to be able to run it in parallel flow, and then wait for the result of it.
Select
Go
package main import "fmt" func fibonacci(c, quit chan int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } func main() { c := make(chan int) quit := make(chan int) go func() { for i := 0; i < 10; i++ { fmt.Println(<-c) } quit <- 0 }() fibonacci(c, quit) }
Go has a special terse syntax for simultaneous operation of multiple channels. D is nothing, of course, does not matter. However, the equivalent functionality is implemented is not particularly complicated manual implementation cycle monitoring.
D
unittest { import std.range; import jin.go; __gshared int[] log; static auto fibonacci( Channel!int numbers , Channel!bool control ) { auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ); while( !control.closed ) { if( numbers.needed ) numbers.next = range.next; yield; } log ~= -1; numbers.close(); } static void print( Channel!bool control , Channel!int numbers ) { foreach( i ; 10.iota ) log ~= numbers.next; control.close(); } auto numbers = new Channel!int(1); auto control = new Channel!bool(1); go!print( control , numbers ); go!fibonacci( numbers , control ); while( !control.empty || !numbers.empty ) yield; log.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , -1 ]); }
As you can see, we had to get rid of the circuit, but in cycles add "yield", to the competing fibers, too, could do something, while the present hanging in suspense.
Default Selection
Go
package main import ( "fmt" "time" ) func main() { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } } }
A special syntax allows Go do something, or if one of the channels were not active. In D you, however, more control over the execution flow.
D
unittest { import core.time; import jin.go; static auto after( Channel!bool channel , Duration dur ) { sleep( dur ); if( !channel.closed ) channel.next = true; } static auto tick( Channel!bool channel , Duration dur ) { while( !channel.closed ) after( channel , dur ); } auto ticks = go!tick( 101.msecs ); auto booms = go!after( 501.msecs ); string log; while( booms.clear ) { while( !ticks.clear ) { log ~= "tick"; ticks.popFront; } log ~= "."; sleep( 51.msecs ); } log ~= "BOOM!"; log.assertEq( "..tick..tick..tick..tick..BOOM!" ); }
A notable feature is that we are not required to manually create a channel. If the function takes the first argument to the channel and we did not pass, it will be created automatically and returned as the result of the template "go!", Which is very convenient. Functions of "after" and "tick" is too specific to make them into a shared library, but the implementation of them is very simple.
Mutex
In some cases, no shared mutable state still can not do, and here we come to the aid of the lock.
Go
package main import ( "fmt" "sync" "time" ) // SafeCounter is safe to use concurrently. type SafeCounter struct { v map[string]int mux sync.Mutex } // Inc increments the counter for the given key. func (c *SafeCounter) Inc(key string) { c.mux.Lock() // Lock so only one goroutine at a time can access the map c.v. c.v[key]++ c.mux.Unlock() } // Value returns the current value of the counter for the given key. func (c *SafeCounter) Value(key string) int { c.mux.Lock() // Lock so only one goroutine at a time can access the map c.v. defer c.mux.Unlock() return c.v[key] } func main() { c := SafeCounter{v: make(map[string]int)} for i := 0; i < 1000; i++ { go c.Inc("somekey") } time.Sleep(time.Second) fmt.Println(c.Value("somekey")) }
Yes, that's right, the realization of the shared mutable state in the Go - it's a pain and suffering. One wrong move when you work with mutex and you suddenly find yourself phantom limb . Not to mention the fact that the compiler does not even nameknёt you about where mutexes needed. But D compiler you narugaet much for trying to work with unprotected mutable state of different threads. And the easiest way to protect state when multithreading - to implement a synchronized class.
D
unittest { import core.atomic; import core.time; import std.range; import std.typecons; import jin.go; synchronized class SafeCounter { private int[string] store; void inc( string key ) { ++ store[key]; } auto opIndex( string key ) { return store[ key ]; } void opIndexUnary( string op = "++" )( string key ) { this.inc( key ); } } static counter = new shared SafeCounter; static void working( int i ) { ++ counter["somekey"]; } foreach( i ; 1000.iota ) { go!working( i ); } sleep( 1.seconds ); counter["somekey"].assertEq( 1000 ); }
Feature synchronized class that automatically creates a mutex and call any public method of the mutex is captured for him, freed only when the output from the method. At the same time all the internal state is required to be kept private. But there is one unpleasant feature (and actually very dangerous and annoying bug compiler): template methods, such as, for example, "! OpIndexUnary", not wrapped in the capture of the mutex. Therefore, we have created a separate public method "inc", which causes of a generic method. The internal implementation has turned out not so beautiful, but it turned out the external interface as a native. The resulting "shared SafeCounter" we can safely pass through the channel and used directly from the different streams.