GitXplorerGitXplorer
k

run_with_fork

public
18 stars
0 forks
0 issues

Commits

List of commits on branch master.
Unverified
bd69ce22fb5522308b40cd61db2d980452279c1c

fix

kkostya committed 6 years ago
Unverified
8ae5ef8b749552740ac637f693ebf8f233b1b5c9

adding pid

kkostya committed 7 years ago
Unverified
a60f30cf7e5a991922a6a3b759359f9936808b82

fix

kkostya committed 7 years ago
Unverified
d118f8c42548e1f86a23ad7230ed35f216d0939c

fix

kkostya committed 7 years ago
Unverified
f6c45ed6b6b1706e712b4d742af5e834f1d9ed19

fix

kkostya committed 7 years ago

README

The README file for this repository.

run_with_fork

Some simple parallelism for Crystal. Run some heavy or blocked thread operations in background fork. Fork created for every new task and exit when done.

Installation

Add this to your application's shard.yml:

dependencies:
  run_with_fork:
    github: kostya/run_with_fork

Usage

require "run_with_fork"
require "digest/md5"

def heavy_operation(str)
  1000.times { str = Digest::MD5.hexdigest(str) }
  str
end

pid, read_io = Process.run_with_fork do |write_io|
  write_io.puts heavy_operation("bla")
end

res = read_io.gets
read_io.close

puts res

Example concurrent

without fork:

crystal examples/2.cr --release -- 100 10000 0

00:00:03.380020000

with fork:

crystal examples/2.cr --release -- 100 10000 1

00:00:00.758754000
require "run_with_fork"
require "digest/md5"

t = Time.now

res = Channel(String).new

times = (ARGV[0]? || 100).to_i
count = (ARGV[1]? || 10000).to_i
use_fork = (ARGV[2]? == "1")

puts "use #{use_fork ? "fork" : "fiber"}"

def operation(count, data)
  s = ""
  count.times do
    s = Digest::MD5.hexdigest("#{data} bla #{s}")
  end
  "done #{data} #{s}"
end

times.times do |i|
  spawn do
    if use_fork
      pid, r = Process.run_with_fork do |w|
        w.puts operation(count, i)
      end

      s = r.gets
      r.close
      res.send s.not_nil!
    else
      res.send operation(count, i)
    end
  end
end

times.times do
  p res.receive
end

p Time.now - t

Example use msgpack to exchange data

require "run_with_fork"
require "msgpack"

pid, read_io = Process.run_with_fork do |write_io|
  1.to_msgpack(write_io)
  "done".to_msgpack(write_io)
  [1, 2, 3].to_msgpack(write_io)
end

pull = MessagePack::Unpacker.new(read_io)

p pull.read_uint         # => 1_u8
p pull.read_string       # => "done"
p Array(Int32).new(pull) # => [1, 2, 3]

read_io.close