Skip to content

Commit

Permalink
Prefer waitUndilDone over waitUntilFinish in test and examples (#5349)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Apr 26, 2024
1 parent 37e8b3b commit 8ff8bba
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class BigtableIT extends PipelineSpec {
sc
.parallelize(data.map(kv => toWriteMutation(kv._1, kv._2)))
.saveAsBigtable(projectId, instanceId, tableId)
}.waitUntilFinish()
}.waitUntilDone()

// Read rows back
// Filter rows in case there are other keys in the table
Expand All @@ -112,16 +112,17 @@ class BigtableIT extends PipelineSpec {
sc
.bigtable(projectId, instanceId, tableId, rowFilter = rowFilter)
.map(fromRow) should containInAnyOrder(data)
}.waitUntilFinish()
}.waitUntilDone()
} catch {
case e: Throwable => throw e
} finally {
// Delete rows afterwards
runWithRealContext() { sc =>
sc.parallelize(data.map(kv => toDeleteMutation(kv._1)))
.saveAsBigtable(projectId, instanceId, tableId)
}
}
} finally
{
// Delete rows afterwards
runWithRealContext() { sc =>
sc.parallelize(data.map(kv => toDeleteMutation(kv._1)))
.saveAsBigtable(projectId, instanceId, tableId)
}
}.waitUntilFinish()
}

it should "work in bulk mode" in {
Expand All @@ -140,7 +141,7 @@ class BigtableIT extends PipelineSpec {
sc
.parallelize(data.map(kv => toWriteMutation(kv._1, kv._2)))
.saveAsBigtable(options, tableId, 1)
}.waitUntilFinish()
}.waitUntilDone()

// Read rows back
// Filter rows in case there are other keys in the table
Expand All @@ -152,7 +153,7 @@ class BigtableIT extends PipelineSpec {
sc
.bigtable(projectId, instanceId, tableId, rowFilter = rowFilter)
.map(fromRow) should containInAnyOrder(data)
}.waitUntilFinish()
}.waitUntilDone()
} catch {
case e: Throwable => throw e
} finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object DebuggingWordCount {
.containsInAnyOrder(List(("Flourish", 3L), ("stomach", 1L)).asJava)

// Execute the pipeline and block until it finishes
val result = sc.run().waitUntilFinish()
val result = sc.run().waitUntilDone()

// Retrieve metric values
require(result.counter(matchedWords).committed.get == 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object WordCount {
.saveAsTextFile(output)

// Execute the pipeline and block until it finishes
val result = sc.run().waitUntilFinish()
val result = sc.run().waitUntilDone()

// Retrieve metric values
logger.info("Max: " + result.distribution(lineDist).committed.map(_.getMax))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object ElasticsearchMinimalExample {
.saveAsElasticsearch(clusterOpts)(indexRequestBuilder)

// Run pipeline
sc.run().waitUntilFinish()
sc.run().waitUntilDone()
}

private def indexer(index: String): ((String, Long)) => Iterable[BulkOperation] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object MetricsExample {
sum2.inc(i)
}

val result = sc.run().waitUntilFinish()
val result = sc.run().waitUntilDone()

// # Retrieving metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ object Neo4JExample {
|""".stripMargin
)
// Run pipeline
sc.run().waitUntilFinish()
sc.run().waitUntilDone()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object TapOutputExample {
// Re-open taps in new `ScioContext`
(t1.open(sc2) ++ t2.open(sc2).map(_.toInt)).sum
// Execute the pipeline and block until it completes
val result = sc2.run().waitUntilFinish()
val result = sc2.run().waitUntilDone()

println(result.state)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object WordCountScioIO {
.write(outputTextIO)(TextIO.DefaultWriteParam)

// Execute the pipeline and block until it finishes
val result = sc.run().waitUntilFinish()
val result = sc.run().waitUntilDone()

// Retrieve metric values
logger.info("Max: " + result.distribution(lineDist).committed.map(_.getMax))
Expand Down

0 comments on commit 8ff8bba

Please sign in to comment.