1
0
mirror of https://github.com/fumiama/sched.git synced 2026-06-04 23:10:26 +08:00

init: add codes

This commit is contained in:
源文雨
2025-02-16 22:43:26 +09:00
parent cf70378fba
commit df0f69c08c
5 changed files with 304 additions and 0 deletions

View File

@@ -1,2 +1,42 @@
# sched
Simple Golang parallel scheduler.
## Usage
### Add one to each element in arr
```go
arr := make([]uint8, 64*1024*1024) // 64M
_, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) {
for i := range x {
arr[i]++
}
return arr, nil
}, false, false).Collect(2*1024*1024, true, true) // 2M batch
```
### Get HTTP contents for earh URL
```go
contents, err := NewTask(urls, func(_ int, urls []string) ([]string, error) {
for i, u := range url {
str, err := ... // get content
if err != nil {
return nil, err
}
urls[i] = str // overlap
}
return urls, nil // return result
}, false, false).Collect(4, false, false) // 4 URLs as a group
```
## Bechmark
A simple self-incrasing process is performed on a 64M uint8 array.
```c
goos: darwin
goarch: arm64
pkg: github.com/fumiama/sched
cpu: Apple M1
BenchmarkSched/single-8 1000000000 0.02992 ns/op 2242952686132.55 MB/s 0 B/op 0 allocs/op
BenchmarkSched/para512K-8 1000000000 0.02483 ns/op 2702751323377.81 MB/s 0 B/op 0 allocs/op
BenchmarkSched/para1M-8 1000000000 0.02492 ns/op 2693026104055.06 MB/s 0 B/op 0 allocs/op
BenchmarkSched/para2M-8 1000000000 0.02133 ns/op 3146490138908.33 MB/s 0 B/op 0 allocs/op
PASS
ok github.com/fumiama/sched 1.119s
```

24
error.go Normal file
View File

@@ -0,0 +1,24 @@
package sched
import (
"strconv"
"strings"
)
// Errors parallel errors with batch index.
type Errors []error
func (errs Errors) Error() string {
sb := strings.Builder{}
for i, err := range []error(errs) {
if err == nil {
continue
}
sb.WriteByte('#')
sb.WriteString(strconv.Itoa(i))
sb.WriteByte(':')
sb.WriteString(err.Error())
sb.WriteByte(' ')
}
return sb.String()
}

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module github.com/fumiama/sched
go 1.23.2

137
sched.go Normal file
View File

@@ -0,0 +1,137 @@
package sched
import (
"errors"
"slices"
"sync"
)
var (
ErrInvalidBatch = errors.New("invalid batch")
ErrEmptyItems = errors.New("empty items")
)
// Task split target into slices and gather them
type Task[T any] struct {
items []T
sched func(int, []T) ([]T, error)
pseudo bool
single bool
}
// NewTask on items.
//
// - pseudo: pseudo run
// - single: disable parallel
func NewTask[T any](items []T, sched func(int, []T) ([]T, error), pseudo, single bool) *Task[T] {
return &Task[T]{items: items, sched: sched, pseudo: pseudo, single: single}
}
// Collect divide items by batch and send them to sched func parallelly.
func (sc *Task[T]) Collect(batch int, ignoreoutput, ignoreerror bool) ([]T, error) {
if batch <= 0 {
return nil, ErrInvalidBatch
}
cnt := len(sc.items)
if cnt == 0 {
return nil, ErrEmptyItems
}
n := cnt / batch
if n == 0 || batch >= cnt { // only one batch
if sc.pseudo {
return sc.items, nil
}
return sc.sched(0, sc.items)
}
remain := cnt % batch
wg := sync.WaitGroup{}
var (
itemgroups [][]T
errorgroup Errors
)
if !ignoreoutput {
if remain == 0 {
itemgroups = make([][]T, n)
} else {
itemgroups = make([][]T, n+1)
}
}
if !ignoreerror {
if remain == 0 {
errorgroup = make(Errors, n)
} else {
errorgroup = make(Errors, n+1)
}
}
iterfn := func(i int) {
a := i * batch
b := (i + 1) * batch
if sc.pseudo {
if ignoreoutput {
return
}
itemgroups[i] = sc.items[a:b]
return
}
if ignoreoutput {
if ignoreerror {
_, _ = sc.sched(i, sc.items[a:b])
} else {
_, errorgroup[i] = sc.sched(i, sc.items[a:b])
}
} else {
if ignoreerror {
itemgroups[i], _ = sc.sched(i, sc.items[a:b])
} else {
itemgroups[i], errorgroup[i] = sc.sched(i, sc.items[a:b])
}
}
}
if !sc.single {
wg.Add(n)
}
for i := 0; i < n; i++ { // full batches
if !sc.single {
go func(i int) {
defer wg.Done()
iterfn(i)
}(i)
} else {
iterfn(i)
}
}
if remain > 0 {
if ignoreoutput {
if !sc.pseudo {
if ignoreerror {
_, _ = sc.sched(n, sc.items[n*batch:])
} else {
_, errorgroup[n] = sc.sched(n, sc.items[n*batch:])
}
}
} else {
if sc.pseudo {
itemgroups[n] = sc.items[n*batch:]
} else {
if ignoreerror {
itemgroups[n], _ = sc.sched(n, sc.items[n*batch:])
} else {
itemgroups[n], errorgroup[n] = sc.sched(n, sc.items[n*batch:])
}
}
}
}
if !sc.single {
wg.Wait()
}
if !ignoreerror && errorgroup.Error() != "" {
if ignoreoutput {
return nil, errorgroup
}
return slices.Concat(itemgroups...), errorgroup
}
if ignoreoutput {
return nil, nil
}
return slices.Concat(itemgroups...), nil
}

100
sched_test.go Normal file
View File

@@ -0,0 +1,100 @@
package sched
import (
"crypto/rand"
"slices"
"testing"
)
func TestSched(t *testing.T) {
buf := make([]int, 4096)
for i := 0; i < len(buf); i++ {
buf[i] = i
}
for i := 1; i < len(buf); i++ {
sc := NewTask(buf[:i], func(_ int, t []int) ([]int, error) {
return t, nil
}, false, false)
for j := 1; j < 256; j++ {
r, err := sc.Collect(j, false, true)
if err != nil {
t.Fatal(err)
}
if !slices.Equal(r, buf[:i]) {
t.Fatal("expect", buf[:i], "got", r)
}
}
}
}
func BenchmarkSched(b *testing.B) {
arr := make([]uint8, 64*1024*1024) // 64M
_, err := rand.Read(arr)
if err != nil {
b.Fatal(err)
}
b.Run("single", func(b *testing.B) {
b.SetBytes(int64(len(arr)))
b.ResetTimer()
for i := range arr {
arr[i]++
}
})
_, err = rand.Read(arr)
if err != nil {
b.Fatal(err)
}
b.Run("para512K", func(b *testing.B) {
b.SetBytes(int64(len(arr)))
b.ResetTimer()
_, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) {
for i := range x {
arr[i] /= 3
arr[i]++
arr[i] *= 5
arr[i]++
arr[i] /= 7
arr[i]++
}
return arr, nil
}, false, false).Collect(512*1024, true, true) // 512K batch
})
_, err = rand.Read(arr)
if err != nil {
b.Fatal(err)
}
b.Run("para1M", func(b *testing.B) {
b.SetBytes(int64(len(arr)))
b.ResetTimer()
_, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) {
for i := range x {
arr[i] /= 3
arr[i]++
arr[i] *= 5
arr[i]++
arr[i] /= 7
arr[i]++
}
return arr, nil
}, false, false).Collect(1024*1024, true, true) // 1M batch
})
_, err = rand.Read(arr)
if err != nil {
b.Fatal(err)
}
b.Run("para2M", func(b *testing.B) {
b.SetBytes(int64(len(arr)))
b.ResetTimer()
_, _ = NewTask(arr, func(_ int, x []byte) ([]byte, error) {
for i := range x {
arr[i] /= 3
arr[i]++
arr[i] *= 5
arr[i]++
arr[i] /= 7
arr[i]++
}
return arr, nil
}, false, false).Collect(2*1024*1024, true, true) // 2M batch
})
}