欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > MapReduce程序设计2

MapReduce程序设计2

2024/10/25 18:23:32 来源:https://blog.csdn.net/qq_73841617/article/details/139901713  浏览:    关键词:MapReduce程序设计2

要求

1、数据集stock-daily,包含A股近4000只股票的今年以来的日数据;数据集stock-daily-30d仅包含最近30个交易日数据,根据自己计算机性能选择。

数据来源:https://www.joinquant.com/help/api/help?name=JQData

2、数据集stock-concept,包含A股近4000只股票所有的股票代码、名称和概念。

数据来源:万德金融终端

根据此stock-daily数据集计算每只股票的5日滚动收益为正的概率,滚动收益:某个投资标的(如:股票、基金、黄金、期货、债券、房子等)在任意时刻进入后持有固定时间,如:5日(一周)、22日(一月)、44日(两月)、66日(一季度)、123日(半年)、245日(一年)等。后获取的收益,是描述某个投资标的赚钱概率的数学模型,也可用来衡量股票、基金、债券等证券的业绩。

3、滚动收益率计算方法:

(1) 忽略N/A所在日的股票数据,思考:可使用插值算法填充异常N/A数据,但退市股票同样会造成N/A数据,需要识别那种数据是退市造成的,而哪种数据是异常形成的。

(2)第t日的5日滚动收益

Rt= (Ct - Ct-5 ) / Ct-5 ,Ct:第t日收盘价    Rt:第t日滚动收益

(3) 5日滚动正收益率

       所有交易日的5日滚动收益为正(赚钱)的概率

* 所有计算忽略非交易日(节假日)

数据集

 代码

配置pom文件和建包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>stock_daily</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.2</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-configuration2</artifactId><version>2.7</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.2</version></dependency></dependencies>
</project>

 

MapReduce任务类

创建类继承Configured类实现Tool接口

重写run方法,配置mepreduce的相关内容,指定mapreducegroup类。并启动job

Map类继承mapper类,指定输入和输出格式

基本代码逻辑为将输入的文件按行切割,然后按照元素位置取出对应的元素,将股票代码和时间封装到CodeTimeTuple类里面作为键,将收盘价作为值写入到context里面交给reduce类。这里日期转换用了SipleDataFormat类进行转换,设置格式为yyyy-MM-dd HHmmss

CodeTimeTuple

里面设置两个属性,一个是time,一个是code。这个类的读取和写入都要采用字节流的方式。在这里还要实现比较的方法以便实现后面的组排序,排序分为两次,一次是根据股票代码排序,如果股票代码相同再根据时间进行排序。

组排序类

将排序对象转化为CodeTimeTuple类调用排序方法

Reduce

将排完序的内容拉取过来,设置一个数组来存取每一天的收盘价(为后面计算滚动收益做准备),遍历容器,每只股票前四天的收盘价直接作为收益,后面的收盘价计算公式为Rt= (Ct - Ct-5 ) / Ct-5 ,Ct:第t日收盘价     Rt:第t日滚动收益。然后计算收益为正的概率。

完整代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;/*** Calculate the stock five days' roll yield.* */
public class RollYield extends Configured implements Tool {/*** The entrance of the program* @param args are used as the path parameter from the terminal.* */public static void main(String[] args) throws Exception {int res = ToolRunner.run(new RollYield(),args);System.exit(res);}//of main/*** Set the map class and reduce class and construct the job.* */@Overridepublic int run(String[] args) throws Exception {//build Configuration class to manage the configuration file of the hadoopConfiguration conf = new Configuration();//Construct the jobSystem.out.println("-----------创建和配置Job-------------");Job job = Job.getInstance(conf,"RollYield");//indicate the class of the Jobjob.setJarByClass(RollYield.class);//indicate the class of the Map and Reducejob.setMapperClass(yieldMapper.class);job.setReducerClass(yieldReduce.class);//job.setCombinerClass(yieldReduce.class);//indicate the format of the input:Text type filejob.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path(args[0]));//set the Grouping sort class which has the sort method.job.setGroupingComparatorClass(GroupSort.class);//indicate the format of the output:key is text,value is double.job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);job.setMapOutputKeyClass(CodeTimeTuple.class);job.setMapOutputValueClass(FloatWritable.class);TextOutputFormat.setOutputPath(job,new Path(args[1]));//Execute the mapreduceboolean res = job.waitForCompletion(true);if(res){return 0;}//of ifelse{return -1;}//of else}//of run/*** The map class for converting the input data into key-value pair.*/public static class yieldMapper extends Mapper<LongWritable,Text,CodeTimeTuple, FloatWritable> {/*** the map method is used for dispose the data and* */@Overridepublic void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {String line = value.toString();//ignore the empty line and rows of invalid data.if(line.contains("N/A")){return;}//of if//split every line of the file into a string array.String[] fields =line.split("\t");try {if(fields.length>=13){//according to the position to get the corresponding valuefloat closePrice = Float.parseFloat(fields[3]);//put the code and the date into the timeTuple.CodeTimeTuple timeTuple = new CodeTimeTuple();timeTuple.setCode(new Text(fields[0]));//convert the date into timestamp for the sort.Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(fields[13]+" 00:00:00");timeTuple.setTime(new LongWritable(date.getTime()));context.write(timeTuple,new FloatWritable(closePrice));}//of if}//of trycatch (ParseException e) {System.out.println(line);System.out.println(e.getMessage());}//of catch}//of map}//of class/****/public static class GroupSort extends WritableComparator {/*** construct method* */protected GroupSort() {super(CodeTimeTuple.class, true);}/*** sort the two timeTuple* */@Overridepublic int compare(WritableComparable a, WritableComparable b) {CodeTimeTuple key1 = (CodeTimeTuple)a;CodeTimeTuple key2 = (CodeTimeTuple)b;return key1.compareTo(key2);}//of compare}//of class GroupSort/**** */public static class yieldReduce extends Reducer<CodeTimeTuple,FloatWritable,Text,FloatWritable>{/**** */@Overridepublic void reduce(CodeTimeTuple key,Iterable<FloatWritable> values,Context context) throws IOException, InterruptedException {//use the array to record the closePrice of everydayfloat[] PriceOfEveryday = new float[1000];int i = 0;//the days which has the positive yield.int positiveDays = 0;for (FloatWritable val:values){float currentPrice = val.get();PriceOfEveryday[i] = currentPrice;if(i<=4){if(currentPrice>0){positiveDays++;}}//of ifelse{if(currentPrice-PriceOfEveryday[i-5]>0){positiveDays++;}//of if}//of elsei++;}//of forcontext.write(new Text(key.getCode()),new FloatWritable((float) positiveDays /(i+1)));}//of reduce}//of class yieldReduce
}//of class RollYield

 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//自定义Tuple
public class CodeTimeTuple implements WritableComparable<CodeTimeTuple> {private LongWritable time = new LongWritable();private Text code = new Text();public LongWritable getTime() { return time; }public void setTime(LongWritable time) { this.time = time; }public Text getCode() { return code; }public void setCode(Text code) { this.code = code; }//写入数据至流//用于框架对数据的处理//注意读readFields和写write的顺序一致public void write(DataOutput dataOutput) throws IOException {code.write(dataOutput);time.write(dataOutput);}//从流中读取数据//将框架返回的数据提取出到对应属性中来//注意读readFields和写write的顺序一致public void readFields(DataInput dataInput) throws IOException {code.readFields(dataInput);time.readFields(dataInput);}//Key排序public int compareTo(CodeTimeTuple o) {//一次排序:股票代码排序(这里要与组排序逻辑相同)int cmp = this.getCode().compareTo(o.getCode());//如果股票代码相同,则按时间排序if(cmp != 0)return cmp;//二次排序:时间排序,结果乘以-1则降序排列,否则为升序排列return -this.getTime().compareTo(o.getTime());}
}

创建jar包和打包上传到hadoop

启动job

ps:

由于启用combine必须要求reduce的输出跟输入类型相对应,但是这里的reduce输出和输入类型不一样,所以要么重写一个combine类,要么直接不使用combine类,我选择的第二种,所以combine没有输入输出,但是会加重reduce的负担。原因是combine相当于一个小的reduce,所以也会有输入输出类型,且和指定的类有关,所以执行会报reduce输入不匹配的问题。

打印输出的内容在logs下面的userlog里面,也可以在集群的网页上面看,有对应的log文件。

 

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com