names = tables.tableNames;
if (names.isEmpty()) {
return simpleConnection.prepareStatement(sql);
}
diff --git a/core/src/main/java/com/qihoo/qsql/api/DynamicSqlRunner.java b/core/src/main/java/com/qihoo/qsql/api/DynamicSqlRunner.java
index edf739a5..dbf3d6af 100644
--- a/core/src/main/java/com/qihoo/qsql/api/DynamicSqlRunner.java
+++ b/core/src/main/java/com/qihoo/qsql/api/DynamicSqlRunner.java
@@ -1,8 +1,10 @@
package com.qihoo.qsql.api;
+import com.qihoo.qsql.api.SqlRunner.Builder.RunnerType;
import com.qihoo.qsql.exception.QsqlException;
import com.qihoo.qsql.metadata.MetadataPostman;
import com.qihoo.qsql.plan.QueryProcedureProducer;
+import com.qihoo.qsql.plan.QueryTables;
import com.qihoo.qsql.plan.proc.DirectQueryProcedure;
import com.qihoo.qsql.plan.proc.ExtractProcedure;
import com.qihoo.qsql.plan.proc.PreparedExtractProcedure;
@@ -66,7 +68,12 @@ private QueryProcedure createQueryPlan(String sql) {
@Override
public AbstractPipeline sql(String sql) {
LOGGER.info("The SQL that is ready to execute is: \n" + sql);
- tableNames = SqlUtil.parseTableName(sql);
+ QueryTables tables = SqlUtil.parseTableName(sql);
+ tableNames = tables.tableNames;
+
+ if (tables.isDml()) {
+ environment.setTransformRunner(RunnerType.SPARK);
+ }
LOGGER.debug("Parsed table names for upper SQL are: {}", tableNames);
QueryProcedure procedure = createQueryPlan(sql);
@@ -136,4 +143,7 @@ private AbstractPipeline getOrCreateClusterPipeline(QueryProcedure procedure) {
return this.pipeline;
}
+ //TODO extract all of SqlParser for parsing by one config
+ //TODO test set identifier escape in dialect
+ //TODO adjust code architecture, make jdbc and runner perform in the same way.(always translate to a new lang)
}
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/ClassBodyComposer.java b/core/src/main/java/com/qihoo/qsql/codegen/ClassBodyComposer.java
index b8f02ac8..5970bb2e 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/ClassBodyComposer.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/ClassBodyComposer.java
@@ -117,6 +117,7 @@ class ClassesLink extends BlockLink {
super(link);
}
+ //TODO change to generate construction dynamically
@Override
protected void decorateTrait(Class clazz, String... code) {
if (isMyResponsibility(clazz)) {
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/IntegratedQueryWrapper.java b/core/src/main/java/com/qihoo/qsql/codegen/IntegratedQueryWrapper.java
index b1603987..c5f7e61b 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/IntegratedQueryWrapper.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/IntegratedQueryWrapper.java
@@ -2,8 +2,6 @@
import com.qihoo.qsql.plan.proc.QueryProcedure;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Provide several method, which can generate execution code in intermediate engine layer.
*
@@ -15,9 +13,6 @@
public abstract class IntegratedQueryWrapper extends ClassBodyWrapper {
- protected static final String VARIABLE_PREFIX = "$";
- protected AtomicInteger varId = new AtomicInteger(0);
-
public abstract IntegratedQueryWrapper run(QueryProcedure plan);
public abstract void interpretProcedure(QueryProcedure plan);
@@ -32,7 +27,4 @@ public abstract class IntegratedQueryWrapper extends ClassBodyWrapper {
public abstract void createTempTable(String tableName);
- protected String latestDeclaredVariable() {
- return VARIABLE_PREFIX + varId.get();
- }
}
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/QueryGenerator.java b/core/src/main/java/com/qihoo/qsql/codegen/QueryGenerator.java
index 1bc73281..02e2d6d2 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/QueryGenerator.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/QueryGenerator.java
@@ -8,7 +8,7 @@
import com.qihoo.qsql.codegen.spark.SparkCsvGenerator;
import com.qihoo.qsql.codegen.spark.SparkElasticsearchGenerator;
import com.qihoo.qsql.codegen.spark.SparkHiveGenerator;
-import com.qihoo.qsql.codegen.spark.SparkMySqlGenerator;
+import com.qihoo.qsql.codegen.spark.SparkJdbcGenerator;
import com.qihoo.qsql.codegen.spark.SparkVirtualGenerator;
import com.qihoo.qsql.plan.proc.ExtractProcedure;
import com.qihoo.qsql.plan.proc.PreparedExtractProcedure;
@@ -26,7 +26,7 @@ public abstract class QueryGenerator {
private static QueryGenerator elasticSearch = null;
private static QueryGenerator hive = null;
- private static QueryGenerator mysql = null;
+ private static QueryGenerator jdbc = null;
private static QueryGenerator virtual = null;
private static QueryGenerator csv = null;
@@ -53,8 +53,9 @@ public static QueryGenerator getQueryGenerator(ExtractProcedure procedure,
return createHiveQueryGenerator(procedure, composer, isSpark);
} else if (procedure instanceof PreparedExtractProcedure.ElasticsearchExtractor) {
return createElasticsearchQueryGenerator(procedure, composer, isSpark);
- } else if (procedure instanceof PreparedExtractProcedure.MySqlExtractor) {
- return createMySqlQueryGenerator(procedure, composer, isSpark);
+ } else if (procedure instanceof PreparedExtractProcedure.MySqlExtractor
+ || procedure instanceof PreparedExtractProcedure.OracleExtractor) {
+ return createJdbcQueryGenerator(procedure, composer, isSpark);
} else if (procedure instanceof PreparedExtractProcedure.VirtualExtractor) {
return createVirtualQueryGenerator(procedure, composer, isSpark);
} else if (procedure instanceof PreparedExtractProcedure.CsvExtractor) {
@@ -100,21 +101,21 @@ private static QueryGenerator createElasticsearchQueryGenerator(ExtractProcedure
return elasticSearch;
}
- private static QueryGenerator createMySqlQueryGenerator(ExtractProcedure procedure,
+ private static QueryGenerator createJdbcQueryGenerator(ExtractProcedure procedure,
ClassBodyComposer composer,
boolean isSpark) {
- if (mysql == null) {
+ if (jdbc == null) {
if (isSpark) {
- mysql = new SparkMySqlGenerator();
+ jdbc = new SparkJdbcGenerator();
} else {
- mysql = new FlinkMySqlGenerator();
+ jdbc = new FlinkMySqlGenerator();
}
- setSpecificState(mysql, procedure, composer);
- mysql.prepare();
+ setSpecificState(jdbc, procedure, composer);
+ jdbc.prepare();
} else {
- setSpecificState(mysql, procedure, composer);
+ setSpecificState(jdbc, procedure, composer);
}
- return mysql;
+ return jdbc;
}
private static QueryGenerator createVirtualQueryGenerator(ExtractProcedure procedure,
@@ -166,7 +167,7 @@ private static void setSpecificState(QueryGenerator generator,
public static void close() {
elasticSearch = null;
hive = null;
- mysql = null;
+ jdbc = null;
virtual = null;
csv = null;
}
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkBodyWrapper.java b/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkBodyWrapper.java
index ed854218..60367f90 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkBodyWrapper.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkBodyWrapper.java
@@ -4,7 +4,6 @@
import com.qihoo.qsql.codegen.IntegratedQueryWrapper;
import com.qihoo.qsql.plan.proc.LoadProcedure;
import com.qihoo.qsql.plan.proc.QueryProcedure;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* As a child of {@link IntegratedQueryWrapper}, {@link FlinkBodyWrapper} implement mixed operations code generation for
@@ -19,7 +18,7 @@ public IntegratedQueryWrapper run(QueryProcedure plan) {
@Override
public void interpretProcedure(QueryProcedure plan) {
- plan.accept(new SimpleFlinkProcVisitor(varId, composer));
+ plan.accept(new SimpleFlinkProcVisitor(composer));
}
@Override
@@ -40,8 +39,7 @@ public void importSpecificDependency() {
@Override
public IntegratedQueryWrapper show() {
- composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE,
- latestDeclaredVariable() + ".print();\n");
+ composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, "tmp.print();\n");
return this;
}
@@ -62,9 +60,8 @@ public void createTempTable(String tableName) {
private class SimpleFlinkProcVisitor extends FlinkProcedureVisitor {
- SimpleFlinkProcVisitor(AtomicInteger varId,
- ClassBodyComposer composer) {
- super(varId, composer);
+ SimpleFlinkProcVisitor(ClassBodyComposer composer) {
+ super(composer);
}
@Override
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkProcedureVisitor.java b/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkProcedureVisitor.java
index 5c683f5d..ebcd0943 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkProcedureVisitor.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/flink/FlinkProcedureVisitor.java
@@ -1,6 +1,7 @@
package com.qihoo.qsql.codegen.flink;
import com.qihoo.qsql.codegen.ClassBodyComposer;
+import com.qihoo.qsql.codegen.ClassBodyComposer.CodeCategory;
import com.qihoo.qsql.codegen.QueryGenerator;
import com.qihoo.qsql.plan.ProcedureVisitor;
import com.qihoo.qsql.plan.proc.DirectQueryProcedure;
@@ -8,7 +9,6 @@
import com.qihoo.qsql.plan.proc.LoadProcedure;
import com.qihoo.qsql.plan.proc.QueryProcedure;
import com.qihoo.qsql.plan.proc.TransformProcedure;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* For traversing procedures to generate.
@@ -16,37 +16,35 @@
public class FlinkProcedureVisitor extends ProcedureVisitor {
private ClassBodyComposer composer;
- private AtomicInteger varId;
- private String variable;
- public FlinkProcedureVisitor(AtomicInteger varId, ClassBodyComposer composer) {
+ public FlinkProcedureVisitor(ClassBodyComposer composer) {
this.composer = composer;
- this.varId = varId;
}
@Override
public void visit(ExtractProcedure extractProcedure) {
- createVariableName();
+ composer.handleComposition(CodeCategory.SENTENCE, "{");
QueryGenerator builder = QueryGenerator.getQueryGenerator(
extractProcedure, composer, false);
builder.execute();
builder.saveToTempTable();
+ composer.handleComposition(CodeCategory.SENTENCE, "}");
visitNext(extractProcedure);
}
+ //TODO Care for `tmp` is not declared.
@Override
public void visit(TransformProcedure transformProcedure) {
composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE,
"Table table = tEnv.sqlQuery(\"" + transformProcedure.sql() + "\");");
composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE,
- "DataSet " + variable + " = tEnv.toDataSet(table, Row.class);");
+ "tmp = tEnv.toDataSet(table, Row.class);");
visitNext(transformProcedure);
}
@Override
public void visit(LoadProcedure loadProcedure) {
- composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE,
- variable + ".print();\n");
+ composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, "tmp.print();\n");
visitNext(loadProcedure);
}
@@ -59,8 +57,4 @@ public void visit(QueryProcedure queryProcedure) {
public void visit(DirectQueryProcedure queryProcedure) {
visitNext(queryProcedure);
}
-
- protected void createVariableName() {
- this.variable = "$" + (varId.incrementAndGet());
- }
}
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkBodyWrapper.java b/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkBodyWrapper.java
index eeed647e..a81543df 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkBodyWrapper.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkBodyWrapper.java
@@ -2,9 +2,7 @@
import com.qihoo.qsql.codegen.ClassBodyComposer;
import com.qihoo.qsql.codegen.IntegratedQueryWrapper;
-import com.qihoo.qsql.plan.proc.LoadProcedure;
import com.qihoo.qsql.plan.proc.QueryProcedure;
-import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -15,13 +13,13 @@ public class SparkBodyWrapper extends IntegratedQueryWrapper {
@Override
public IntegratedQueryWrapper run(QueryProcedure plan) {
- plan.accept(new SparkProcedureVisitor(varId, composer));
+ plan.accept(new SparkProcedureVisitor(composer));
return this;
}
@Override
public void interpretProcedure(QueryProcedure plan) {
- plan.accept(new SimpleSparkProcVisitor(varId, composer));
+ plan.accept(new SimpleSparkProcVisitor(composer));
}
@Override
@@ -63,13 +61,8 @@ public void createTempTable(String tableName) {
private class SimpleSparkProcVisitor extends SparkProcedureVisitor {
- SimpleSparkProcVisitor(AtomicInteger varId,
- ClassBodyComposer composer) {
- super(varId, composer);
- }
-
- @Override
- public void visit(LoadProcedure procedure) {
+ SimpleSparkProcVisitor(ClassBodyComposer composer) {
+ super(composer);
}
}
}
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkMySqlGenerator.java b/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkJdbcGenerator.java
similarity index 79%
rename from core/src/main/java/com/qihoo/qsql/codegen/spark/SparkMySqlGenerator.java
rename to core/src/main/java/com/qihoo/qsql/codegen/spark/SparkJdbcGenerator.java
index bc8b97ab..03276355 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkMySqlGenerator.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkJdbcGenerator.java
@@ -1,53 +1,55 @@
-package com.qihoo.qsql.codegen.spark;
-
-import com.qihoo.qsql.codegen.ClassBodyComposer;
-import com.qihoo.qsql.codegen.QueryGenerator;
-import java.util.Properties;
-
-/**
- * Code generator, used when {@link com.qihoo.qsql.exec.spark.SparkPipeline} is chosen and source
- * data of query is in MySql at the same time.
- */
-public class SparkMySqlGenerator extends QueryGenerator {
-
- @Override
- public void importDependency() {
- String[] imports = {
- "import org.apache.spark.sql.Dataset",
- "import org.apache.spark.sql.Row",
- "import java.util.Properties",
- "import com.qihoo.qsql.codegen.spark.SparkMySqlGenerator"
- };
- composer.handleComposition(ClassBodyComposer.CodeCategory.IMPORT, imports);
- }
-
- @Override
- public void prepareQuery() {}
-
- //remember to remove temporary files after computing in hdfs or local machine
- @Override
- public void executeQuery() {
- //TODO change to generate invoking from reflection
- Invoker config = Invoker.registerMethod("SparkMySqlGenerator.config");
- String invokeWrap = config.invoke(convertProperties("jdbcUser", "jdbcPassword"));
- String invoked = String.format("tmp = spark.read().jdbc(\"%s\", \"%s\", %s);",
- properties.getOrDefault("jdbcUrl", ""), tableName, invokeWrap);
- composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, invoked);
- }
-
- @Override
- public void saveToTempTable() {
- String created = "tmp.createOrReplaceTempView(\"" + tableName + "\");";
- composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, created);
- }
-
- /**
- * .
- */
- public static Properties config(String user, String password) {
- Properties properties = new Properties();
- properties.put("user", user);
- properties.put("password", password);
- return properties;
- }
-}
+package com.qihoo.qsql.codegen.spark;
+
+import com.qihoo.qsql.codegen.ClassBodyComposer;
+import com.qihoo.qsql.codegen.QueryGenerator;
+import java.util.Properties;
+
+/**
+ * Code generator, used when {@link com.qihoo.qsql.exec.spark.SparkPipeline} is chosen and source
+ * data of query is in MySql at the same time.
+ */
+public class SparkJdbcGenerator extends QueryGenerator {
+
+ @Override
+ public void importDependency() {
+ String[] imports = {
+ "import org.apache.spark.sql.Dataset",
+ "import org.apache.spark.sql.Row",
+ "import java.util.Properties",
+ "import com.qihoo.qsql.codegen.spark.SparkJdbcGenerator"
+ };
+ composer.handleComposition(ClassBodyComposer.CodeCategory.IMPORT, imports);
+ }
+
+ @Override
+ public void prepareQuery() {}
+
+ //remember to remove temporary files after computing in hdfs or local machine
+ @Override
+ public void executeQuery() {
+ //TODO change to generate invoking from reflection
+ Invoker config = Invoker.registerMethod("SparkJdbcGenerator.config");
+ String invokeWrap = config.invoke(convertProperties("jdbcUser", "jdbcPassword", "jdbcDriver"));
+ String invoked = String.format("tmp = spark.read().jdbc(\"%s\", \"%s\", %s);",
+ properties.getOrDefault("jdbcUrl", ""),
+ "(" + query + ") " + tableName, invokeWrap);
+ composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, invoked);
+ }
+
+ @Override
+ public void saveToTempTable() {
+ String created = "tmp.createOrReplaceTempView(\"" + tableName + "\");";
+ composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, created);
+ }
+
+ /**
+ * .
+ */
+ public static Properties config(String user, String password, String driver) {
+ Properties properties = new Properties();
+ properties.put("user", user);
+ properties.put("password", password);
+ properties.put("driver", driver);
+ return properties;
+ }
+}
diff --git a/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkProcedureVisitor.java b/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkProcedureVisitor.java
index 1650c865..9d4bf0d0 100644
--- a/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkProcedureVisitor.java
+++ b/core/src/main/java/com/qihoo/qsql/codegen/spark/SparkProcedureVisitor.java
@@ -3,15 +3,14 @@
import com.qihoo.qsql.codegen.ClassBodyComposer;
import com.qihoo.qsql.codegen.ClassBodyComposer.CodeCategory;
import com.qihoo.qsql.codegen.QueryGenerator;
+import com.qihoo.qsql.plan.ProcedureVisitor;
import com.qihoo.qsql.plan.proc.DirectQueryProcedure;
+import com.qihoo.qsql.plan.proc.DiskLoadProcedure;
import com.qihoo.qsql.plan.proc.ExtractProcedure;
import com.qihoo.qsql.plan.proc.LoadProcedure;
import com.qihoo.qsql.plan.proc.MemoryLoadProcedure;
import com.qihoo.qsql.plan.proc.QueryProcedure;
import com.qihoo.qsql.plan.proc.TransformProcedure;
-import com.qihoo.qsql.plan.ProcedureVisitor;
-
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Provide several visit methods to traversing the whole {@link QueryProcedure} which will be execute on Spark.
@@ -20,7 +19,7 @@ public class SparkProcedureVisitor extends ProcedureVisitor {
private ClassBodyComposer composer;
- public SparkProcedureVisitor(AtomicInteger varId, ClassBodyComposer composer) {
+ public SparkProcedureVisitor(ClassBodyComposer composer) {
this.composer = composer;
}
@@ -46,6 +45,12 @@ public void visit(TransformProcedure transformProcedure) {
public void visit(LoadProcedure loadProcedure) {
if (loadProcedure instanceof MemoryLoadProcedure) {
composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE, "tmp.show();\n");
+ } else if (loadProcedure instanceof DiskLoadProcedure) {
+ composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE,
+ String.format("tmp.write().format(\"com.databricks.spark.csv\")"
+ + ".save(\"%s\");\n", ((DiskLoadProcedure) loadProcedure).path));
+ // composer.handleComposition(ClassBodyComposer.CodeCategory.SENTENCE,
+ // String.format("tmp.write().text(\"%s\");\n", ((DiskLoadProcedure) loadProcedure).path));
}
visitNext(loadProcedure);
}
diff --git a/core/src/main/java/com/qihoo/qsql/exec/JdbcPipeline.java b/core/src/main/java/com/qihoo/qsql/exec/JdbcPipeline.java
index 6fb45988..a9830a45 100644
--- a/core/src/main/java/com/qihoo/qsql/exec/JdbcPipeline.java
+++ b/core/src/main/java/com/qihoo/qsql/exec/JdbcPipeline.java
@@ -88,9 +88,9 @@ public static Connection createSpecificConnection(String json, List pars
try {
Map properties = parseJsonSchema(parsedTables, json);
switch (properties.get("type")) {
- case "mysql":
- LOGGER.debug("Connecting to MySQL server....");
- return createMySqlConnection(properties);
+ case "jdbc":
+ LOGGER.debug("Connecting to JDBC server....");
+ return createJdbcConnection(properties);
case "elasticsearch":
LOGGER.debug("Connection to Elasticsearch server....");
return createElasticsearchConnection(json);
@@ -133,8 +133,8 @@ public static Connection createSpecificConnection(List assemble
return createElasticsearchConnection(
"inline: " + MetadataPostman.assembleSchema(assemblers));
case JDBC:
- LOGGER.debug("Connecting to MySQL server....");
- return createMySqlConnection(conn);
+ LOGGER.debug("Connecting to JDBC server....");
+ return createJdbcConnection(conn);
default:
throw new RuntimeException("Unsupported jdbc type");
}
@@ -154,13 +154,19 @@ private static Connection createElasticsearchConnection(String json) throws SQLE
return connection;
}
- private static Connection createMySqlConnection(Map conn)
+ private static Connection createJdbcConnection(Map conn)
throws ClassNotFoundException, SQLException {
- Class.forName("com.mysql.jdbc.Driver");
- String ip = conn.getOrDefault("jdbcNode", "");
- String port = conn.getOrDefault("jdbcPort", "");
- String db = conn.getOrDefault("dbName", "");
- String url = conn.getOrDefault("jdbcUrl", "jdbc:mysql://" + ip + ":" + port + "/" + db);
+ if (! conn.containsKey("jdbcDriver")) {
+ throw new RuntimeException("The `jdbcDriver` property needed to be set.");
+ }
+ Class.forName(conn.get("jdbcDriver"));
+ // String ip = conn.getOrDefault("jdbcNode", "");
+ // String port = conn.getOrDefault("jdbcPort", "");
+ // String db = conn.getOrDefault("dbName", "");
+ if (! conn.containsKey("jdbcUrl")) {
+ throw new RuntimeException("The `jdbcUrl` property needed to be set.");
+ }
+ String url = conn.get("jdbcUrl");
String user = conn.getOrDefault("jdbcUser", "");
String password = conn.getOrDefault("jdbcPassword", "");
Connection connection = DriverManager.getConnection(url, user, password);
@@ -203,9 +209,14 @@ private static Map parseJsonSchema(List names, String ur
* @throws SQLException sql exception
*/
public static Connection createCsvConnection(String json) throws SQLException {
- Properties info = new Properties();
- info.put("model", json);
- Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
+ ConnectionFactory connectionFactory = new MapConnectionFactory(
+ ImmutableMap.of("unquotedCasing", "unchanged", "caseSensitive", "true"),
+ ImmutableList.of()
+ ).with("model", json);
+
+ // Properties info = new Properties();
+ // info.put("model", json);
+ Connection connection = connectionFactory.createConnection();
LOGGER.debug("Connect with embedded calcite server successfully!");
return connection;
}
@@ -219,7 +230,8 @@ public void run() {
QueryProcedure next = procedure.next();
ResultSet resultSet = establishStatement();
- if (next instanceof DiskLoadProcedure) {
+ //TODO add jdbc sql translate
+ if (next.hasNext() && next.next() instanceof DiskLoadProcedure) {
String path = ((DiskLoadProcedure) next).path;
String deliminator;
if (((DiskLoadProcedure) next).getDataFormat() == LoadProcedure.DataFormat.DEFAULT) {
@@ -358,7 +370,7 @@ private Connection getConnection() {
}
enum JdbcType {
- ELASTICSEARCH, MYSQL, CSV
+ ELASTICSEARCH, JDBC, CSV
}
public interface ConnectionPostProcessor {
@@ -428,7 +440,7 @@ public static class JsonVisitor {
private JdbcType type = null;
JsonVisitor(List names) {
- this.names = names.stream().collect(Collectors.toList());
+ this.names = new ArrayList<>(names);
}
Map getConnectionInfo() {
@@ -440,7 +452,7 @@ Map getConnectionInfo() {
case CSV:
connectionInfo.put("type", "csv");
break;
- case MYSQL:
+ case JDBC:
connectionInfo =
jdbcProps.stream().reduce((left, right) -> {
String leftUrl = left.getOrDefault("jdbcUrl", "");
@@ -456,7 +468,7 @@ Map getConnectionInfo() {
"Not find any schema info for given table names in "
+ "sql"));
- connectionInfo.put("type", "mysql");
+ connectionInfo.put("type", "jdbc");
break;
default:
throw new RuntimeException("Do not support this engine type: " + type);
@@ -473,10 +485,7 @@ void visit(JsonRoot jsonRoot) {
}
boolean visit(JsonSchema schema) {
- if (schema instanceof JsonCustomSchema) {
- return visit((JsonCustomSchema) schema);
- }
- return false;
+ return schema instanceof JsonCustomSchema && visit((JsonCustomSchema) schema);
}
boolean visit(JsonCustomSchema schema) {
@@ -499,8 +508,8 @@ boolean visit(JsonCustomSchema schema) {
}
if (jdbcProps.size() == names.size()) {
- if (schema.factory.toLowerCase().contains("mysql")) {
- type = JdbcType.MYSQL;
+ if (schema.factory.toLowerCase().contains("jdbc")) {
+ type = JdbcType.JDBC;
} else if (schema.factory.toLowerCase().contains("elasticsearch")) {
type = JdbcType.ELASTICSEARCH;
} else if (schema.factory.toLowerCase().contains("csv")) {
diff --git a/core/src/main/java/com/qihoo/qsql/exec/result/JdbcPipelineResult.java b/core/src/main/java/com/qihoo/qsql/exec/result/JdbcPipelineResult.java
index 64ad69d1..b25194dd 100644
--- a/core/src/main/java/com/qihoo/qsql/exec/result/JdbcPipelineResult.java
+++ b/core/src/main/java/com/qihoo/qsql/exec/result/JdbcPipelineResult.java
@@ -1,9 +1,16 @@
package com.qihoo.qsql.exec.result;
import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
/**
* Iterator of reading data from {@link JdbcPipelineResult}, which is the result of {@link
@@ -41,12 +48,126 @@ public ShowPipelineResult(CloseableIterator