[performance] ffmpeg ffprobe wrapper improvements (#3225)

* use a single instance of wazero runtime and compiled modules

* remove test output 🤦

* undo process-{media,emoji} changes

* update test runner to include wazero compilation cache

* sign drone.yml

---------

Co-authored-by: tobi <tobi.smethurst@protonmail.com>
This commit is contained in:
kim 2024-08-23 15:15:35 +00:00 committed by GitHub
parent 53fccb8af8
commit 8e5a72ac5c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 301 additions and 384 deletions

View file

@ -34,8 +34,11 @@ steps:
path: /root/.cache/go-build path: /root/.cache/go-build
- name: go-src - name: go-src
path: /go path: /go
- name: wazero-compilation-cache
path: /root/.cache/wazero
environment: environment:
CGO_ENABLED: "0" CGO_ENABLED: "0"
GTS_WAZERO_COMPILATION_CACHE: "/root/.cache/wazero"
commands: commands:
- apk update --no-cache && apk add git - apk update --no-cache && apk add git
- >- - >-
@ -204,6 +207,6 @@ steps:
--- ---
kind: signature kind: signature
hmac: 86ebddcd630792cac43aa92fa7f45118943c51b5157491d05eb480ac21762329 hmac: f4008d87e4e5b67251eb89f255c1224e6ab5818828cab24fc319b8f829176058
... ...

View file

@ -27,7 +27,6 @@ import (
"codeberg.org/gruf/go-byteutil" "codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-ffmpreg/wasm"
_ffmpeg "github.com/superseriousbusiness/gotosocial/internal/media/ffmpeg" _ffmpeg "github.com/superseriousbusiness/gotosocial/internal/media/ffmpeg"
"github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtserror"
@ -161,7 +160,7 @@ func ffmpegGenerateStatic(ctx context.Context, filepath string) (string, error)
// ffmpeg calls `ffmpeg [args...]` (WASM) with directory path mounted in runtime. // ffmpeg calls `ffmpeg [args...]` (WASM) with directory path mounted in runtime.
func ffmpeg(ctx context.Context, dirpath string, args ...string) error { func ffmpeg(ctx context.Context, dirpath string, args ...string) error {
var stderr byteutil.Buffer var stderr byteutil.Buffer
rc, err := _ffmpeg.Ffmpeg(ctx, wasm.Args{ rc, err := _ffmpeg.Ffmpeg(ctx, _ffmpeg.Args{
Stderr: &stderr, Stderr: &stderr,
Args: args, Args: args,
Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig {
@ -188,7 +187,7 @@ func ffprobe(ctx context.Context, filepath string) (*result, error) {
dirpath := path.Dir(filepath) dirpath := path.Dir(filepath)
// Run ffprobe on our given file at path. // Run ffprobe on our given file at path.
_, err := _ffmpeg.Ffprobe(ctx, wasm.Args{ _, err := _ffmpeg.Ffprobe(ctx, _ffmpeg.Args{
Stdout: &stdout, Stdout: &stdout,
Args: []string{ Args: []string{

View file

@ -1,46 +0,0 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ffmpeg
import (
"os"
"github.com/tetratelabs/wazero"
)
// shared WASM compilation cache.
var cache wazero.CompilationCache
func initCache() {
if cache != nil {
return
}
if dir := os.Getenv("WAZERO_COMPILATION_CACHE"); dir != "" {
var err error
// Use on-filesystem compilation cache given by env.
cache, err = wazero.NewCompilationCacheWithDir(dir)
if err != nil {
panic(err)
}
} else {
// Use in-memory compilation cache.
cache = wazero.NewCompilationCache()
}
}

View file

@ -19,65 +19,22 @@ package ffmpeg
import ( import (
"context" "context"
ffmpeglib "codeberg.org/gruf/go-ffmpreg/embed/ffmpeg"
"codeberg.org/gruf/go-ffmpreg/wasm"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
) )
// InitFfmpeg initializes the ffmpeg WebAssembly instance pool, // ffmpegRunner limits the number of
// with given maximum limiting the number of concurrent instances. // ffmpeg WebAssembly instances that
// may be concurrently running, in
// order to reduce memory usage.
var ffmpegRunner runner
// InitFfmpeg precompiles the ffmpeg WebAssembly source into memory and
// prepares the runner to only allow max given concurrent running instances.
func InitFfmpeg(ctx context.Context, max int) error { func InitFfmpeg(ctx context.Context, max int) error {
initCache() // ensure compilation cache initialized ffmpegRunner.Init(max)
return ffmpegPool.Init(ctx, max) return compileFfmpeg(ctx)
} }
// Ffmpeg runs the given arguments with an instance of ffmpeg. // Ffmpeg runs the given arguments with an instance of ffmpeg.
func Ffmpeg(ctx context.Context, args wasm.Args) (uint32, error) { func Ffmpeg(ctx context.Context, args Args) (uint32, error) {
return ffmpegPool.Run(ctx, args) return ffmpegRunner.Run(ctx, ffmpeg, args)
}
var ffmpegPool = wasmInstancePool{
inst: wasm.Instantiator{
// WASM module name.
Module: "ffmpeg",
// Per-instance WebAssembly runtime (with shared cache).
Runtime: func(ctx context.Context) wazero.Runtime {
// Prepare config with cache.
cfg := wazero.NewRuntimeConfig()
cfg = cfg.WithCoreFeatures(ffmpeglib.CoreFeatures)
cfg = cfg.WithCompilationCache(cache)
// Instantiate runtime with our config.
rt := wazero.NewRuntimeWithConfig(ctx, cfg)
// Prepare default "env" host module.
env := rt.NewHostModuleBuilder("env")
// Instantiate "env" module in our runtime.
_, err := env.Instantiate(context.Background())
if err != nil {
panic(err)
}
// Instantiate the wasi snapshot preview 1 in runtime.
_, err = wasi_snapshot_preview1.Instantiate(ctx, rt)
if err != nil {
panic(err)
}
return rt
},
// Per-run module configuration.
Config: wazero.NewModuleConfig,
// Embedded WASM.
Source: ffmpeglib.B,
},
} }

View file

@ -19,65 +19,22 @@ package ffmpeg
import ( import (
"context" "context"
ffprobelib "codeberg.org/gruf/go-ffmpreg/embed/ffprobe"
"codeberg.org/gruf/go-ffmpreg/wasm"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
) )
// InitFfprobe initializes the ffprobe WebAssembly instance pool, // ffprobeRunner limits the number of
// with given maximum limiting the number of concurrent instances. // ffprobe WebAssembly instances that
// may be concurrently running, in
// order to reduce memory usage.
var ffprobeRunner runner
// InitFfprobe precompiles the ffprobe WebAssembly source into memory and
// prepares the runner to only allow max given concurrent running instances.
func InitFfprobe(ctx context.Context, max int) error { func InitFfprobe(ctx context.Context, max int) error {
initCache() // ensure compilation cache initialized ffprobeRunner.Init(max)
return ffprobePool.Init(ctx, max) return compileFfprobe(ctx)
} }
// Ffprobe runs the given arguments with an instance of ffprobe. // Ffprobe runs the given arguments with an instance of ffprobe.
func Ffprobe(ctx context.Context, args wasm.Args) (uint32, error) { func Ffprobe(ctx context.Context, args Args) (uint32, error) {
return ffprobePool.Run(ctx, args) return ffprobeRunner.Run(ctx, ffprobe, args)
}
var ffprobePool = wasmInstancePool{
inst: wasm.Instantiator{
// WASM module name.
Module: "ffprobe",
// Per-instance WebAssembly runtime (with shared cache).
Runtime: func(ctx context.Context) wazero.Runtime {
// Prepare config with cache.
cfg := wazero.NewRuntimeConfig()
cfg = cfg.WithCoreFeatures(ffprobelib.CoreFeatures)
cfg = cfg.WithCompilationCache(cache)
// Instantiate runtime with our config.
rt := wazero.NewRuntimeWithConfig(ctx, cfg)
// Prepare default "env" host module.
env := rt.NewHostModuleBuilder("env")
// Instantiate "env" module in our runtime.
_, err := env.Instantiate(context.Background())
if err != nil {
panic(err)
}
// Instantiate the wasi snapshot preview 1 in runtime.
_, err = wasi_snapshot_preview1.Instantiate(ctx, rt)
if err != nil {
panic(err)
}
return rt
},
// Per-run module configuration.
Config: wazero.NewModuleConfig,
// Embedded WASM.
Source: ffprobelib.B,
},
} }

View file

@ -1,94 +0,0 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ffmpeg
import (
"context"
"codeberg.org/gruf/go-ffmpreg/wasm"
)
// wasmInstancePool wraps a wasm.Instantiator{} and a
// channel of wasm.Instance{}s to provide a concurrency
// safe pool of WebAssembly module instances capable of
// compiling new instances on-the-fly, with a predetermined
// maximum number of concurrent instances at any one time.
type wasmInstancePool struct {
inst wasm.Instantiator
pool chan *wasm.Instance
}
func (p *wasmInstancePool) Init(ctx context.Context, sz int) error {
// Initialize for first time
// to preload module into the
// wazero compilation cache.
inst, err := p.inst.New(ctx)
if err != nil {
return err
}
// Clamp to 1.
if sz <= 0 {
sz = 1
}
// Allocate new pool instance channel.
p.pool = make(chan *wasm.Instance, sz)
// Store only one
// open instance
// at init time.
p.pool <- inst
// Fill reminaing with closed
// instances for later opening.
for i := 0; i < sz-1; i++ {
p.pool <- new(wasm.Instance)
}
return nil
}
func (p *wasmInstancePool) Run(ctx context.Context, args wasm.Args) (uint32, error) {
var inst *wasm.Instance
select {
// Context canceled.
case <-ctx.Done():
return 0, ctx.Err()
// Acquire instance.
case inst = <-p.pool:
// Ensure instance is
// ready for running.
if inst.IsClosed() {
var err error
inst, err = p.inst.New(ctx)
if err != nil {
return 0, err
}
}
}
// Release instance to pool on end.
defer func() { p.pool <- inst }()
// Pass args to instance.
return inst.Run(ctx, args)
}

View file

@ -0,0 +1,70 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ffmpeg
import (
"context"
"github.com/tetratelabs/wazero"
)
// runner simply abstracts away the complexities
// of limiting the number of concurrent running
// instances of a particular WebAssembly module.
type runner struct{ pool chan struct{} }
// Init initializes the runner to
// only allow 'n' concurrently running
// instances. Special cases include 0
// which clamps to 1, and < 0 which
// disables the limit alltogether.
func (r *runner) Init(n int) {
// Reset pool.
r.pool = nil
// Clamp to 1.
if n <= 0 {
n = 1
}
// Allocate new pool channel.
r.pool = make(chan struct{}, n)
for i := 0; i < n; i++ {
r.pool <- struct{}{}
}
}
// Run will attempt to pass the given compiled WebAssembly module with args to run(), waiting on
// the receiving runner until a free slot is available to run an instance, (if a limit is enabled).
func (r *runner) Run(ctx context.Context, cmod wazero.CompiledModule, args Args) (uint32, error) {
select {
// Context canceled.
case <-ctx.Done():
return 0, ctx.Err()
// Slot acquired.
case <-r.pool:
}
// Release slot back to pool on end.
defer func() { r.pool <- struct{}{} }()
// Pass to main module runner.
return run(ctx, cmod, args)
}

View file

@ -0,0 +1,201 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ffmpeg
import (
"context"
"io"
"os"
ffmpeglib "codeberg.org/gruf/go-ffmpreg/embed/ffmpeg"
ffprobelib "codeberg.org/gruf/go-ffmpreg/embed/ffprobe"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/tetratelabs/wazero/sys"
)
// Use all core features required by ffmpeg / ffprobe
// (these should be the same but we OR just in case).
const corefeatures = ffprobelib.CoreFeatures |
ffmpeglib.CoreFeatures
var (
// shared WASM runtime instance.
runtime wazero.Runtime
// ffmpeg / ffprobe compiled WASM.
ffmpeg wazero.CompiledModule
ffprobe wazero.CompiledModule
)
// Args encapsulates the passing of common
// configuration options to run an instance
// of a compiled WebAssembly module that is
// run in a typical CLI manner.
type Args struct {
// Optional further module configuration function.
// (e.g. to mount filesystem dir, set env vars, etc).
Config func(wazero.ModuleConfig) wazero.ModuleConfig
// Standard FDs.
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
// CLI args.
Args []string
}
// run will run the given compiled
// WebAssembly module using given args,
// using the global wazero runtime.
func run(
ctx context.Context,
cmod wazero.CompiledModule,
args Args,
) (
uint32, // exit code
error,
) {
// Prefix module name as argv0 to args.
cargs := make([]string, len(args.Args)+1)
copy(cargs[1:], args.Args)
cargs[0] = cmod.Name()
// Create base module config.
modcfg := wazero.NewModuleConfig()
modcfg = modcfg.WithArgs(cargs...)
modcfg = modcfg.WithStdin(args.Stdin)
modcfg = modcfg.WithStdout(args.Stdout)
modcfg = modcfg.WithStderr(args.Stderr)
if args.Config != nil {
// Pass through config fn.
modcfg = args.Config(modcfg)
}
// Instantiate the module from precompiled wasm module data.
mod, err := runtime.InstantiateModule(ctx, cmod, modcfg)
if mod != nil {
// Ensure closed.
_ = mod.Close(ctx)
}
// Try extract exit code.
switch err := err.(type) {
case *sys.ExitError:
return err.ExitCode(), nil
default:
return 0, err
}
}
// compileFfmpeg ensures the ffmpeg WebAssembly has been
// pre-compiled into memory. If already compiled is a no-op.
func compileFfmpeg(ctx context.Context) error {
if ffmpeg != nil {
return nil
}
// Ensure runtime already initialized.
if err := initRuntime(ctx); err != nil {
return err
}
// Compile the ffmpeg WebAssembly module into memory.
cmod, err := runtime.CompileModule(ctx, ffmpeglib.B)
if err != nil {
return err
}
// Set module.
ffmpeg = cmod
return nil
}
// compileFfprobe ensures the ffprobe WebAssembly has been
// pre-compiled into memory. If already compiled is a no-op.
func compileFfprobe(ctx context.Context) error {
if ffprobe != nil {
return nil
}
// Ensure runtime already initialized.
if err := initRuntime(ctx); err != nil {
return err
}
// Compile the ffprobe WebAssembly module into memory.
cmod, err := runtime.CompileModule(ctx, ffprobelib.B)
if err != nil {
return err
}
// Set module.
ffprobe = cmod
return nil
}
// initRuntime initializes the global wazero.Runtime,
// if already initialized this function is a no-op.
func initRuntime(ctx context.Context) error {
if runtime != nil {
return nil
}
var cache wazero.CompilationCache
if dir := os.Getenv("GTS_WAZERO_COMPILATION_CACHE"); dir != "" {
var err error
// Use on-filesystem compilation cache given by env.
cache, err = wazero.NewCompilationCacheWithDir(dir)
if err != nil {
return err
}
}
// Prepare config with cache.
cfg := wazero.NewRuntimeConfig()
cfg = cfg.WithCoreFeatures(corefeatures)
cfg = cfg.WithCompilationCache(cache)
// Instantiate runtime with prepared config.
rt := wazero.NewRuntimeWithConfig(ctx, cfg)
// Prepare default "env" host module.
env := rt.NewHostModuleBuilder("env")
// Instantiate host "env" module.
_, err := env.Instantiate(ctx)
if err != nil {
return err
}
// Instantiate wasi snapshot preview features in runtime.
_, err = wasi_snapshot_preview1.Instantiate(ctx, rt)
if err != nil {
return err
}
// Set runtime.
runtime = rt
return nil
}

View file

@ -1,129 +0,0 @@
package wasm
import (
"context"
"errors"
"io"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/sys"
)
type Args struct {
// Optional further module configuration function.
// (e.g. to mount filesystem dir, set env vars, etc).
Config func(wazero.ModuleConfig) wazero.ModuleConfig
// Standard FDs.
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
// CLI args.
Args []string
}
type Instantiator struct {
// Module ...
Module string
// Runtime ...
Runtime func(context.Context) wazero.Runtime
// Config ...
Config func() wazero.ModuleConfig
// Source ...
Source []byte
}
func (inst *Instantiator) New(ctx context.Context) (*Instance, error) {
switch {
case inst.Module == "":
panic("missing module name")
case inst.Runtime == nil:
panic("missing runtime instantiator")
case inst.Config == nil:
panic("missing module configuration")
case len(inst.Source) == 0:
panic("missing module source")
}
// Create new host runtime.
rt := inst.Runtime(ctx)
// Compile guest module from WebAssembly source.
mod, err := rt.CompileModule(ctx, inst.Source)
if err != nil {
return nil, err
}
return &Instance{
inst: inst,
wzrt: rt,
cmod: mod,
}, nil
}
// Instance ...
//
// NOTE: Instance is NOT concurrency
// safe. One at a time please!!
type Instance struct {
inst *Instantiator
wzrt wazero.Runtime
cmod wazero.CompiledModule
}
func (inst *Instance) Run(ctx context.Context, args Args) (uint32, error) {
if inst.inst == nil {
panic("not initialized")
}
// Check instance open.
if inst.IsClosed() {
return 0, errors.New("instance closed")
}
// Prefix binary name as argv0 to args.
cargs := make([]string, len(args.Args)+1)
copy(cargs[1:], args.Args)
cargs[0] = inst.inst.Module
// Create base module config.
modcfg := inst.inst.Config()
modcfg = modcfg.WithName(inst.inst.Module)
modcfg = modcfg.WithArgs(cargs...)
modcfg = modcfg.WithStdin(args.Stdin)
modcfg = modcfg.WithStdout(args.Stdout)
modcfg = modcfg.WithStderr(args.Stderr)
if args.Config != nil {
// Pass through config fn.
modcfg = args.Config(modcfg)
}
// Instantiate the module from precompiled wasm module data.
mod, err := inst.wzrt.InstantiateModule(ctx, inst.cmod, modcfg)
switch err := err.(type) {
case nil:
return 0, mod.Close(ctx)
case *sys.ExitError:
return err.ExitCode(), nil
default:
return 0, err
}
}
func (inst *Instance) IsClosed() bool {
return (inst.wzrt == nil || inst.cmod == nil)
}
func (inst *Instance) Close(ctx context.Context) error {
if inst.IsClosed() {
return nil
}
err1 := inst.cmod.Close(ctx)
err2 := inst.wzrt.Close(ctx)
return errors.Join(err1, err2)
}

1
vendor/modules.txt vendored
View file

@ -34,7 +34,6 @@ codeberg.org/gruf/go-fastpath/v2
## explicit; go 1.22.0 ## explicit; go 1.22.0
codeberg.org/gruf/go-ffmpreg/embed/ffmpeg codeberg.org/gruf/go-ffmpreg/embed/ffmpeg
codeberg.org/gruf/go-ffmpreg/embed/ffprobe codeberg.org/gruf/go-ffmpreg/embed/ffprobe
codeberg.org/gruf/go-ffmpreg/wasm
# codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf # codeberg.org/gruf/go-iotools v0.0.0-20240710125620-934ae9c654cf
## explicit; go 1.21 ## explicit; go 1.21
codeberg.org/gruf/go-iotools codeberg.org/gruf/go-iotools