ETT 5 years ago
commit
e865102da4
27 changed files with 1641 additions and 0 deletions
  1. 31 0
      .gitignore
  2. 114 0
      .mvn/wrapper/MavenWrapperDownloader.java
  3. BIN
      .mvn/wrapper/maven-wrapper.jar
  4. 1 0
      .mvn/wrapper/maven-wrapper.properties
  5. 286 0
      mvnw
  6. 161 0
      mvnw.cmd
  7. 74 0
      pom.xml
  8. 105 0
      src/main/java/com/ett/rocketmq/Abstract/AbstractMqConsumer.java
  9. 24 0
      src/main/java/com/ett/rocketmq/Abstract/AbstractMqProducer.java
  10. 13 0
      src/main/java/com/ett/rocketmq/RocketmqApplication.java
  11. 115 0
      src/main/java/com/ett/rocketmq/consumer/MQConsumer.java
  12. 64 0
      src/main/java/com/ett/rocketmq/consumer/PullConsumer.java
  13. 36 0
      src/main/java/com/ett/rocketmq/controller/Consumer.java
  14. 29 0
      src/main/java/com/ett/rocketmq/controller/MQController.java
  15. 8 0
      src/main/java/com/ett/rocketmq/dto/TestMqMessageDto.java
  16. 88 0
      src/main/java/com/ett/rocketmq/exception/AppException.java
  17. 19 0
      src/main/java/com/ett/rocketmq/exception/ErrorCode.java
  18. 31 0
      src/main/java/com/ett/rocketmq/exception/RocketMQErrorEnum.java
  19. 40 0
      src/main/java/com/ett/rocketmq/exception/RocketMQException.java
  20. 44 0
      src/main/java/com/ett/rocketmq/listener/MQConsumeMsgListenerProcessor.java
  21. 35 0
      src/main/java/com/ett/rocketmq/listener/MqMessageEvent.java
  22. 110 0
      src/main/java/com/ett/rocketmq/producer/MQProducer.java
  23. 74 0
      src/main/java/com/ett/rocketmq/service/ConsumerService.java
  24. 45 0
      src/main/java/com/ett/rocketmq/service/ProducerService.java
  25. 43 0
      src/main/resources/application.properties
  26. 35 0
      src/test/java/com/ett/rocketmq/DefaultProductTest.java
  27. 16 0
      src/test/java/com/ett/rocketmq/RocketmqApplicationTests.java

+ 31 - 0
.gitignore

@@ -0,0 +1,31 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**
+!**/src/test/**
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+
+### VS Code ###
+.vscode/

+ 114 - 0
.mvn/wrapper/MavenWrapperDownloader.java

@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  https://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Properties;
+
+public class MavenWrapperDownloader {
+
+    /**
+     * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
+     */
+    private static final String DEFAULT_DOWNLOAD_URL =
+            "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar";
+
+    /**
+     * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
+     * use instead of the default one.
+     */
+    private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
+            ".mvn/wrapper/maven-wrapper.properties";
+
+    /**
+     * Path where the maven-wrapper.jar will be saved to.
+     */
+    private static final String MAVEN_WRAPPER_JAR_PATH =
+            ".mvn/wrapper/maven-wrapper.jar";
+
+    /**
+     * Name of the property which should be used to override the default download url for the wrapper.
+     */
+    private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
+
+    public static void main(String args[]) {
+        System.out.println("- Downloader started");
+        File baseDirectory = new File(args[0]);
+        System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
+
+        // If the maven-wrapper.properties exists, read it and check if it contains a custom
+        // wrapperUrl parameter.
+        File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
+        String url = DEFAULT_DOWNLOAD_URL;
+        if (mavenWrapperPropertyFile.exists()) {
+            FileInputStream mavenWrapperPropertyFileInputStream = null;
+            try {
+                mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
+                Properties mavenWrapperProperties = new Properties();
+                mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
+                url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
+            } catch (IOException e) {
+                System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
+            } finally {
+                try {
+                    if (mavenWrapperPropertyFileInputStream != null) {
+                        mavenWrapperPropertyFileInputStream.close();
+                    }
+                } catch (IOException e) {
+                    // Ignore ...
+                }
+            }
+        }
+        System.out.println("- Downloading from: : " + url);
+
+        File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
+        if (!outputFile.getParentFile().exists()) {
+            if (!outputFile.getParentFile().mkdirs()) {
+                System.out.println(
+                        "- ERROR creating output direcrory '" + outputFile.getParentFile().getAbsolutePath() + "'");
+            }
+        }
+        System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
+        try {
+            downloadFileFromURL(url, outputFile);
+            System.out.println("Done");
+            System.exit(0);
+        } catch (Throwable e) {
+            System.out.println("- Error downloading");
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    private static void downloadFileFromURL(String urlString, File destination) throws Exception {
+        URL website = new URL(urlString);
+        ReadableByteChannel rbc;
+        rbc = Channels.newChannel(website.openStream());
+        FileOutputStream fos = new FileOutputStream(destination);
+        fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+        fos.close();
+        rbc.close();
+    }
+
+}

BIN
.mvn/wrapper/maven-wrapper.jar


+ 1 - 0
.mvn/wrapper/maven-wrapper.properties

@@ -0,0 +1 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.0/apache-maven-3.6.0-bin.zip

+ 286 - 0
mvnw

@@ -0,0 +1,286 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven2 Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+#   JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+#   M2_HOME - location of maven2's installed home dir
+#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
+#     e.g. to debug Maven itself, use
+#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+  if [ -f /etc/mavenrc ] ; then
+    . /etc/mavenrc
+  fi
+
+  if [ -f "$HOME/.mavenrc" ] ; then
+    . "$HOME/.mavenrc"
+  fi
+
+fi
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        export JAVA_HOME="`/usr/libexec/java_home`"
+      else
+        export JAVA_HOME="/Library/Java/Home"
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+  ## resolve links - $0 may be a link to maven's home
+  PRG="$0"
+
+  # need this for relative symlinks
+  while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+      PRG="$link"
+    else
+      PRG="`dirname "$PRG"`/$link"
+    fi
+  done
+
+  saveddir=`pwd`
+
+  M2_HOME=`dirname "$PRG"`/..
+
+  # make it fully qualified
+  M2_HOME=`cd "$M2_HOME" && pwd`
+
+  cd "$saveddir"
+  # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --unix "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME="`(cd "$M2_HOME"; pwd)`"
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+  # TODO classpath?
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="`which javac`"
+  if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=`which readlink`
+    if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+      if $darwin ; then
+        javaHome="`dirname \"$javaExecutable\"`"
+        javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+      else
+        javaExecutable="`readlink -f \"$javaExecutable\"`"
+      fi
+      javaHome="`dirname \"$javaExecutable\"`"
+      javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="`which java`"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." >&2
+  echo "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+  if [ -z "$1" ]
+  then
+    echo "Path not specified to find_maven_basedir"
+    return 1
+  fi
+
+  basedir="$1"
+  wdir="$1"
+  while [ "$wdir" != '/' ] ; do
+    if [ -d "$wdir"/.mvn ] ; then
+      basedir=$wdir
+      break
+    fi
+    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+    if [ -d "${wdir}" ]; then
+      wdir=`cd "$wdir/.."; pwd`
+    fi
+    # end of workaround
+  done
+  echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+  if [ -f "$1" ]; then
+    echo "$(tr -s '\n' ' ' < "$1")"
+  fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+  exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Found .mvn/wrapper/maven-wrapper.jar"
+    fi
+else
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+    fi
+    jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
+    while IFS="=" read key value; do
+      case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+      esac
+    done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+    if [ "$MVNW_VERBOSE" = true ]; then
+      echo "Downloading from: $jarUrl"
+    fi
+    wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+
+    if command -v wget > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found wget ... using wget"
+        fi
+        wget "$jarUrl" -O "$wrapperJarPath"
+    elif command -v curl > /dev/null; then
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Found curl ... using curl"
+        fi
+        curl -o "$wrapperJarPath" "$jarUrl"
+    else
+        if [ "$MVNW_VERBOSE" = true ]; then
+          echo "Falling back to using Java to download"
+        fi
+        javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+        if [ -e "$javaClass" ]; then
+            if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Compiling MavenWrapperDownloader.java ..."
+                fi
+                # Compiling the Java class
+                ("$JAVA_HOME/bin/javac" "$javaClass")
+            fi
+            if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+                # Running the downloader
+                if [ "$MVNW_VERBOSE" = true ]; then
+                  echo " - Running MavenWrapperDownloader.java ..."
+                fi
+                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+            fi
+        fi
+    fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+  echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$M2_HOME" ] &&
+    M2_HOME=`cygpath --path --windows "$M2_HOME"`
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+    MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+  $MAVEN_OPTS \
+  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+  "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

+ 161 - 0
mvnw.cmd

@@ -0,0 +1,161 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM    https://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven2 Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM     e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
+FOR /F "tokens=1,2 delims==" %%A IN (%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties) DO (
+	IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B 
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+    echo Found %WRAPPER_JAR%
+) else (
+    echo Couldn't find %WRAPPER_JAR%, downloading it ...
+	echo Downloading from: %DOWNLOAD_URL%
+    powershell -Command "(New-Object Net.WebClient).DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"
+    echo Finished downloading %WRAPPER_JAR%
+)
+@REM End of extension
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%

+ 74 - 0
pom.xml

@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.1.8.RELEASE</version>
+        <relativePath/> <!-- lookup parent from repository -->
+    </parent>
+    <groupId>com.ett</groupId>
+    <artifactId>rocketmq</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>rocketmq</name>
+    <description>Demo project for Spring Boot</description>
+
+    <properties>
+        <java.version>1.8</java.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>4.1.0-incubating</version>
+        </dependency>
+
+
+<!--        <dependency>-->
+<!--            <groupId>org.apache.rocketmq</groupId>-->
+<!--            <artifactId>rocketmq-tools</artifactId>-->
+<!--            <version>4.3.0</version>-->
+<!--        </dependency>-->
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.47</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>com.alibaba.rocketmq</groupId>-->
+<!--            <artifactId>rocketmq-client</artifactId>-->
+<!--            <version>3.2.6</version>-->
+<!--        </dependency>-->
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 105 - 0
src/main/java/com/ett/rocketmq/Abstract/AbstractMqConsumer.java

@@ -0,0 +1,105 @@
+package com.ett.rocketmq.Abstract;
+
+import com.ett.rocketmq.listener.MqMessageEvent;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.*;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.List;
+
+public abstract class AbstractMqConsumer {
+
+    protected DefaultMQPushConsumer consumer;
+    // 是否允许顺序消费
+    protected boolean isOrderConsumer = false;
+
+    @Autowired
+    protected ApplicationEventPublisher publisher;
+
+    /**
+     * 初始化consumer,由开发者控制
+     *
+     * 例如:
+     * try {
+     *      consumer = new DefaultMQPushConsumer(consumerGroup);
+     *      consumer.setNamesrvAddr(namesrvAddr);
+     *      consumer.setMessageModel(MessageModel.CLUSTERING);
+     *      consumer.setConsumeMessageBatchMaxSize(1);
+     *      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+     *      consumer.subscribe("TopicTest", "*");
+     *  } catch (MQClientException e) {
+     *      e.printStackTrace();
+     *  }
+     */
+    public abstract void start0();
+
+    @PostConstruct
+    private void start() {
+        if (null == consumer) {
+            start0();
+        }
+
+        if (isOrderConsumer) {
+            consumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
+                    try {
+                        consumeOrderlyContext.setAutoCommit(true);
+                        if (null == msgs || msgs.size() == 0) {
+                            return ConsumeOrderlyStatus.SUCCESS;
+                        }
+                        publisher.publishEvent(new MqMessageEvent(consumer, msgs));
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                    return ConsumeOrderlyStatus.SUCCESS;
+                }
+            });
+        }
+        else {
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                    try {
+                        if (null == msgs || msgs.size() == 0) {
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        }
+                        publisher.publishEvent(new MqMessageEvent(consumer, msgs));
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                    }
+
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                }
+            });
+        }
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+
+                try {
+                    Thread.sleep(5000L);
+
+                    consumer.start();
+                    System.err.println("rocketmq consumer server is starting...");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        consumer.shutdown();
+    }
+}

+ 24 - 0
src/main/java/com/ett/rocketmq/Abstract/AbstractMqProducer.java

@@ -0,0 +1,24 @@
+package com.ett.rocketmq.Abstract;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+/**
+ * 长连接producer抽象
+ * 生产者的抽象类,定义了start()和shutdown()方法。
+ * 其中start()方法需要开发者进行重写。开发者需要在start()方法中,为producer进行初始化和启动工作。
+ */
+public abstract class AbstractMqProducer {
+    protected DefaultMQProducer producer;
+
+    @PostConstruct
+    public abstract void start() throws MQClientException;
+
+    @PreDestroy
+    public void shutdown() {
+        System.err.println("AbstractMqProducer @PreDestroy调用");
+        producer.shutdown();
+    }
+}

+ 13 - 0
src/main/java/com/ett/rocketmq/RocketmqApplication.java

@@ -0,0 +1,13 @@
+package com.ett.rocketmq;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class RocketmqApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(RocketmqApplication.class, args);
+    }
+
+}

+ 115 - 0
src/main/java/com/ett/rocketmq/consumer/MQConsumer.java

@@ -0,0 +1,115 @@
+package com.ett.rocketmq.consumer;
+
+import com.ett.rocketmq.Abstract.AbstractMqConsumer;
+import com.ett.rocketmq.exception.RocketMQException;
+import com.ett.rocketmq.listener.MQConsumeMsgListenerProcessor;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MQConsumer extends AbstractMqConsumer {
+    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumer.class);
+
+    @Value("${rocketmq.consumer.groupName}")
+    private String groupName;
+
+    @Value("${rocketmq.consumer.namesrvAddr}")
+    private String namesrvAddr;
+
+    @Value("${rocketmq.consumer.consumeThreadMin}")
+    private int consumeThreadMin;
+
+    @Value("${rocketmq.consumer.consumeThreadMax}")
+    private int consumeThreadMax;
+
+    @Value("${rocketmq.consumer.topic}")
+    private String topic;
+
+    @Value("${rocketmq.consumer.tag}")
+    private String tag;
+
+    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
+    private int consumeMessageBatchMaxSize;
+    //监听
+    @Autowired
+    private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
+
+    @Override
+    public void start0() {
+        try {
+            consumer = new DefaultMQPushConsumer("ett");
+            // 设置namesrv地址
+            consumer.setNamesrvAddr(namesrvAddr);
+            // 设置集群消费
+            consumer.setMessageModel(MessageModel.CLUSTERING);
+
+            consumer.setConsumeThreadMin(consumeThreadMin);
+
+            consumer.setConsumeThreadMax(consumeThreadMax);
+
+            // 设置每次消费消息的数量,官方一般建议1条,除非你有批量处理的需求
+            consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
+            // 设置消费策略
+            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+            // 设置订阅的topic和tags
+            consumer.subscribe(topic, tag);
+            // ... 根据自己的需求设置consumer其他参数
+
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException {
+
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
+        consumer.setNamesrvAddr(namesrvAddr);
+        consumer.setConsumeThreadMin(consumeThreadMin);
+        consumer.setConsumeThreadMax(consumeThreadMax);
+
+        //我们自己实现的监听类
+        consumer.registerMessageListener(mqMessageListenerProcessor);
+        consumer.setVipChannelEnabled(false);
+
+        /**
+         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
+         * 如果非第一次启动,那么按照上次消费的位置继续消费
+         */
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        /**
+         * 设置消费模型,集群还是广播,默认为集群
+         */
+        //consumer.setMessageModel(MessageModel.CLUSTERING);
+        /**
+         * 设置一次消费消息的条数,默认为1条
+         */
+        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
+        try {
+          /**
+           * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
+           */
+//            String[] topicTagsArr = topics.split(";");
+//            for (String topicTags : topicTagsArr) {
+//                String[] topicTag = topicTags.split("~");
+//                consumer.subscribe(topicTag[0],topicTag[1]);
+//            }
+            //订阅
+            consumer.subscribe(topic,tag);
+            consumer.start();
+            LOGGER.info("consumer is start !!! groupName:{},topic:{},namesrvAddr:{}",groupName,topic,namesrvAddr);
+        }catch (MQClientException e){
+            LOGGER.error("consumer is start !!! groupName:{},topic:{},namesrvAddr:{}",groupName,topic,namesrvAddr,e);
+            throw new RocketMQException(e);
+        }
+        return consumer;
+    }
+}

+ 64 - 0
src/main/java/com/ett/rocketmq/consumer/PullConsumer.java

@@ -0,0 +1,64 @@
+package com.ett.rocketmq.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class PullConsumer {
+    private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+    public static void main(String[] args) throws Exception {
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ett");
+        consumer.setNamesrvAddr("127.0.0.1:9876");
+        consumer.start();
+        try {
+            Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("DemoTopic");
+            for(MessageQueue mq:mqs) {
+                System.out.println("Consume from the queue: " + mq);
+                //	long offset = consumer.fetchConsumeOffset(mq, true);
+                //	PullResultExt pullResult =(PullResultExt)consumer.pull(mq, null, getMessageQueueOffset(mq), 32);
+                //消息未到达默认是阻塞10秒,private long consumerPullTimeoutMillis = 1000 * 10;
+                PullResultExt pullResult =(PullResultExt)consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+                putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+                switch (pullResult.getPullStatus()) {
+                    case FOUND:
+                        List<MessageExt> messageExtList = pullResult.getMsgFoundList();
+                        for (MessageExt m : messageExtList) {
+                            System.out.println(new String(m.getBody()));
+                        }
+                        break;
+                    case NO_MATCHED_MSG:
+                        break;
+                    case NO_NEW_MSG:
+                        break ;
+                    case OFFSET_ILLEGAL:
+                        break;
+                    default:
+                        break;
+                }
+            }
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+
+
+    }
+
+    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+        offsetTable.put(mq, offset);
+    }
+
+    private static long getMessageQueueOffset(MessageQueue mq) {
+        Long offset = offsetTable.get(mq);
+        if (offset != null)
+            return offset;
+        return 0;
+    }
+
+}

+ 36 - 0
src/main/java/com/ett/rocketmq/controller/Consumer.java

@@ -0,0 +1,36 @@
+package com.ett.rocketmq.controller;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class Consumer {
+    public static void main(String[] args) throws MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
+
+        consumer.setNamesrvAddr("172.18.4.114:9876");
+        consumer.setInstanceName("consumer");
+        consumer.subscribe("DemoTopic", "test");
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(
+                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    System.out.println(new String(msg.getTopic()));
+                    System.out.println(new String(msg.getTags()));
+                    System.out.println("=== " + new String(msg.getBody()));
+                }
+
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.println("Consumer Started.");
+    }
+}

+ 29 - 0
src/main/java/com/ett/rocketmq/controller/MQController.java

@@ -0,0 +1,29 @@
+package com.ett.rocketmq.controller;
+
+import com.ett.rocketmq.listener.MqMessageEvent;
+import com.ett.rocketmq.service.ConsumerService;
+import com.ett.rocketmq.service.ProducerService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/MQController")
+public class MQController {
+    @Autowired
+    private ProducerService producerService;
+
+    @Autowired
+    private ConsumerService consumerService;
+
+    @RequestMapping("/send")
+    public void send(){
+        producerService.send();
+    }
+//    @RequestMapping("/get")
+//    public void get(){
+//            consumerService.get();
+//    }
+
+
+}

+ 8 - 0
src/main/java/com/ett/rocketmq/dto/TestMqMessageDto.java

@@ -0,0 +1,8 @@
+package com.ett.rocketmq.dto;
+
+import lombok.Data;
+
+@Data
+public class TestMqMessageDto {
+    private String s;
+}

+ 88 - 0
src/main/java/com/ett/rocketmq/exception/AppException.java

@@ -0,0 +1,88 @@
+package com.ett.rocketmq.exception;
+
+public class AppException extends RuntimeException {
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 错误编码
+     */
+    protected ErrorCode errCode;
+
+    /**
+     * 错误信息
+     */
+    protected String errMsg;
+
+    /**
+     * 无参构造函数
+     */
+    public AppException() {
+        super();
+    }
+    public AppException(Throwable e) {
+        super(e);
+    }
+
+    public AppException(ErrorCode errCode, String... errMsg) {
+        super(errCode.getMsg());
+        this.errCode = errCode;
+        setErrMsg(errMsg,true);
+    }
+
+    public AppException(ErrorCode errCode, String errMsg,Boolean isTransfer) {
+        super(errMsg);
+        this.errCode = errCode;
+        setErrMsg(new String[]{errMsg},isTransfer);
+    }
+
+    /**
+     * 构造函数
+     *
+     * @param cause 异常
+     */
+    public AppException(ErrorCode errCode, Throwable cause, String... errMsg) {
+        super(errCode.getCode() + errCode.getMsg(), cause);
+        this.errCode = errCode;
+        setErrMsg(errMsg,true);
+    }
+
+    public ErrorCode getErrCode() {
+        return errCode;
+    }
+
+    public void setErrCode(ErrorCode errCode) {
+        this.errCode = errCode;
+    }
+
+    public String getErrMsg() {
+        return this.errMsg;
+    }
+
+    public void setErrMsg(String[] errMsg,Boolean isTransfer) {
+
+        if (null != errMsg &&errMsg.length>0) {
+            if(errCode.getMsg().contains("%s") && isTransfer){
+                this.errMsg = String.format(errCode.getMsg(), errMsg);
+            }else{
+                StringBuffer sf = new StringBuffer();
+                for (String msg : errMsg) {
+                    sf.append(msg+";");
+                }
+                this.errMsg = sf.toString();
+            }
+        }else{
+            this.errMsg = errCode.getMsg();
+        }
+
+    }
+
+    public static void main(String[] args) {
+        String str = "ERRCode:1004--对象不存在:[%s]";
+        if (str.contains("%s")){
+            System.out.println("包含");
+        }
+    }
+}

+ 19 - 0
src/main/java/com/ett/rocketmq/exception/ErrorCode.java

@@ -0,0 +1,19 @@
+package com.ett.rocketmq.exception;
+
+import java.io.Serializable;
+
+public interface ErrorCode extends Serializable {
+    /**
+     * 错误码
+     *
+     * @return
+     */
+    String getCode();
+
+    /**
+     * 错误信息
+     *
+     * @return
+     */
+    String getMsg();
+}

+ 31 - 0
src/main/java/com/ett/rocketmq/exception/RocketMQErrorEnum.java

@@ -0,0 +1,31 @@
+package com.ett.rocketmq.exception;
+
+public enum RocketMQErrorEnum implements ErrorCode {
+    /******** 公共 ********/
+    PARAMM_NULL("MQ_001", "参数为空"),
+
+    /******** 生产者 *******/
+
+    /******** 消费者 *******/
+    NOT_FOUND_CONSUMESERVICE("MQ_100", "根据topic和tag没有找到对应的消费服务"), HANDLE_RESULT_NULL("MQ_101", "消费方法返回值为空"), CONSUME_FAIL("MQ_102", "消费失败")
+
+            ;
+
+    private String code;
+    private String msg;
+
+    private RocketMQErrorEnum(String code, String msg) {
+        this.code = code;
+        this.msg = msg;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getMsg() {
+        return this.msg;
+    }
+}

+ 40 - 0
src/main/java/com/ett/rocketmq/exception/RocketMQException.java

@@ -0,0 +1,40 @@
+package com.ett.rocketmq.exception;
+
+public class RocketMQException extends AppException {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 无参构造函数
+     */
+    public RocketMQException() {
+        super();
+    }
+
+    public RocketMQException(Throwable e) {
+        super(e);
+    }
+
+    public RocketMQException(ErrorCode errorType) {
+        super(errorType);
+    }
+
+    public RocketMQException(ErrorCode errorCode, String... errMsg) {
+        super(errorCode, errMsg);
+    }
+
+    /**
+     * 封装异常
+     *
+     * @param errorCode
+     * @param errMsg
+     * @param isTransfer
+     *            是否转换异常信息,如果为false,则直接使用errMsg信息
+     */
+    public RocketMQException(ErrorCode errorCode, String errMsg, Boolean isTransfer) {
+        super(errorCode, errMsg, isTransfer);
+    }
+
+    public RocketMQException(ErrorCode errCode, Throwable cause, String... errMsg) {
+        super(errCode, cause, errMsg);
+    }
+}

+ 44 - 0
src/main/java/com/ett/rocketmq/listener/MQConsumeMsgListenerProcessor.java

@@ -0,0 +1,44 @@
+package com.ett.rocketmq.listener;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+@Component
+public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
+    private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
+
+    /**
+     *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
+     *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
+     */
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+        if(CollectionUtils.isEmpty(msgs)){
+            logger.info("接受到的消息为空,不处理,直接返回成功");
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+        MessageExt messageExt = msgs.get(0);
+        logger.info("接受到的消息为:"+messageExt.toString());
+        if(messageExt.getTopic().equals("你的Topic")){
+            if(messageExt.getTags().equals("你的Tag")){
+                //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
+                //TODO 获取该消息重试次数
+                int reconsume = messageExt.getReconsumeTimes();
+                if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                }
+                //TODO 处理对应的业务逻辑
+            }
+        }
+        // 如果没有return success ,consumer会重新消费该消息,直到return success
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+}

+ 35 - 0
src/main/java/com/ett/rocketmq/listener/MqMessageEvent.java

@@ -0,0 +1,35 @@
+package com.ett.rocketmq.listener;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+public class MqMessageEvent extends ApplicationEvent {
+    private DefaultMQPushConsumer consumer;
+    private List<MessageExt> msgs;
+
+    public MqMessageEvent(DefaultMQPushConsumer consumer, List<MessageExt> msgs) {
+        super(msgs);
+        this.consumer = consumer;
+        this.msgs = msgs;
+    }
+
+    public DefaultMQPushConsumer getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(DefaultMQPushConsumer consumer) {
+        this.consumer = consumer;
+    }
+
+    public List<MessageExt> getMsgs() {
+        return msgs;
+    }
+
+    public void setMsgs(List<MessageExt> msgs) {
+        this.msgs = msgs;
+    }
+}

+ 110 - 0
src/main/java/com/ett/rocketmq/producer/MQProducer.java

@@ -0,0 +1,110 @@
+package com.ett.rocketmq.producer;
+
+import com.alibaba.fastjson.JSONObject;
+import com.ett.rocketmq.Abstract.AbstractMqProducer;
+import com.ett.rocketmq.dto.TestMqMessageDto;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MQProducer extends AbstractMqProducer {
+    //public static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class);
+
+    @Value("${rocketmq.producer.instanceName}")
+    private String instanceName;
+
+    @Value("${rocketmq.producer.groupName}")
+    private String groupName;
+
+    @Value("${rocketmq.producer.namesrvAddr}")
+    private String namesrvAddr;
+    /**
+     * 消息最大大小,默认4M
+     */
+    @Value("${rocketmq.producer.maxMessageSize}")
+    private Integer maxMessageSize ;
+    /**
+     * 消息发送超时时间,默认3秒
+     */
+    @Value("${rocketmq.producer.sendMsgTimeout}")
+    private Integer sendMsgTimeout;
+    /**
+     * 消息发送失败重试次数,默认2次
+     */
+    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
+    private Integer retryTimesWhenSendFailed;
+
+    @Override
+    public void start() throws MQClientException {
+        if (null == producer) {
+            producer = new DefaultMQProducer("ett");
+            producer.setNamesrvAddr(namesrvAddr);
+            //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
+            producer.setInstanceName(instanceName);
+
+            producer.setMaxMessageSize(maxMessageSize);
+
+            producer.setSendMsgTimeout(sendMsgTimeout);
+            //如果发送消息失败,设置重试次数,默认为2次
+            producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
+        }
+        producer.start();
+        System.out.println(namesrvAddr);
+        System.err.println("rocketmq producer is starting...");
+    }
+
+    public boolean send(String topic, String tag, TestMqMessageDto msg) {
+        try {
+            Message message = new Message(
+                    topic, tag,JSONObject.toJSONString(msg).getBytes("utf-8")
+            );
+            SendResult sendResult = producer.send(message);
+            System.err.println("消息生产结果:" + JSONObject.toJSON(sendResult));
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+
+
+
+
+
+
+
+
+
+//    public DefaultMQProducer getRocketMQProducer() throws RocketMQException {
+//        DefaultMQProducer producer;
+//        producer = new DefaultMQProducer(groupName
+//        );
+//        producer.setNamesrvAddr(namesrvAddr);
+//        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
+//        producer.setInstanceName(instanceName);
+//
+//        producer.setMaxMessageSize(maxMessageSize);
+//
+//        producer.setSendMsgTimeout(sendMsgTimeout);
+//        //如果发送消息失败,设置重试次数,默认为2次
+//        producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
+//
+//        producer.setVipChannelEnabled(false);
+//        try {
+//            producer.start();
+//
+//            LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
+//                    , groupName, this.namesrvAddr));
+//        } catch (MQClientException e) {
+//            LOGGER.error(String.format("producer is error {}"
+//                    , e.getMessage(),e));
+//            throw new RocketMQException(e);
+//        }
+//        return producer;
+//    }
+}

+ 74 - 0
src/main/java/com/ett/rocketmq/service/ConsumerService.java

@@ -0,0 +1,74 @@
+package com.ett.rocketmq.service;
+
+import com.ett.rocketmq.consumer.MQConsumer;
+import com.ett.rocketmq.listener.MqMessageEvent;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * 用于监听MqMessageEvent的服务
+ * 消费MQ消息
+ *
+ * 一般两种方式:
+ *     (1)第一种:这个类的作用就是监听SpringEvent事件,
+ *     然后再根据消息分发给其他Service进行处理,所以这里一般不会包含业务逻辑代码
+ * (2)第二种:这个类的作用就是具体的消费消息类
+ */
+@Service
+public class ConsumerService {
+
+    @Value("${rocketmq.consumer.topic}")
+    private String topic;
+
+    @Value("${rocketmq.consumer.tag}")
+    private String tag;
+
+//    @Autowired
+//    private MqMessageEvent mqMessageEvent;
+
+    /**
+     * 消费
+     * @param event
+     */
+    @EventListener(condition = "#event.msgs[0].topic==topic && #event.msgs[0].tags==tag")
+    public void testConsumer(MqMessageEvent event) {
+       // mqMessageEvent.getConsumer();
+
+        // 由于mq消费者设置了batch=1,所以每次都只会消费一条
+        MessageExt msg = event.getMsgs().get(0);
+        if (null != msg) {
+            // 具体的消费MessageExt的逻辑
+            System.out.println(msg);
+        }
+    }
+
+
+    /**
+     * 消费TopicTestB
+     * @param event
+     */
+//    @EventListener(condition = "#event.msgs[0].topic=='TopicTestB' ")
+//    public void testConsumer(MqMessageEvent event) {
+//        // 由于mq消费者设置了batch=1,所以每次都只会消费一条
+//        MessageExt msg = event.getMsgs().get(0);
+//        if (null != msg) {
+//            // 具体的消费MessageExt的逻辑
+//            if (msg.getTags() == "TagA") {
+//                //消费TagA的消息
+//            }
+//            else if (msg.getTags() == 'TagB'){
+//                //消费TagB的消息
+//            }
+//        }
+//    }
+}

+ 45 - 0
src/main/java/com/ett/rocketmq/service/ProducerService.java

@@ -0,0 +1,45 @@
+package com.ett.rocketmq.service;
+
+import com.ett.rocketmq.dto.TestMqMessageDto;
+import com.ett.rocketmq.producer.MQProducer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ProducerService {
+
+    @Value("${rocketmq.consumer.topic}")
+    private String topic;
+
+    @Value("${rocketmq.consumer.tag}")
+    private String tag;
+
+    @Autowired
+    private MQProducer mqProducer;
+
+    public void start(){
+        try {
+            mqProducer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void send(){
+        try {
+            mqProducer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+        String test = "hellow word";
+        TestMqMessageDto testMqMessageDto = new TestMqMessageDto();
+        testMqMessageDto.setS(test);
+        mqProducer.send("DemoTopic",tag,testMqMessageDto);
+        //关闭,根据实际情况操作
+        //mqProducer.shutdown();
+    }
+
+
+}

+ 43 - 0
src/main/resources/application.properties

@@ -0,0 +1,43 @@
+server.port=8095
+
+###producer
+#该应用是否启用生产者
+rocketmq.producer.isOnOff=on
+#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
+rocketmq.producer.groupName=ett
+
+rocketmq.producer.instanceName=instanceName
+
+rocketmq.producer.topic=DemoTopic
+
+rocketmq.producer.tag=test
+#mq的nameserver地址
+rocketmq.producer.namesrvAddr=127.0.0.1:9876
+#消息最大长度 默认1024*4(4M)
+rocketmq.producer.maxMessageSize=4096
+#发送消息超时时间,默认3000
+rocketmq.producer.sendMsgTimeout=3000
+#发送消息失败重试次数,默认2
+rocketmq.producer.retryTimesWhenSendFailed=2
+
+
+###consumer
+##该应用是否启用消费者
+rocketmq.consumer.isOnOff=on
+
+rocketmq.consumer.groupName=ett
+
+#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
+
+rocketmq.consumer.topic=DemoTopic
+
+rocketmq.consumer.tag=test
+
+#mq的nameserver地址
+rocketmq.consumer.namesrvAddr=127.0.0.1:9876
+
+rocketmq.consumer.consumeThreadMin=20
+
+rocketmq.consumer.consumeThreadMax=64
+#设置一次消费消息的条数,默认为1条
+rocketmq.consumer.consumeMessageBatchMaxSize=1

+ 35 - 0
src/test/java/com/ett/rocketmq/DefaultProductTest.java

@@ -0,0 +1,35 @@
+package com.ett.rocketmq;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class DefaultProductTest {
+    private static final Logger logger = LoggerFactory.getLogger(DefaultProductTest.class);
+
+    /**使用RocketMq的生产者*/
+    @Autowired
+    private DefaultMQProducer defaultMQProducer;
+
+    @Test
+    public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
+        String msg = "demo msg test";
+        logger.info("开始发送消息:"+msg);
+        Message sendMsg = new Message("DemoTopic","DemoTag",msg.getBytes());
+        //默认3秒超时
+        SendResult sendResult = defaultMQProducer.send(sendMsg);
+        logger.info("消息发送响应信息:"+sendResult.toString());
+    }
+}

+ 16 - 0
src/test/java/com/ett/rocketmq/RocketmqApplicationTests.java

@@ -0,0 +1,16 @@
+package com.ett.rocketmq;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class RocketmqApplicationTests {
+
+    @Test
+    public void contextLoads() {
+    }
+
+}