Skip to content

Commit

Permalink
Genesis export stream (#519)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
Writes genesis export to a file but with each module being appended to
the file individually. We need this because we are running in an issue
where the app state is too large to hold in memory causing an OOM. We
currently assume that each individual module's app state is small enough
to hold in memory.

## Testing performed to validate your change
manual testing locally.
  • Loading branch information
jewei1997 authored Jul 29, 2024
1 parent bf3b12b commit 2d2472d
Show file tree
Hide file tree
Showing 34 changed files with 1,758 additions and 50 deletions.
18 changes: 18 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ type StateSyncConfig struct {
SnapshotDirectory string `mapstructure:"snapshot-directory"`
}

// GenesisConfig defines the genesis export, validation, and import configuration
type GenesisConfig struct {
// StreamImport defines if the genesis.json is in stream form or not.
StreamImport bool `mapstructure:"stream-import"`

// GenesisStreamFile sets the genesis json file from which to stream from
GenesisStreamFile string `mapstructure:"genesis-stream-file"`
}

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -205,6 +214,7 @@ type Config struct {
StateSync StateSyncConfig `mapstructure:"state-sync"`
StateCommit config.StateCommitConfig `mapstructure:"state-commit"`
StateStore config.StateStoreConfig `mapstructure:"state-store"`
Genesis GenesisConfig `mapstructure:genesis`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -288,6 +298,10 @@ func DefaultConfig() *Config {
},
StateCommit: config.DefaultStateCommitConfig(),
StateStore: config.DefaultStateStoreConfig(),
Genesis: GenesisConfig{
StreamImport: false,
GenesisStreamFile: "",
},
}
}

Expand Down Expand Up @@ -391,6 +405,10 @@ func GetConfig(v *viper.Viper) (Config, error) {
PruneIntervalSeconds: v.GetInt("state-store.prune-interval-seconds"),
ImportNumWorkers: v.GetInt("state-store.import-num-workers"),
},
Genesis: GenesisConfig{
StreamImport: v.GetBool("genesis.stream-import"),
GenesisStreamFile: v.GetString("genesis.genesis-stream-file"),
},
}, nil
}

Expand Down
14 changes: 14 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }}
# default is emtpy which will then store under the app home directory same as before.
snapshot-directory = "{{ .StateSync.SnapshotDirectory }}"
###############################################################################
### Genesis Configuration ###
###############################################################################
# Genesis config allows configuring whether to stream from an genesis json file in streamed form
[genesis]
# stream-import specifies whether to the stream the import from the genesis json file. The genesis
# file must be in stream form and exported in a streaming fashion.
stream-import = {{ .Genesis.StreamImport }}
# genesis-stream-file specifies the path of the genesis json file to stream from.
genesis-stream-file = "{{ .Genesis.GenesisStreamFile }}"
` + config.DefaultConfigTemplate

var configTemplate *template.Template
Expand Down
80 changes: 79 additions & 1 deletion server/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"fmt"
"io/ioutil"
"os"
"time"

"github.com/spf13/cobra"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/cosmos/cosmos-sdk/client/flags"
Expand All @@ -17,11 +19,22 @@ import (
)

const (
FlagIsStreaming = "streaming"
FlagStreamingFile = "streaming-file"
FlagHeight = "height"
FlagForZeroHeight = "for-zero-height"
FlagJailAllowedAddrs = "jail-allowed-addrs"
)

type GenesisDocNoAppState struct {
GenesisTime time.Time `json:"genesis_time"`
ChainID string `json:"chain_id"`
InitialHeight int64 `json:"initial_height,string"`
ConsensusParams *tmtypes.ConsensusParams `json:"consensus_params,omitempty"`
Validators []tmtypes.GenesisValidator `json:"validators,omitempty"`
AppHash tmbytes.HexBytes `json:"app_hash"`
}

// ExportCmd dumps app state to JSON.
func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Command {
cmd := &cobra.Command{
Expand All @@ -38,6 +51,20 @@ func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Com
return err
}

isStreaming, err := cmd.Flags().GetBool(FlagIsStreaming)
if err != nil {
return err
}

streamingFile, err := cmd.Flags().GetString(FlagStreamingFile)
if err != nil {
return err
}

if isStreaming && streamingFile == "" {
return fmt.Errorf("file to export stream to not provided, please specify --streaming-file")
}

db, err := openDB(config.RootDir)
if err != nil {
return err
Expand Down Expand Up @@ -67,7 +94,56 @@ func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Com
forZeroHeight, _ := cmd.Flags().GetBool(FlagForZeroHeight)
jailAllowedAddrs, _ := cmd.Flags().GetStringSlice(FlagJailAllowedAddrs)

exported, err := appExporter(serverCtx.Logger, db, traceWriter, height, forZeroHeight, jailAllowedAddrs, serverCtx.Viper)
if isStreaming {
file, err := os.Create(streamingFile)
if err != nil {
return err
}
exported, err := appExporter(serverCtx.Logger, db, traceWriter, height, forZeroHeight, jailAllowedAddrs, serverCtx.Viper, file)
if err != nil {
return fmt.Errorf("error exporting state: %v", err)
}

doc, err := tmtypes.GenesisDocFromFile(serverCtx.Config.GenesisFile())
if err != nil {
return err
}

genesisDocNoAppHash := GenesisDocNoAppState{
GenesisTime: doc.GenesisTime,
ChainID: doc.ChainID,
AppHash: doc.AppHash,
InitialHeight: exported.Height,
ConsensusParams: &tmtypes.ConsensusParams{
Block: tmtypes.BlockParams{
MaxBytes: exported.ConsensusParams.Block.MaxBytes,
MaxGas: exported.ConsensusParams.Block.MaxGas,
},
Evidence: tmtypes.EvidenceParams{
MaxAgeNumBlocks: exported.ConsensusParams.Evidence.MaxAgeNumBlocks,
MaxAgeDuration: exported.ConsensusParams.Evidence.MaxAgeDuration,
MaxBytes: exported.ConsensusParams.Evidence.MaxBytes,
},
Validator: tmtypes.ValidatorParams{
PubKeyTypes: exported.ConsensusParams.Validator.PubKeyTypes,
},
},
Validators: exported.Validators,
}

// NOTE: Tendermint uses a custom JSON decoder for GenesisDoc
// (except for stuff inside AppState). Inside AppState, we're free
// to encode as protobuf or amino.
encoded, err := json.Marshal(genesisDocNoAppHash)
if err != nil {
return err
}

file.Write([]byte(fmt.Sprintf("%s", string(sdk.MustSortJSON(encoded)))))
return nil
}

exported, err := appExporter(serverCtx.Logger, db, traceWriter, height, forZeroHeight, jailAllowedAddrs, serverCtx.Viper, nil)
if err != nil {
return fmt.Errorf("error exporting state: %v", err)
}
Expand Down Expand Up @@ -108,6 +184,8 @@ func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Com
},
}

cmd.Flags().Bool(FlagIsStreaming, false, "Whether to stream the export in chunks. Useful when genesis is extremely large and cannot fit into memory.")
cmd.Flags().String(FlagStreamingFile, "genesis-stream.json", "The file to export the streamed genesis to")
cmd.Flags().String(flags.FlagHome, defaultNodeHome, "The application home directory")
cmd.Flags().Int64(FlagHeight, -1, "Export state from a particular height (-1 means latest height)")
cmd.Flags().Bool(FlagForZeroHeight, false, "Export state to start at height zero (perform preproccessing)")
Expand Down
2 changes: 1 addition & 1 deletion server/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func setupApp(t *testing.T, tempDir string) (*simapp.SimApp, context.Context, *t
app.Commit(context.Background())

cmd := server.ExportCmd(
func(_ log.Logger, _ dbm.DB, _ io.Writer, height int64, forZeroHeight bool, jailAllowedAddrs []string, appOptons types.AppOptions) (types.ExportedApp, error) {
func(_ log.Logger, _ dbm.DB, _ io.Writer, height int64, forZeroHeight bool, jailAllowedAddrs []string, appOptons types.AppOptions, file *os.File) (types.ExportedApp, error) {
encCfg := simapp.MakeTestEncodingConfig()

var simApp *simapp.SimApp
Expand Down
28 changes: 22 additions & 6 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

clientconfig "github.com/cosmos/cosmos-sdk/client/config"

genesistypes "github.com/cosmos/cosmos-sdk/types/genesis"
"github.com/spf13/cobra"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/server"
Expand Down Expand Up @@ -170,11 +171,6 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.

serverCtx.Viper.Set(flags.FlagChainID, chainID)

genesisFile, _ := tmtypes.GenesisDocFromFile(serverCtx.Config.GenesisFile())
if genesisFile.ChainID != clientCtx.ChainID {
panic(fmt.Sprintf("genesis file chain-id=%s does not equal config.toml chain-id=%s", genesisFile.ChainID, clientCtx.ChainID))
}

if enableTracing, _ := cmd.Flags().GetBool(tracing.FlagTracing); !enableTracing {
serverCtx.Logger.Info("--tracing not passed in, tracing is not enabled")
tracerProviderOptions = []trace.TracerProviderOption{}
Expand All @@ -197,6 +193,12 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
if err != nil {
return fmt.Errorf("failed to initialize telemetry: %w", err)
}
if !config.Genesis.StreamImport {
genesisFile, _ := tmtypes.GenesisDocFromFile(serverCtx.Config.GenesisFile())
if genesisFile.ChainID != clientCtx.ChainID {
panic(fmt.Sprintf("genesis file chain-id=%s does not equal config.toml chain-id=%s", genesisFile.ChainID, clientCtx.ChainID))
}
}

restartCoolDownDuration := time.Second * time.Duration(serverCtx.Config.SelfRemediation.RestartCooldownSeconds)
// Set the first restart time to be now - restartCoolDownDuration so that the first restart can trigger whenever
Expand Down Expand Up @@ -386,13 +388,27 @@ func startInProcess(
config.GRPC.Enable = true
} else {
ctx.Logger.Info("starting node with ABCI Tendermint in-process")
var gen *tmtypes.GenesisDoc
if config.Genesis.StreamImport {
lines := genesistypes.IngestGenesisFileLineByLine(config.Genesis.GenesisStreamFile)
for line := range lines {
genDoc, err := tmtypes.GenesisDocFromJSON([]byte(line))
if err != nil {
return err
}
if gen != nil {
return fmt.Errorf("error: multiple genesis docs found in stream")
}
gen = genDoc
}
}
tmNode, err = node.New(
goCtx,
ctx.Config,
ctx.Logger,
restartCh,
abciclient.NewLocalClient(ctx.Logger, app),
nil,
gen,
tracerProviderOptions,
nodeMetricsProvider,
)
Expand Down
4 changes: 3 additions & 1 deletion server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"encoding/json"
"io"
"os"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -84,5 +85,6 @@ type (

// AppExporter is a function that dumps all app state to
// JSON-serializable structure and returns the current validator set.
AppExporter func(log.Logger, dbm.DB, io.Writer, int64, bool, []string, AppOptions) (ExportedApp, error)
// If a file is specified,
AppExporter func(log.Logger, dbm.DB, io.Writer, int64, bool, []string, AppOptions, *os.File) (ExportedApp, error)
)
3 changes: 2 additions & 1 deletion simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
genesistypes "github.com/cosmos/cosmos-sdk/types/genesis"
"github.com/cosmos/cosmos-sdk/types/module"
"github.com/cosmos/cosmos-sdk/utils"
"github.com/cosmos/cosmos-sdk/version"
Expand Down Expand Up @@ -594,7 +595,7 @@ func (app *SimApp) InitChainer(ctx sdk.Context, req abci.RequestInitChain) abci.
panic(err)
}
app.UpgradeKeeper.SetModuleVersionMap(ctx, app.mm.GetVersionMap())
return app.mm.InitGenesis(ctx, app.appCodec, genesisState)
return app.mm.InitGenesis(ctx, app.appCodec, genesisState, genesistypes.GenesisImportConfig{})
}

// LoadHeight loads a particular height
Expand Down
3 changes: 2 additions & 1 deletion simapp/sim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cosmos/cosmos-sdk/simapp/helpers"
"github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/genesis"
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
authzkeeper "github.com/cosmos/cosmos-sdk/x/authz/keeper"
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestAppImportExport(t *testing.T) {

ctxA := app.NewContext(true, tmproto.Header{Height: app.LastBlockHeight()})
ctxB := newApp.NewContext(true, tmproto.Header{Height: app.LastBlockHeight()})
newApp.mm.InitGenesis(ctxB, app.AppCodec(), genesisState)
newApp.mm.InitGenesis(ctxB, app.AppCodec(), genesisState, genesis.GenesisImportConfig{})
newApp.StoreConsensusParams(ctxB, exported.ConsensusParams)

fmt.Printf("comparing stores...\n")
Expand Down
2 changes: 1 addition & 1 deletion simapp/simd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (a appCreator) newApp(logger log.Logger, db dbm.DB, traceStore io.Writer, t
// and exports state.
func (a appCreator) appExport(
logger log.Logger, db dbm.DB, traceStore io.Writer, height int64, forZeroHeight bool, jailAllowedAddrs []string,
appOpts servertypes.AppOptions) (servertypes.ExportedApp, error) {
appOpts servertypes.AppOptions, file *os.File) (servertypes.ExportedApp, error) {

var simApp *simapp.SimApp
homePath, ok := appOpts.Get(flags.FlagHome).(string)
Expand Down
Loading

0 comments on commit 2d2472d

Please sign in to comment.