Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Hstore support bulkload
  • Loading branch information
haohao0103 committed Oct 17, 2024
1 parent 06041bb commit f208576
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public static String[] processAddresses(String addresses, String newPort) {

private Map<Long, Long> getGraphId(String graphName, String[] urls) {
RestClientConfig config = RestClientConfig.builder()
.connectTimeout(5 * 1000) // 连接超时时间 5s
.maxConns(10) // 最大连接数
.connectTimeout(5 * 1000)
.maxConns(10)
.build();


Expand Down Expand Up @@ -145,7 +145,7 @@ public Tuple2<byte[], Integer> getKeyBytes(GraphElement e, Directions direction)
Edge edge = (Edge) e;
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_OUT.code());
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id())); //出现错误
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.targetId()));
array = buffer.bytes();
Expand All @@ -162,7 +162,7 @@ public Tuple2<byte[], Integer> getKeyBytes(GraphElement e, Directions direction)
Edge edge = (Edge) e;
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_IN.code());
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id())); //出现错误
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.targetId()));
array = buffer.bytes();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.serializer.direct.struct;

public enum Directions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public class PartitionUtils {
public static final int MAX_VALUE = 0xffff;

/**
* 计算key的hashcode
*
* @param key
* @return hashcode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hugegraph.loader.builder;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import org.apache.hugegraph.loader.executor.LoadContext;
Expand Down Expand Up @@ -73,11 +72,6 @@ public List<Vertex> build(Row row) {
return kvPairs.buildVertices(true);
}

// @Override
// public List<Vertex> build(Row row, Directions directions) {
// return Collections.emptyList();
// }

@Override
public SchemaLabel schemaLabel() {
return this.vertexLabel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.loader.direct.loader;


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.loader.direct.loader;


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.loader.direct.loader;


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.loader.direct.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -31,7 +48,7 @@ public static class RocksDBSSTFileRecordWriter extends RecordWriter<byte[], byte
private final Path outputPath;
private final FileSystem fs;
private final File localSSTFile;
private boolean hasData = false; // 标记是否有数据写入
private boolean hasData = false;

public RocksDBSSTFileRecordWriter(FSDataOutputStream out, Path outputPath, FileSystem fs) throws IOException {
this.out = out;
Expand All @@ -53,7 +70,7 @@ public void write(byte[] key, byte[] value) throws IOException {
try {
sstFileWriter.put(key, value);
if (!hasData) {
hasData = true; // 仅在第一次写入时设置
hasData = true;
}
} catch (Exception e) {
throw new IOException(e);
Expand All @@ -63,7 +80,7 @@ public void write(byte[] key, byte[] value) throws IOException {
@Override
public void close(TaskAttemptContext context) throws IOException {
try {
if (hasData) { // 只有在有数据写入时才进行后续操作
if (hasData) {
sstFileWriter.finish();
try (InputStream in = new FileInputStream(localSSTFile)) {
byte[] buffer = new byte[4096];
Expand All @@ -74,7 +91,6 @@ public void close(TaskAttemptContext context) throws IOException {
}
out.close();
} else {
// 没有数据写入时删除临时文件
localSSTFile.delete();
out.close();
fs.delete(outputPath, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.loader.direct.partitioner;


Expand Down

0 comments on commit f208576

Please sign in to comment.