diff --git a/README.md b/README.md index 36aba4a..507d9a4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,42 @@ # sched Simple Golang parallel scheduler. + +## Usage +### Add one to each element in arr +```go +arr := make([]uint8, 64*1024*1024) // 64M +_, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) { + for i := range x { + arr[i]++ + } + return arr, nil +}, false, false).Collect(2*1024*1024, true, true) // 2M batch +``` +### Get HTTP contents for earh URL +```go +contents, err := NewTask(urls, func(_ int, urls []string) ([]string, error) { + for i, u := range url { + str, err := ... // get content + if err != nil { + return nil, err + } + urls[i] = str // overlap + } + return urls, nil // return result +}, false, false).Collect(4, false, false) // 4 URLs as a group +``` + +## Bechmark +A simple self-incrasing process is performed on a 64M uint8 array. +```c +goos: darwin +goarch: arm64 +pkg: github.com/fumiama/sched +cpu: Apple M1 +BenchmarkSched/single-8 1000000000 0.02992 ns/op 2242952686132.55 MB/s 0 B/op 0 allocs/op +BenchmarkSched/para512K-8 1000000000 0.02483 ns/op 2702751323377.81 MB/s 0 B/op 0 allocs/op +BenchmarkSched/para1M-8 1000000000 0.02492 ns/op 2693026104055.06 MB/s 0 B/op 0 allocs/op +BenchmarkSched/para2M-8 1000000000 0.02133 ns/op 3146490138908.33 MB/s 0 B/op 0 allocs/op +PASS +ok github.com/fumiama/sched 1.119s +``` diff --git a/error.go b/error.go new file mode 100644 index 0000000..ef00f35 --- /dev/null +++ b/error.go @@ -0,0 +1,24 @@ +package sched + +import ( + "strconv" + "strings" +) + +// Errors parallel errors with batch index. +type Errors []error + +func (errs Errors) Error() string { + sb := strings.Builder{} + for i, err := range []error(errs) { + if err == nil { + continue + } + sb.WriteByte('#') + sb.WriteString(strconv.Itoa(i)) + sb.WriteByte(':') + sb.WriteString(err.Error()) + sb.WriteByte(' ') + } + return sb.String() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..440adb0 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/fumiama/sched + +go 1.23.2 diff --git a/sched.go b/sched.go new file mode 100644 index 0000000..899c4d6 --- /dev/null +++ b/sched.go @@ -0,0 +1,137 @@ +package sched + +import ( + "errors" + "slices" + "sync" +) + +var ( + ErrInvalidBatch = errors.New("invalid batch") + ErrEmptyItems = errors.New("empty items") +) + +// Task split target into slices and gather them +type Task[T any] struct { + items []T + sched func(int, []T) ([]T, error) + pseudo bool + single bool +} + +// NewTask on items. +// +// - pseudo: pseudo run +// - single: disable parallel +func NewTask[T any](items []T, sched func(int, []T) ([]T, error), pseudo, single bool) *Task[T] { + return &Task[T]{items: items, sched: sched, pseudo: pseudo, single: single} +} + +// Collect divide items by batch and send them to sched func parallelly. +func (sc *Task[T]) Collect(batch int, ignoreoutput, ignoreerror bool) ([]T, error) { + if batch <= 0 { + return nil, ErrInvalidBatch + } + cnt := len(sc.items) + if cnt == 0 { + return nil, ErrEmptyItems + } + n := cnt / batch + if n == 0 || batch >= cnt { // only one batch + if sc.pseudo { + return sc.items, nil + } + return sc.sched(0, sc.items) + } + remain := cnt % batch + wg := sync.WaitGroup{} + var ( + itemgroups [][]T + errorgroup Errors + ) + if !ignoreoutput { + if remain == 0 { + itemgroups = make([][]T, n) + } else { + itemgroups = make([][]T, n+1) + } + } + if !ignoreerror { + if remain == 0 { + errorgroup = make(Errors, n) + } else { + errorgroup = make(Errors, n+1) + } + } + iterfn := func(i int) { + a := i * batch + b := (i + 1) * batch + if sc.pseudo { + if ignoreoutput { + return + } + itemgroups[i] = sc.items[a:b] + return + } + if ignoreoutput { + if ignoreerror { + _, _ = sc.sched(i, sc.items[a:b]) + } else { + _, errorgroup[i] = sc.sched(i, sc.items[a:b]) + } + } else { + if ignoreerror { + itemgroups[i], _ = sc.sched(i, sc.items[a:b]) + } else { + itemgroups[i], errorgroup[i] = sc.sched(i, sc.items[a:b]) + } + } + } + if !sc.single { + wg.Add(n) + } + for i := 0; i < n; i++ { // full batches + if !sc.single { + go func(i int) { + defer wg.Done() + iterfn(i) + }(i) + } else { + iterfn(i) + } + } + if remain > 0 { + if ignoreoutput { + if !sc.pseudo { + if ignoreerror { + _, _ = sc.sched(n, sc.items[n*batch:]) + } else { + _, errorgroup[n] = sc.sched(n, sc.items[n*batch:]) + } + } + } else { + if sc.pseudo { + itemgroups[n] = sc.items[n*batch:] + } else { + if ignoreerror { + itemgroups[n], _ = sc.sched(n, sc.items[n*batch:]) + } else { + itemgroups[n], errorgroup[n] = sc.sched(n, sc.items[n*batch:]) + } + } + } + } + if !sc.single { + wg.Wait() + } + if !ignoreerror && errorgroup.Error() != "" { + if ignoreoutput { + return nil, errorgroup + } + return slices.Concat(itemgroups...), errorgroup + } + if ignoreoutput { + return nil, nil + } + return slices.Concat(itemgroups...), nil +} diff --git a/sched_test.go b/sched_test.go new file mode 100644 index 0000000..cdf0f59 --- /dev/null +++ b/sched_test.go @@ -0,0 +1,100 @@ +package sched + +import ( + "crypto/rand" + "slices" + "testing" +) + +func TestSched(t *testing.T) { + buf := make([]int, 4096) + for i := 0; i < len(buf); i++ { + buf[i] = i + } + for i := 1; i < len(buf); i++ { + sc := NewTask(buf[:i], func(_ int, t []int) ([]int, error) { + return t, nil + }, false, false) + for j := 1; j < 256; j++ { + r, err := sc.Collect(j, false, true) + if err != nil { + t.Fatal(err) + } + if !slices.Equal(r, buf[:i]) { + t.Fatal("expect", buf[:i], "got", r) + } + } + } +} + +func BenchmarkSched(b *testing.B) { + arr := make([]uint8, 64*1024*1024) // 64M + _, err := rand.Read(arr) + if err != nil { + b.Fatal(err) + } + b.Run("single", func(b *testing.B) { + b.SetBytes(int64(len(arr))) + b.ResetTimer() + for i := range arr { + arr[i]++ + } + }) + _, err = rand.Read(arr) + if err != nil { + b.Fatal(err) + } + b.Run("para512K", func(b *testing.B) { + b.SetBytes(int64(len(arr))) + b.ResetTimer() + _, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) { + for i := range x { + arr[i] /= 3 + arr[i]++ + arr[i] *= 5 + arr[i]++ + arr[i] /= 7 + arr[i]++ + } + return arr, nil + }, false, false).Collect(512*1024, true, true) // 512K batch + }) + _, err = rand.Read(arr) + if err != nil { + b.Fatal(err) + } + b.Run("para1M", func(b *testing.B) { + b.SetBytes(int64(len(arr))) + b.ResetTimer() + _, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) { + for i := range x { + arr[i] /= 3 + arr[i]++ + arr[i] *= 5 + arr[i]++ + arr[i] /= 7 + arr[i]++ + } + return arr, nil + }, false, false).Collect(1024*1024, true, true) // 1M batch + }) + _, err = rand.Read(arr) + if err != nil { + b.Fatal(err) + } + b.Run("para2M", func(b *testing.B) { + b.SetBytes(int64(len(arr))) + b.ResetTimer() + _, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) { + for i := range x { + arr[i] /= 3 + arr[i]++ + arr[i] *= 5 + arr[i]++ + arr[i] /= 7 + arr[i]++ + } + return arr, nil + }, false, false).Collect(2*1024*1024, true, true) // 2M batch + }) +}