Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package ${package}.reader.mycomponent

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory, StreamReaderFactoryProvider}
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
Expand All @@ -33,7 +33,7 @@ private[reader] class MyStreamReaderImpl() extends StreamReader {
}

object MyStreamReaderImpl extends StreamReaderFactory with MyStreamReaderImplAttributes {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

override def apply(conf: Configuration): StreamReader = {
logger.info("Building MyStreamReaderImpl")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package ${package}.transformer.mycomponent

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import org.apache.spark.sql.DataFrame
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory, StreamTransformerFactoryProvider}
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
Expand All @@ -33,7 +33,7 @@ private[transformer] class MyStreamTransformerImpl() extends StreamTransformer {
}

object MyStreamTransformerImpl extends StreamTransformerFactory with MyStreamTransformerImplAttributes {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

override def apply(conf: Configuration): StreamTransformer = {
logger.info("Building MyStreamTransformerImpl")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package ${package}.writer.mycomponent

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQuery
import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterFactoryProvider}
Expand All @@ -34,7 +34,7 @@ private[writer] class MyStreamWriterImpl(val destination: String) extends Stream
}

object MyStreamWriterImpl extends StreamWriterFactory with MyStreamWriterImplAttributes {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

override def apply(conf: Configuration): StreamWriter = {
logger.info("Building MyStreamWriterImpl")
Expand Down
28 changes: 0 additions & 28 deletions component-scanner/src/main/resources/log4j2.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.net.URLClassLoader
import java.nio.file.{Files, Path}
import java.util.ServiceLoader

import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReaderFactory, StreamReaderFactoryProvider}
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}
import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider}
Expand All @@ -37,7 +37,7 @@ case class ComponentDescriptor(attributes: HasComponentAttributes,
jarPath: Path)

object ComponentScanner {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)
private val jarSuffix = ".jar"

def getComponents(baseDirectory: Path): Try[ComponentDescriptors] = getComponents(List(baseDirectory))
Expand Down
28 changes: 0 additions & 28 deletions driver/src/main/resources/log4j2.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.hyperdrive.driver

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter
Expand All @@ -25,7 +25,7 @@ import za.co.absa.hyperdrive.ingestor.implementation.transformer.factories.Strea
import za.co.absa.hyperdrive.ingestor.implementation.writer.factories.StreamWriterAbstractFactory

private[driver] class IngestionDriver {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)
val ListDelimiter = ','

def ingest(configuration: Configuration): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.hyperdrive.driver
import java.util.UUID

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import org.apache.spark.sql.SparkSession
import za.co.absa.hyperdrive.driver.TerminationMethodEnum.{AwaitTermination, ProcessAllAvailable, TerminationMethod}
import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader
Expand All @@ -39,7 +39,7 @@ class SparkIngestor(val spark: SparkSession,
val awaitTerminationTimeout: Option[Long],
val conf: Configuration) {

private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

/**
* This method performs the ingestion according to the components it receives.
Expand Down Expand Up @@ -100,7 +100,7 @@ class SparkIngestor(val spark: SparkSession,

object SparkIngestor extends SparkIngestorAttributes {

private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

def apply(conf: Configuration): SparkIngestor = {
ComponentFactoryUtil.validateConfiguration(conf, getProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@ import org.apache.commons.configuration2.builder.BasicConfigurationBuilder
import org.apache.commons.configuration2.builder.fluent.Parameters
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{BaseConfiguration, Configuration}
import org.apache.logging.log4j.LogManager
import org.apache.spark.internal.Logging
import za.co.absa.hyperdrive.driver.IngestionDriver
import za.co.absa.hyperdrive.driver.utils.DriverUtil

object CommandLineIngestionDriver extends IngestionDriver {
object CommandLineIngestionDriver extends IngestionDriver with Logging {

private val logger = LogManager.getLogger
private val PropertyDelimiter = "="

def main(args: Array[String]): Unit = {
if (args.isEmpty) {
throw new IllegalArgumentException("No configuration provided.")
}

logger.info(s"Starting Hyperdrive ${DriverUtil.getVersionString}")
logInfo(s"Starting Hyperdrive ${DriverUtil.getVersionString}")

logger.info(s"Going to load ${args.length} configurations from command line.")
logInfo(s"Going to load ${args.length} configurations from command line.")
val configuration = parseConfiguration(args)
logger.info("Configuration loaded. Going to invoke ingestion.")
logInfo("Configuration loaded. Going to invoke ingestion.")
ingest(configuration)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,33 @@
package za.co.absa.hyperdrive.driver.drivers

import java.nio.file.{Files, Paths}

import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder
import org.apache.commons.configuration2.builder.fluent.Parameters
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{Configuration, PropertiesConfiguration}
import org.apache.logging.log4j.LogManager
import org.apache.spark.internal.Logging
import za.co.absa.hyperdrive.driver.IngestionDriver
import za.co.absa.hyperdrive.driver.utils.DriverUtil

/**
* This driver launches ingestion by loading the configurations from a properties file.
*/
object PropertiesIngestionDriver extends IngestionDriver {

private val logger = LogManager.getLogger
object PropertiesIngestionDriver extends IngestionDriver with Logging {

def main(args: Array[String]): Unit = {
val propertiesFile = getPropertiesFilePath(args)
if (propertiesFile.isEmpty) {
throw new IllegalArgumentException("No properties file supplied.")
}
logger.info(s"Starting Hyperdrive ${DriverUtil.getVersionString}")
logInfo(s"Starting Hyperdrive ${DriverUtil.getVersionString}")

if (isInvalid(propertiesFile.get)) {
throw new IllegalArgumentException(s"Invalid properties file: '${propertiesFile.get}'.")
}

logger.info(s"Going to load ingestion configurations from '${propertiesFile.get}'.")
logInfo(s"Going to load ingestion configurations from '${propertiesFile.get}'.")
val configurations = loadConfiguration(propertiesFile.get)
logger.info(s"Configurations loaded. Going to invoke ingestion: [$configurations]")
logInfo(s"Configurations loaded. Going to invoke ingestion: [$configurations]")
ingest(configurations)
}

Expand All @@ -63,7 +60,7 @@ object PropertiesIngestionDriver extends IngestionDriver {
case v if v == 0 => None
case v =>
if (v > 1) {
logger.warn(s"Expected only properties file path, but got extra parameters. Returning first as the path. All parameters = [${args.mkString(",")}]")
logWarning(s"Expected only properties file path, but got extra parameters. Returning first as the path. All parameters = [${args.mkString(",")}]")
}
Some(args(0))
}
Expand Down
28 changes: 0 additions & 28 deletions driver/src/test/resources/log4j2-test.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.{GenericContainer, KafkaContainer, Network}
import org.testcontainers.utility.DockerImageName

case class SchemaRegistryContainer(dockerImageName: String) extends GenericContainer[SchemaRegistryContainer](dockerImageName)

class KafkaSchemaRegistryWrapper {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

private val confluentPlatformVersion = "5.3.1" // should be same as kafka.avro.serializer.version property in pom file
private val schemaRegistryPort = 8081
Expand Down
19 changes: 19 additions & 0 deletions ingestor-default/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright 2018 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %p %c - %m%n
28 changes: 0 additions & 28 deletions ingestor-default/src/main/resources/log4j2.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.hyperdrive.ingestor.implementation.reader.factories

import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory}
import za.co.absa.hyperdrive.shared.utils.ClassLoaderUtils

Expand All @@ -27,7 +27,7 @@ import za.co.absa.hyperdrive.shared.utils.ClassLoaderUtils
*/
object StreamReaderAbstractFactory {

private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)
val componentConfigKey = "component.reader"

def build(config: Configuration): StreamReader = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.net.URI
import org.apache.commons.configuration2.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.commons.lang3.StringUtils
import org.apache.logging.log4j.LogManager
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.DataStreamReader
import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory}
Expand Down Expand Up @@ -47,7 +47,7 @@ private[reader] class KafkaStreamReader(
val topic: String, val brokers: String, val checkpointLocation: String, val extraConfs: Map[String, String]) extends StreamReader {
import KafkaStreamReaderProps._

private val logger = LogManager.getLogger()
private val logger = LoggerFactory.getLogger(this.getClass)

if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(s"Invalid topic: '$topic'")
Expand Down Expand Up @@ -109,7 +109,7 @@ private[reader] class KafkaStreamReader(
}

object KafkaStreamReader extends StreamReaderFactory with KafkaStreamReaderAttributes {
private val logger = LogManager.getLogger
private val logger = LoggerFactory.getLogger(this.getClass)

override def apply(conf: Configuration): StreamReader = {
val topic = getTopic(conf)
Expand Down
Loading