Skip to content

Commit

Permalink
Staged Update
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Mar 20, 2024
1 parent e157a6f commit 6230388
Show file tree
Hide file tree
Showing 16 changed files with 777 additions and 330 deletions.
25 changes: 14 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ PROTO_FILES := $(shell find internal/pb/ -type f -name '*.proto')
MIGRATE_DIR := ./migrations
SERVICE := $(PROJECT).server

export DL_SKIP_SSL_VERIFICATION := 1

.PHONY: install migrate migrate-create clean build lint release
.PHONY: test test-one test-fuzz test-js lint-js build-js
.PHONY: reset-db setup-local server server-profile install-js
Expand Down Expand Up @@ -107,12 +109,11 @@ else
endif

test-fuzz: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
test-fuzz: export DL_SKIP_SSL_VERIFICATION=1
test-fuzz: reset-db
go run cmd/fuzz-test/main.go --host $(GRPC_HOST) --iterations 1000 --projects 5

reset-db: migrate
psql $(DB_URI) -c "truncate dl.objects; truncate dl.contents; truncate dl.projects; truncate dl.cache_versions;"
psql $(DB_URI) -c "truncate dl.objects; truncate dl.contents; truncate dl.projects; truncate dl.cache_versions; truncate dl.staged_objects"

setup-local: reset-db
psql $(DB_URI) -c "insert into dl.projects (id, latest_version, pack_patterns) values (1, 0, '{\"node_modules/.*/\"}');"
Expand All @@ -126,7 +127,6 @@ server-profile: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go
go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --log-level info

client-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-update: export DL_SKIP_SSL_VERIFICATION=1
client-update:
development/scripts/simple_input.sh 1
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/simple
Expand All @@ -136,7 +136,6 @@ client-update:
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/simple

client-large-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-large-update: export DL_SKIP_SSL_VERIFICATION=1
client-large-update:
development/scripts/complex_input.sh 1
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/complex
Expand All @@ -145,8 +144,18 @@ client-large-update:
development/scripts/complex_input.sh 3
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/complex

client-staged-update: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-staged-update:
development/scripts/simple_input.sh 1
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/simple
development/scripts/simple_input.sh 2
go run cmd/client/main.go update --host $(GRPC_HOST) --project 1 --dir input/simple --staged

client-commit: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-commit:
go run cmd/client/main.go commit --host $(GRPC_HOST) --project 1 --version 2

client-get: export DL_TOKEN=$(DEV_TOKEN_PROJECT_1)
client-get: export DL_SKIP_SSL_VERIFICATION=1
client-get:
ifndef to_version
go run cmd/client/main.go get --host $(GRPC_HOST) --project 1 --prefix "$(prefix)"
Expand All @@ -155,7 +164,6 @@ else
endif

client-rebuild: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-rebuild: export DL_SKIP_SSL_VERIFICATION=1
client-rebuild:
ifndef to_version
go run cmd/client/main.go rebuild --host $(GRPC_HOST) --project 1 --prefix "$(prefix)" --dir $(dir)
Expand All @@ -164,27 +172,22 @@ else
endif

client-rebuild-with-cache: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-rebuild-with-cache: export DL_SKIP_SSL_VERIFICATION=1
client-rebuild-with-cache:
go run cmd/client/main.go rebuild --host $(GRPC_HOST) --project 1 --prefix "$(prefix)" --dir $(dir) --cachedir input/cache

client-getcache: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-getcache: export DL_SKIP_SSL_VERIFICATION=1
client-getcache:
go run cmd/client/main.go getcache --host $(GRPC_HOST) --path input/cache

client-gc-contents: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-gc-contents: export DL_SKIP_SSL_VERIFICATION=1
client-gc-contents:
go run cmd/client/main.go gc --host $(GRPC_HOST) --mode contents --sample 25

client-gc-project: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-gc-project: export DL_SKIP_SSL_VERIFICATION=1
client-gc-project:
go run cmd/client/main.go gc --host $(GRPC_HOST) --mode project --project 1 --keep 1

client-gc-random-projects: export DL_TOKEN=$(DEV_TOKEN_ADMIN)
client-gc-random-projects: export DL_SKIP_SSL_VERIFICATION=1
client-gc-random-projects:
go run cmd/client/main.go gc --host $(GRPC_HOST) --mode random-projects --sample 25 --keep 1

Expand Down
2 changes: 1 addition & 1 deletion cmd/fuzz-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func runIteration(ctx context.Context, client *dlc.Client, project int64, operat
return -1, fmt.Errorf("failed to apply operation %s: %w", operation.String(), err)
}

version, _, err := client.Update(ctx, project, dirs.Base(project))
version, _, err := client.Update(ctx, project, dirs.Base(project), false)
if err != nil {
return -1, fmt.Errorf("failed to update project %d: %w", project, err)
}
Expand Down
145 changes: 113 additions & 32 deletions internal/db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@ func UpdateLatestVersion(ctx context.Context, tx pgx.Tx, project int64, version
return nil
}

func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string) error {
func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string, isStaged bool) error {
if isStaged {
_, err := tx.Exec(ctx, `
INSERT INTO dl.staged_objects (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, NULL, $2, $3, NULL, NULL, NULL, NULL)
`, project, version, path)
if err != nil {
return fmt.Errorf("delete staged object, project %v, version %v, path %v: %w", project, version, path, err)
}

return nil
}

_, err := tx.Exec(ctx, `
UPDATE dl.objects
SET stop_version = $1
Expand All @@ -39,7 +51,7 @@ func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64,
}

// UpdateObject returns true if content changed, false otherwise
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object) (bool, error) {
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object, isStaged bool) (bool, error) {
content := object.Content
if content == nil {
content = []byte("")
Expand All @@ -61,13 +73,18 @@ func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *Con
return false, fmt.Errorf("insert objects content, hash %x-%x: %w", hash.H1, hash.H2, err)
}

rows, err := tx.Query(ctx, `
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
objectTable := "dl.objects"
if isStaged {
objectTable = "dl.staged_objects"
}

rows, err := tx.Query(ctx, fmt.Sprintf(`
INSERT INTO %s (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
ON CONFLICT
DO NOTHING
RETURNING project
`, project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
`, objectTable), project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
if err != nil {
return false, fmt.Errorf("insert new object, project %v, version %v, path %v: %w", project, version, object.Path, err)
}
Expand All @@ -79,30 +96,31 @@ func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *Con
return false, nil
}

previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")

for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
}
if !isStaged {
previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")

_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)
for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
}

if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)
if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
}
}

return true, nil
}

// UpdatePackedObjects returns true if content changed, false otherwise
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object) (bool, error) {
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object, isStaged bool) (bool, error) {
var hash Hash
var content []byte

Expand Down Expand Up @@ -146,30 +164,43 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje

batch := &pgx.Batch{}

batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)
if isStaged {
batch.Queue(`
INSERT INTO dl.staged_objects (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, NULL, $2, $3, NULL, NULL, NULL, NULL)
`, project, version, parent)
} else {
batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)
}

if shouldInsert {
// insert the content outside the transaction to avoid deadlocks and to keep smaller transactions
_, err = conn.Exec(ctx, `
INSERT INTO dl.contents (hash, bytes)
VALUES (($1, $2), $3)
ON CONFLICT DO NOTHING
ON CONFLICT
DO NOTHING
`, newHash.H1, newHash.H2, updated)

if err != nil {
return false, fmt.Errorf("insert packed content, hash %x-%x: %w", newHash.H1, newHash.H2, err)
}

batch.Queue(`
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
objectTable := "dl.objects"
if isStaged {
objectTable = "dl.staged_objects"
}

batch.Queue(fmt.Sprintf(`
INSERT INTO %s (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
`, project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
`, objectTable), project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
}

results := tx.SendBatch(ctx, batch)
Expand All @@ -190,3 +221,53 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje
// content did change
return true, nil
}

func CommitStagedObjects(ctx context.Context, tx pgx.Tx, project int64, version int64) error {
batch := &pgx.Batch{}

batch.Queue(`
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
SELECT project, start_version, NULL, path, hash, mode, size, packed
FROM dl.staged_objects
WHERE project = $1
AND start_version = $2
AND stop_version IS NULL
`, project, version)

batch.Queue(`
UPDATE dl.objects SET stop_version = $2
FROM (
SELECT project, path
FROM dl.staged_objects
WHERE project = $1
AND stop_version = $2
AND start_version IS NULL
) staged
WHERE staged.project = dl.objects.project
AND staged.path = dl.objects.path
`, project, version)

batch.Queue(`
DELETE FROM dl.staged_objects
WHERE project = $1
AND (start_version = $2 OR stop_version = $2)
`, project, version)

results := tx.SendBatch(ctx, batch)
defer results.Close()

_, err := results.Exec()
if err != nil {
return fmt.Errorf("commit new objects, project %v, version %v: %w", project, version, err)
}
_, err = results.Exec()
if err != nil {
return fmt.Errorf("commit deleted objects, project %v, version %v: %w", project, version, err)
}
_, err = results.Exec()
if err != nil {
return fmt.Errorf("deleted staged objects, project %v, version %v: %w", project, version, err)
}

return nil
}
1 change: 1 addition & 0 deletions internal/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
Directory = StringKey("dl.directory")
Environment = StringKey("dl.environment")
FromVersion = Int64pKey("dl.from_version")
IsStaged = BoolKey("dl.is_staged")
KeepVersions = Int64Key("dl.keep_versions")
LatestVersion = Int64Key("dl.latest_version")
LiveObjectsCount = Int64Key("dl.live_objects_count")
Expand Down
Loading

0 comments on commit 6230388

Please sign in to comment.