Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh 5121 fedx bind left join #5122

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
Expand All @@ -39,6 +37,7 @@
import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath;
import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator;
import org.eclipse.rdf4j.federated.algebra.FilterExpr;
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
import org.eclipse.rdf4j.federated.algebra.HolderNode;
import org.eclipse.rdf4j.federated.algebra.NJoin;
Expand All @@ -53,12 +52,13 @@
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor;
import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousBoundJoin;
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousJoin;
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
Expand All @@ -68,6 +68,7 @@
import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel;
Expand Down Expand Up @@ -97,6 +98,7 @@
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.DescribeOperator;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.LeftJoin;
import org.eclipse.rdf4j.query.algebra.QueryRoot;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
Expand All @@ -108,12 +110,10 @@
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.BadlyDesignedLeftJoinIterator;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.DescribeIteration;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration;
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ConstantOptimizer;
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.DisjunctiveConstraintOptimizer;
Expand Down Expand Up @@ -748,10 +748,7 @@ public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {

if (problemVars.containsAll(bindings.getBindingNames())) {
var leftIter = leftPrepared.evaluate(bindings);
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(scheduler, FederationEvalStrategy.this,
leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
executor.execute(join);
return join;
return executeLeftJoin(scheduler, leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
} else {
Set<String> problemVarsClone = new HashSet<>(problemVars);
problemVarsClone.retainAll(bindings.getBindingNames());
Expand Down Expand Up @@ -815,8 +812,14 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext
/**
* Execute the join in a separate thread using some join executor.
*
* Join executors are for instance: - {@link SynchronousJoin} - {@link SynchronousBoundJoin} -
* {@link ControlledWorkerJoin} - {@link ControlledWorkerBoundJoin}
* Join executors are for instance:
*
* <ul>
* <li>{@link SynchronousJoin}</li>
* <li>{@link SynchronousBoundJoin}</li>
* <li>{@link ControlledWorkerJoin}</li>
* <li>{@link ControlledWorkerBoundJoin}</li>
* </ul>
*
* For endpoint federation use controlled worker bound join, for local federation use controlled worker join. The
* other operators are there for completeness.
Expand All @@ -836,6 +839,21 @@ protected abstract CloseableIteration<BindingSet> executeJoin(
CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
Set<String> joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;

/**
* Execute the left join in a separate thread using some join executor.
*
* @param joinScheduler
* @param leftIter
* @param leftJoin
* @param bindings
* @return the result
* @throws QueryEvaluationException
*/
protected abstract CloseableIteration<BindingSet> executeLeftJoin(
ControlledWorkerScheduler<BindingSet> joinScheduler,
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin,
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;

public abstract CloseableIteration<BindingSet> evaluateExclusiveGroup(
ExclusiveGroup group, BindingSet bindings)
throws RepositoryException, MalformedQueryException, QueryEvaluationException;
Expand Down Expand Up @@ -920,6 +938,59 @@ public abstract CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern
public abstract CloseableIteration<BindingSet> evaluateGroupedCheck(
CheckStatementPattern stmt, final List<BindingSet> bindings) throws QueryEvaluationException;

/**
* Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints.
*
* @param stmt
* @param bindings
* @return the result iteration
* @throws QueryEvaluationException
* @see {@link BindLeftJoinIteration}
*/
public CloseableIteration<BindingSet> evaluateLeftBoundJoinStatementPattern(
StatementTupleExpr stmt, final List<BindingSet> bindings) throws QueryEvaluationException {
// we can omit the bound join handling
if (bindings.size() == 1) {
return evaluate(stmt, bindings.get(0));
}

FilterValueExpr filterExpr = null;
if (stmt instanceof FilterTuple) {
filterExpr = ((FilterTuple) stmt).getFilterExpr();
}

AtomicBoolean isEvaluated = new AtomicBoolean(false);
String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings,
filterExpr, isEvaluated, stmt.getQueryInfo().getDataset());

CloseableIteration<BindingSet> result = null;
try {
result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo());

// apply filter and/or convert to original bindings
if (filterExpr != null && !isEvaluated.get()) {
result = new BindLeftJoinIteration(result, bindings); // apply conversion
result = new FilteringIteration(filterExpr, result, this); // apply filter
if (!result.hasNext()) {
result.close();
return new EmptyIteration<>();
}
} else {
result = new BindLeftJoinIteration(result, bindings);
}

return result;
} catch (Throwable t) {
if (result != null) {
result.close();
}
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw ExceptionUtil.toQueryEvaluationException(t);
}
}

/**
* Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.federated.util.QueryAlgebraUtil;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.LeftJoin;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.repository.RepositoryException;
Expand Down Expand Up @@ -119,6 +121,16 @@ public CloseableIteration<BindingSet> executeJoin(
return join;
}

@Override
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
throws QueryEvaluationException {
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
leftIter, leftJoin, bindings, queryInfo);
executor.execute(join);
return join;
}

@Override
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
ExclusiveGroup group, BindingSet bindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr;
import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern;
import org.eclipse.rdf4j.federated.algebra.ExclusiveGroup;
import org.eclipse.rdf4j.federated.algebra.FedXService;
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
Expand All @@ -29,14 +31,19 @@
import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.federated.util.QueryStringUtil;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.LeftJoin;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.repository.RepositoryException;
Expand Down Expand Up @@ -173,13 +180,60 @@ public CloseableIteration<BindingSet> executeJoin(
TupleExpr rightArg, Set<String> joinVars, BindingSet bindings, QueryInfo queryInfo)
throws QueryEvaluationException {

ControlledWorkerBoundJoin join = new ControlledWorkerBoundJoin(joinScheduler, this, leftIter, rightArg,
bindings, queryInfo);
// determine if we can execute the expr as bind join
boolean executeAsBindJoin = false;
if (rightArg instanceof BoundJoinTupleExpr) {
if (rightArg instanceof FedXService) {
executeAsBindJoin = queryInfo.getFederationContext().getConfig().getEnableServiceAsBoundJoin();
} else {
executeAsBindJoin = true;
}
}

JoinExecutorBase<BindingSet> join;
if (executeAsBindJoin) {
join = new ControlledWorkerBoundJoin(joinScheduler, this, leftIter, rightArg,
bindings, queryInfo);
} else {
join = new ControlledWorkerJoin(joinScheduler, this, leftIter, rightArg, bindings,
queryInfo);
}

join.setJoinVars(joinVars);
executor.execute(join);
return join;
}

@Override
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
throws QueryEvaluationException {

var rightArg = leftJoin.getRightArg();

// determine if we can execute the expr as bind join
boolean executeAsBindJoin = false;
if (rightArg instanceof BoundJoinTupleExpr) {
if (rightArg instanceof FedXService) {
executeAsBindJoin = false;
} else {
executeAsBindJoin = true;
}
}

JoinExecutorBase<BindingSet> join;
if (executeAsBindJoin) {
join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg,
bindings, queryInfo);
} else {
join = new ControlledWorkerLeftJoin(joinScheduler, this,
leftIter, leftJoin, bindings, queryInfo);
}

executor.execute(join);
return join;
}

@Override
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
ExclusiveGroup group, BindingSet bindings) throws RepositoryException,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.iterator;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;

/**
* A {@link LookAheadIteration} for processing bind left join results.
*
* Algorithm:
*
* <ul>
* <li>execute left bind join using regular bound join query</li>
* <li>process result iteration similar to {@link BoundJoinVALUESConversionIteration}</li>
* <li>remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all
* non-seen bindings directly from the input</li>
*
*
* @author Andreas Schwarte
*/
Comment on lines +26 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add that this is used for OPTIONAL clauses. Is it used for anything else?

public class BindLeftJoinIteration extends LookAheadIteration<BindingSet> {

protected final CloseableIteration<BindingSet> iter;
protected final List<BindingSet> bindings;

protected Set<Integer> seenBindingIndexes = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the IntHashSet be appropriate here?

protected final ListIterator<BindingSet> bindingsIterator;

public BindLeftJoinIteration(CloseableIteration<BindingSet> iter,
List<BindingSet> bindings) {
this.iter = iter;
this.bindings = bindings;
this.bindingsIterator = bindings.listIterator();
}

@Override
protected BindingSet getNextElement() {

if (iter.hasNext()) {
var bIn = iter.next();
int bIndex = Integer.parseInt(
bIn.getBinding(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME).getValue().stringValue());
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the binding Value object ever a literal? So it would be possible to check if it's instanceof IntegerLiteral or something like that?

seenBindingIndexes.add(bIndex);
return convert(bIn, bIndex);
}

while (bindingsIterator.hasNext()) {
if (seenBindingIndexes.contains(bindingsIterator.nextIndex())) {
// the binding was already processed as part of the optional
bindingsIterator.next();
continue;
}
return bindingsIterator.next();
}

return null;
}

@Override
protected void handleClose() {
iter.close();
}

protected BindingSet convert(BindingSet bIn, int bIndex) throws QueryEvaluationException {
QueryBindingSet res = new QueryBindingSet();
Iterator<Binding> bIter = bIn.iterator();
while (bIter.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a for-each look?

for (Binding bs : bIn){
  ...
}

Binding b = bIter.next();
if (b.getName().equals(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)) {
continue;
}
res.addBinding(b);
}
for (Binding bs : bindings.get(bIndex)) {
res.setBinding(bs);
}
return res;
}

}
Loading
Loading