diff --git a/README.md b/README.md index b3c357e8..3c030280 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,18 @@ cd incubator-samoa mvn -Papex package ``` +### Heron mode + +Simply clone the repository and install SAMOA. + +```bash +git clone http://git.apache.org/incubator-samoa.git +cd incubator-samoa +mvn -Pheron package +``` + +The deployable jar for SAMOA will be in `target/SAMOA-Heron-0.5.0-SNAPSHOT.jar`. + ### Local mode If you want to test SAMOA in a local environment, simply clone the repository and install SAMOA. diff --git a/pom.xml b/pom.xml index 90d6a5f1..f200cc8c 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,8 @@ #L% --> - + 4.0.0 Apache SAMOA @@ -102,6 +103,15 @@ samoa-test + + heron + + samoa-instances + samoa-api + samoa-heron + samoa-test + + all @@ -114,6 +124,7 @@ samoa-flink samoa-samza samoa-test + samoa-heron @@ -144,6 +155,8 @@ 0.9.4 3.4.6 + 0.17.8 + 3.4.6 1.7.7 @@ -198,7 +211,8 @@ true
Apache SAMOA ${project.version}
Scalable Advanced Massive Online Analysis, - ${project.version}
+ ${project.version} + Apache SAMOA API ${project.version} http://samoa.incubator.apache.org/docs/api/ @@ -270,7 +284,7 @@ - + diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java b/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java index 24ea3f31..c1b5541d 100644 --- a/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java +++ b/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java @@ -1,5 +1,25 @@ package org.apache.samoa.moa.core; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2018 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import java.io.Serializable; /* diff --git a/samoa-heron/pom.xml b/samoa-heron/pom.xml new file mode 100644 index 00000000..328646d0 --- /dev/null +++ b/samoa-heron/pom.xml @@ -0,0 +1,151 @@ + + + + + 4.0.0 + + UTF-8 + + + samoa-heron + Heron bindings for SAMOA + + samoa-heron + + org.apache.samoa + samoa + 0.5.0-incubating-SNAPSHOT + + + + + + mvnrepository + Mvn repository + https://mvnrepository.com/artifact/ + + false + + + true + + + + + + + org.apache.samoa + samoa-api + ${project.version} + + + org.apache.samoa + samoa-test + test-jar + test-jar-with-dependencies + ${project.version} + test + + + com.twitter.heron + heron-storm + ${heron.version} + provided + + + org.apache.zookeeper + zookeeper + ${zookeeper.heron.version} + provided + + + org.slf4j + slf4j-log4j12 + ${slf4j-log4j12.version} + test + + + com.googlecode.json-simple + json-simple + 1.1 + + + org.apache.thrift + libthrift + 0.11.0 + + + + + + + + maven-assembly-plugin + ${maven-assembly-plugin.version} + + SAMOA-heron-${project.version} + false + false + ../target + + jar-with-dependencies + + + + ${parsedVersion.osgiVersion} + ${project.description} + ${project.version} + Yahoo Labs + SAMOA + + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + -Xmx1G + false + + + + + + ${project.basedir}/../bin + + *heron.properties + + + + + diff --git a/samoa-heron/src/main/java/org/apache/samoa/LocalHeronDoTask.java b/samoa-heron/src/main/java/org/apache/samoa/LocalHeronDoTask.java new file mode 100644 index 00000000..50c71fc0 --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/LocalHeronDoTask.java @@ -0,0 +1,74 @@ +package org.apache.samoa.heron.topology; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.samoa.heron.topology.impl.HeronSamoaUtils; +import org.apache.samoa.heron.topology.impl.HeronTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.configuration.Configuration; +import com.twitter.heron.api.Config; +import com.twitter.heron.api.utils.Utils; +import com.twitter.heron.api.exception.AlreadyAliveException; +import com.twitter.heron.api.exception.InvalidTopologyException; +import com.twitter.heron.simulator.Simulator; + + +/** + * The main class to execute a SAMOA task in LOCAL mode in Heron. + * + * @author Arinto Murdopo + */ +public class LocalHeronDoTask { + private static final Logger logger = LoggerFactory.getLogger(LocalHeronDoTask.class); + private static final String EXECUTION_DURATION_KEY = "samoa.storm.local.mode.execution.duration"; + private static final String SAMOA_STORM_PROPERTY_FILE_LOC = "samoa-heron.properties"; + + /** + * The main method. + * + * @param args the arguments + */ + public static void main(String[] args) { + List tmpArgs = new ArrayList(Arrays.asList(args)); + int numWorker = HeronSamoaUtils.numWorkers(tmpArgs); + args = tmpArgs.toArray(new String[0]); + // convert the arguments into Storm topology + HeronTopology heronTopo = HeronSamoaUtils.argsToTopology(args); + String topologyName = heronTopo.getTopologyName(); + Config conf = new Config(); + // conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + // local mode + //conf.setMaxTaskParallelism(numWorker); + Simulator cluster = new Simulator(); + cluster.submitTopology(topologyName, conf, heronTopo.getHeronBuilder().createTopology()); + // Read local mode execution duration from property file + Configuration heronConfig = HeronSamoaUtils.getPropertyConfig(LocalHeronDoTask.SAMOA_STORM_PROPERTY_FILE_LOC); + long executionDuration = heronConfig.getLong(LocalHeronDoTask.EXECUTION_DURATION_KEY); + backtype.storm.utils.Utils.sleep(executionDuration * 1000); + cluster.killTopology(topologyName); + cluster.shutdown(); + } +} \ No newline at end of file diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronBoltStream.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronBoltStream.java new file mode 100644 index 00000000..c57de27c --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronBoltStream.java @@ -0,0 +1,66 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +import com.twitter.heron.api.bolt.OutputCollector; +import com.twitter.heron.api.tuple.Values; + +/** + * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class + * + * @author Arinto Murdopo + */ +class HeronBoltStream extends HeronStream { + + /** + * + */ + private static final long serialVersionUID = -5712513402991550847L; + + private OutputCollector outputCollector; + + HeronBoltStream(String stormComponentId) { + super(stormComponentId); + } + + @Override + public void put(ContentEvent contentEvent) { + outputCollector.emit(this.outputStreamId, new Values(contentEvent, contentEvent.getKey())); + } + + public void setCollector(OutputCollector outputCollector) { + this.outputCollector = outputCollector; + } + + // @Override + // public void setStreamId(String streamId) { + // // TODO Auto-generated method stub + // //this.outputStreamId = streamId; + // } + + @Override + public String getStreamId() { + // TODO Auto-generated method stub + return null; + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronComponentFactory.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronComponentFactory.java new file mode 100644 index 00000000..bf457480 --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronComponentFactory.java @@ -0,0 +1,90 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +/** + * Component factory implementation for samoa-storm + */ +public final class HeronComponentFactory implements ComponentFactory { + + private final Map processorList; + + public HeronComponentFactory() { + processorList = new HashMap<>(); + } + + @Override + public ProcessingItem createPi(Processor processor) { + return new HeronProcessingItem(processor, this.getComponentName(processor.getClass()), 1); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new HeronEntranceProcessingItem(processor, this.getComponentName(processor.getClass())); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + HeronTopologyNode stormCompatiblePi = (HeronTopologyNode) sourcePi; + return stormCompatiblePi.createStream(); + } + + @Override + public Topology createTopology(String topoName) { + return new HeronTopology(topoName); + } + + private String getComponentName(Class clazz) { + StringBuilder componentName = new StringBuilder(clazz.getCanonicalName()); + String key = componentName.toString(); + Integer index; + + if (!processorList.containsKey(key)) { + index = 1; + } else { + index = processorList.get(key) + 1; + } + + processorList.put(key, index); + + componentName.append('_'); + componentName.append(index); + + return componentName.toString(); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new HeronProcessingItem(processor, this.getComponentName(processor.getClass()), parallelism); + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronDoTask.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronDoTask.java new file mode 100644 index 00000000..d0df9e9a --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronDoTask.java @@ -0,0 +1,120 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.heron.api.Config; +import com.twitter.heron.api.HeronSubmitter; +import com.twitter.heron.api.exception.AlreadyAliveException; +import com.twitter.heron.api.exception.InvalidTopologyException; +//import com.twitter.heron.api.exception.NotAliveException; +import com.twitter.heron.api.utils.Utils; +import com.twitter.heron.simulator.Simulator; + +/** + * The main class that used by samoa script to execute SAMOA task. + * + * @author Arinto Murdopo + */ +public class HeronDoTask { + private static final Logger logger = LoggerFactory.getLogger(HeronDoTask.class); + private static String localFlag = "local"; + private static String clusterFlag = "cluster"; + + /** + * The main method. + * + * @param args the arguments + */ + public static void main(String[] args) throws AlreadyAliveException { + + List tmpArgs = new ArrayList(Arrays.asList(args)); + + boolean isLocal = isLocal(tmpArgs); + int numWorker = HeronSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + + // convert the arguments into Storm topology + HeronTopology heronTopo = HeronSamoaUtils.argsToTopology(args); + String topologyName = heronTopo.getTopologyName(); + + Config conf = new Config(); + //conf.putAll(Utils.readStormConfig()); + conf.setDebug(false); + + try { + if (isLocal) { + // local mode + //conf.setMaxTaskParallelism(numWorker); + Simulator cluster = new Simulator(); + cluster.submitTopology(topologyName, conf, heronTopo.getHeronBuilder().createTopology()); + backtype.storm.utils.Utils.sleep(600 * 1000); + cluster.killTopology(topologyName); + cluster.shutdown(); + } else { + // cluster mode + conf.setNumStmgrs(numWorker); + HeronSubmitter.submitTopology(topologyName, conf, + heronTopo.getHeronBuilder().createTopology()); + + } + } catch (AlreadyAliveException aae) { + aae.printStackTrace(); + } catch (InvalidTopologyException ite) { + ite.printStackTrace(); + } + } + + private static boolean isLocal(List tmpArgs) { + ExecutionMode executionMode = ExecutionMode.UNDETERMINED; + + int position = tmpArgs.size() - 1; + String flag = tmpArgs.get(position); + boolean isLocal = true; + + if (flag.equals(clusterFlag)) { + executionMode = ExecutionMode.CLUSTER; + isLocal = false; + } else if (flag.equals(localFlag)) { + executionMode = ExecutionMode.LOCAL; + isLocal = true; + } + + if (executionMode != ExecutionMode.UNDETERMINED) { + tmpArgs.remove(position); + } + + return isLocal; + } + + private enum ExecutionMode { + LOCAL, CLUSTER, UNDETERMINED + } + + ; +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronEntranceProcessingItem.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronEntranceProcessingItem.java new file mode 100644 index 00000000..a9966ece --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronEntranceProcessingItem.java @@ -0,0 +1,212 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Map; +import java.util.UUID; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.AbstractEntranceProcessingItem; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.Stream; + +import com.twitter.heron.api.spout.BaseRichSpout; +import com.twitter.heron.api.spout.SpoutOutputCollector; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Values; +import com.twitter.heron.api.utils.Utils; + +/** + * EntranceProcessingItem implementation for Heron. + */ +class HeronEntranceProcessingItem extends AbstractEntranceProcessingItem implements HeronTopologyNode { + private final HeronEntranceSpout piSpout; + + HeronEntranceProcessingItem(EntranceProcessor processor) { + this(processor, UUID.randomUUID().toString()); + } + + HeronEntranceProcessingItem(EntranceProcessor processor, String friendlyId) { + super(processor); + this.setName(friendlyId); + this.piSpout = new HeronEntranceSpout(processor); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + // piSpout.streams.add(stream); + piSpout.setOutputStream((HeronStream) stream); + return this; + } + + @Override + public Stream getOutputStream() { + return piSpout.getOutputStream(); + } + + @Override + public void addToTopology(HeronTopology topology, int parallelismHint) { + topology.getHeronBuilder().setSpout(this.getName(), piSpout, parallelismHint); + } + + @Override + public HeronStream createStream() { + return piSpout.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + /** + * Resulting Spout of StormEntranceProcessingItem + */ + final static class HeronEntranceSpout extends BaseRichSpout { + + private static final long serialVersionUID = -9066409791668954099L; + + // private final Set streams; + private final EntranceProcessor entranceProcessor; + private HeronStream outputStream; + + // private transient SpoutStarter spoutStarter; + // private transient Executor spoutExecutors; + // private transient LinkedBlockingQueue tupleInfoQueue; + + private SpoutOutputCollector collector; + + HeronEntranceSpout(EntranceProcessor processor) { + // this.streams = new HashSet(); + this.entranceProcessor = processor; + } + + public HeronStream getOutputStream() { + return outputStream; + } + + public void setOutputStream(HeronStream stream) { + this.outputStream = stream; + } + + @Override + public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + // this.tupleInfoQueue = new LinkedBlockingQueue(); + + // Processor and this class share the same instance of stream + // for (StormSpoutStream stream : streams) { + // stream.setSpout(this); + // } + // outputStream.setSpout(this); + + this.entranceProcessor.onCreate(context.getThisTaskId()); + // this.spoutStarter = new SpoutStarter(this.starter); + + // this.spoutExecutors = Executors.newSingleThreadExecutor(); + // this.spoutExecutors.execute(spoutStarter); + } + + @Override + public void nextTuple() { + if (entranceProcessor.hasNext()) { + Values value = newValues(entranceProcessor.nextEvent()); + collector.emit(outputStream.getOutputId(), value); + } else + Utils.sleep(1000); + // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, + // TimeUnit.MILLISECONDS); + // if (tupleInfo != null) { + // Values value = newValues(tupleInfo.getContentEvent()); + // collector.emit(tupleInfo.getHeronStream().getOutputId(), value); + // } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // for (HeronStream stream : streams) { + // declarer.declareStream(stream.getOutputId(), new + // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, + // StormSamoaUtils.KEY_FIELD)); + // } + declarer.declareStream(outputStream.getOutputId(), new Fields(HeronSamoaUtils.CONTENT_EVENT_FIELD, + HeronSamoaUtils.KEY_FIELD)); + } + + HeronStream createStream(String piId) { + // StormSpoutStream stream = new StormSpoutStream(piId); + HeronStream stream = new HeronBoltStream(piId); + // streams.add(stream); + return stream; + } + + // void put(StormSpoutStream stream, ContentEvent contentEvent) { + // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent)); + // } + + private Values newValues(ContentEvent contentEvent) { + return new Values(contentEvent, contentEvent.getKey()); + } + + // private final static class StormTupleInfo { + // + // private final HeronStream stream; + // private final ContentEvent event; + // + // StormTupleInfo(HeronStream stream, ContentEvent event) { + // this.stream = stream; + // this.event = event; + // } + // + // public HeronStream getHeronStream() { + // return this.stream; + // } + // + // public ContentEvent getContentEvent() { + // return this.event; + // } + // } + + // private final static class SpoutStarter implements Runnable { + // + // private final TopologyStarter topoStarter; + // + // SpoutStarter(TopologyStarter topoStarter) { + // this.topoStarter = topoStarter; + // } + // + // @Override + // public void run() { + // this.topoStarter.start(); + // } + // } + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronJarSubmitter.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronJarSubmitter.java new file mode 100644 index 00000000..1b4fd117 --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronJarSubmitter.java @@ -0,0 +1,72 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; +import java.util.Map; +import java.util.HashMap; + +import com.twitter.heron.api.Config; +import com.twitter.heron.api.HeronSubmitter; +import com.twitter.heron.api.utils.Utils; + +/** + * Utility class to submit samoa-storm jar to a Heron cluster. + * + * @author Arinto Murdopo + */ +public class HeronJarSubmitter { + + public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation"; + + /** + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + + + //TODO refactor this whole code to use submitTopology instead + //as submitJar is not longer available in heron-storm + /*Map config = Utils.readStormConfig(); + //config.putAll(Utils.readCommandLineOpts()); + + + System.out.println("uploading jar from " + args[0]); + String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]); + + System.out.println("Uploaded jar file location: "); + System.out.println(uploadedJarLocation); + + Properties props = HeronSamoaUtils.getProperties(); + props.setProperty(HeronJarSubmitter.UPLOADED_JAR_LOCATION_KEY, uploadedJarLocation); + + File f = new File("src/main/resources/samoa-heron-cluster.properties"); + f.createNewFile(); + + OutputStream out = new FileOutputStream(f); + props.store(out, "properties file to store uploaded jar location from HeronJarSubmitter");*/ + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronProcessingItem.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronProcessingItem.java new file mode 100644 index 00000000..1453a30b --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronProcessingItem.java @@ -0,0 +1,168 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.heron.topology.impl.HeronStream.InputStreamId; +import org.apache.samoa.utils.PartitioningScheme; + +import com.twitter.heron.api.bolt.BaseRichBolt; +import com.twitter.heron.api.bolt.OutputCollector; +import com.twitter.heron.api.topology.BoltDeclarer; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.topology.TopologyBuilder; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Tuple; + +/** + * ProcessingItem implementation for Heron. + * + * @author Arinto Murdopo + */ +public class HeronProcessingItem extends AbstractProcessingItem implements HeronTopologyNode { + private final ProcessingItemBolt piBolt; + private BoltDeclarer piBoltDeclarer; + + // TODO: should we put parallelism hint here? + // imo, parallelism hint only declared when we add this PI in the topology + // open for dicussion :p + + HeronProcessingItem(Processor processor, int parallelismHint) { + this(processor, UUID.randomUUID().toString(), parallelismHint); + } + + HeronProcessingItem(Processor processor, String friendlyId, int parallelismHint) { + super(processor, parallelismHint); + this.piBolt = new ProcessingItemBolt(processor); + this.setName(friendlyId); + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + HeronStream stormInputStream = (HeronStream) inputStream; + InputStreamId inputId = stormInputStream.getInputId(); + + switch (scheme) { + case SHUFFLE: + piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), inputId.getStreamId()); + break; + case GROUP_BY_KEY: + piBoltDeclarer.fieldsGrouping( + inputId.getComponentId(), + inputId.getStreamId(), + new Fields(HeronSamoaUtils.KEY_FIELD)); + break; + case BROADCAST: + piBoltDeclarer.allGrouping( + inputId.getComponentId(), + inputId.getStreamId()); + break; + } + return this; + } + + @Override + public void addToTopology(HeronTopology topology, int parallelismHint) { + if (piBoltDeclarer != null) { + // throw exception that one PI only belong to one topology + } else { + TopologyBuilder heronBuilder = topology.getHeronBuilder(); + this.piBoltDeclarer = heronBuilder.setBolt(this.getName(), + this.piBolt, parallelismHint); + } + } + + @Override + public HeronStream createStream() { + return piBolt.createStream(this.getName()); + } + + @Override + public String getId() { + return this.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.insert(0, String.format("id: %s, ", this.getName())); + return sb.toString(); + } + + private final static class ProcessingItemBolt extends BaseRichBolt { + + private static final long serialVersionUID = -6637673741263199198L; + + private final Set streams; + private final Processor processor; + + private OutputCollector collector; + + ProcessingItemBolt(Processor processor) { + this.streams = new HashSet(); + this.processor = processor; + } + + @Override + public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + // Processor and this class share the same instance of stream + for (HeronBoltStream stream : streams) { + stream.setCollector(this.collector); + } + + this.processor.onCreate(context.getThisTaskId()); + } + + @Override + public void execute(Tuple input) { + Object sentObject = input.getValue(0); + ContentEvent sentEvent = (ContentEvent) sentObject; + processor.process(sentEvent); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (HeronStream stream : streams) { + declarer.declareStream(stream.getOutputId(), + new Fields(HeronSamoaUtils.CONTENT_EVENT_FIELD, + HeronSamoaUtils.KEY_FIELD)); + } + } + + HeronStream createStream(String piId) { + HeronBoltStream stream = new HeronBoltStream(piId); + streams.add(stream); + return stream; + } + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronSamoaUtils.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronSamoaUtils.java new file mode 100644 index 00000000..a0e1ff2f --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronSamoaUtils.java @@ -0,0 +1,127 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.ClassOption; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +import org.apache.samoa.tasks.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; + +/** + * Utility class for samoa-heron project. It is used by HeronDoTask to process its arguments. + * + * @author Arinto Murdopo + */ +public class HeronSamoaUtils { + + private static final Logger logger = LoggerFactory.getLogger(HeronSamoaUtils.class); + + static final String KEY_FIELD = "key"; + static final String CONTENT_EVENT_FIELD = "content_event"; + + static Properties getProperties() throws IOException { + Properties props = new Properties(); + InputStream is; + + File f = new File("src/main/resources/samoa-heron-cluster.properties"); // FIXME it does not exist anymore + is = new FileInputStream(f); + + try { + props.load(is); + } catch (IOException e1) { + System.out.println("Fail to load property file"); + return null; + } finally { + is.close(); + } + + return props; + } + + public static HeronTopology argsToTopology(String[] args) { + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); + } + logger.debug("Command line string = {}", cliString.toString()); + + Task task = getTask(cliString.toString()); + + // TODO: remove setFactory method with DynamicBinding + task.setFactory(new HeronComponentFactory()); + task.init(); + + return (HeronTopology) task.getTopology(); + } + + public static int numWorkers(List tmpArgs) { + int position = tmpArgs.size() - 1; + int numWorkers; + + try { + numWorkers = Integer.parseInt(tmpArgs.get(position)); + tmpArgs.remove(position); + } catch (NumberFormatException e) { + numWorkers = 4; + } + + return numWorkers; + } + + public static Task getTask(String cliString) { + Task task = null; + try { + logger.debug("Providing task [{}]", cliString); + task = ClassOption.cliStringToObject(cliString, Task.class, null); + } catch (Exception e) { + logger.warn("Fail in initializing the task!"); + e.printStackTrace(); + } + return task; + } + + public static Configuration getPropertyConfig(String configPropertyPath) { + Configuration config = null; + try { + config = new PropertiesConfiguration(configPropertyPath); + if (null == config || config.isEmpty()) { + logger.error("Configuration is null or empty at file = {}", configPropertyPath); + throw new RuntimeException("Configuration is null or empty : " + configPropertyPath); + } + } catch (ConfigurationException configurationException) { + logger.error("ConfigurationException while reading property file = {}", configurationException); + throw new RuntimeException("ConfigurationException while reading property file : " + configPropertyPath); + } + return config; + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronSpoutStream.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronSpoutStream.java new file mode 100644 index 00000000..e2f1e2be --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronSpoutStream.java @@ -0,0 +1,65 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +// +//import org.apache.samoa.core.ContentEvent; +//import org.apache.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout; +// +///** +// * Storm Stream that connects into Spout. It wraps the spout itself +// * @author Arinto Murdopo +// * +// */ +//final class StormSpoutStream extends StormStream{ +// +// /** +// * +// */ +// private static final long serialVersionUID = -7444653177614988650L; +// +// private StormEntranceSpout spout; +// +// StormSpoutStream(String stormComponentId) { +// super(stormComponentId); +// } +// +// @Override +// public void put(ContentEvent contentEvent) { +// spout.put(this, contentEvent); +// } +// +// void setSpout(StormEntranceSpout spout){ +// this.spout = spout; +// } +// +//// @Override +//// public void setStreamId(String stream) { +//// // TODO Auto-generated method stub +//// +//// } +// +// @Override +// public String getStreamId() { +// // TODO Auto-generated method stub +// return null; +// } +// +// } diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronStream.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronStream.java new file mode 100644 index 00000000..49e66845 --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronStream.java @@ -0,0 +1,85 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.UUID; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.Stream; + +/** + * Abstract class to implement Storm Stream + * + * @author Arinto Murdopo + */ +abstract class HeronStream implements Stream, java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = 281835563756514852L; + protected final String outputStreamId; + protected final InputStreamId inputStreamId; + + public HeronStream(String stormComponentId) { + this.outputStreamId = UUID.randomUUID().toString(); + this.inputStreamId = new InputStreamId(stormComponentId, this.outputStreamId); + } + + @Override + public abstract void put(ContentEvent contentEvent); + + String getOutputId() { + return this.outputStreamId; + } + + InputStreamId getInputId() { + return this.inputStreamId; + } + + final static class InputStreamId implements java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = -7457995634133691295L; + private final String componentId; + private final String streamId; + + InputStreamId(String componentId, String streamId) { + this.componentId = componentId; + this.streamId = streamId; + } + + String getComponentId() { + return componentId; + } + + String getStreamId() { + return streamId; + } + } + + @Override + public void setBatchSize(int batchSize) { + // Ignore batch size + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopology.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopology.java new file mode 100644 index 00000000..a4989023 --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopology.java @@ -0,0 +1,52 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; +import org.apache.samoa.topology.IProcessingItem; + +import com.twitter.heron.api.topology.TopologyBuilder; + +/** + * Adaptation of SAMOA topology in samoa-heron + * + * @author Arinto Murdopo + */ +public class HeronTopology extends AbstractTopology { + + private TopologyBuilder builder; + + public HeronTopology(String topologyName) { + super(topologyName); + this.builder = new TopologyBuilder(); + } + + @Override + public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { + HeronTopologyNode heronNode = (HeronTopologyNode) procItem; + heronNode.addToTopology(this, parallelismHint); + super.addProcessingItem(procItem, parallelismHint); + } + + public TopologyBuilder getHeronBuilder() { + return builder; + } +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopologyNode.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopologyNode.java new file mode 100644 index 00000000..376c048a --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopologyNode.java @@ -0,0 +1,36 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface to represent a node in samoa-storm topology. + * + * @author Arinto Murdopo + */ +interface HeronTopologyNode { + + void addToTopology(HeronTopology topology, int parallelismHint); + + HeronStream createStream(); + + String getId(); + +} diff --git a/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopologySubmitter.java b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopologySubmitter.java new file mode 100644 index 00000000..5b759083 --- /dev/null +++ b/samoa-heron/src/main/java/org/apache/samoa/heron/topology/impl/HeronTopologySubmitter.java @@ -0,0 +1,123 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.thrift.TException; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.heron.api.HeronSubmitter; +import com.twitter.heron.api.Config; +import com.twitter.heron.api.exception.AlreadyAliveException; +import com.twitter.heron.api.exception.InvalidTopologyException; +import com.twitter.heron.api.utils.Utils; + +/** + * Helper class to submit SAMOA task into heron without the need of submitting the jar file. The jar file must be + * submitted first using HeronJarSubmitter class. + * + * @author Arinto Murdopo + */ +public class HeronTopologySubmitter { + + public static String YJP_OPTIONS_KEY = "YjpOptions"; + + private static Logger logger = LoggerFactory.getLogger(HeronTopologySubmitter.class); + + public static void main(String[] args) throws IOException { + Properties props = HeronSamoaUtils.getProperties(); + + String uploadedJarLocation = props.getProperty(HeronJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + if (uploadedJarLocation == null) { + logger.error("Invalid properties file. It must have key {}", + HeronJarSubmitter.UPLOADED_JAR_LOCATION_KEY); + return; + } + + List tmpArgs = new ArrayList(Arrays.asList(args)); + int numWorkers = HeronSamoaUtils.numWorkers(tmpArgs); + + args = tmpArgs.toArray(new String[0]); + HeronTopology heronTopo = HeronSamoaUtils.argsToTopology(args); + + Config conf = new Config(); + //conf.putAll(Utils.readStormConfig()); + conf.putAll(Utils.readCommandLineOpts()); + conf.setDebug(false); + conf.setNumStmgrs(numWorkers); + + String profilerOption = + props.getProperty(HeronTopologySubmitter.YJP_OPTIONS_KEY); + if (profilerOption != null) { + String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); + StringBuilder optionBuilder = new StringBuilder(); + if (topoWorkerChildOpts != null) { + optionBuilder.append(topoWorkerChildOpts); + optionBuilder.append(' '); + } + optionBuilder.append(profilerOption); + conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString()); + } + + Map myConfigMap = new HashMap(conf); + StringWriter out = new StringWriter(); + String topologyName = heronTopo.getTopologyName(); + + try { + JSONValue.writeJSONString(myConfigMap, out); + + + Config config = new Config(); + + + System.out.println("Submitting topology with name: " + + topologyName); + HeronSubmitter.submitTopology(topologyName, conf, heronTopo.getHeronBuilder().createTopology()); + System.out.println(topologyName + " is successfully submitted"); + } catch (IOException e) { + System.out.println("Error in writing JSONString"); + e.printStackTrace(); + return; + } catch (AlreadyAliveException aae) { + aae.printStackTrace(); + } catch (InvalidTopologyException ite) { + System.out.println("Invalid topology for " + topologyName); + ite.printStackTrace(); + } + } + + private static String uploadedJarLocation(List tmpArgs) { + int position = tmpArgs.size() - 1; + String uploadedJarLocation = tmpArgs.get(position); + tmpArgs.remove(position); + return uploadedJarLocation; + } +} diff --git a/samoa-heron/src/test/java/org/apache/samoa/AlgosTest.java b/samoa-heron/src/test/java/org/apache/samoa/AlgosTest.java new file mode 100644 index 00000000..91b493f7 --- /dev/null +++ b/samoa-heron/src/test/java/org/apache/samoa/AlgosTest.java @@ -0,0 +1,95 @@ +package org.apache.samoa.heron; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.heron.topology.LocalHeronDoTask; +import org.apache.samoa.TestParams; +import org.apache.samoa.TestUtils; +import org.junit.Test; +import org.junit.Ignore; + +public class AlgosTest { + + @Test(timeout = 60000) + public void testVHTWithHeron() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .labelSamplingSize(10l) + .classificationsCorrect(55f) + .kappaStat(-0.1f) + .kappaTempStat(-0.1f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(30) + .prePollWait(15) + .taskClassName(LocalHeronDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + + @Test(timeout = 120000) + @Ignore + public void testBaggingWithHeron() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(180_000) + .classifiedInstances(190_000) + .labelSamplingSize(10l) + .classificationsCorrect(60f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) + .resultFilePollTimeout(40) + .prePollWait(20) + .taskClassName(LocalHeronDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + + } + + @Test(timeout = 240000) + @Ignore + public void testCVPReqVHTWithHeron() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQCVEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(30) + .prePollWait(15) + .taskClassName(LocalHeronDoTask.class.getName()) + .labelFileCreated(false) + .build(); + TestUtils.test(vhtConfig); + + } + +} diff --git a/samoa-heron/src/test/java/org/apache/samoa/heron/topology/impl/HeronProcessingItemTest.java b/samoa-heron/src/test/java/org/apache/samoa/heron/topology/impl/HeronProcessingItemTest.java new file mode 100644 index 00000000..38a80d01 --- /dev/null +++ b/samoa-heron/src/test/java/org/apache/samoa/heron/topology/impl/HeronProcessingItemTest.java @@ -0,0 +1,84 @@ +package org.apache.samoa.heron.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.assertEquals; + +import mockit.Expectations; +import mockit.MockUp; +import mockit.Mocked; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.core.Processor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.samoa.heron.topology.impl.HeronProcessingItem; + +import com.twitter.heron.api.bolt.IRichBolt; +import com.twitter.heron.api.topology.BoltDeclarer; +import com.twitter.heron.api.topology.TopologyBuilder; + +public class HeronProcessingItemTest { + private static final int PARRALLELISM_HINT_2 = 2; + private static final int PARRALLELISM_HINT_4 = 4; + private static final String ID = "id"; + @Tested + private HeronProcessingItem pi; + @Mocked + private Processor processor; + @Mocked + private HeronTopology topology; + @Mocked + private TopologyBuilder heronBuilder = new TopologyBuilder(); + + @Before + public void setUp() { + pi = new HeronProcessingItem(processor, ID, PARRALLELISM_HINT_2); + } + + @Test + public void testAddToTopology() { + new Expectations() { + { + topology.getHeronBuilder(); + result = heronBuilder; + + heronBuilder.setBolt(ID, (IRichBolt) any, anyInt); + result = new MockUp() { + }.getMockInstance(); + } + }; + + pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint is ignored + + new Verifications() { + { + assertEquals(pi.getProcessor(), processor); + // TODO add methods to explore a topology and verify them + assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2); + assertEquals(pi.getId(), ID); + } + }; + } +}